# Copyright 2025 Softwell S.r.l.
# Licensed under the Apache License, Version 2.0
"""
Local executor for CPU-bound work using ProcessPoolExecutor.
Purpose
=======
LocalExecutor wraps ProcessPoolExecutor to run blocking/CPU-bound functions
without freezing the async event loop. Supports backpressure via semaphore
and bypass mode for testing.
Features:
- ProcessPoolExecutor for true parallelism (bypasses GIL)
- Bypass mode (pool=None) for testing without spawning processes
- Backpressure via semaphore to limit pending tasks
- Metrics collection (submitted, completed, failed, avg duration)
- Clear error messages for pickle serialization failures
Definition::
class LocalExecutor(BaseExecutor):
__slots__ = ("name", "pool", "max_pending", "_semaphore", "_metrics")
def __init__(
self,
name: str = "default",
max_workers: int | None = None,
initializer: Callable | None = None,
initargs: tuple = (),
max_pending: int = 100,
bypass: bool = False,
)
async def submit(self, func: Callable, *args, **kwargs) -> Any
def shutdown(self, wait: bool = True) -> None
@property
def metrics(self) -> dict
Example::
from genro_asgi.executors import LocalExecutor
executor = LocalExecutor(name="pdf", max_workers=2)
@executor
def generate_pdf(data):
# CPU-bound work
return create_pdf(data)
# In async handler
async def handle():
result = await generate_pdf(report_data)
Bypass Mode::
# For testing - no actual processes spawned
executor = LocalExecutor(name="test", bypass=True)
@executor
def my_func(x):
return x * 2
result = await my_func(5) # runs synchronously, returns 10
Environment Variable::
# Set GENRO_EXECUTOR_BYPASS=1 to bypass all LocalExecutors
import os
os.environ['GENRO_EXECUTOR_BYPASS'] = '1'
Design Notes
============
- Uses asyncio.Semaphore for backpressure control
- Catches pickle.PicklingError for clear error messages
- Metrics are simple counters (Prometheus integration is optional)
- Decorated functions must be top-level (not lambdas/closures)
- Arguments and return values must be pickle-serializable
"""
from __future__ import annotations
import asyncio
import os
import pickle
import time
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from typing import Any, Callable
from .base import BaseExecutor, ExecutorError
__all__ = ["LocalExecutor"]
[docs]
class LocalExecutor(BaseExecutor):
"""
Executor using local ProcessPoolExecutor.
Runs functions in separate processes for true parallelism,
bypassing Python's GIL. Ideal for CPU-bound work.
Attributes:
name: Identifier for this executor (used in metrics/logging).
pool: The ProcessPoolExecutor, or None in bypass mode.
max_pending: Maximum pending tasks before backpressure.
Example:
>>> executor = LocalExecutor(name="compute", max_workers=4)
>>>
>>> @executor
... def heavy_work(data):
... return process(data)
>>>
>>> result = await heavy_work(my_data)
"""
__slots__ = ("name", "pool", "max_pending", "_semaphore", "_metrics")
[docs]
def __init__(
self,
name: str = "default",
max_workers: int | None = None,
initializer: Callable[..., None] | None = None,
initargs: tuple[Any, ...] = (),
max_pending: int = 100,
bypass: bool = False,
) -> None:
"""
Initialize LocalExecutor.
Args:
name: Identifier for metrics and logging.
max_workers: Number of worker processes (default: CPU count).
initializer: Function called once per worker at startup.
initargs: Arguments passed to initializer.
max_pending: Maximum concurrent pending tasks.
bypass: If True, run synchronously without pool (for testing).
"""
self.name = name
self.max_pending = max_pending
# Check environment for global bypass
env_bypass = os.environ.get("GENRO_EXECUTOR_BYPASS") == "1"
if bypass or env_bypass:
self.pool = None
self._semaphore: asyncio.Semaphore | None = None
else:
self.pool = ProcessPoolExecutor(
max_workers=max_workers,
initializer=initializer,
initargs=initargs,
)
self._semaphore = asyncio.Semaphore(max_pending)
self._metrics = {
"submitted": 0,
"completed": 0,
"failed": 0,
"total_duration_ms": 0.0,
}
@property
def metrics(self) -> dict[str, Any]:
"""
Return current executor metrics.
Returns:
Dict with name, pending, submitted, completed, failed, avg_duration_ms.
"""
completed = self._metrics["completed"]
return {
"name": self.name,
"mode": "bypass" if self.pool is None else "process",
"pending": (
self._metrics["submitted"] - self._metrics["completed"] - self._metrics["failed"]
),
"submitted": self._metrics["submitted"],
"completed": completed,
"failed": self._metrics["failed"],
"avg_duration_ms": (
self._metrics["total_duration_ms"] / completed if completed > 0 else 0.0
),
}
[docs]
async def submit(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
"""
Submit a function for execution in the process pool.
Args:
func: The function to execute (must be top-level, pickle-able).
*args: Positional arguments (must be pickle-able).
**kwargs: Keyword arguments (must be pickle-able).
Returns:
The result of func(*args, **kwargs).
Raises:
ExecutorError: If serialization fails or execution errors.
ExecutorOverloadError: If semaphore cannot be acquired.
"""
# Bypass mode: run synchronously
if self.pool is None:
return func(*args, **kwargs)
self._metrics["submitted"] += 1
start = time.monotonic()
try:
if self._semaphore is not None:
# Try to acquire semaphore (backpressure)
try:
async with self._semaphore:
result = await self._execute(func, *args, **kwargs)
except asyncio.CancelledError:
raise
else:
result = await self._execute(func, *args, **kwargs)
self._metrics["completed"] += 1
return result
except ExecutorError:
self._metrics["failed"] += 1
raise
except Exception:
self._metrics["failed"] += 1
raise
finally:
self._metrics["total_duration_ms"] += (time.monotonic() - start) * 1000
async def _execute(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
"""Execute function in the process pool."""
loop = asyncio.get_running_loop()
call = partial(func, *args, **kwargs)
try:
return await loop.run_in_executor(self.pool, call)
except pickle.PicklingError as e:
raise ExecutorError(
f"Cannot serialize arguments for {func.__name__}. "
f"Ensure all args are pickle-serializable. Original: {e}"
) from e
[docs]
def shutdown(self, wait: bool = True) -> None:
"""
Shutdown the process pool.
Args:
wait: If True, wait for pending tasks to complete.
"""
if self.pool is not None:
self.pool.shutdown(wait=wait)
def __repr__(self) -> str:
"""Return string representation."""
mode = "bypass" if self.pool is None else "process"
return f"LocalExecutor(name={self.name!r}, mode={mode})"
if __name__ == "__main__":
import asyncio
async def main() -> None:
# Test bypass mode
executor = LocalExecutor(name="test", bypass=True)
print(f"Executor: {executor}")
@executor
def square(x: int) -> int:
return x * x
result = await square(5) # type: ignore[misc]
print(f"square(5) = {result}")
print(f"Metrics: {executor.metrics}")
asyncio.run(main())