Source code for genro_asgi.executors.registry

# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0

"""
Executor registry for managing multiple executor instances.

Purpose
=======
ExecutorRegistry manages named executor instances, providing lazy creation,
caching, and centralized shutdown. Supports multiple executor types
(local, remote) via factory pattern.

Features:
- Named executors with lazy creation
- Support for multiple executor types
- Centralized metrics collection
- Coordinated shutdown of all executors

Definition::

    class ExecutorRegistry:
        __slots__ = ("_executors", "_factories")

        def __init__(self)
        def register_factory(self, executor_type: str, factory: Callable) -> None
        def get_or_create(self, name: str, executor_type: str = "local", **kwargs) -> BaseExecutor
        def get(self, name: str) -> BaseExecutor | None
        def shutdown_all(self, wait: bool = True) -> None
        def all_metrics(self) -> list[dict]
        @property
        def executors(self) -> dict[str, BaseExecutor]

Example::

    from genro_asgi.executors import ExecutorRegistry

    registry = ExecutorRegistry()

    # Get or create named executors
    pdf_executor = registry.get_or_create("pdf", max_workers=2)
    ml_executor = registry.get_or_create("ml", max_workers=4)

    @pdf_executor
    def generate_pdf(data):
        return create_pdf(data)

    @ml_executor
    def predict(model, data):
        return model.predict(data)

    # Get metrics for all executors
    metrics = registry.all_metrics()

    # Shutdown all on application exit
    registry.shutdown_all()

Extensibility::

    # Register custom executor type
    from myapp.executors import RemoteExecutor

    registry.register_factory("remote", lambda **kw: RemoteExecutor(**kw))

    # Use remote executor
    executor = registry.get_or_create("cluster", executor_type="remote", url="...")

Design Notes
============
- Default factory creates LocalExecutor
- Executors are cached by name
- shutdown_all() should be called on application shutdown
- Thread-safe for read operations (creation should be done at startup)
"""

from __future__ import annotations

from typing import Any, Callable

from .base import BaseExecutor
from .local import LocalExecutor

__all__ = ["ExecutorRegistry"]


[docs] class ExecutorRegistry: """ Registry for managing named executor instances. Provides centralized management of executors with lazy creation, caching, and coordinated shutdown. Attributes: executors: Dict of name -> executor instance. Example: >>> registry = ExecutorRegistry() >>> executor = registry.get_or_create("compute", max_workers=4) >>> registry.shutdown_all() """ __slots__ = ("_executors", "_factories")
[docs] def __init__(self) -> None: """Initialize ExecutorRegistry with default factories.""" self._executors: dict[str, BaseExecutor] = {} self._factories: dict[str, Callable[..., BaseExecutor]] = { "local": self._create_local, }
def _create_local(self, name: str, **kwargs: Any) -> LocalExecutor: """Factory for LocalExecutor.""" return LocalExecutor(name=name, **kwargs)
[docs] def register_factory( self, executor_type: str, factory: Callable[..., BaseExecutor], ) -> None: """ Register a factory for a custom executor type. Args: executor_type: Type identifier (e.g., "remote", "hybrid"). factory: Callable that creates executor instances. Signature: factory(name: str, **kwargs) -> BaseExecutor Example: >>> registry.register_factory("remote", lambda name, **kw: RemoteExecutor(name, **kw)) """ self._factories[executor_type] = factory
[docs] def get_or_create( self, name: str, executor_type: str = "local", **kwargs: Any, ) -> BaseExecutor: """ Get existing executor or create new one. Args: name: Unique identifier for the executor. executor_type: Type of executor ("local" by default). **kwargs: Arguments passed to executor constructor. Returns: Executor instance (cached if already exists). Raises: ValueError: If executor_type is not registered. Example: >>> executor = registry.get_or_create("pdf", max_workers=2) >>> same_executor = registry.get_or_create("pdf") # returns cached """ if name in self._executors: return self._executors[name] if executor_type not in self._factories: available = ", ".join(self._factories.keys()) raise ValueError( f"Unknown executor type: {executor_type!r}. " f"Available types: {available}" ) factory = self._factories[executor_type] executor = factory(name=name, **kwargs) self._executors[name] = executor return executor
[docs] def get(self, name: str) -> BaseExecutor | None: """ Get executor by name without creating. Args: name: Executor identifier. Returns: Executor instance or None if not found. """ return self._executors.get(name)
[docs] def shutdown_all(self, wait: bool = True) -> None: """ Shutdown all registered executors. Args: wait: If True, wait for pending tasks to complete. """ for executor in self._executors.values(): executor.shutdown(wait=wait) self._executors.clear()
[docs] def all_metrics(self) -> list[dict[str, Any]]: """ Collect metrics from all executors. Returns: List of metric dicts, one per executor. """ return [executor.metrics for executor in self._executors.values()]
@property def executors(self) -> dict[str, BaseExecutor]: """Return dict of all registered executors.""" return dict(self._executors) def __len__(self) -> int: """Return number of registered executors.""" return len(self._executors) def __contains__(self, name: str) -> bool: """Check if executor exists.""" return name in self._executors def __repr__(self) -> str: """Return string representation.""" names = list(self._executors.keys()) return f"ExecutorRegistry(executors={names})"
if __name__ == "__main__": import asyncio async def main() -> None: registry = ExecutorRegistry() # Create executors exec1 = registry.get_or_create("compute", bypass=True) exec2 = registry.get_or_create("io", bypass=True) print(f"Registry: {registry}") print(f"Executor 1: {exec1}") print(f"Executor 2: {exec2}") # Use decorator @exec1 def square(x: int) -> int: return x * x result = await square(7) # type: ignore[misc] print(f"square(7) = {result}") # Metrics print(f"All metrics: {registry.all_metrics()}") # Shutdown registry.shutdown_all() print(f"After shutdown: {registry}") asyncio.run(main())