Source code for genro_asgi.applications.mcp_application

# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0

"""McpApplication - MCP Streamable HTTP transport for genro-asgi.

Implements the MCP (Model Context Protocol) Streamable HTTP transport
using JSON-RPC 2.0 envelopes over HTTP POST.

Spec references:
- JSON-RPC 2.0: https://www.jsonrpc.org/specification
- MCP Streamable HTTP: stateless mode (json_response=True, stateless_http=True)

Protocol summary:
- All requests: POST / with JSON-RPC 2.0 body
- Request (has "id"): process and return JSON-RPC result
- Notification (no "id"): return HTTP 202, no body
- tools/list: enumerate tools from the router
- tools/call: dispatch to router node by path
- initialize: return server capabilities

Tool name to route path convention:
    "code_search_symbols" → "code/search_symbols"
"""

from __future__ import annotations

import inspect
import json
import typing
from typing import TYPE_CHECKING, Any, Union

from genro_toolbox.smartasync import smartasync  # type: ignore[import-untyped]

from .asgi_application import AsgiApplication

if TYPE_CHECKING:
    from genro_routes import Router  # type: ignore[import-untyped]
    from ..types import Receive, Scope, Send

__all__ = ["McpApplication"]

_JSONRPC_PARSE_ERROR = -32700
_JSONRPC_INVALID_REQUEST = -32600
_JSONRPC_METHOD_NOT_FOUND = -32601
_JSONRPC_INTERNAL_ERROR = -32603

