Executors

Execution contexts for handlers.

Base Executor

Base executor interface and exceptions.

Purpose

Defines the abstract base class for all executor implementations and common exceptions. All executors (local, remote, hybrid) must implement the BaseExecutor interface.

Definition:

class BaseExecutor(ABC):
    name: str

    @abstractmethod
    async def submit(self, func: Callable, *args, **kwargs) -> Any
        '''Submit a function for execution.'''

    @abstractmethod
    def shutdown(self, wait: bool = True) -> None
        '''Shutdown the executor.'''

    @property
    @abstractmethod
    def metrics(self) -> dict[str, Any]
        '''Return executor metrics.'''

    def __call__(self, func: Callable) -> Callable
        '''Decorator interface - wraps func to use submit().'''

Exceptions:

ExecutorError
    Base exception for executor operations.

ExecutorOverloadError(ExecutorError)
    Raised when executor has too many pending tasks (backpressure).

Design Notes

  • BaseExecutor provides __call__ for decorator pattern (uses submit internally)

  • Subclasses only need to implement submit(), shutdown(), metrics

  • Metrics dict must include: name, pending, submitted, completed, failed

class genro_asgi.executors.base.BaseExecutor[source]

Bases: ABC

Abstract base class for all executor implementations.

Provides a common interface for local and remote executors. Implements the decorator pattern via __call__ which uses submit().

Subclasses must implement: - submit(): Execute function asynchronously - shutdown(): Clean up resources - metrics: Return performance metrics

name

Identifier for this executor instance.

Example

>>> class MyExecutor(BaseExecutor):
...     async def submit(self, func, *args, **kwargs):
...         return func(*args, **kwargs)
...     def shutdown(self, wait=True): pass
...     @property
...     def metrics(self): return {"name": self.name}
>>>
>>> executor = MyExecutor("test")
>>>
>>> @executor
... def my_func(x):
...     return x * 2
>>>
>>> result = await my_func(5)  # returns 10
abstract property metrics: dict[str, Any]

Return executor metrics.

Returns:

  • name: Executor name

  • pending: Number of pending tasks

  • submitted: Total submitted tasks

  • completed: Total completed tasks

  • failed: Total failed tasks

Return type:

Dict containing at minimum

name: str
abstractmethod shutdown(wait=True)[source]

Shutdown the executor and release resources.

Parameters:

wait (bool) – If True, wait for pending tasks to complete.

Return type:

None

abstractmethod async submit(func, *args, **kwargs)[source]

Submit a function for execution.

Parameters:
  • func (Callable[..., Any]) – The function to execute.

  • *args (Any) – Positional arguments for func.

  • **kwargs (Any) – Keyword arguments for func.

Return type:

Any

Returns:

The result of func(*args, **kwargs).

Raises:
exception genro_asgi.executors.base.ExecutorError[source]

Bases: Exception

Base exception for executor operations.

exception genro_asgi.executors.base.ExecutorOverloadError[source]

Bases: ExecutorError

Raised when executor has too many pending tasks.

Local Executor

Local executor for CPU-bound work using ProcessPoolExecutor.

Classes:
LocalExecutor – wraps ProcessPoolExecutor for async submission

with backpressure (semaphore) and metrics.

Supports bypass mode (no processes) for testing. Environment variable GENRO_EXECUTOR_BYPASS=1 forces bypass globally. Functions and arguments must be pickle-serializable.

class genro_asgi.executors.local.LocalExecutor(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]

Bases: BaseExecutor

Executor using local ProcessPoolExecutor.

Runs functions in separate processes for true parallelism, bypassing Python’s GIL. Ideal for CPU-bound work.

name

Identifier for this executor (used in metrics/logging).

pool

The ProcessPoolExecutor, or None in bypass mode.

max_pending

Maximum pending tasks before backpressure.

Example

>>> executor = LocalExecutor(name="compute", max_workers=4)
>>>
>>> @executor
... def heavy_work(data):
...     return process(data)
>>>
>>> result = await heavy_work(my_data)
__init__(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]

Initialize LocalExecutor.

Parameters:
  • name (str) – Identifier for metrics and logging.

  • max_workers (int | None) – Number of worker processes (default: CPU count).

  • initializer (Optional[Callable[..., None]]) – Function called once per worker at startup.

  • initargs (tuple[Any, ...]) – Arguments passed to initializer.

  • max_pending (int) – Maximum concurrent pending tasks.

  • bypass (bool) – If True, run synchronously without pool (for testing).

max_pending
property metrics: dict[str, Any]

Return current executor metrics.

Returns:

Dict with name, pending, submitted, completed, failed, avg_duration_ms.

name: str
pool
shutdown(wait=True)[source]

Shutdown the process pool.

Parameters:

wait (bool) – If True, wait for pending tasks to complete.

Return type:

None

async submit(func, *args, **kwargs)[source]

Submit a function for execution in the process pool.

Parameters:
  • func (Callable[..., Any]) – The function to execute (must be top-level, pickle-able).

  • *args (Any) – Positional arguments (must be pickle-able).

  • **kwargs (Any) – Keyword arguments (must be pickle-able).

Return type:

Any

Returns:

The result of func(*args, **kwargs).

Raises:

Executor Registry

Executor registry for managing multiple named executor instances.

Classes:
ExecutorRegistry – lazy creation, caching, and coordinated shutdown

of named executors.

Supports multiple executor types via factory pattern (default: LocalExecutor). Provides centralized metrics collection and bulk shutdown.

class genro_asgi.executors.registry.ExecutorRegistry[source]

Bases: object

Registry for managing named executor instances.

Provides centralized management of executors with lazy creation, caching, and coordinated shutdown.

executors

Dict of name -> executor instance.

Example

>>> registry = ExecutorRegistry()
>>> executor = registry.get_or_create("compute", max_workers=4)
>>> registry.shutdown_all()
__init__()[source]

Initialize ExecutorRegistry with default factories.

all_metrics()[source]

Collect metrics from all executors.

Return type:

list[dict[str, Any]]

Returns:

List of metric dicts, one per executor.

property executors: dict[str, BaseExecutor]

Return dict of all registered executors.

get(name)[source]

Get executor by name without creating.

Parameters:

name (str) – Executor identifier.

Return type:

BaseExecutor | None

Returns:

Executor instance or None if not found.

get_or_create(name, executor_type='local', **kwargs)[source]

Get existing executor or create new one.

Parameters:
  • name (str) – Unique identifier for the executor.

  • executor_type (str) – Type of executor (“local” by default).

  • **kwargs (Any) – Arguments passed to executor constructor.

Return type:

BaseExecutor

Returns:

Executor instance (cached if already exists).

Raises:

ValueError – If executor_type is not registered.

Example

>>> executor = registry.get_or_create("pdf", max_workers=2)
>>> same_executor = registry.get_or_create("pdf")  # returns cached
register_factory(executor_type, factory)[source]

Register a factory for a custom executor type.

Parameters:
  • executor_type (str) – Type identifier (e.g., “remote”, “hybrid”).

  • factory (Callable[..., BaseExecutor]) – Callable that creates executor instances. Signature: factory(name: str, **kwargs) -> BaseExecutor

Return type:

None

Example

>>> registry.register_factory("remote", lambda name, **kw: RemoteExecutor(name, **kw))
shutdown_all(wait=True)[source]

Shutdown all registered executors.

Parameters:

wait (bool) – If True, wait for pending tasks to complete.

Return type:

None