Source code for genro_asgi.executors.registry

# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0

"""Executor registry for managing multiple named executor instances.

Classes:
    ExecutorRegistry -- lazy creation, caching, and coordinated shutdown
                        of named executors.

Supports multiple executor types via factory pattern (default: LocalExecutor).
Provides centralized metrics collection and bulk shutdown.
"""

from __future__ import annotations

from typing import Any, Callable

from .base import BaseExecutor
from .local import LocalExecutor

__all__ = ["ExecutorRegistry"]


[docs] class ExecutorRegistry: """ Registry for managing named executor instances. Provides centralized management of executors with lazy creation, caching, and coordinated shutdown. Attributes: executors: Dict of name -> executor instance. Example: >>> registry = ExecutorRegistry() >>> executor = registry.get_or_create("compute", max_workers=4) >>> registry.shutdown_all() """ __slots__ = ("_executors", "_factories")
[docs] def __init__(self) -> None: """Initialize ExecutorRegistry with default factories.""" self._executors: dict[str, BaseExecutor] = {} self._factories: dict[str, Callable[..., BaseExecutor]] = { "local": self._create_local, }
def _create_local(self, name: str, **kwargs: Any) -> LocalExecutor: """Factory for LocalExecutor.""" return LocalExecutor(name=name, **kwargs)
[docs] def register_factory( self, executor_type: str, factory: Callable[..., BaseExecutor], ) -> None: """ Register a factory for a custom executor type. Args: executor_type: Type identifier (e.g., "remote", "hybrid"). factory: Callable that creates executor instances. Signature: factory(name: str, **kwargs) -> BaseExecutor Example: >>> registry.register_factory("remote", lambda name, **kw: RemoteExecutor(name, **kw)) """ self._factories[executor_type] = factory
[docs] def get_or_create( self, name: str, executor_type: str = "local", **kwargs: Any, ) -> BaseExecutor: """ Get existing executor or create new one. Args: name: Unique identifier for the executor. executor_type: Type of executor ("local" by default). **kwargs: Arguments passed to executor constructor. Returns: Executor instance (cached if already exists). Raises: ValueError: If executor_type is not registered. Example: >>> executor = registry.get_or_create("pdf", max_workers=2) >>> same_executor = registry.get_or_create("pdf") # returns cached """ if name in self._executors: return self._executors[name] if executor_type not in self._factories: available = ", ".join(self._factories.keys()) raise ValueError( f"Unknown executor type: {executor_type!r}. " f"Available types: {available}" ) factory = self._factories[executor_type] executor = factory(name=name, **kwargs) self._executors[name] = executor return executor
[docs] def get(self, name: str) -> BaseExecutor | None: """ Get executor by name without creating. Args: name: Executor identifier. Returns: Executor instance or None if not found. """ return self._executors.get(name)
[docs] def shutdown_all(self, wait: bool = True) -> None: """ Shutdown all registered executors. Args: wait: If True, wait for pending tasks to complete. """ for executor in self._executors.values(): executor.shutdown(wait=wait) self._executors.clear()
[docs] def all_metrics(self) -> list[dict[str, Any]]: """ Collect metrics from all executors. Returns: List of metric dicts, one per executor. """ return [executor.metrics for executor in self._executors.values()]
@property def executors(self) -> dict[str, BaseExecutor]: """Return dict of all registered executors.""" return dict(self._executors) def __len__(self) -> int: """Return number of registered executors.""" return len(self._executors) def __contains__(self, name: str) -> bool: """Check if executor exists.""" return name in self._executors def __repr__(self) -> str: """Return string representation.""" names = list(self._executors.keys()) return f"ExecutorRegistry(executors={names})"
if __name__ == "__main__": import asyncio async def main() -> None: registry = ExecutorRegistry() # Create executors exec1 = registry.get_or_create("compute", bypass=True) exec2 = registry.get_or_create("io", bypass=True) print(f"Registry: {registry}") print(f"Executor 1: {exec1}") print(f"Executor 2: {exec2}") # Use decorator @exec1 def square(x: int) -> int: return x * x result = await square(7) # type: ignore[misc] print(f"square(7) = {result}") # Metrics print(f"All metrics: {registry.all_metrics()}") # Shutdown registry.shutdown_all() print(f"After shutdown: {registry}") asyncio.run(main())