Source code for genro_asgi.applications.mcp_application.mcp_application

# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0

"""McpApplication - MCP Streamable HTTP transport for genro-asgi.

Implements the MCP (Model Context Protocol) Streamable HTTP transport
using JSON-RPC 2.0 envelopes over standard genro-asgi routing.

The application works inside the Dispatcher via the default_entry
mechanism: when mounted as an app (e.g., under ``/mcp/``), requests
to ``POST /mcp/`` resolve to the ``index`` route handler.

Spec references:
    JSON-RPC 2.0: https://www.jsonrpc.org/specification
    MCP Streamable HTTP: stateless mode

Protocol summary:
    All requests: POST with JSON-RPC 2.0 body (parsed by framework)
    Request (has "id"): process and return JSON-RPC result
    Notification (no "id"): return HTTP 202, no body
    tools/list: enumerate tools from the external router
    tools/call: dispatch to router node by path
    initialize: return server capabilities

Tool name to route path convention:
    "code_search_symbols" -> "code/search_symbols"

Integration with genro-asgi:
    McpApplication extends AsgiApplication and uses @route for its
    endpoint.  The Dispatcher handles request creation, middleware
    chain (auth, CORS, errors), and response sending.  The handler
    accesses the parsed body via ``get_current_request().data``.
"""

from __future__ import annotations

import inspect
import json
import typing
from typing import TYPE_CHECKING, Any, Union

from genro_routes import route  # type: ignore[import-untyped]
from genro_toolbox.smartasync import smartasync  # type: ignore[import-untyped]

from ...request import get_current_request
from ..asgi_application import AsgiApplication

if TYPE_CHECKING:
    from genro_routes import Router  # type: ignore[import-untyped]

__all__ = ["McpApplication"]

_JSONRPC_INVALID_REQUEST = -32600
_JSONRPC_METHOD_NOT_FOUND = -32601
_JSONRPC_INTERNAL_ERROR = -32603


