Executors

Execution contexts for handlers.

Base Executor

Base executor interface and exceptions.

Purpose

Defines the abstract base class for all executor implementations and common exceptions. All executors (local, remote, hybrid) must implement the BaseExecutor interface.

Definition:

class BaseExecutor(ABC):
    name: str

    @abstractmethod
    async def submit(self, func: Callable, *args, **kwargs) -> Any
        '''Submit a function for execution.'''

    @abstractmethod
    def shutdown(self, wait: bool = True) -> None
        '''Shutdown the executor.'''

    @property
    @abstractmethod
    def metrics(self) -> dict[str, Any]
        '''Return executor metrics.'''

    def __call__(self, func: Callable) -> Callable
        '''Decorator interface - wraps func to use submit().'''

Exceptions:

ExecutorError
    Base exception for executor operations.

ExecutorOverloadError(ExecutorError)
    Raised when executor has too many pending tasks (backpressure).

Design Notes

  • BaseExecutor provides __call__ for decorator pattern (uses submit internally)

  • Subclasses only need to implement submit(), shutdown(), metrics

  • Metrics dict must include: name, pending, submitted, completed, failed

class genro_asgi.executors.base.BaseExecutor[source]

Bases: ABC

Abstract base class for all executor implementations.

Provides a common interface for local and remote executors. Implements the decorator pattern via __call__ which uses submit().

Subclasses must implement: - submit(): Execute function asynchronously - shutdown(): Clean up resources - metrics: Return performance metrics

name

Identifier for this executor instance.

Example

>>> class MyExecutor(BaseExecutor):
...     async def submit(self, func, *args, **kwargs):
...         return func(*args, **kwargs)
...     def shutdown(self, wait=True): pass
...     @property
...     def metrics(self): return {"name": self.name}
>>>
>>> executor = MyExecutor("test")
>>>
>>> @executor
... def my_func(x):
...     return x * 2
>>>
>>> result = await my_func(5)  # returns 10
abstract property metrics: dict[str, Any]

Return executor metrics.

Returns:

  • name: Executor name

  • pending: Number of pending tasks

  • submitted: Total submitted tasks

  • completed: Total completed tasks

  • failed: Total failed tasks

Return type:

Dict containing at minimum

name: str
abstractmethod shutdown(wait=True)[source]

Shutdown the executor and release resources.

Parameters:

wait (bool) – If True, wait for pending tasks to complete.

Return type:

None

abstractmethod async submit(func, *args, **kwargs)[source]

Submit a function for execution.

Parameters:
  • func (Callable[..., Any]) – The function to execute.

  • *args (Any) – Positional arguments for func.

  • **kwargs (Any) – Keyword arguments for func.

Return type:

Any

Returns:

The result of func(*args, **kwargs).

Raises:
exception genro_asgi.executors.base.ExecutorError[source]

Bases: Exception

Base exception for executor operations.

exception genro_asgi.executors.base.ExecutorOverloadError[source]

Bases: ExecutorError

Raised when executor has too many pending tasks.

Local Executor

Local executor for CPU-bound work using ProcessPoolExecutor.

Purpose

LocalExecutor wraps ProcessPoolExecutor to run blocking/CPU-bound functions without freezing the async event loop. Supports backpressure via semaphore and bypass mode for testing.

Features: - ProcessPoolExecutor for true parallelism (bypasses GIL) - Bypass mode (pool=None) for testing without spawning processes - Backpressure via semaphore to limit pending tasks - Metrics collection (submitted, completed, failed, avg duration) - Clear error messages for pickle serialization failures

Definition:

class LocalExecutor(BaseExecutor):
    __slots__ = ("name", "pool", "max_pending", "_semaphore", "_metrics")

    def __init__(
        self,
        name: str = "default",
        max_workers: int | None = None,
        initializer: Callable | None = None,
        initargs: tuple = (),
        max_pending: int = 100,
        bypass: bool = False,
    )
    async def submit(self, func: Callable, *args, **kwargs) -> Any
    def shutdown(self, wait: bool = True) -> None
    @property
    def metrics(self) -> dict

Example:

from genro_asgi.executors import LocalExecutor

executor = LocalExecutor(name="pdf", max_workers=2)

@executor
def generate_pdf(data):
    # CPU-bound work
    return create_pdf(data)

# In async handler
async def handle():
    result = await generate_pdf(report_data)

Bypass Mode:

# For testing - no actual processes spawned
executor = LocalExecutor(name="test", bypass=True)

@executor
def my_func(x):
    return x * 2

result = await my_func(5)  # runs synchronously, returns 10

Environment Variable:

# Set GENRO_EXECUTOR_BYPASS=1 to bypass all LocalExecutors
import os
os.environ['GENRO_EXECUTOR_BYPASS'] = '1'

Design Notes

  • Uses asyncio.Semaphore for backpressure control

  • Catches pickle.PicklingError for clear error messages

  • Metrics are simple counters (Prometheus integration is optional)

  • Decorated functions must be top-level (not lambdas/closures)

  • Arguments and return values must be pickle-serializable

class genro_asgi.executors.local.LocalExecutor(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]

Bases: BaseExecutor

Executor using local ProcessPoolExecutor.

Runs functions in separate processes for true parallelism, bypassing Python’s GIL. Ideal for CPU-bound work.

name

Identifier for this executor (used in metrics/logging).

