Source code for genro_asgi.server.dispatcher

# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0

"""Dispatcher - Routes ASGI requests to handlers via genro-routes.

The Dispatcher is the innermost layer of the middleware chain. It:
1. Creates Request from ASGI scope via RequestRegistry
2. Resolves the handler via router.node() with auth and channel filtering
3. Calls the handler with query parameters
4. Sets the result on Response via set_result()
5. Sends the ASGI response

Request flow:
    scope → RequestRegistry.create() → Request
         → router.node(path, auth_tags, channel_channel, env_capabilities, errors)
         → handler(**query)
         → response.set_result(result, metadata)
         → response(scope, receive, send)

Error mapping (ROUTER_ERRORS):
    Router errors are mapped to HTTP exceptions:
    - not_found → HTTPNotFound (404)
    - not_authorized → HTTPForbidden (403)
    - not_authenticated → HTTPUnauthorized (401)
    - not_available → HTTPServiceUnavailable (503)
    - validation_error → HTTPBadRequest (400)
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from genro_routes import is_result_wrapper
from genro_toolbox import DictObj
from genro_toolbox.smartasync import smartasync

from ..exceptions import (
    HTTPBadRequest,
    HTTPForbidden,
    HTTPNotFound,
    HTTPServiceUnavailable,
    HTTPUnauthorized,
)
from ..request import set_current_request

if TYPE_CHECKING:
    from .server import AsgiServer
    from ..types import Receive, Scope, Send

# Error mapping for router.node()
ROUTER_ERRORS: dict[str, type[Exception]] = {
    "not_found": HTTPNotFound,
    "not_authorized": HTTPForbidden,
    "not_authenticated": HTTPUnauthorized,
    "not_available": HTTPServiceUnavailable,
    "validation_error": HTTPBadRequest,
}


[docs] class Dispatcher: """Routes ASGI requests to handlers via genro_routes Router. Attributes: server: Parent AsgiServer instance. """ __slots__ = ("server",)
[docs] def __init__(self, server: AsgiServer) -> None: """Initialize dispatcher. Args: server: Parent AsgiServer instance. """ self.server = server
@property def router(self) -> Any: """Proxy to server.router.""" return self.server.router @property def request_registry(self) -> Any: """Proxy to server.request_registry.""" return self.server.request_registry def _ensure_auth(self, scope: Scope) -> None: """Lazy auth: populate scope["auth"] and scope["_filters"] if not set. Parses raw headers into scope["_headers"] if missing, then calls server.authenticate(scope). Sets scope["auth"], scope["_filters"]["auth_tags"], and scope["_filters"]["channel_channel"] from X-Channel header. Args: scope: ASGI scope dictionary. """ if "auth" not in scope: if "_headers" not in scope: scope["_headers"] = { name.decode("latin-1").lower(): value.decode("latin-1") for name, value in scope.get("headers", []) } auth = self.server.authenticate(scope) scope["auth"] = auth filters = scope.setdefault("_filters", {}) filters["auth_tags"] = auth["tags"] if auth is not None else [] channel = scope["_headers"].get("x-channel", "").strip() if channel: filters["channel_channel"] = channel def _resolve_app_name(self, path: str) -> str: """Extract app name from request path (first segment). Args: path: Request path (e.g., "/api/products/list"). Returns: First path segment (e.g., "api"), or empty string for root. """ stripped = path.strip("/") if "/" in stripped: return stripped.split("/", 1)[0] return stripped async def _execute_handler(self, node: Any, request: Any, scope: Scope, receive: Receive, send: Send) -> None: """Execute handler node and send ASGI response. Used when no per-app middleware chain is configured for the app. Args: node: Resolved router node (callable handler). request: Current request instance. scope: ASGI scope dictionary. receive: ASGI receive callable. send: ASGI send callable. """ result = await smartasync(node)(**dict(request.query)) if is_result_wrapper(result): metadata: dict[str, Any] = {**node.metadata, **result.metadata} request.response.set_result(result.value, metadata) else: request.response.set_result(result, node.metadata) await request.response(scope, receive, send) async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: """ASGI interface — dispatch request to handler via router. Creates request, resolves handler via router.node(), delegates to per-app middleware chain if configured, otherwise executes directly. Args: scope: ASGI scope dictionary. receive: ASGI receive callable. send: ASGI send callable. """ self._ensure_auth(scope) request = await self.request_registry.create(scope, receive, send, server=self.server) set_current_request(request) error: BaseException | None = None try: node = self.router.node( request.path, errors=ROUTER_ERRORS, **scope.get("_filters", {}), ) app_name = self._resolve_app_name(request.path) request.app_name = app_name app = self.server.apps.get(app_name) ctx = DictObj() ctx.server = self.server ctx.app = app ctx.request = request ctx.session = scope.get("session") auth = scope.get("auth") ctx.avatar = auth.get("avatar") if auth else None db_name = getattr(app, "db_name", None) or "default" if db_name in self.server.db_registry: ctx._db = self.server.get_db(db_name) scope["ctx"] = ctx app_chain = self.server.app_middleware_chains.get(app_name) if app_chain is not None: scope["_handler_node"] = node await app_chain(scope, receive, send) else: await self._execute_handler(node, request, scope, receive, send) except BaseException as exc: error = exc raise finally: request.run_cleanups(error=error) set_current_request(None) self.request_registry.unregister()
if __name__ == "__main__": pass