# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0
"""ASGI Server - main entry point for genro-asgi applications.
Classes:
AsgiServer -- central coordinator: loads config (YAML), mounts apps,
builds middleware chain, handles lifespan, routes via
genro-routes Router.
Inherits BasicAuthMixin and RoutingClass. Run with ``AsgiServer(server_dir).run()``.
WebSocket connections are handled by WsxHandler (WSX protocol over WebSocket).
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any
from .auth_mixin import BasicAuthMixin
from .dispatcher import Dispatcher
from .handler_executor import HandlerExecutor
from .server_config import ServerConfig
from ..lifespan import ServerLifespan
from ..session import MemorySessionStore, Session
from ..loader import AppLoader
from ..middleware import middleware_chain
from ..resources import ResourceLoader
from ..response import Response
from ..request import RequestRegistry, BaseRequest
from .server_app.server_app import ServerApplication
from ..storage import LocalStorage
from ..types import Receive, Scope, Send
from ..applications import AsgiApplication, OpenApiApplication
from ..wsx.handler import WsxHandler
from genro_routes import RoutingClass, Router # type: ignore[import-untyped]
__all__ = ["AsgiServer"]
[docs]
class AsgiServer(BasicAuthMixin, RoutingClass):
"""
Base ASGI server with routing via genro_routes.
Attributes:
config: ServerConfig for configuration.
router: genro_routes Router for dispatch.
dispatcher: Dispatcher handling request routing.
lifespan: ServerLifespan for startup/shutdown.
server_application: ServerApplication with system endpoints.
sys_apps: Dict for system apps (mounted at /_sys/).
apps: Dict for user apps.
logger: Server logger instance.
request_registry: RequestRegistry for tracking requests.
request: Current request (from ContextVar).
response: Current response builder.
"""
__slots__ = (
"_auth_config",
"wsgi_app",
"config",
"session_store",
"app_middleware_chains",
"base_dir",
"router",
"_sys_router",
"storage",
"resource_loader",
"logger",
"lifespan",
"request_registry",
"dispatcher",
"wsx_handler",
"openapi_info",
"app_loader",
"server_application",
"sys_apps",
"apps",
"parent",
"db_registry",
)
[docs]
def __init__(
self,
server_dir: str | Path | None = None,
host: str | None = None,
port: int | None = None,
reload: bool | None = None,
argv: list[str] | None = None,
parent: Any = None,
) -> None:
"""Initialize AsgiServer.
Composes BasicAuthMixin (auth) and RoutingClass (routing).
Loads config, builds middleware chain, mounts apps.
Args:
server_dir: Path to server directory with config.yaml.
host: Override host from config.
port: Override port from config.
reload: Override reload from config.
argv: CLI arguments for config override.
parent: Optional parent object with shared resources and logic.
"""
self.parent = parent
self.wsgi_app: Any = None
self.db_registry: dict[str, Any] = {}
self.config = ServerConfig(server_dir, host, port, reload, argv)
auth_config = self.config["auth_middleware"]
if auth_config is not None:
if hasattr(auth_config, "as_dict"):
auth_config = auth_config.as_dict()
BasicAuthMixin.__init__(self, **auth_config)
else:
BasicAuthMixin.__init__(self)
self.session_store = MemorySessionStore()
self.base_dir: Path = self.config.server["server_dir"]
self.router = Router(self, name="root")
self.storage = LocalStorage(self.base_dir)
self.resource_loader = ResourceLoader(self)
for name, opts in self.config.get_plugin_specs().items():
self.router.plug(name, **opts)
self.logger = logging.getLogger("genro_asgi")
self.lifespan = ServerLifespan(self)
self.request_registry = RequestRegistry()
self.dispatcher = middleware_chain(
self.config.middleware, Dispatcher(self), full_config=self.config._opts
)
self.wsx_handler = WsxHandler(self)
# OpenAPI info from config (plain dict via property)
self.openapi_info: dict[str, Any] = self.config.openapi
# Server application - system endpoints (/_server/)
self.server_application = ServerApplication(server=self)
self.attach_instance(self.server_application, name="_server")
# AppLoader for isolated module loading (avoids sys.path pollution)
self.app_loader = AppLoader() # default prefix: "genro_root"
# System apps (/_sys/) and user apps - loaded with same logic
self.sys_apps: dict[str, RoutingClass] = {}
self.apps: dict[str, RoutingClass] = {}
# Create _sys router as child of root router for system apps
self._sys_router = Router(self, name="_sys", parent_router=self.router)
self._load_apps(self.config.get_sys_app_specs_raw(), self.sys_apps, self._sys_router)
# Load user apps with convention support
app_specs = dict(self.config.get_app_specs_raw()) # mutable copy
convention_app = self._detect_convention_app(app_specs)
self._load_apps(app_specs, self.apps)
# Build per-app middleware chains
self.app_middleware_chains: dict[str, Any] = {}
handler_executor = HandlerExecutor(self)
for name, (_, _, mw_config, _) in app_specs.items():
if mw_config:
chain = middleware_chain(mw_config, handler_executor, full_config=self.config._opts)
self.app_middleware_chains[name] = chain
# Set main_app: explicit config > convention app > single app
main_app = self.config["main_app"]
if not main_app and convention_app:
main_app = convention_app
if main_app:
self.config._opts["main_app"] = main_app
# Default entry points to server_application.index
self.router.default_entry = "_server/index"
# Apply persisted plugin configuration from plugin_config.json
self.apply_persisted_config()
[docs]
def create_session(self, auth: dict[str, Any] | None = None) -> Session:
"""Create a new session in the session store.
Args:
auth: Auth dict snapshot to store in session.
Returns:
New Session instance with unique token.
"""
return self.session_store.create(auth=auth)
[docs]
def mount(self, name: str, app: Any) -> None:
"""Mount an application on the server.
Args:
name: Mount name (becomes the URL prefix).
app: Application instance (AsgiApplication or RoutingClass).
"""
app.mount_name = name
self.apps[name] = app
self.attach_instance(app, name=name)
[docs]
def register_db(self, name: str, db: Any) -> None:
"""Register a database connection in the registry.
Args:
name: Logical name for this database (e.g. "sourcerer", "default").
db: Database connection or pool object.
"""
self.db_registry[name] = db
[docs]
def get_db(self, name: str) -> Any:
"""Return a registered database by name.
Args:
name: Logical name used in register_db().
Raises:
KeyError: If no database registered with that name.
"""
return self.db_registry[name]
@property
def db(self) -> Any:
"""Default database (shortcut for get_db("default"))."""
return self.db_registry["default"]
# -- Plugin config persistence --
@property
def plugin_config_file(self) -> Path:
"""Path to plugin_config.json in server_dir."""
return self.base_dir / "plugin_config.json"
[docs]
def get_plugin_config(self, router_path: str = "", entry: str = "") -> dict[str, Any]:
"""Read live plugin configuration for a router/entry.
Uses routing.configure("?") for introspection via genro-routes
public API.
Args:
router_path: Slash-separated path to the router (empty = all).
entry: Specific entry/handler name to filter (empty = all).
Returns:
Dict with router_path, plugin_info. Error dict if router not found.
"""
try:
description = self.routing.configure("?")
except (AttributeError, KeyError) as exc:
return {"error": str(exc)}
if router_path:
parts = router_path.split("/")
current = description
for part in parts:
routers = current.get("routers", {}) if isinstance(current, dict) else {}
if part not in routers:
return {"error": f"Router not found: {router_path}"}
current = routers[part]
description = {router_path.split("/")[-1]: current}
return {"router_path": router_path, "entry": entry, "plugin_info": description}
[docs]
def set_plugin_config(
self,
router_path: str = "",
plugin_name: str = "",
target: str = "_all_",
**config_values: Any,
) -> dict[str, Any]:
"""Apply a plugin config change at runtime without persisting.
Delegates to routing.configure() via genro-routes public API.
Args:
router_path: Slash-separated path to the router (empty = root).
plugin_name: Name of the plugin to configure.
target: Target slot ("_all_" or specific entry name).
**config_values: Plugin configuration key-value pairs.
Returns:
Dict with "ok": True on success, or "error" key on failure.
"""
if not plugin_name:
return {"error": "plugin_name is required"}
router_spec = router_path.replace("/", "/") if router_path else "root"
selector = f"/{target}" if target != "_all_" else ""
config_target = f"{router_spec}:{plugin_name}{selector}"
try:
self.routing.configure(config_target, **config_values)
except (AttributeError, KeyError, ValueError) as exc:
return {"error": str(exc)}
return {"ok": True}
[docs]
def save_plugin_config(self) -> dict[str, Any]:
"""Persist current runtime plugin config to plugin_config.json.
Uses routing.configure("?") to snapshot the full tree via
genro-routes public API.
Returns:
Dict with "ok": True and "file" path on success.
"""
snapshot = self.routing.configure("?")
self.plugin_config_file.write_text(json.dumps(snapshot, indent=2))
return {"ok": True, "file": str(self.plugin_config_file)}
[docs]
def revert_plugin_config(self) -> dict[str, Any]:
"""Revert runtime config to what is saved in plugin_config.json.
Returns:
Dict with "ok": True. Includes "message" if no saved config exists.
"""
data = self._load_persisted_config()
if not data:
return {"ok": True, "message": "No saved config, nothing to revert"}
self._apply_config_data(data)
return {"ok": True}
[docs]
def apply_persisted_config(self) -> None:
"""Apply plugin_config.json to the current router tree.
Called at boot after plugins are attached.
"""
data = self._load_persisted_config()
if data:
self._apply_config_data(data)
def _load_persisted_config(self) -> dict[str, Any]:
"""Load plugin_config.json if it exists."""
if self.plugin_config_file.exists():
result: dict[str, Any] = json.loads(self.plugin_config_file.read_text())
return result
return {}
def _apply_config_data(self, data: dict[str, Any]) -> None:
"""Apply config data dict to the live router tree.
Delegates to routing.configure() for each entry.
"""
for router_name, router_info in data.items():
for plugin_info in router_info.get("plugins", []):
plugin_name = plugin_info.get("name", "")
config = plugin_info.get("config", {})
if not config:
continue
config_target = f"{router_name}:{plugin_name}"
try:
self.routing.configure(config_target, **config)
except (AttributeError, KeyError, ValueError) as exc:
self.logger.warning(
"plugin_config.json: cannot apply %s: %s", config_target, exc
)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Handle ASGI request.
Dispatches via genro_routes Router.
Handles lifespan events via self.lifespan.
Args:
scope: ASGI scope dict.
receive: ASGI receive callable.
send: ASGI send callable.
"""
if scope["type"] == "lifespan":
await self.lifespan(scope, receive, send)
elif scope["type"] == "websocket":
await self.wsx_handler(scope, receive, send)
else:
await self.dispatcher(scope, receive, send)
[docs]
def run(self) -> None:
"""Run the server using Uvicorn."""
import os
import uvicorn
host = self.config.server["host"]
port = self.config.server["port"]
reload = self.config.server["reload"] or False
self.logger.info(f"Starting server on {host}:{port}")
if reload:
# Uvicorn requires import string for reload mode
# Pass server_dir via env var for the factory
os.environ["GENRO_ASGI_SERVER_DIR"] = str(self.base_dir)
uvicorn.run(
"genro_asgi:AsgiServer",
host=host,
port=port,
reload=True,
reload_dirs=[str(self.base_dir)],
factory=True,
)
else:
uvicorn.run(self, host=host, port=port)
def __repr__(self) -> str:
"""Return string representation."""
if self.router is not None:
return f"AsgiServer(router={self.router.name!r})"
apps_str = ", ".join(f"{path!r}" for path in self.apps)
return f"AsgiServer(apps=[{apps_str}])"
def _detect_convention_app(
self,
app_specs: dict[str, tuple[str, str, dict[str, Any] | None, dict[str, Any]]],
) -> str | None:
"""Detect convention-based app in server_dir.
Looks for {server_dir.name}_app.py with class Application.
If found and not already in app_specs, adds it.
Args:
app_specs: Mutable dict of app specs to potentially update.
Returns:
Name of convention app if found and added, None otherwise.
"""
server_dir = self.config.server_dir
app_name = server_dir.name
convention_module = f"{app_name}_app"
convention_file = server_dir / f"{convention_module}.py"
if not convention_file.exists():
return None
# Check if class Application exists in the module
try:
self.app_loader.load_package(app_name, server_dir)
module = self.app_loader.get_module(app_name, convention_module)
if module is None or not hasattr(module, "Application"):
return None
except Exception:
return None
# Add to specs if not already present
if app_name not in app_specs:
app_specs[app_name] = (convention_module, "Application", None, {"base_dir": server_dir})
return app_name
def _maybe_wrap(self, cls: type, kwargs: dict[str, Any]) -> tuple[type, dict[str, Any]]:
"""If cls is a plain RoutingClass, wrap it in OpenApiApplication.
AsgiApplication subclasses and non-RoutingClass types pass through
unchanged.
Args:
cls: The class loaded from the app module spec.
kwargs: Instantiation kwargs (may include ``docs``, ``base_dir``).
Returns:
``(effective_cls, effective_kwargs)`` ready for instantiation.
"""
if issubclass(cls, AsgiApplication):
return cls, kwargs
if not issubclass(cls, RoutingClass):
return cls, kwargs
# Plain RoutingClass: extract wrapper kwargs, pass rest to inner class
docs = kwargs.pop("docs", "off")
base_dir = kwargs.pop("base_dir", None)
db_name = kwargs.pop("db_name", None)
routing_instance = cls(**kwargs)
wrapper_kwargs: dict[str, Any] = {
"routing_class": routing_instance,
"docs": docs,
}
if base_dir is not None:
wrapper_kwargs["base_dir"] = base_dir
if db_name is not None:
wrapper_kwargs["db_name"] = db_name
return OpenApiApplication, wrapper_kwargs
def _load_apps(
self,
specs: dict[str, tuple[str, str, dict[str, Any] | None, dict[str, Any]]],
target: dict[str, RoutingClass],
target_router: Router | None = None,
) -> None:
"""Load apps from specs into target dict and mount on router.
Supports two module formats:
1. Local modules (relative to server_dir): "shop_app:Application", "myapp.core:MyApp"
2. Installed packages (absolute): "genro_asgi.sys_applications.swagger.swagger_app:Application"
Args:
specs: Dict of {name: (module_name, class_name, mw_config, kwargs)}.
target: Dict to store loaded app instances (apps or sys_apps).
target_router: Router to mount apps on. If None, uses self.router.
"""
import importlib
server_dir_path = self.config.server_dir
for name, (module_name, class_name, _mw_config, kwargs) in specs.items():
# Check if already loaded by convention detection
app_module = self.app_loader.get_module(name, module_name)
if app_module is None:
# Not pre-loaded, try to load it
module_path = module_name.replace(".", "/")
module_as_file = server_dir_path / f"{module_path}.py"
module_as_dir = server_dir_path / module_path
if module_as_file.exists() or module_as_dir.exists():
# Local module: use AppLoader
app_dir = server_dir_path if module_as_file.exists() else module_as_dir
self.app_loader.load_package(name, app_dir)
app_module = self.app_loader.get_module(name, module_name)
if app_module is None:
raise ImportError(f"Cannot load module '{module_name}' for app '{name}'")
if "base_dir" not in kwargs:
kwargs["base_dir"] = app_dir
else:
# Installed package: use importlib
app_module = importlib.import_module(module_name)
module_file = getattr(app_module, "__file__", None)
if module_file and "base_dir" not in kwargs:
kwargs["base_dir"] = Path(module_file).parent
cls = getattr(app_module, class_name)
cls, kwargs = self._maybe_wrap(cls, kwargs)
instance = cls(**kwargs)
instance.mount_name = name
target[name] = instance
if target_router is None:
self.attach_instance(instance, name=name)
else:
target_router.include(instance.default_router, name=name)
@property
def request(self) -> BaseRequest | None:
"""Current request from registry."""
return self.request_registry.current
@property
def response(self) -> Response | None:
"""Current response from request."""
req = self.request
return req.response if req else None
if __name__ == "__main__":
server = AsgiServer()
server.run()