Source code for genro_asgi.server.server

# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0

"""ASGI Server - main entry point for genro-asgi applications.

Classes:
    AsgiServer -- central coordinator: loads config (YAML), mounts apps,
                  builds middleware chain, handles lifespan, routes via
                  genro-routes Router.

Inherits BasicAuthMixin and RoutingClass. Run with ``AsgiServer(server_dir).run()``.

WebSocket connections are handled by WsxHandler (WSX protocol over WebSocket).
"""

from __future__ import annotations

import json
import logging
from pathlib import Path
from typing import Any

from .auth_mixin import BasicAuthMixin
from .dispatcher import Dispatcher
from .handler_executor import HandlerExecutor
from .server_config import ServerConfig
from ..lifespan import ServerLifespan
from ..session import MemorySessionStore, Session
from ..loader import AppLoader
from ..middleware import middleware_chain
from ..resources import ResourceLoader
from ..response import Response
from ..request import RequestRegistry, BaseRequest
from .server_app.server_app import ServerApplication
from ..storage import LocalStorage
from ..types import Receive, Scope, Send
from ..applications import AsgiApplication, OpenApiApplication
from ..wsx.handler import WsxHandler

from genro_routes import RoutingClass, Router  # type: ignore[import-untyped]

__all__ = ["AsgiServer"]


