Source code for genro_asgi.request

# Copyright 2025 Softwell S.r.l.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Transport-agnostic request system.

Classes:
    BaseRequest (ABC) — transport-agnostic interface
        ├── HttpRequest    — ASGI HTTP (stream-based, body via receive)
        └── MsgRequest     — WSX messages (atomic, WebSocket/NATS)
    RequestRegistry — factory and tracking for active requests

Every request has an associated Response (``request.response``).
The handler returns a value; the Dispatcher calls
``response.set_result()`` and sends the ASGI response.

Request/Response lifecycle::

    RequestRegistry.create(scope, receive, send)
        → factory()        # sync: allocate slots
        → request.init()   # async: read body, parse data
        → register in registry
    set_current_request(request)
    router.node(path) → handler
    result = handler(**request.query)
    response.set_result(result)
    response(scope, receive, send)
    set_current_request(None)
    registry.unregister()

Notification pattern (fire-and-forget over HTTP):
    Some protocols (e.g., JSON-RPC 2.0) define messages that expect
    no response payload.  The handler sets the status code and returns
    None; the Dispatcher sends an empty HTTP response::

        request = get_current_request()
        request.response.status_code = 202
        return None  # Dispatcher sends 202 with empty body
"""

from __future__ import annotations

import json as stdlib_json
import time
import uuid
from abc import ABC, abstractmethod
from collections.abc import Iterator
from contextvars import ContextVar
from typing import TYPE_CHECKING, Any

from .datastructures import (
    Address,
    Headers,
    QueryParams,
    State,
    URL,
    headers_from_scope,
    query_params_from_scope,
)
from .types import Receive, Scope, Send

if TYPE_CHECKING:
    from .websocket import WebSocket

__all__ = [
    "BaseRequest",
    "HttpRequest",
    "MsgRequest",
    "RequestRegistry",
    "REQUEST_FACTORIES",
    "get_current_request",
    "set_current_request",
]

# ContextVar for current request - allows any code to access the current request
_current_request: ContextVar["BaseRequest | None"] = ContextVar("current_request", default=None)


[docs] def get_current_request() -> "BaseRequest | None": """Get the current request from context. Returns None if not in request context.""" return _current_request.get()
[docs] def set_current_request(request: "BaseRequest | None") -> Any: """Set the current request in context. Returns token for reset.""" return _current_request.set(request)
[docs] class BaseRequest(ABC): """Abstract base for transport-agnostic requests. Subclasses: HttpRequest (ASGI HTTP), MsgRequest (WSX messages). Handlers see only BaseRequest and work identically across transports. Every instance carries an associated ``response`` (Response object). The handler controls the response via ``request.response``: status code, headers, and — indirectly — body (the Dispatcher calls ``response.set_result()`` with whatever the handler returns). For notification/fire-and-forget over HTTP the handler sets ``request.response.status_code = 202`` and returns ``None``. Abstract properties (subclasses must implement): id, method, path, headers, cookies, query, data, transport Concrete attributes: response: Associated Response object (created in __init__) auth_tags: Security tags injected by AuthMiddleware via scope env_capabilities: Environment flags injected via scope external_id: Client-provided correlation ID (optional) tytx_mode: True when request uses TYTX serialization tytx_transport: TYTX transport type ('json', 'msgpack') or None app_name: App handling this request (set after routing, for metrics) created_at: Timestamp (epoch) when request was created age: Seconds since creation (computed) """ __slots__ = ( "_app_name", "_auth_tags", "_cleanups", "_onerr_callbacks", "_created_at", "_env_capabilities", "_external_id", "_tytx_mode", "_tytx_transport", "response", ) def __init__(self) -> None: from .response import Response self._app_name: str | None = None self._auth_tags: list[str] = [] self._cleanups: list[Any] = [] self._onerr_callbacks: list[Any] = [] self._created_at: float = time.time() self._env_capabilities: list[str] = [] self._external_id: str | None = None self._tytx_mode: bool = False self._tytx_transport: str | None = None self.response: Response = Response(request=self) @property @abstractmethod def id(self) -> str: """Correlation ID for request/response matching.""" @property @abstractmethod def method(self) -> str: """HTTP method: GET, POST, PUT, DELETE, PATCH.""" @property @abstractmethod def path(self) -> str: """Request path (e.g., '/users/42').""" @property @abstractmethod def headers(self) -> dict[str, str]: """Request headers (lowercase keys).""" @property @abstractmethod def cookies(self) -> dict[str, str]: """Request cookies.""" @property @abstractmethod def query(self) -> dict[str, Any]: """Query parameters.""" @property @abstractmethod def data(self) -> Any: """Request body/payload.""" @property @abstractmethod def transport(self) -> str: """Transport type: 'http', 'websocket', 'nats'.""" @property def auth_tags(self) -> list[str]: """Auth tags (set from scope during init by AuthMiddleware).""" return self._auth_tags @property def env_capabilities(self) -> list[str]: """Environment capabilities (set from scope during init).""" return self._env_capabilities @property def ctx(self) -> Any: """Execution context (DictObj set by Dispatcher in scope["ctx"]).""" return self._scope.get("ctx") # type: ignore[attr-defined]
[docs] def add_cleanup(self, callback: Any) -> None: """Register a callback to run at end of request (always, even on error). Args: callback: Callable to invoke. Called in reverse order of registration. """ self._cleanups.append(callback)
[docs] def add_onerr(self, callback: Any) -> None: """Register a callback to run only when the request ends with an error. Args: callback: Callable to invoke with the exception. Called in reverse order. """ self._onerr_callbacks.append(callback)
[docs] def run_cleanups(self, error: BaseException | None = None) -> None: """Run registered callbacks. Called by Dispatcher in finally block. Args: error: The exception if request failed, None on success. """ if error is not None: for cb in reversed(self._onerr_callbacks): try: cb(error) except Exception: pass for cb in reversed(self._cleanups): try: cb() except Exception: pass
@property def external_id(self) -> str | None: """Client-provided ID for correlation (e.g., WSX message id).""" return self._external_id @external_id.setter def external_id(self, value: str | None) -> None: self._external_id = value @property def tytx_mode(self) -> bool: """True if request uses TYTX serialization.""" return self._tytx_mode @tytx_mode.setter def tytx_mode(self, value: bool) -> None: self._tytx_mode = value @property def tytx_transport(self) -> str | None: """TYTX transport type ('json', 'msgpack') or None.""" return self._tytx_transport @tytx_transport.setter def tytx_transport(self, value: str | None) -> None: self._tytx_transport = value @property def app_name(self) -> str | None: """Name of the app handling this request (set after routing).""" return self._app_name @app_name.setter def app_name(self, value: str | None) -> None: self._app_name = value @property def created_at(self) -> float: """Timestamp when request was created.""" return self._created_at @property def age(self) -> float: """Seconds since request was created.""" return time.time() - self._created_at
[docs] @abstractmethod async def init( self, scope: Scope, receive: Receive, send: Send | None = None, **kwargs: Any, ) -> None: """Async initialization — parse transport data from ASGI scope. Subclasses must override to read body/message, populate internal state (headers, cookies, query, data), and extract auth context. Args: scope: ASGI scope dict. receive: ASGI receive callable. send: ASGI send callable (optional, not used by all transports). **kwargs: Transport-specific options (e.g. ``server``, ``websocket``). """
def __repr__(self) -> str: return ( f"<{self.__class__.__name__} " f"id={self.id!r} method={self.method} path={self.path!r} " f"transport={self.transport}>" )
[docs] class HttpRequest(BaseRequest): """HTTP request adapter — wraps ASGI scope, parses body via asgi_data. Supports both TYTX-encoded requests (type hydration for Decimal, date, datetime, time) and standard HTTP requests with plain JSON bodies. TYTX mode is detected from the ``X-TYTX-Transport`` header. When present, Response uses the same transport to serialize the reply. Body parsing is handled by ``genro_tytx.asgi_data()`` in both cases (after genro-tytx 0.8.0, plain JSON is parsed correctly without TYTX markers). Parsing flow (in ``init()``): 1. Headers decoded from ASGI scope (latin-1) 2. TYTX mode detected from X-TYTX-Transport header 3. ``asgi_data(scope, receive)`` parses headers, query, cookies, body 4. Request ID extracted from x-request-id header or generated (UUID) 5. Auth context read from scope (injected by AuthMiddleware) Known limitation: Raw body bytes are not preserved after parsing. ``asgi_data`` consumes the ASGI receive callable internally; ``self.body`` returns ``b""``. Extra properties (beyond BaseRequest): scope, body, scheme, url, headers_obj, query_params, client, state, content_type """ __slots__ = ( "_scope", "_server", "_db", "_body", "_headers", "_headers_obj", "_cookies", "_query", "_query_obj", "_data", "_id", "_url", "_state", ) def __init__(self) -> None: super().__init__() # Slots initialized to None, populated by init() self._scope: Scope = {} self._server: Any = None self._db: Any = None self._body: bytes = b"" self._headers: dict[str, str] = {} self._cookies: dict[str, str] = {} self._query: dict[str, Any] = {} self._data: Any = None self._id: str = "" self._url: URL | None = None self._state: State | None = None self._headers_obj: Headers | None = None self._query_obj: QueryParams | None = None
[docs] async def init( self, scope: Scope, receive: Receive, send: Send | None = None, **kwargs: Any, ) -> None: """Async init — parse headers, body, query, cookies via asgi_data. Delegates to ``genro_tytx.asgi_data(scope, receive)`` which handles TYTX-encoded and plain JSON bodies transparently (since genro-tytx 0.8.0). After this method: - ``self._data``: parsed body (dict for JSON, None if empty/unparseable) - ``self._headers``, ``self._cookies``, ``self._query``: populated - ``self._tytx_mode``: True if X-TYTX-Transport header was present - ``self._auth_tags``, ``self._env_capabilities``: from scope - ``self._body``: b"" (raw bytes not preserved — see class docstring) """ from genro_tytx import asgi_data self._scope = scope self._server = kwargs.pop("server", None) # Parse headers first (needed for TYTX detection) self._headers = {} for name, value in scope.get("headers", []): self._headers[name.decode("latin-1").lower()] = value.decode("latin-1") # Check for TYTX mode via X-TYTX-Transport header tytx_transport = self._headers.get("x-tytx-transport") if tytx_transport: self._tytx_mode = True self._tytx_transport = tytx_transport.lower() # Use asgi_data for parsing (handles both TYTX and normal requests) data = await asgi_data(dict(scope), receive) self._body = b"" self._headers = data.get("headers", self._headers) self._cookies = data.get("cookies", {}) self._query = data.get("query", {}) self._data = data.get("body") # Generate or extract request ID self._id = self._headers.get("x-request-id", str(uuid.uuid4())) self._external_id = self._headers.get("x-external-id") # Set auth_tags and env_capabilities from _filters (set by middleware) filters = scope.get("_filters", {}) self._auth_tags = list(filters.get("auth_tags", [])) self._env_capabilities = list(filters.get("env_capabilities", []))
@property def id(self) -> str: return self._id @property def method(self) -> str: return str(self._scope.get("method", "GET")).upper() @property def path(self) -> str: return str(self._scope.get("path", "/")) @property def headers(self) -> dict[str, str]: return self._headers @property def cookies(self) -> dict[str, str]: return self._cookies @property def query(self) -> dict[str, Any]: return self._query @property def data(self) -> Any: return self._data @property def transport(self) -> str: return "http" @property def server(self) -> Any: """Parent AsgiServer instance.""" return self._server @property def db(self) -> Any: """Database connection, lazy-loaded from ``scope["ctx"]._db``. On first access, reads ``ctx._db`` (set by Dispatcher) and registers ``db.closeConnection`` as a request cleanup callback. Returns: The database connection, or None if ctx is absent or has no ``_db``. """ if self._db is not None: return self._db ctx = self.ctx if ctx is None or "_db" not in ctx: return None self._db = ctx._db self.add_cleanup(self._db.closeConnection) return self._db @property def session(self) -> Any: """Session object (set by SessionMiddleware via scope).""" return self._scope.get("session") @property def scope(self) -> Scope: """Raw ASGI scope dict.""" return self._scope @property def body(self) -> bytes: """Raw body bytes.""" return self._body @property def scheme(self) -> str: """URL scheme: http or https.""" return str(self._scope.get("scheme", "http")) @property def url(self) -> URL: """Full request URL.""" if self._url is None: scheme = self.scheme server = self._scope.get("server") path = self._scope.get("root_path", "") + self.path query_string = self._scope.get("query_string", b"") if server: host, port = server if (scheme == "http" and port == 80) or (scheme == "https" and port == 443): netloc = host else: netloc = f"{host}:{port}" else: netloc = self._headers.get("host", "localhost") url_str = f"{scheme}://{netloc}{path}" if query_string: url_str += f"?{query_string.decode('latin-1')}" self._url = URL(url_str) return self._url @property def headers_obj(self) -> Headers: """Request headers as Headers object (case-insensitive).""" if self._headers_obj is None: self._headers_obj = headers_from_scope(self._scope) return self._headers_obj @property def query_params(self) -> QueryParams: """Query string parameters as QueryParams object.""" if self._query_obj is None: self._query_obj = query_params_from_scope(self._scope) return self._query_obj @property def client(self) -> Address | None: """Client address (host, port) if available.""" client = self._scope.get("client") if client: return Address(host=client[0], port=client[1]) return None @property def state(self) -> State: """Request-scoped state container.""" if self._state is None: self._state = State() return self._state @property def content_type(self) -> str | None: """Content-Type header value.""" return self._headers.get("content-type")
[docs] class MsgRequest(BaseRequest): """ Message-based request adapter (WSX over WebSocket, NATS, etc.). Parses WSX:// formatted messages into BaseRequest interface. Transport-agnostic: works with any message-based protocol. """ __slots__ = ( "_scope", "_send", "_id", "_method", "_path", "_headers", "_cookies", "_query", "_data", "_transport_type", "_websocket", ) def __init__(self) -> None: super().__init__() # Slots initialized to defaults, populated by init() self._scope: Scope = {} self._send: Send | None = None self._id: str = "" self._method: str = "GET" self._path: str = "/" self._headers: dict[str, str] = {} self._cookies: dict[str, str] = {} self._query: dict[str, Any] = {} self._data: Any = None self._transport_type: str = "websocket" self._websocket: "WebSocket | None" = None
[docs] async def init( self, scope: Scope, receive: Receive, send: Send | None = None, **kwargs: Any, ) -> None: """Async initialization - parses WSX message.""" self._scope = scope self._send = send self._transport_type = kwargs.get("transport_type", "websocket") self._websocket = kwargs.get("websocket") # Get message from kwargs message = kwargs.get("message") if message is None: raise ValueError("MsgRequest requires 'message' kwarg") # Parse WSX message parsed = self._parse_wsx_message(message) # Required fields if "id" not in parsed: raise ValueError("WSX message missing required 'id' field") if "method" not in parsed: raise ValueError("WSX message missing required 'method' field") # The WSX message 'id' is the client's external_id self._external_id = parsed["id"] # Generate internal server id self._id = str(uuid.uuid4()) self._method = parsed["method"].upper() self._path = parsed.get("path", "/") self._headers = parsed.get("headers", {}) self._cookies = parsed.get("cookies", {}) self._query = parsed.get("query", {}) self._data = parsed.get("data") # Detect TYTX mode from message marker or header self._tytx_mode = ( parsed.get("tytx", False) or "tytx" in self._headers.get("content-type", "").lower() ) # Set auth_tags and env_capabilities from _filters (set by middleware) filters = scope.get("_filters", {}) self._auth_tags = list(filters.get("auth_tags", [])) self._env_capabilities = list(filters.get("env_capabilities", []))
def _parse_wsx_message(self, data: str | bytes) -> dict[str, Any]: """Parse WSX protocol message into request dict. Handles both text (JSON) and binary (msgpack) WSX messages. Supports TYTX hydration for type-aware data reconstruction. Args: data: Raw message data, either str (JSON) or bytes (msgpack). Returns: Parsed message dict with keys: id, method, path, headers, cookies, query, data. Note: Format detection: - bytes: Attempts msgpack parsing via genro_tytx - str starting with "WSX://": Protocol prefix stripped - str ending with "::JS": TYTX JSON with type markers - Other str: Standard JSON parsing If genro_tytx is not installed, falls back to stdlib json. """ if isinstance(data, bytes): # Binary data - try msgpack via from_tytx try: from genro_tytx import from_tytx return dict(from_tytx(data, transport="msgpack")) except ImportError: data = data.decode("utf-8") # String data if data.startswith("WSX://"): data = data[6:] # Check for TYTX JSON marker if data.endswith("::JS"): try: from genro_tytx import from_tytx return dict(from_tytx(data)) except ImportError: data = data[:-4] # Strip marker, parse as regular JSON return dict(stdlib_json.loads(data)) @property def id(self) -> str: return self._id @property def method(self) -> str: return self._method @property def path(self) -> str: return self._path @property def headers(self) -> dict[str, str]: return self._headers @property def cookies(self) -> dict[str, str]: return self._cookies @property def query(self) -> dict[str, Any]: return self._query @property def data(self) -> Any: return self._data @property def transport(self) -> str: return self._transport_type @property def scope(self) -> Scope: """Access to raw ASGI scope.""" return self._scope @property def websocket(self) -> "WebSocket | None": """Access to underlying WebSocket connection (if available).""" return self._websocket @property def client(self) -> tuple[str, int] | None: """Client address as raw (host, port) tuple from WebSocket scope. Unlike HttpRequest.client (which wraps in Address), WSX messages return the raw tuple because WebSocket connections are long-lived and the overhead of wrapping each message is unnecessary. """ return self._scope.get("client")
[docs] class RequestRegistry: """ Registry for creating and tracking active requests. Responsibilities: - Creates appropriate request based on scope["type"] using factories dict - Calls async init() on the created request - Tracks active requests for monitoring and metrics - Provides iteration and lookup by request ID Example: registry = RequestRegistry() request = await registry.create(scope, receive, send) print(f"Active: {len(registry)}") registry.unregister() """ __slots__ = ("_requests", "factories", "_ctx_request") def __init__( self, factories: dict[str, type[BaseRequest]] | None = None, ) -> None: self._requests: dict[str, BaseRequest] = {} self.factories = factories if factories is not None else REQUEST_FACTORIES.copy() self._ctx_request: ContextVar[BaseRequest | None] = ContextVar('current_request', default=None) @property def current(self) -> BaseRequest | None: """Current request from ContextVar.""" return self._ctx_request.get()
[docs] async def create( self, scope: Scope, receive: Receive, send: Send | None = None, **kwargs: Any, ) -> BaseRequest: """Create and register a request from ASGI scope.""" scope_type = scope.get("type", "") factory = self.factories.get(scope_type) if factory is None: raise ValueError(f"No factory for scope type: {scope_type!r}") request = factory() await request.init(scope, receive, send, **kwargs) self._requests[request.id] = request self._ctx_request.set(request) return request
[docs] def register_factory(self, scope_type: str, factory: type[BaseRequest]) -> None: """Register a factory for a scope type.""" self.factories[scope_type] = factory
[docs] def unregister(self) -> BaseRequest | None: """Unregister current request.""" request = self._ctx_request.get() if request is not None: self._requests.pop(request.id, None) self._ctx_request.set(None) return request
[docs] def get(self, request_id: str) -> BaseRequest | None: """Get a request by id.""" return self._requests.get(request_id)
[docs] def count_by_app(self, app_name: str) -> int: """Count active requests for a specific app.""" return sum(1 for req in self._requests.values() if req.app_name == app_name)
def __len__(self) -> int: """Return number of active requests.""" return len(self._requests) def __iter__(self) -> Iterator[BaseRequest]: """Iterate over active requests.""" return iter(self._requests.values()) def __contains__(self, request_id: str) -> bool: """Check if a request is registered.""" return request_id in self._requests def __repr__(self) -> str: return f"RequestRegistry(active={len(self._requests)})"
# Default factories for request creation REQUEST_FACTORIES: dict[str, type[BaseRequest]] = { "http": HttpRequest, "websocket": MsgRequest, }