Executors
Execution contexts for handlers.
Base Executor
Base executor interface and exceptions.
Purpose
Defines the abstract base class for all executor implementations and common exceptions. All executors (local, remote, hybrid) must implement the BaseExecutor interface.
Definition:
class BaseExecutor(ABC):
name: str
@abstractmethod
async def submit(self, func: Callable, *args, **kwargs) -> Any
'''Submit a function for execution.'''
@abstractmethod
def shutdown(self, wait: bool = True) -> None
'''Shutdown the executor.'''
@property
@abstractmethod
def metrics(self) -> dict[str, Any]
'''Return executor metrics.'''
def __call__(self, func: Callable) -> Callable
'''Decorator interface - wraps func to use submit().'''
Exceptions:
ExecutorError
Base exception for executor operations.
ExecutorOverloadError(ExecutorError)
Raised when executor has too many pending tasks (backpressure).
Design Notes
BaseExecutor provides __call__ for decorator pattern (uses submit internally)
Subclasses only need to implement submit(), shutdown(), metrics
Metrics dict must include: name, pending, submitted, completed, failed
- class genro_asgi.executors.base.BaseExecutor[source]
Bases:
ABCAbstract base class for all executor implementations.
Provides a common interface for local and remote executors. Implements the decorator pattern via __call__ which uses submit().
Subclasses must implement: - submit(): Execute function asynchronously - shutdown(): Clean up resources - metrics: Return performance metrics
- name
Identifier for this executor instance.
Example
>>> class MyExecutor(BaseExecutor): ... async def submit(self, func, *args, **kwargs): ... return func(*args, **kwargs) ... def shutdown(self, wait=True): pass ... @property ... def metrics(self): return {"name": self.name} >>> >>> executor = MyExecutor("test") >>> >>> @executor ... def my_func(x): ... return x * 2 >>> >>> result = await my_func(5) # returns 10
- abstract property metrics: dict[str, Any]
Return executor metrics.
- Returns:
name: Executor name
pending: Number of pending tasks
submitted: Total submitted tasks
completed: Total completed tasks
failed: Total failed tasks
- Return type:
Dict containing at minimum
- abstractmethod async submit(func, *args, **kwargs)[source]
Submit a function for execution.
- Parameters:
- Return type:
- Returns:
- Raises:
ExecutorError – If execution fails.
ExecutorOverloadError – If too many tasks are pending.
- exception genro_asgi.executors.base.ExecutorError[source]
Bases:
ExceptionBase exception for executor operations.
- exception genro_asgi.executors.base.ExecutorOverloadError[source]
Bases:
ExecutorErrorRaised when executor has too many pending tasks.
Local Executor
Local executor for CPU-bound work using ProcessPoolExecutor.
Purpose
LocalExecutor wraps ProcessPoolExecutor to run blocking/CPU-bound functions without freezing the async event loop. Supports backpressure via semaphore and bypass mode for testing.
Features: - ProcessPoolExecutor for true parallelism (bypasses GIL) - Bypass mode (pool=None) for testing without spawning processes - Backpressure via semaphore to limit pending tasks - Metrics collection (submitted, completed, failed, avg duration) - Clear error messages for pickle serialization failures
Definition:
class LocalExecutor(BaseExecutor):
__slots__ = ("name", "pool", "max_pending", "_semaphore", "_metrics")
def __init__(
self,
name: str = "default",
max_workers: int | None = None,
initializer: Callable | None = None,
initargs: tuple = (),
max_pending: int = 100,
bypass: bool = False,
)
async def submit(self, func: Callable, *args, **kwargs) -> Any
def shutdown(self, wait: bool = True) -> None
@property
def metrics(self) -> dict
Example:
from genro_asgi.executors import LocalExecutor
executor = LocalExecutor(name="pdf", max_workers=2)
@executor
def generate_pdf(data):
# CPU-bound work
return create_pdf(data)
# In async handler
async def handle():
result = await generate_pdf(report_data)
Bypass Mode:
# For testing - no actual processes spawned
executor = LocalExecutor(name="test", bypass=True)
@executor
def my_func(x):
return x * 2
result = await my_func(5) # runs synchronously, returns 10
Environment Variable:
# Set GENRO_EXECUTOR_BYPASS=1 to bypass all LocalExecutors
import os
os.environ['GENRO_EXECUTOR_BYPASS'] = '1'
Design Notes
Uses asyncio.Semaphore for backpressure control
Catches pickle.PicklingError for clear error messages
Metrics are simple counters (Prometheus integration is optional)
Decorated functions must be top-level (not lambdas/closures)
Arguments and return values must be pickle-serializable
- class genro_asgi.executors.local.LocalExecutor(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]
Bases:
BaseExecutorExecutor using local ProcessPoolExecutor.
Runs functions in separate processes for true parallelism, bypassing Python’s GIL. Ideal for CPU-bound work.
- name
Identifier for this executor (used in metrics/logging).
- pool
The ProcessPoolExecutor, or None in bypass mode.
- max_pending
Maximum pending tasks before backpressure.
Example
>>> executor = LocalExecutor(name="compute", max_workers=4) >>> >>> @executor ... def heavy_work(data): ... return process(data) >>> >>> result = await heavy_work(my_data)
- __init__(name='default', max_workers=None, initializer=None, initargs=(), max_pending=100, bypass=False)[source]
Initialize LocalExecutor.
- Parameters:
name (
str) – Identifier for metrics and logging.max_workers (
int|None) – Number of worker processes (default: CPU count).initializer (
Optional[Callable[...,None]]) – Function called once per worker at startup.initargs (
tuple[Any,...]) – Arguments passed to initializer.max_pending (
int) – Maximum concurrent pending tasks.bypass (
bool) – If True, run synchronously without pool (for testing).
- max_pending
- property metrics: dict[str, Any]
Return current executor metrics.
- Returns:
Dict with name, pending, submitted, completed, failed, avg_duration_ms.
- pool
- async submit(func, *args, **kwargs)[source]
Submit a function for execution in the process pool.
- Parameters:
- Return type:
- Returns:
- Raises:
ExecutorError – If serialization fails or execution errors.
ExecutorOverloadError – If semaphore cannot be acquired.
Executor Registry
Executor registry for managing multiple executor instances.
Purpose
ExecutorRegistry manages named executor instances, providing lazy creation, caching, and centralized shutdown. Supports multiple executor types (local, remote) via factory pattern.
Features: - Named executors with lazy creation - Support for multiple executor types - Centralized metrics collection - Coordinated shutdown of all executors
Definition:
class ExecutorRegistry:
__slots__ = ("_executors", "_factories")
def __init__(self)
def register_factory(self, executor_type: str, factory: Callable) -> None
def get_or_create(self, name: str, executor_type: str = "local", **kwargs) -> BaseExecutor
def get(self, name: str) -> BaseExecutor | None
def shutdown_all(self, wait: bool = True) -> None
def all_metrics(self) -> list[dict]
@property
def executors(self) -> dict[str, BaseExecutor]
Example:
from genro_asgi.executors import ExecutorRegistry
registry = ExecutorRegistry()
# Get or create named executors
pdf_executor = registry.get_or_create("pdf", max_workers=2)
ml_executor = registry.get_or_create("ml", max_workers=4)
@pdf_executor
def generate_pdf(data):
return create_pdf(data)
@ml_executor
def predict(model, data):
return model.predict(data)
# Get metrics for all executors
metrics = registry.all_metrics()
# Shutdown all on application exit
registry.shutdown_all()
Extensibility:
# Register custom executor type
from myapp.executors import RemoteExecutor
registry.register_factory("remote", lambda **kw: RemoteExecutor(**kw))
# Use remote executor
executor = registry.get_or_create("cluster", executor_type="remote", url="...")
Design Notes
Default factory creates LocalExecutor
Executors are cached by name
shutdown_all() should be called on application shutdown
Thread-safe for read operations (creation should be done at startup)
- class genro_asgi.executors.registry.ExecutorRegistry[source]
Bases:
objectRegistry for managing named executor instances.
Provides centralized management of executors with lazy creation, caching, and coordinated shutdown.
- executors
Dict of name -> executor instance.
Example
>>> registry = ExecutorRegistry() >>> executor = registry.get_or_create("compute", max_workers=4) >>> registry.shutdown_all()
- property executors: dict[str, BaseExecutor]
Return dict of all registered executors.
- get(name)[source]
Get executor by name without creating.
- Parameters:
name (
str) – Executor identifier.- Return type:
- Returns:
Executor instance or None if not found.
- get_or_create(name, executor_type='local', **kwargs)[source]
Get existing executor or create new one.
- Parameters:
- Return type:
- Returns:
Executor instance (cached if already exists).
- Raises:
ValueError – If executor_type is not registered.
Example
>>> executor = registry.get_or_create("pdf", max_workers=2) >>> same_executor = registry.get_or_create("pdf") # returns cached
- register_factory(executor_type, factory)[source]
Register a factory for a custom executor type.
- Parameters:
executor_type (
str) – Type identifier (e.g., “remote”, “hybrid”).factory (
Callable[...,BaseExecutor]) – Callable that creates executor instances. Signature: factory(name: str, **kwargs) -> BaseExecutor
- Return type:
Example
>>> registry.register_factory("remote", lambda name, **kw: RemoteExecutor(name, **kw))