[docs] class AsgiServer(BasicAuthMixin, RoutingClass): """ Base ASGI server with routing via genro_routes. Attributes: config: ServerConfig for configuration. router: genro_routes Router for dispatch. dispatcher: Dispatcher handling request routing. lifespan: ServerLifespan for startup/shutdown. server_application: ServerApplication with system endpoints. sys_apps: Dict for system apps (mounted at /_sys/). apps: Dict for user apps. logger: Server logger instance. request_registry: RequestRegistry for tracking requests. request: Current request (from ContextVar). response: Current response builder. """ __slots__ = ( "_auth_config", "wsgi_app", "config", "session_store", "app_middleware_chains", "base_dir", "router", "_sys_router", "storage", "resource_loader", "logger", "lifespan", "request_registry", "dispatcher", "wsx_handler", "openapi_info", "app_loader", "server_application", "sys_apps", "apps", "parent", "db_registry", )
[docs] def __init__( self, server_dir: str | Path | None = None, host: str | None = None, port: int | None = None, reload: bool | None = None, argv: list[str] | None = None, parent: Any = None, ) -> None: """Initialize AsgiServer. Composes BasicAuthMixin (auth) and RoutingClass (routing). Loads config, builds middleware chain, mounts apps. Args: server_dir: Path to server directory with config.yaml. host: Override host from config. port: Override port from config. reload: Override reload from config. argv: CLI arguments for config override. parent: Optional parent object with shared resources and logic. """ self.parent = parent self.wsgi_app: Any = None self.db_registry: dict[str, Any] = {} self.config = ServerConfig(server_dir, host, port, reload, argv) auth_config = self.config["auth_middleware"] if auth_config is not None: if hasattr(auth_config, "as_dict"): auth_config = auth_config.as_dict() BasicAuthMixin.__init__(self, **auth_config) else: BasicAuthMixin.__init__(self) self.session_store = MemorySessionStore() self.base_dir: Path = self.config.server["server_dir"] self.router = Router(self, name="root") self.storage = LocalStorage(self.base_dir) self.resource_loader = ResourceLoader(self) for name, opts in self.config.get_plugin_specs().items(): self.router.plug(name, **opts) self.logger = logging.getLogger("genro_asgi") self.lifespan = ServerLifespan(self) self.request_registry = RequestRegistry() self.dispatcher = middleware_chain( self.config.middleware, Dispatcher(self), full_config=self.config._opts ) self.wsx_handler = WsxHandler(self) # OpenAPI info from config (plain dict via property) self.openapi_info: dict[str, Any] = self.config.openapi # Server application - system endpoints (/_server/) self.server_application = ServerApplication(server=self) self.attach_instance(self.server_application, name="_server") # AppLoader for isolated module loading (avoids sys.path pollution) self.app_loader = AppLoader() # default prefix: "genro_root" # System apps (/_sys/) and user apps - loaded with same logic self.sys_apps: dict[str, RoutingClass] = {} self.apps: dict[str, RoutingClass] = {} # Create _sys router as child of root router for system apps self._sys_router = Router(self, name="_sys", parent_router=self.router) self._load_apps(self.config.get_sys_app_specs_raw(), self.sys_apps, self._sys_router) # Load user apps with convention support app_specs = dict(self.config.get_app_specs_raw()) # mutable copy convention_app = self._detect_convention_app(app_specs) self._load_apps(app_specs, self.apps) # Build per-app middleware chains self.app_middleware_chains: dict[str, Any] = {} handler_executor = HandlerExecutor(self) for name, (_, _, mw_config, _) in app_specs.items(): if mw_config: chain = middleware_chain(mw_config, handler_executor, full_config=self.config._opts) self.app_middleware_chains[name] = chain # Set main_app: explicit config > convention app > single app main_app = self.config["main_app"] if not main_app and convention_app: main_app = convention_app if main_app: self.config._opts["main_app"] = main_app # Default entry points to server_application.index self.router.default_entry = "_server/index" # Apply persisted plugin configuration from plugin_config.json self.apply_persisted_config()
[docs] def create_session(self, auth: dict[str, Any] | None = None) -> Session: """Create a new session in the session store. Args: auth: Auth dict snapshot to store in session. Returns: New Session instance with unique token. """ return self.session_store.create(auth=auth)
[docs] def mount(self, name: str, app: Any) -> None: """Mount an application on the server. Args: name: Mount name (becomes the URL prefix). app: Application instance (AsgiApplication or RoutingClass). """ app.mount_name = name self.apps[name] = app self.attach_instance(app, name=name)
[docs] def register_db(self, name: str, db: Any) -> None: """Register a database connection in the registry. Args: name: Logical name for this database (e.g. "sourcerer", "default"). db: Database connection or pool object. """ self.db_registry[name] = db
[docs] def get_db(self, name: str) -> Any: """Return a registered database by name. Args: name: Logical name used in register_db(). Raises: KeyError: If no database registered with that name. """ return self.db_registry[name]
@property def db(self) -> Any: """Default database (shortcut for get_db("default")).""" return self.db_registry["default"] # -- Plugin config persistence -- @property def plugin_config_file(self) -> Path: """Path to plugin_config.json in server_dir.""" return self.base_dir / "plugin_config.json"
[docs] def get_plugin_config(self, router_path: str = "", entry: str = "") -> dict[str, Any]: """Read live plugin configuration for a router/entry. Uses routing.configure("?") for introspection via genro-routes public API. Args: router_path: Slash-separated path to the router (empty = all). entry: Specific entry/handler name to filter (empty = all). Returns: Dict with router_path, plugin_info. Error dict if router not found. """ try: description = self.routing.configure("?") except (AttributeError, KeyError) as exc: return {"error": str(exc)} if router_path: parts = router_path.split("/") current = description for part in parts: routers = current.get("routers", {}) if isinstance(current, dict) else {} if part not in routers: return {"error": f"Router not found: {router_path}"} current = routers[part] description = {router_path.split("/")[-1]: current} return {"router_path": router_path, "entry": entry, "plugin_info": description}
[docs] def set_plugin_config( self, router_path: str = "", plugin_name: str = "", target: str = "_all_", **config_values: Any, ) -> dict[str, Any]: """Apply a plugin config change at runtime without persisting. Delegates to routing.configure() via genro-routes public API. Args: router_path: Slash-separated path to the router (empty = root). plugin_name: Name of the plugin to configure. target: Target slot ("_all_" or specific entry name). **config_values: Plugin configuration key-value pairs. Returns: Dict with "ok": True on success, or "error" key on failure. """ if not plugin_name: return {"error": "plugin_name is required"} router_spec = router_path.replace("/", "/") if router_path else "root" selector = f"/{target}" if target != "_all_" else "" config_target = f"{router_spec}:{plugin_name}{selector}" try: self.routing.configure(config_target, **config_values) except (AttributeError, KeyError, ValueError) as exc: return {"error": str(exc)} return {"ok": True}
[docs] def save_plugin_config(self) -> dict[str, Any]: """Persist current runtime plugin config to plugin_config.json. Uses routing.configure("?") to snapshot the full tree via genro-routes public API. Returns: Dict with "ok": True and "file" path on success. """ snapshot = self.routing.configure("?") self.plugin_config_file.write_text(json.dumps(snapshot, indent=2)) return {"ok": True, "file": str(self.plugin_config_file)}
[docs] def revert_plugin_config(self) -> dict[str, Any]: """Revert runtime config to what is saved in plugin_config.json. Returns: Dict with "ok": True. Includes "message" if no saved config exists. """ data = self._load_persisted_config() if not data: return {"ok": True, "message": "No saved config, nothing to revert"} self._apply_config_data(data) return {"ok": True}
[docs] def apply_persisted_config(self) -> None: """Apply plugin_config.json to the current router tree. Called at boot after plugins are attached. """ data = self._load_persisted_config() if data: self._apply_config_data(data)
def _load_persisted_config(self) -> dict[str, Any]: """Load plugin_config.json if it exists.""" if self.plugin_config_file.exists(): result: dict[str, Any] = json.loads(self.plugin_config_file.read_text()) return result return {} def _apply_config_data(self, data: dict[str, Any]) -> None: """Apply config data dict to the live router tree. Delegates to routing.configure() for each entry. """ for router_name, router_info in data.items(): for plugin_info in router_info.get("plugins", []): plugin_name = plugin_info.get("name", "") config = plugin_info.get("config", {}) if not config: continue config_target = f"{router_name}:{plugin_name}" try: self.routing.configure(config_target, **config) except (AttributeError, KeyError, ValueError) as exc: self.logger.warning( "plugin_config.json: cannot apply %s: %s", config_target, exc ) async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: """ Handle ASGI request. Dispatches via genro_routes Router. Handles lifespan events via self.lifespan. Args: scope: ASGI scope dict. receive: ASGI receive callable. send: ASGI send callable. """ if scope["type"] == "lifespan": await self.lifespan(scope, receive, send) elif scope["type"] == "websocket": await self.wsx_handler(scope, receive, send) else: await self.dispatcher(scope, receive, send)
[docs] def run(self) -> None: """Run the server using Uvicorn.""" import os import uvicorn host = self.config.server["host"] port = self.config.server["port"] reload = self.config.server["reload"] or False self.logger.info(f"Starting server on {host}:{port}") if reload: # Uvicorn requires import string for reload mode # Pass server_dir via env var for the factory os.environ["GENRO_ASGI_SERVER_DIR"] = str(self.base_dir) uvicorn.run( "genro_asgi:AsgiServer", host=host, port=port, reload=True, reload_dirs=[str(self.base_dir)], factory=True, ) else: uvicorn.run(self, host=host, port=port)
def __repr__(self) -> str: """Return string representation.""" if self.router is not None: return f"AsgiServer(router={self.router.name!r})" apps_str = ", ".join(f"{path!r}" for path in self.apps) return f"AsgiServer(apps=[{apps_str}])" def _detect_convention_app( self, app_specs: dict[str, tuple[str, str, dict[str, Any] | None, dict[str, Any]]], ) -> str | None: """Detect convention-based app in server_dir. Looks for {server_dir.name}_app.py with class Application. If found and not already in app_specs, adds it. Args: app_specs: Mutable dict of app specs to potentially update. Returns: Name of convention app if found and added, None otherwise. """ server_dir = self.config.server_dir app_name = server_dir.name convention_module = f"{app_name}_app" convention_file = server_dir / f"{convention_module}.py" if not convention_file.exists(): return None # Check if class Application exists in the module try: self.app_loader.load_package(app_name, server_dir) module = self.app_loader.get_module(app_name, convention_module) if module is None or not hasattr(module, "Application"): return None except Exception: return None # Add to specs if not already present if app_name not in app_specs: app_specs[app_name] = (convention_module, "Application", None, {"base_dir": server_dir}) return app_name def _maybe_wrap(self, cls: type, kwargs: dict[str, Any]) -> tuple[type, dict[str, Any]]: """If cls is a plain RoutingClass, wrap it in OpenApiApplication. AsgiApplication subclasses and non-RoutingClass types pass through unchanged. Args: cls: The class loaded from the app module spec. kwargs: Instantiation kwargs (may include ``docs``, ``base_dir``). Returns: ``(effective_cls, effective_kwargs)`` ready for instantiation. """ if issubclass(cls, AsgiApplication): return cls, kwargs if not issubclass(cls, RoutingClass): return cls, kwargs # Plain RoutingClass: extract wrapper kwargs, pass rest to inner class docs = kwargs.pop("docs", "off") base_dir = kwargs.pop("base_dir", None) db_name = kwargs.pop("db_name", None) routing_instance = cls(**kwargs) wrapper_kwargs: dict[str, Any] = { "routing_class": routing_instance, "docs": docs, } if base_dir is not None: wrapper_kwargs["base_dir"] = base_dir if db_name is not None: wrapper_kwargs["db_name"] = db_name return OpenApiApplication, wrapper_kwargs def _load_apps( self, specs: dict[str, tuple[str, str, dict[str, Any] | None, dict[str, Any]]], target: dict[str, RoutingClass], target_router: Router | None = None, ) -> None: """Load apps from specs into target dict and mount on router. Supports two module formats: 1. Local modules (relative to server_dir): "shop_app:Application", "myapp.core:MyApp" 2. Installed packages (absolute): "genro_asgi.sys_applications.swagger.swagger_app:Application" Args: specs: Dict of {name: (module_name, class_name, mw_config, kwargs)}. target: Dict to store loaded app instances (apps or sys_apps). target_router: Router to mount apps on. If None, uses self.router. """ import importlib server_dir_path = self.config.server_dir for name, (module_name, class_name, _mw_config, kwargs) in specs.items(): # Check if already loaded by convention detection app_module = self.app_loader.get_module(name, module_name) if app_module is None: # Not pre-loaded, try to load it module_path = module_name.replace(".", "/") module_as_file = server_dir_path / f"{module_path}.py" module_as_dir = server_dir_path / module_path if module_as_file.exists() or module_as_dir.exists(): # Local module: use AppLoader app_dir = server_dir_path if module_as_file.exists() else module_as_dir self.app_loader.load_package(name, app_dir) app_module = self.app_loader.get_module(name, module_name) if app_module is None: raise ImportError(f"Cannot load module '{module_name}' for app '{name}'") if "base_dir" not in kwargs: kwargs["base_dir"] = app_dir else: # Installed package: use importlib app_module = importlib.import_module(module_name) module_file = getattr(app_module, "__file__", None) if module_file and "base_dir" not in kwargs: kwargs["base_dir"] = Path(module_file).parent cls = getattr(app_module, class_name) cls, kwargs = self._maybe_wrap(cls, kwargs) instance = cls(**kwargs) instance.mount_name = name target[name] = instance if target_router is None: self.attach_instance(instance, name=name) else: target_router.include(instance.default_router, name=name) @property def request(self) -> BaseRequest | None: """Current request from registry.""" return self.request_registry.current @property def response(self) -> Response | None: """Current response from request.""" req = self.request return req.response if req else None
if __name__ == "__main__": server = AsgiServer() server.run()