[docs] class McpApplication(AsgiApplication): """MCP Streamable HTTP transport as a genro-asgi application. Accepts an external genro-routes Router and exposes its @route entries as MCP tools via JSON-RPC 2.0 over HTTP POST. Mounts inside the server like any other app and participates in the full middleware chain (auth, CORS, errors). The ``index`` route handles all JSON-RPC messages via the default_entry mechanism. Usage:: class SourcererMcpApp(McpApplication): mcp_name = "sourcerer" mcp_version = "1.0.0" def on_init(self, **kwargs): api = SourcererAPI(...) self.set_router(api.api) """ mcp_name: str = "genro-mcp" mcp_version: str = "1.0.0"
[docs] def __init__(self, **kwargs: Any) -> None: """Initialize with optional mcp_name/mcp_version overrides.""" self._external_router: Router | None = None mcp_name = kwargs.pop("mcp_name", None) mcp_version = kwargs.pop("mcp_version", None) if mcp_name is not None: self.mcp_name = mcp_name if mcp_version is not None: self.mcp_version = mcp_version super().__init__(**kwargs)
[docs] def set_router(self, router: Router) -> None: """Set the external genro-routes Router to expose as MCP tools.""" self._external_router = router
[docs] @route(meta_mime_type="application/json") def index(self) -> dict | None: """JSON-RPC 2.0 endpoint for MCP Streamable HTTP. Reads the parsed body from ``request.data`` (populated by the framework via asgi_data). Dispatches based on JSON-RPC method. For notifications (no "id" field), sets HTTP 202 and returns None. For requests, returns a JSON-RPC response dict. """ request = get_current_request() assert request is not None # guaranteed inside Dispatcher envelope = request.data if not isinstance(envelope, dict): return self._jsonrpc_error( _JSONRPC_INVALID_REQUEST, "Invalid request", None, ) jsonrpc_id = envelope.get("id") method = envelope.get("method", "") params = envelope.get("params") or {} # Notification: no id → HTTP 202, no body if "id" not in envelope: request.response.status_code = 202 return None auth_tags = request.auth_tags try: result = self._dispatch(method, params, auth_tags) except _McpError as exc: return self._jsonrpc_error(exc.code, exc.message, jsonrpc_id) except Exception as exc: return self._jsonrpc_error( _JSONRPC_INTERNAL_ERROR, str(exc), jsonrpc_id, ) return {"jsonrpc": "2.0", "id": jsonrpc_id, "result": result}
def _jsonrpc_error(self, code: int, message: str, jsonrpc_id: Any) -> dict: """Build a JSON-RPC 2.0 error response dict. Args: code: JSON-RPC error code (e.g. -32600, -32601). message: Human-readable error message. jsonrpc_id: Request id to echo back (may be None). Returns: JSON-RPC 2.0 error envelope dict. """ return { "jsonrpc": "2.0", "id": jsonrpc_id, "error": {"code": code, "message": message}, } def _dispatch(self, method: str, params: dict, auth_tags: list) -> Any: """Route JSON-RPC method to the appropriate handler. Args: method: JSON-RPC method name (e.g. "initialize", "tools/list"). params: JSON-RPC params dict. auth_tags: Authenticated user tags for filtering. Returns: Handler result dict. Raises: _McpError: If method is not recognized. """ if method == "initialize": return self._handle_initialize() if method == "tools/list": return self._handle_tools_list() if method == "tools/call": return self._handle_tools_call(params, auth_tags) raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Method not found: {method}") def _handle_initialize(self) -> dict: """Return MCP server capabilities.""" return { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": { "name": self.mcp_name, "version": self.mcp_version, }, } def _handle_tools_list(self) -> dict: """Enumerate all tools from the external router.""" if self._external_router is None: return {"tools": []} nodes = self._external_router.nodes(forbidden=True, channel_channel="mcp") tools: list[dict] = [] self._collect_tools(nodes, "", tools) return {"tools": tools} def _collect_tools(self, nodes: dict, prefix: str, tools: list) -> None: """Recursively collect tools from router nodes.""" for name, info in nodes.get("entries", {}).items(): tool_name = f"{prefix}{name}" if not prefix else f"{prefix}_{name}" tool = self._build_tool_descriptor(tool_name, info) tools.append(tool) for router_name, sub_nodes in nodes.get("routers", {}).items(): sub_prefix = f"{prefix}{router_name}" if not prefix else f"{prefix}_{router_name}" self._collect_tools(sub_nodes, sub_prefix, tools) def _build_tool_descriptor(self, tool_name: str, info: dict) -> dict: """Build MCP tool descriptor with JSON Schema for parameters.""" callable_fn = info.get("callable") metadata = info.get("metadata") or {} description = metadata.get("description") or info.get("doc") or tool_name properties: dict = {} required: list = [] if callable_fn is not None: try: sig = inspect.signature(callable_fn) resolved_hints = self._resolve_type_hints(callable_fn) for param_name, param in sig.parameters.items(): if param_name == "self": continue ann = resolved_hints.get(param_name, param.annotation) json_type = self._annotation_to_json_type(ann) properties[param_name] = {"type": json_type} if param.default is inspect.Parameter.empty: required.append(param_name) except (ValueError, TypeError): pass schema: dict = {"type": "object", "properties": properties} if required: schema["required"] = required return { "name": tool_name, "description": description, "inputSchema": schema, } def _handle_tools_call(self, params: dict, auth_tags: list) -> dict: """Dispatch tools/call to the router node.""" if self._external_router is None: raise _McpError(_JSONRPC_INTERNAL_ERROR, "No router configured") name = params.get("name", "") arguments = params.get("arguments") or {} # Convert tool name to route path: "code_search_symbols" → "code/search_symbols" path = name.replace("_", "/", 1) if "_" in name else name node = self._external_router.node( path, auth_tags=",".join(auth_tags) if isinstance(auth_tags, list) else auth_tags, channel_channel="mcp", ) if node.error == "not_authorized": raise _McpError(-32000, "Not authorized") if node.error == "not_found": raise _McpError(_JSONRPC_METHOD_NOT_FOUND, f"Tool not found: {name}") if node.error: raise _McpError(_JSONRPC_INTERNAL_ERROR, f"Router error: {node.error}") result = smartasync(node)(**arguments) serialized = self._serialize_result(result) return {"content": [{"type": "text", "text": serialized}]} def _serialize_result(self, result: Any) -> str: """Serialize result to string for MCP content.""" if isinstance(result, str): return result try: return json.dumps(result, ensure_ascii=False, default=str) except (TypeError, ValueError): return str(result) def _annotation_to_json_type(self, annotation: Any) -> str: """Map Python type annotation to JSON Schema type string.""" origin = getattr(annotation, "__origin__", None) if origin is Union: args = [a for a in annotation.__args__ if a is not type(None)] if args: annotation = args[0] type_mapping = {int: "integer", float: "number", bool: "boolean", str: "string"} str_mapping = {"int": "integer", "float": "number", "bool": "boolean", "str": "string"} if isinstance(annotation, str): return str_mapping.get(annotation, "string") return type_mapping.get(annotation, "string") def _resolve_type_hints(self, callable_fn: Any) -> dict: """Resolve string annotations to actual types via typing.get_type_hints.""" try: hints = typing.get_type_hints(callable_fn) hints.pop("return", None) return hints except Exception: return {}
class _McpError(Exception): """Internal exception for JSON-RPC error responses.""" def __init__(self, code: int, message: str) -> None: super().__init__(message) self.code = code self.message = message if __name__ == "__main__": app = McpApplication() print(f"McpApplication: {app.mcp_name} v{app.mcp_version}")