[docs] class McpApplication(AsgiApplication): """ASGI application implementing MCP Streamable HTTP transport. Accepts an external genro-routes Router and exposes its @route entries as MCP tools via JSON-RPC 2.0 over HTTP POST. Usage:: class SourcererMcpApp(McpApplication): mcp_name = "sourcerer" mcp_version = "1.0.0" def on_init(self, **kwargs): api = SourcererAPI(...) self.set_router(api.api) """ mcp_name: str = "genro-mcp" mcp_version: str = "1.0.0"
[docs] def __init__(self, **kwargs: Any) -> None: """Initialize with optional mcp_name/mcp_version overrides.""" self._external_router: Router | None = None mcp_name = kwargs.pop("mcp_name", None) mcp_version = kwargs.pop("mcp_version", None) if mcp_name is not None: self.mcp_name = mcp_name if mcp_version is not None: self.mcp_version = mcp_version super().__init__(**kwargs)
[docs] def set_router(self, router: Router) -> None: """Set the external genro-routes Router to expose as MCP tools.""" self._external_router = router
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: """ASGI entry point — handle MCP Streamable HTTP requests.""" if scope["type"] != "http": return body = await self._read_body(receive) try: envelope = json.loads(body) except (json.JSONDecodeError, ValueError): await self._send_error(send, _JSONRPC_PARSE_ERROR, "Parse error", None) return if not isinstance(envelope, dict): await self._send_error(send, _JSONRPC_INVALID_REQUEST, "Invalid request", None) return jsonrpc_id = envelope.get("id") method = envelope.get("method", "") params = envelope.get("params") or {} # Notification: no id → HTTP 202, no elaboration if "id" not in envelope: await self._send_notification_ack(send) return auth = scope.get("auth") or {} auth_tags = auth.get("tags", []) if isinstance(auth_tags, str): auth_tags = [t.strip() for t in auth_tags.split(",") if t.strip()] try: result = await self._dispatch(method, params, auth_tags) except _McpError as exc: await self._send_error(send, exc.code, exc.message, jsonrpc_id) return except Exception as exc: await self._send_error(send, _JSONRPC_INTERNAL_ERROR, str(exc), jsonrpc_id) return await self._send_result(send, result, jsonrpc_id) async def _dispatch(self, method: str, params: dict, auth_tags: list) -> Any: """Route JSON-RPC method to the appropriate handler.""" if method == "initialize": return self._handle_initialize() if method == "tools/list": return self._handle_tools_list() if method == "tools/call": return await self._handle_tools_call(params, auth_tags) raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Method not found: {method}") def _handle_initialize(self) -> dict: """Return MCP server capabilities.""" return { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": { "name": self.mcp_name, "version": self.mcp_version, }, } def _handle_tools_list(self) -> dict: """Enumerate all tools from the external router.""" if self._external_router is None: return {"tools": []} nodes = self._external_router.nodes(forbidden=True) tools: list[dict] = [] self._collect_tools(nodes, "", tools) return {"tools": tools} def _collect_tools(self, nodes: dict, prefix: str, tools: list) -> None: """Recursively collect tools from router nodes.""" for name, info in nodes.get("entries", {}).items(): tool_name = f"{prefix}{name}" if not prefix else f"{prefix}_{name}" tool = self._build_tool_descriptor(tool_name, info) tools.append(tool) for router_name, sub_nodes in nodes.get("routers", {}).items(): sub_prefix = f"{prefix}{router_name}" if not prefix else f"{prefix}_{router_name}" self._collect_tools(sub_nodes, sub_prefix, tools) def _build_tool_descriptor(self, tool_name: str, info: dict) -> dict: """Build MCP tool descriptor with JSON Schema for parameters.""" callable_fn = info.get("callable") metadata = info.get("metadata") or {} description = metadata.get("description") or info.get("doc") or tool_name properties: dict = {} required: list = [] if callable_fn is not None: try: sig = inspect.signature(callable_fn) resolved_hints = self._resolve_type_hints(callable_fn) for param_name, param in sig.parameters.items(): if param_name == "self": continue ann = resolved_hints.get(param_name, param.annotation) json_type = self._annotation_to_json_type(ann) properties[param_name] = {"type": json_type} if param.default is inspect.Parameter.empty: required.append(param_name) except (ValueError, TypeError): pass schema: dict = {"type": "object", "properties": properties} if required: schema["required"] = required return { "name": tool_name, "description": description, "inputSchema": schema, } async def _handle_tools_call(self, params: dict, auth_tags: list) -> dict: """Dispatch tools/call to the router node.""" if self._external_router is None: raise _McpError(_JSONRPC_INTERNAL_ERROR, "No router configured") name = params.get("name", "") arguments = params.get("arguments") or {} # Convert tool name to route path: "code_search_symbols" → "code/search_symbols" path = name.replace("_", "/", 1) if "_" in name else name node = self._external_router.node( path, auth_tags=",".join(auth_tags) if isinstance(auth_tags, list) else auth_tags, ) if node.error == "not_authorized": raise _McpError(-32000, "Not authorized") if node.error == "not_found": raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Tool not found: {name}") if node.error: raise _McpError(_JSONRPC_INTERNAL_ERROR, f"Router error: {node.error}") if node._entry is not None: arguments = self._coerce_arguments(node._entry.func, arguments) result = await smartasync(node)(**arguments) serialized = self._serialize_result(result) return {"content": [{"type": "text", "text": serialized}]} def _serialize_result(self, result: Any) -> str: """Serialize result to string for MCP content.""" if isinstance(result, str): return result try: return json.dumps(result, ensure_ascii=False, default=str) except (TypeError, ValueError): return str(result) async def _read_body(self, receive: Receive) -> bytes: """Read full request body from ASGI receive channel.""" body = b"" while True: message = await receive() chunk = message.get("body", b"") if isinstance(chunk, bytes): body += chunk if not message.get("more_body", False): break return body async def _send_result(self, send: Send, result: Any, jsonrpc_id: Any) -> None: """Send JSON-RPC success response.""" payload = { "jsonrpc": "2.0", "id": jsonrpc_id, "result": result, } await self._send_json(send, 200, payload) async def _send_error( self, send: Send, code: int, message: str, jsonrpc_id: Any ) -> None: """Send JSON-RPC error response.""" payload = { "jsonrpc": "2.0", "id": jsonrpc_id, "error": {"code": code, "message": message}, } await self._send_json(send, 200, payload) async def _send_notification_ack(self, send: Send) -> None: """Send HTTP 202 for JSON-RPC notifications (no id).""" await send({ "type": "http.response.start", "status": 202, "headers": [[b"content-length", b"0"]], }) await send({"type": "http.response.body", "body": b""}) async def _send_json(self, send: Send, status: int, payload: dict) -> None: """Send JSON HTTP response.""" body = json.dumps(payload, ensure_ascii=False).encode() await send({ "type": "http.response.start", "status": status, "headers": [ [b"content-type", b"application/json"], [b"content-length", str(len(body)).encode()], ], }) await send({"type": "http.response.body", "body": body}) def _annotation_to_json_type(self, annotation: Any) -> str: """Map Python type annotation to JSON Schema type string.""" origin = getattr(annotation, "__origin__", None) if origin is Union: args = [a for a in annotation.__args__ if a is not type(None)] if args: annotation = args[0] type_mapping = {int: "integer", float: "number", bool: "boolean", str: "string"} str_mapping = {"int": "integer", "float": "number", "bool": "boolean", "str": "string"} if isinstance(annotation, str): return str_mapping.get(annotation, "string") return type_mapping.get(annotation, "string") def _resolve_type_hints(self, callable_fn: Any) -> dict: """Resolve string annotations to actual types via typing.get_type_hints.""" try: hints = typing.get_type_hints(callable_fn) hints.pop("return", None) return hints except Exception: return {} def _coerce_arguments(self, callable_fn: Any, arguments: dict) -> dict: """Coerce argument values to match parameter type annotations.""" resolved = self._resolve_type_hints(callable_fn) if not resolved: try: sig = inspect.signature(callable_fn) resolved = { name: param.annotation for name, param in sig.parameters.items() if param.annotation is not inspect.Parameter.empty } except (ValueError, TypeError): return arguments coerced = dict(arguments) for param_name, value in coerced.items(): if param_name not in resolved: continue ann = resolved[param_name] origin = getattr(ann, "__origin__", None) if origin is Union: args = [a for a in ann.__args__ if a is not type(None)] if args: ann = args[0] if isinstance(value, str): try: if ann is int: coerced[param_name] = int(value) elif ann is float: coerced[param_name] = float(value) elif ann is bool: coerced[param_name] = value.lower() in ("true", "1", "yes") except (ValueError, TypeError): pass return coerced
class _McpError(Exception): """Internal exception for JSON-RPC error responses.""" def __init__(self, code: int, message: str) -> None: super().__init__(message) self.code = code self.message = message if __name__ == "__main__": app = McpApplication() print(f"McpApplication: {app.mcp_name} v{app.mcp_version}")