# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0
"""McpApplication - MCP Streamable HTTP transport for genro-asgi.
Implements the MCP (Model Context Protocol) Streamable HTTP transport
using JSON-RPC 2.0 envelopes over HTTP POST.
Spec references:
- JSON-RPC 2.0: https://www.jsonrpc.org/specification
- MCP Streamable HTTP: stateless mode (json_response=True, stateless_http=True)
Protocol summary:
- All requests: POST / with JSON-RPC 2.0 body
- Request (has "id"): process and return JSON-RPC result
- Notification (no "id"): return HTTP 202, no body
- tools/list: enumerate tools from the router
- tools/call: dispatch to router node by path
- initialize: return server capabilities
Tool name to route path convention:
"code_search_symbols" → "code/search_symbols"
"""
from __future__ import annotations
import inspect
import json
import typing
from typing import TYPE_CHECKING, Any, Union
from genro_toolbox.smartasync import smartasync # type: ignore[import-untyped]
from .asgi_application import AsgiApplication
if TYPE_CHECKING:
from genro_routes import Router # type: ignore[import-untyped]
from ..types import Receive, Scope, Send
__all__ = ["McpApplication"]
_JSONRPC_PARSE_ERROR = -32700
_JSONRPC_INVALID_REQUEST = -32600
_JSONRPC_METHOD_NOT_FOUND = -32601
_JSONRPC_INTERNAL_ERROR = -32603
[docs]
class McpApplication(AsgiApplication):
"""ASGI application implementing MCP Streamable HTTP transport.
Accepts an external genro-routes Router and exposes its @route
entries as MCP tools via JSON-RPC 2.0 over HTTP POST.
Usage::
class SourcererMcpApp(McpApplication):
mcp_name = "sourcerer"
mcp_version = "1.0.0"
def on_init(self, **kwargs):
api = SourcererAPI(...)
self.set_router(api.api)
"""
mcp_name: str = "genro-mcp"
mcp_version: str = "1.0.0"
[docs]
def __init__(self, **kwargs: Any) -> None:
"""Initialize with optional mcp_name/mcp_version overrides."""
self._external_router: Router | None = None
mcp_name = kwargs.pop("mcp_name", None)
mcp_version = kwargs.pop("mcp_version", None)
if mcp_name is not None:
self.mcp_name = mcp_name
if mcp_version is not None:
self.mcp_version = mcp_version
super().__init__(**kwargs)
[docs]
def set_router(self, router: Router) -> None:
"""Set the external genro-routes Router to expose as MCP tools."""
self._external_router = router
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""ASGI entry point — handle MCP Streamable HTTP requests."""
if scope["type"] != "http":
return
body = await self._read_body(receive)
try:
envelope = json.loads(body)
except (json.JSONDecodeError, ValueError):
await self._send_error(send, _JSONRPC_PARSE_ERROR, "Parse error", None)
return
if not isinstance(envelope, dict):
await self._send_error(send, _JSONRPC_INVALID_REQUEST, "Invalid request", None)
return
jsonrpc_id = envelope.get("id")
method = envelope.get("method", "")
params = envelope.get("params") or {}
# Notification: no id → HTTP 202, no elaboration
if "id" not in envelope:
await self._send_notification_ack(send)
return
auth = scope.get("auth") or {}
auth_tags = auth.get("tags", [])
if isinstance(auth_tags, str):
auth_tags = [t.strip() for t in auth_tags.split(",") if t.strip()]
try:
result = await self._dispatch(method, params, auth_tags)
except _McpError as exc:
await self._send_error(send, exc.code, exc.message, jsonrpc_id)
return
except Exception as exc:
await self._send_error(send, _JSONRPC_INTERNAL_ERROR, str(exc), jsonrpc_id)
return
await self._send_result(send, result, jsonrpc_id)
async def _dispatch(self, method: str, params: dict, auth_tags: list) -> Any:
"""Route JSON-RPC method to the appropriate handler."""
if method == "initialize":
return self._handle_initialize()
if method == "tools/list":
return self._handle_tools_list()
if method == "tools/call":
return await self._handle_tools_call(params, auth_tags)
raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Method not found: {method}")
def _handle_initialize(self) -> dict:
"""Return MCP server capabilities."""
return {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {
"name": self.mcp_name,
"version": self.mcp_version,
},
}
def _handle_tools_list(self) -> dict:
"""Enumerate all tools from the external router."""
if self._external_router is None:
return {"tools": []}
nodes = self._external_router.nodes(forbidden=True)
tools: list[dict] = []
self._collect_tools(nodes, "", tools)
return {"tools": tools}
def _collect_tools(self, nodes: dict, prefix: str, tools: list) -> None:
"""Recursively collect tools from router nodes."""
for name, info in nodes.get("entries", {}).items():
tool_name = f"{prefix}{name}" if not prefix else f"{prefix}_{name}"
tool = self._build_tool_descriptor(tool_name, info)
tools.append(tool)
for router_name, sub_nodes in nodes.get("routers", {}).items():
sub_prefix = f"{prefix}{router_name}" if not prefix else f"{prefix}_{router_name}"
self._collect_tools(sub_nodes, sub_prefix, tools)
def _build_tool_descriptor(self, tool_name: str, info: dict) -> dict:
"""Build MCP tool descriptor with JSON Schema for parameters."""
callable_fn = info.get("callable")
metadata = info.get("metadata") or {}
description = metadata.get("description") or info.get("doc") or tool_name
properties: dict = {}
required: list = []
if callable_fn is not None:
try:
sig = inspect.signature(callable_fn)
resolved_hints = self._resolve_type_hints(callable_fn)
for param_name, param in sig.parameters.items():
if param_name == "self":
continue
ann = resolved_hints.get(param_name, param.annotation)
json_type = self._annotation_to_json_type(ann)
properties[param_name] = {"type": json_type}
if param.default is inspect.Parameter.empty:
required.append(param_name)
except (ValueError, TypeError):
pass
schema: dict = {"type": "object", "properties": properties}
if required:
schema["required"] = required
return {
"name": tool_name,
"description": description,
"inputSchema": schema,
}
async def _handle_tools_call(self, params: dict, auth_tags: list) -> dict:
"""Dispatch tools/call to the router node."""
if self._external_router is None:
raise _McpError(_JSONRPC_INTERNAL_ERROR, "No router configured")
name = params.get("name", "")
arguments = params.get("arguments") or {}
# Convert tool name to route path: "code_search_symbols" → "code/search_symbols"
path = name.replace("_", "/", 1) if "_" in name else name
node = self._external_router.node(
path,
auth_tags=",".join(auth_tags) if isinstance(auth_tags, list) else auth_tags,
)
if node.error == "not_authorized":
raise _McpError(-32000, "Not authorized")
if node.error == "not_found":
raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Tool not found: {name}")
if node.error:
raise _McpError(_JSONRPC_INTERNAL_ERROR, f"Router error: {node.error}")
if node._entry is not None:
arguments = self._coerce_arguments(node._entry.func, arguments)
result = await smartasync(node)(**arguments)
serialized = self._serialize_result(result)
return {"content": [{"type": "text", "text": serialized}]}
def _serialize_result(self, result: Any) -> str:
"""Serialize result to string for MCP content."""
if isinstance(result, str):
return result
try:
return json.dumps(result, ensure_ascii=False, default=str)
except (TypeError, ValueError):
return str(result)
async def _read_body(self, receive: Receive) -> bytes:
"""Read full request body from ASGI receive channel."""
body = b""
while True:
message = await receive()
chunk = message.get("body", b"")
if isinstance(chunk, bytes):
body += chunk
if not message.get("more_body", False):
break
return body
async def _send_result(self, send: Send, result: Any, jsonrpc_id: Any) -> None:
"""Send JSON-RPC success response."""
payload = {
"jsonrpc": "2.0",
"id": jsonrpc_id,
"result": result,
}
await self._send_json(send, 200, payload)
async def _send_error(
self, send: Send, code: int, message: str, jsonrpc_id: Any
) -> None:
"""Send JSON-RPC error response."""
payload = {
"jsonrpc": "2.0",
"id": jsonrpc_id,
"error": {"code": code, "message": message},
}
await self._send_json(send, 200, payload)
async def _send_notification_ack(self, send: Send) -> None:
"""Send HTTP 202 for JSON-RPC notifications (no id)."""
await send({
"type": "http.response.start",
"status": 202,
"headers": [[b"content-length", b"0"]],
})
await send({"type": "http.response.body", "body": b""})
async def _send_json(self, send: Send, status: int, payload: dict) -> None:
"""Send JSON HTTP response."""
body = json.dumps(payload, ensure_ascii=False).encode()
await send({
"type": "http.response.start",
"status": status,
"headers": [
[b"content-type", b"application/json"],
[b"content-length", str(len(body)).encode()],
],
})
await send({"type": "http.response.body", "body": body})
def _annotation_to_json_type(self, annotation: Any) -> str:
"""Map Python type annotation to JSON Schema type string."""
origin = getattr(annotation, "__origin__", None)
if origin is Union:
args = [a for a in annotation.__args__ if a is not type(None)]
if args:
annotation = args[0]
type_mapping = {int: "integer", float: "number", bool: "boolean", str: "string"}
str_mapping = {"int": "integer", "float": "number", "bool": "boolean", "str": "string"}
if isinstance(annotation, str):
return str_mapping.get(annotation, "string")
return type_mapping.get(annotation, "string")
def _resolve_type_hints(self, callable_fn: Any) -> dict:
"""Resolve string annotations to actual types via typing.get_type_hints."""
try:
hints = typing.get_type_hints(callable_fn)
hints.pop("return", None)
return hints
except Exception:
return {}
def _coerce_arguments(self, callable_fn: Any, arguments: dict) -> dict:
"""Coerce argument values to match parameter type annotations."""
resolved = self._resolve_type_hints(callable_fn)
if not resolved:
try:
sig = inspect.signature(callable_fn)
resolved = {
name: param.annotation
for name, param in sig.parameters.items()
if param.annotation is not inspect.Parameter.empty
}
except (ValueError, TypeError):
return arguments
coerced = dict(arguments)
for param_name, value in coerced.items():
if param_name not in resolved:
continue
ann = resolved[param_name]
origin = getattr(ann, "__origin__", None)
if origin is Union:
args = [a for a in ann.__args__ if a is not type(None)]
if args:
ann = args[0]
if isinstance(value, str):
try:
if ann is int:
coerced[param_name] = int(value)
elif ann is float:
coerced[param_name] = float(value)
elif ann is bool:
coerced[param_name] = value.lower() in ("true", "1", "yes")
except (ValueError, TypeError):
pass
return coerced
class _McpError(Exception):
"""Internal exception for JSON-RPC error responses."""
def __init__(self, code: int, message: str) -> None:
super().__init__(message)
self.code = code
self.message = message
if __name__ == "__main__":
app = McpApplication()
print(f"McpApplication: {app.mcp_name} v{app.mcp_version}")