Executors
Execution contexts for handlers.
Base Executor
Base executor interface and exceptions.
Purpose
Defines the abstract base class for all executor implementations and common exceptions. All executors (local, remote, hybrid) must implement the BaseExecutor interface.
Definition:
class BaseExecutor(ABC):
name: str
@abstractmethod
async def submit(self, func: Callable, *args, **kwargs) -> Any
'''Submit a function for execution.'''
@abstractmethod
def shutdown(self, wait: bool = True) -> None
'''Shutdown the executor.'''
@property
@abstractmethod
def metrics(self) -> dict[str, Any]
'''Return executor metrics.'''
def __call__(self, func: Callable) -> Callable
'''Decorator interface - wraps func to use submit().'''
Exceptions:
ExecutorError
Base exception for executor operations.
ExecutorOverloadError(ExecutorError)
Raised when executor has too many pending tasks (backpressure).
Design Notes
BaseExecutor provides __call__ for decorator pattern (uses submit internally)
Subclasses only need to implement submit(), shutdown(), metrics
Metrics dict must include: name, pending, submitted, completed, failed
- class genro_asgi.executors.base.BaseExecutor[source]
Bases:
ABCAbstract base class for all executor implementations.
Provides a common interface for local and remote executors. Implements the decorator pattern via __call__ which uses submit().
Subclasses must implement: - submit(): Execute function asynchronously - shutdown(): Clean up resources - metrics: Return performance metrics
- name
Identifier for this executor instance.
Example
>>> class MyExecutor(BaseExecutor): ... async def submit(self, func, *args, **kwargs): ... return func(*args, **kwargs) ... def shutdown(self, wait=True): pass ... @property ... def metrics(self): return {"name": self.name} >>> >>> executor = MyExecutor("test") >>> >>> @executor ... def my_func(x): ... return x * 2 >>> >>> result = await my_func(5) # returns 10
- abstract property metrics: dict[str, Any]
Return executor metrics.
- Returns:
name: Executor name
pending: Number of pending tasks
submitted: Total submitted tasks
completed: Total completed tasks
failed: Total failed tasks
- Return type:
Dict containing at minimum
- abstractmethod async submit(func, *args, **kwargs)[source]
Submit a function for execution.
- Parameters:
- Return type:
- Returns:
- Raises:
ExecutorError – If execution fails.
ExecutorOverloadError – If too many tasks are pending.
- exception genro_asgi.executors.base.ExecutorError[source]
Bases:
ExceptionBase exception for executor operations.
- exception genro_asgi.executors.base.ExecutorOverloadError[source]
Bases:
ExecutorErrorRaised when executor has too many pending tasks.
Local Executor
Local executor for CPU-bound work using ProcessPoolExecutor.
- Classes:
- LocalExecutor – wraps ProcessPoolExecutor for async submission
with backpressure (semaphore) and metrics.
Supports bypass mode (no processes) for testing. Environment variable
GENRO_EXECUTOR_BYPASS=1 forces bypass globally. Functions and
arguments must be pickle-serializable.
- class genro_asgi.executors.local.LocalExecutor(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]
Bases:
BaseExecutorExecutor using local ProcessPoolExecutor.
Runs functions in separate processes for true parallelism, bypassing Python’s GIL. Ideal for CPU-bound work.
- name
Identifier for this executor (used in metrics/logging).
- pool
The ProcessPoolExecutor, or None in bypass mode.
- max_pending
Maximum pending tasks before backpressure.
Example
>>> executor = LocalExecutor(name="compute", max_workers=4) >>> >>> @executor ... def heavy_work(data): ... return process(data) >>> >>> result = await heavy_work(my_data)
- __init__(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]
Initialize LocalExecutor.
- Parameters:
name (
str) – Identifier for metrics and logging.max_workers (
int|None) – Number of worker processes (default: CPU count).initializer (
Optional[Callable[...,None]]) – Function called once per worker at startup.initargs (
tuple[Any,...]) – Arguments passed to initializer.max_pending (
int) – Maximum concurrent pending tasks.bypass (
bool) – If True, run synchronously without pool (for testing).
- max_pending
- property metrics: dict[str, Any]
Return current executor metrics.
- Returns:
Dict with name, pending, submitted, completed, failed, avg_duration_ms.
- pool
- async submit(func, *args, **kwargs)[source]
Submit a function for execution in the process pool.
- Parameters:
- Return type:
- Returns:
- Raises:
ExecutorError – If serialization fails or execution errors.
ExecutorOverloadError – If semaphore cannot be acquired.
Executor Registry
Executor registry for managing multiple named executor instances.
- Classes:
- ExecutorRegistry – lazy creation, caching, and coordinated shutdown
of named executors.
Supports multiple executor types via factory pattern (default: LocalExecutor). Provides centralized metrics collection and bulk shutdown.
- class genro_asgi.executors.registry.ExecutorRegistry[source]
Bases:
objectRegistry for managing named executor instances.
Provides centralized management of executors with lazy creation, caching, and coordinated shutdown.
- executors
Dict of name -> executor instance.
Example
>>> registry = ExecutorRegistry() >>> executor = registry.get_or_create("compute", max_workers=4) >>> registry.shutdown_all()
- property executors: dict[str, BaseExecutor]
Return dict of all registered executors.
- get(name)[source]
Get executor by name without creating.
- Parameters:
name (
str) – Executor identifier.- Return type:
- Returns:
Executor instance or None if not found.
- get_or_create(name, executor_type='local', **kwargs)[source]
Get existing executor or create new one.
- Parameters:
- Return type:
- Returns:
Executor instance (cached if already exists).
- Raises:
ValueError – If executor_type is not registered.
Example
>>> executor = registry.get_or_create("pdf", max_workers=2) >>> same_executor = registry.get_or_create("pdf") # returns cached
- register_factory(executor_type, factory)[source]
Register a factory for a custom executor type.
- Parameters:
executor_type (
str) – Type identifier (e.g., “remote”, “hybrid”).factory (
Callable[...,BaseExecutor]) – Callable that creates executor instances. Signature: factory(name: str, **kwargs) -> BaseExecutor
- Return type:
Example
>>> registry.register_factory("remote", lambda name, **kw: RemoteExecutor(name, **kw))