pool

The ProcessPoolExecutor, or None in bypass mode.

max_pending

Maximum pending tasks before backpressure.

Example

>>> executor = LocalExecutor(name="compute", max_workers=4)
>>>
>>> @executor
... def heavy_work(data):
...     return process(data)
>>>
>>> result = await heavy_work(my_data)
__init__(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]

Initialize LocalExecutor.

Parameters:
  • name (str) – Identifier for metrics and logging.

  • max_workers (int | None) – Number of worker processes (default: CPU count).

  • initializer (Optional[Callable[..., None]]) – Function called once per worker at startup.

  • initargs (tuple[Any, ...]) – Arguments passed to initializer.

  • max_pending (int) – Maximum concurrent pending tasks.

  • bypass (bool) – If True, run synchronously without pool (for testing).

max_pending
property metrics: dict[str, Any]

Return current executor metrics.

Returns:

Dict with name, pending, submitted, completed, failed, avg_duration_ms.

name: str
pool
shutdown(wait=True)[source]

Shutdown the process pool.

Parameters:

wait (bool) – If True, wait for pending tasks to complete.

Return type:

None

async submit(func, *args, **kwargs)[source]

Submit a function for execution in the process pool.

Parameters:
  • func (Callable[..., Any]) – The function to execute (must be top-level, pickle-able).

  • *args (Any) – Positional arguments (must be pickle-able).

  • **kwargs (Any) – Keyword arguments (must be pickle-able).

Return type:

Any

Returns:

The result of func(*args, **kwargs).

Raises:

Executor Registry

Executor registry for managing multiple executor instances.

Purpose

ExecutorRegistry manages named executor instances, providing lazy creation, caching, and centralized shutdown. Supports multiple executor types (local, remote) via factory pattern.

Features: - Named executors with lazy creation - Support for multiple executor types - Centralized metrics collection - Coordinated shutdown of all executors

Definition:

class ExecutorRegistry:
    __slots__ = ("_executors", "_factories")

    def __init__(self)
    def register_factory(self, executor_type: str, factory: Callable) -> None
    def get_or_create(self, name: str, executor_type: str = "local", **kwargs) -> BaseExecutor
    def get(self, name: str) -> BaseExecutor | None
    def shutdown_all(self, wait: bool = True) -> None
    def all_metrics(self) -> list[dict]
    @property
    def executors(self) -> dict[str, BaseExecutor]

Example:

from genro_asgi.executors import ExecutorRegistry

registry = ExecutorRegistry()

# Get or create named executors
pdf_executor = registry.get_or_create("pdf", max_workers=2)
ml_executor = registry.get_or_create("ml", max_workers=4)

@pdf_executor
def generate_pdf(data):
    return create_pdf(data)

@ml_executor
def predict(model, data):
    return model.predict(data)

# Get metrics for all executors
metrics = registry.all_metrics()

# Shutdown all on application exit
registry.shutdown_all()

Extensibility:

# Register custom executor type
from myapp.executors import RemoteExecutor

registry.register_factory("remote", lambda **kw: RemoteExecutor(**kw))

# Use remote executor
executor = registry.get_or_create("cluster", executor_type="remote", url="...")

Design Notes

  • Default factory creates LocalExecutor

  • Executors are cached by name

  • shutdown_all() should be called on application shutdown

  • Thread-safe for read operations (creation should be done at startup)

class genro_asgi.executors.registry.ExecutorRegistry[source]

Bases: object

Registry for managing named executor instances.

Provides centralized management of executors with lazy creation, caching, and coordinated shutdown.

executors

Dict of name -> executor instance.

Example

>>> registry = ExecutorRegistry()
>>> executor = registry.get_or_create("compute", max_workers=4)
>>> registry.shutdown_all()
__init__()[source]

Initialize ExecutorRegistry with default factories.

all_metrics()[source]

Collect metrics from all executors.

Return type:

list[dict[str, Any]]

Returns:

List of metric dicts, one per executor.

property executors: dict[str, BaseExecutor]

Return dict of all registered executors.

get(name)[source]

Get executor by name without creating.

Parameters:

name (str) – Executor identifier.

Return type:

BaseExecutor | None

Returns:

Executor instance or None if not found.

get_or_create(name, executor_type='local', **kwargs)[source]

Get existing executor or create new one.

Parameters:
  • name (str) – Unique identifier for the executor.

  • executor_type (str) – Type of executor (“local” by default).

  • **kwargs (Any) – Arguments passed to executor constructor.

Return type:

BaseExecutor

Returns:

Executor instance (cached if already exists).

Raises:

ValueError – If executor_type is not registered.

Example

>>> executor = registry.get_or_create("pdf", max_workers=2)
>>> same_executor = registry.get_or_create("pdf")  # returns cached
register_factory(executor_type, factory)[source]

Register a factory for a custom executor type.

Parameters:
  • executor_type (str) – Type identifier (e.g., “remote”, “hybrid”).

  • factory (Callable[..., BaseExecutor]) – Callable that creates executor instances. Signature: factory(name: str, **kwargs) -> BaseExecutor

Return type:

None

Example

>>> registry.register_factory("remote", lambda name, **kw: RemoteExecutor(name, **kw))
shutdown_all(wait=True)[source]

Shutdown all registered executors.

Parameters:

wait (bool) – If True, wait for pending tasks to complete.

Return type:

None