# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0
"""McpApplication - MCP Streamable HTTP transport for genro-asgi.
Implements the MCP (Model Context Protocol) Streamable HTTP transport
using JSON-RPC 2.0 envelopes over standard genro-asgi routing.
The application works inside the Dispatcher via the default_entry
mechanism: when mounted as an app (e.g., under ``/mcp/``), requests
to ``POST /mcp/`` resolve to the ``index`` route handler.
Spec references:
JSON-RPC 2.0: https://www.jsonrpc.org/specification
MCP Streamable HTTP: stateless mode
Protocol summary:
All requests: POST with JSON-RPC 2.0 body (parsed by framework)
Request (has "id"): process and return JSON-RPC result
Notification (no "id"): return HTTP 202, no body
tools/list: enumerate tools from the external router
tools/call: dispatch to router node by path
initialize: return server capabilities
Tool name to route path convention:
"code_search_symbols" -> "code/search_symbols"
Integration with genro-asgi:
McpApplication extends AsgiApplication and uses @route for its
endpoint. The Dispatcher handles request creation, middleware
chain (auth, CORS, errors), and response sending. The handler
accesses the parsed body via ``get_current_request().data``.
"""
from __future__ import annotations
import inspect
import json
import typing
from typing import TYPE_CHECKING, Any, Union
from genro_routes import route # type: ignore[import-untyped]
from genro_toolbox.smartasync import smartasync # type: ignore[import-untyped]
from ...request import get_current_request
from ..asgi_application import AsgiApplication
if TYPE_CHECKING:
from genro_routes import Router # type: ignore[import-untyped]
__all__ = ["McpApplication"]
_JSONRPC_INVALID_REQUEST = -32600
_JSONRPC_METHOD_NOT_FOUND = -32601
_JSONRPC_INTERNAL_ERROR = -32603
[docs]
class McpApplication(AsgiApplication):
"""MCP Streamable HTTP transport as a genro-asgi application.
Accepts an external genro-routes Router and exposes its @route
entries as MCP tools via JSON-RPC 2.0 over HTTP POST.
Mounts inside the server like any other app and participates in
the full middleware chain (auth, CORS, errors). The ``index``
route handles all JSON-RPC messages via the default_entry mechanism.
Usage::
class SourcererMcpApp(McpApplication):
mcp_name = "sourcerer"
mcp_version = "1.0.0"
def on_init(self, **kwargs):
api = SourcererAPI(...)
self.set_router(api.api)
"""
mcp_name: str = "genro-mcp"
mcp_version: str = "1.0.0"
[docs]
def __init__(self, **kwargs: Any) -> None:
"""Initialize with optional mcp_name/mcp_version overrides."""
self._external_router: Router | None = None
mcp_name = kwargs.pop("mcp_name", None)
mcp_version = kwargs.pop("mcp_version", None)
if mcp_name is not None:
self.mcp_name = mcp_name
if mcp_version is not None:
self.mcp_version = mcp_version
super().__init__(**kwargs)
[docs]
def set_router(self, router: Router) -> None:
"""Set the external genro-routes Router to expose as MCP tools."""
self._external_router = router
[docs]
@route(meta_mime_type="application/json")
def index(self) -> dict | None:
"""JSON-RPC 2.0 endpoint for MCP Streamable HTTP.
Reads the parsed body from ``request.data`` (populated by the
framework via asgi_data). Dispatches based on JSON-RPC method.
For notifications (no "id" field), sets HTTP 202 and returns None.
For requests, returns a JSON-RPC response dict.
"""
request = get_current_request()
assert request is not None # guaranteed inside Dispatcher
envelope = request.data
if not isinstance(envelope, dict):
return self._jsonrpc_error(
_JSONRPC_INVALID_REQUEST, "Invalid request", None,
)
jsonrpc_id = envelope.get("id")
method = envelope.get("method", "")
params = envelope.get("params") or {}
# Notification: no id → HTTP 202, no body
if "id" not in envelope:
request.response.status_code = 202
return None
auth_tags = request.auth_tags
try:
result = self._dispatch(method, params, auth_tags)
except _McpError as exc:
return self._jsonrpc_error(exc.code, exc.message, jsonrpc_id)
except Exception as exc:
return self._jsonrpc_error(
_JSONRPC_INTERNAL_ERROR, str(exc), jsonrpc_id,
)
return {"jsonrpc": "2.0", "id": jsonrpc_id, "result": result}
def _jsonrpc_error(self, code: int, message: str, jsonrpc_id: Any) -> dict:
"""Build a JSON-RPC 2.0 error response dict.
Args:
code: JSON-RPC error code (e.g. -32600, -32601).
message: Human-readable error message.
jsonrpc_id: Request id to echo back (may be None).
Returns:
JSON-RPC 2.0 error envelope dict.
"""
return {
"jsonrpc": "2.0",
"id": jsonrpc_id,
"error": {"code": code, "message": message},
}
def _dispatch(self, method: str, params: dict, auth_tags: list) -> Any:
"""Route JSON-RPC method to the appropriate handler.
Args:
method: JSON-RPC method name (e.g. "initialize", "tools/list").
params: JSON-RPC params dict.
auth_tags: Authenticated user tags for filtering.
Returns:
Handler result dict.
Raises:
_McpError: If method is not recognized.
"""
if method == "initialize":
return self._handle_initialize()
if method == "tools/list":
return self._handle_tools_list()
if method == "tools/call":
return self._handle_tools_call(params, auth_tags)
raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Method not found: {method}")
def _handle_initialize(self) -> dict:
"""Return MCP server capabilities."""
return {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": self.mcp_name,
"version": self.mcp_version,
},
}
def _handle_tools_list(self) -> dict:
"""Enumerate all tools from the external router."""
if self._external_router is None:
return {"tools": []}
nodes = self._external_router.nodes(forbidden=True, channel_channel="mcp")
tools: list[dict] = []
self._collect_tools(nodes, "", tools)
return {"tools": tools}
def _collect_tools(self, nodes: dict, prefix: str, tools: list) -> None:
"""Recursively collect tools from router nodes."""
for name, info in nodes.get("entries", {}).items():
tool_name = f"{prefix}{name}" if not prefix else f"{prefix}_{name}"
tool = self._build_tool_descriptor(tool_name, info)
tools.append(tool)
for router_name, sub_nodes in nodes.get("routers", {}).items():
sub_prefix = f"{prefix}{router_name}" if not prefix else f"{prefix}_{router_name}"
self._collect_tools(sub_nodes, sub_prefix, tools)
def _build_tool_descriptor(self, tool_name: str, info: dict) -> dict:
"""Build MCP tool descriptor with JSON Schema for parameters."""
callable_fn = info.get("callable")
metadata = info.get("metadata") or {}
description = metadata.get("description") or info.get("doc") or tool_name
properties: dict = {}
required: list = []
if callable_fn is not None:
try:
sig = inspect.signature(callable_fn)
resolved_hints = self._resolve_type_hints(callable_fn)
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
ann = resolved_hints.get(param_name, param.annotation)
json_type = self._annotation_to_json_type(ann)
properties[param_name] = {"type": json_type}
if param.default is inspect.Parameter.empty:
required.append(param_name)
except (ValueError, TypeError):
pass
schema: dict = {"type": "object", "properties": properties}
if required:
schema["required"] = required
return {
"name": tool_name,
"description": description,
"inputSchema": schema,
}
def _handle_tools_call(self, params: dict, auth_tags: list) -> dict:
"""Dispatch tools/call to the router node."""
if self._external_router is None:
raise _McpError(_JSONRPC_INTERNAL_ERROR, "No router configured")
name = params.get("name", "")
arguments = params.get("arguments") or {}
# Convert tool name to route path: "code_search_symbols" → "code/search_symbols"
path = name.replace("_", "/", 1) if "_" in name else name
node = self._external_router.node(
path,
auth_tags=",".join(auth_tags) if isinstance(auth_tags, list) else auth_tags,
channel_channel="mcp",
)
if node.error == "not_authorized":
raise _McpError(-32000, "Not authorized")
if node.error == "not_found":
raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Tool not found: {name}")
if node.error:
raise _McpError(_JSONRPC_INTERNAL_ERROR, f"Router error: {node.error}")
result = smartasync(node)(**arguments)
serialized = self._serialize_result(result)
return {"content": [{"type": "text", "text": serialized}]}
def _serialize_result(self, result: Any) -> str:
"""Serialize result to string for MCP content."""
if isinstance(result, str):
return result
try:
return json.dumps(result, ensure_ascii=False, default=str)
except (TypeError, ValueError):
return str(result)
def _annotation_to_json_type(self, annotation: Any) -> str:
"""Map Python type annotation to JSON Schema type string."""
origin = getattr(annotation, "__origin__", None)
if origin is Union:
args = [a for a in annotation.__args__ if a is not type(None)]
if args:
annotation = args[0]
type_mapping = {int: "integer", float: "number", bool: "boolean", str: "string"}
str_mapping = {"int": "integer", "float": "number", "bool": "boolean", "str": "string"}
if isinstance(annotation, str):
return str_mapping.get(annotation, "string")
return type_mapping.get(annotation, "string")
def _resolve_type_hints(self, callable_fn: Any) -> dict:
"""Resolve string annotations to actual types via typing.get_type_hints."""
try:
hints = typing.get_type_hints(callable_fn)
hints.pop("return", None)
return hints
except Exception:
return {}
class _McpError(Exception):
"""Internal exception for JSON-RPC error responses."""
def __init__(self, code: int, message: str) -> None:
super().__init__(message)
self.code = code
self.message = message
if __name__ == "__main__":
app = McpApplication()
print(f"McpApplication: {app.mcp_name} v{app.mcp_version}")