Executors
Version: 1.0.0 Status: SOURCE OF TRUTH Last Updated: 2025-12-03
Overview
Executors handle blocking and CPU-bound operations without freezing the async event loop.
Key features:
ProcessPoolExecutor for CPU-bound work
Decorator pattern for easy usage
Worker initialization for preloaded data
Bypass mode for testing
Metrics for observability
Architecture
Main Process (asyncio event loop)
│
├── Executor Pool "pdf" (2 workers)
│ ├── Worker 1 → generate_pdf()
│ └── Worker 2 → generate_pdf()
│
├── Executor Pool "ml" (4 workers)
│ ├── Worker 1 → predict() [with preloaded model]
│ ├── Worker 2 → predict()
│ ├── Worker 3 → predict()
│ └── Worker 4 → predict()
│
└── async handlers await executor results
Decorator Pattern
from myproject import server
# Create named process pools
executor_pdf = server.executor(name="pdf", max_workers=2)
executor_ml = server.executor(name="ml", max_workers=4)
@executor_pdf
def generate_pdf(data):
"""CPU-bound: runs in process pool."""
return create_pdf(data)
@executor_ml
def predict(data):
"""Uses preloaded model in worker memory."""
return _model.predict(data)
# In async handler - just await
async def handle_request(request):
pdf = await generate_pdf(report_data)
prediction = await predict(input_data)
Server Integration
server.executor()
class AsgiServer:
def __init__(self):
self._executors: dict[str, ExecutorDecorator] = {}
def executor(
self,
name: str = "default",
max_workers: int | None = None,
initializer: Callable | None = None,
initargs: tuple = (),
) -> ExecutorDecorator:
"""
Get or create a named process pool executor.
Args:
name: Pool identifier (allows multiple isolated pools)
max_workers: Number of workers (default: CPU count)
initializer: Function called once per worker at startup
initargs: Arguments passed to initializer
Returns:
ExecutorDecorator that can decorate functions
"""
if name not in self._executors:
pool = ProcessPoolExecutor(
max_workers=max_workers,
initializer=initializer,
initargs=initargs,
)
self._executors[name] = ExecutorDecorator(pool, name)
return self._executors[name]
def shutdown(self):
"""Shutdown all executors."""
for executor in self._executors.values():
executor.shutdown(wait=True)
ExecutorDecorator
class ExecutorDecorator:
"""Wraps a function to run in an executor pool."""
def __init__(self, pool: Executor | None, name: str = "default"):
self.pool = pool
self.name = name
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Bypass mode: if no pool, run synchronously
if self.pool is None:
return func(*args, **kwargs)
loop = asyncio.get_running_loop()
call = partial(func, *args, **kwargs)
try:
return await loop.run_in_executor(self.pool, call)
except pickle.PicklingError as e:
raise ExecutorError(
f"Cannot serialize arguments for {func.__name__}. "
f"Ensure all args are pickle-serializable. Original: {e}"
) from e
return wrapper
def shutdown(self, wait=True):
if self.pool is not None:
self.pool.shutdown(wait=wait)
Worker Initialization (Preloaded Data)
Workers can load data at startup and reuse across all tasks:
# Worker-side globals
_model = None
_lookup_table = None
def init_ml_worker(model_path, lookup_path):
"""Called once when each worker process starts."""
global _model, _lookup_table
_model = load_heavy_model(model_path)
_lookup_table = load_lookup(lookup_path)
def predict(data):
"""Uses preloaded data - no loading overhead."""
features = _lookup_table[data["category"]]
return _model.predict(features)
# Create pool with initializer
executor_ml = server.executor(
name="ml",
max_workers=4,
initializer=init_ml_worker,
initargs=("/models/v1.pkl", "/data/lookup.json"),
)
@executor_ml
def predict(data):
return _model.predict(data) # Model already loaded
Use cases:
ML models (sklearn, pytorch, tensorflow)
Large lookup tables / dictionaries
Compiled regex patterns
Database connection pools (per-worker)
Configuration that doesn’t change
Multiple Pools
Isolated pools for different workloads:
# One slow PDF doesn't block ML predictions
executor_pdf = server.executor(name="pdf", max_workers=2)
executor_ml = server.executor(name="ml", max_workers=4)
executor_image = server.executor(name="image", max_workers=2)
Multithreaded Workers
For I/O-bound tasks, workers can use internal thread pools:
# Worker globals
_thread_pool = None
def init_db_worker(db_url, n_threads):
global _thread_pool
_thread_pool = ThreadPoolExecutor(max_workers=n_threads)
def db_transaction(operations):
def _execute():
conn = connect(db_url)
try:
for op in operations:
conn.execute(op)
conn.commit()
finally:
conn.close()
return _thread_pool.submit(_execute).result()
executor_db = server.executor(
name="db",
max_workers=2, # 2 processes
initializer=init_db_worker,
initargs=(DB_URL, 8), # 8 threads per process
)
Bypass Mode for Testing
# Create decorator without pool
executor_pdf = ExecutorDecorator(None) # Bypass mode
@executor_pdf
def generate_pdf(data):
return create_pdf(data)
# Works without pool - runs synchronously
async def test_generate_pdf():
result = await generate_pdf({"title": "Test"})
assert result is not None
Environment variable bypass:
import os
def executor(...) -> ExecutorDecorator:
if os.environ.get("GENRO_EXECUTOR_BYPASS") == "1":
return ExecutorDecorator(None) # Bypass all pools
# Normal pool creation...
Backpressure & Queue Management
Semaphore-based Throttling
class ExecutorDecorator:
def __init__(self, pool, max_pending: int = 100):
self.pool = pool
self._semaphore = asyncio.Semaphore(max_pending) if pool else None
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
if self.pool is None:
return func(*args, **kwargs)
# Block if too many pending tasks
async with self._semaphore:
loop = asyncio.get_running_loop()
call = partial(func, *args, **kwargs)
return await loop.run_in_executor(self.pool, call)
return wrapper
Fail-Fast on Overload
async def wrapper(*args, **kwargs):
if self._semaphore.locked():
raise ExecutorOverloadError(
f"Executor '{self.name}' has {self.max_pending} pending tasks."
)
# ... proceed
Metrics
class ExecutorDecorator:
def __init__(self, pool, name: str = "default"):
self.pool = pool
self.name = name
self._tasks_submitted = 0
self._tasks_completed = 0
self._tasks_failed = 0
self._total_duration_ms = 0.0
@property
def metrics(self) -> dict:
return {
"name": self.name,
"pending": self._tasks_submitted - self._tasks_completed - self._tasks_failed,
"submitted": self._tasks_submitted,
"completed": self._tasks_completed,
"failed": self._tasks_failed,
"avg_duration_ms": (
self._total_duration_ms / self._tasks_completed
if self._tasks_completed > 0 else 0
),
}
Server-level metrics:
class AsgiServer:
def get_executor_metrics(self) -> list[dict]:
return [ex.metrics for ex in self._executors.values()]
Constraints
Decorated functions must be top-level (not lambdas or methods)
Arguments and return values must be pickle-serializable
Preloaded data in workers is read-only
Workers are persistent - started at pool creation, reused for all tasks
Remote Executors (Future)
executor_remote = server.executor(
"nats",
name="heavy",
subject="tasks.heavy",
url="nats://worker:4222",
)
@executor_remote
def heavy_task(data):
return process(data) # Executed on remote worker
result = await heavy_task(data)
Benefits:
Horizontal scaling across machines
Automatic load balancing (NATS)
Fault tolerance
Same API as local executors
Copyright: Softwell S.r.l. (2025) License: Apache License 2.0