Taming Asyncio: Production Patterns That Prevent Silent Failures
Your asyncio code works perfectly in development, then silently drops requests in production. Tasks vanish without trace, exceptions disappear into the void, and your monitoring shows nothing wrong. This happens because asyncio’s elegant syntax hides sharp edges that only reveal themselves under real-world load.
The promise of asyncio is compelling: write concurrent code that looks almost synchronous, handle thousands of connections with minimal resources, and build responsive applications without the complexity of threads. But that promise comes with hidden traps that can turn your production system into an unreliable mess. The event loop model that makes asyncio so efficient also creates subtle failure modes that traditional debugging techniques simply cannot catch.
After debugging dozens of production asyncio failures, I’ve identified the patterns that separate reliable async systems from fragile ones. These aren’t theoretical concerns—they’re hard-won lessons from systems handling millions of requests where a single missed exception or orphaned task can cascade into a full outage. This guide covers the seven critical areas where asyncio implementations fail and provides battle-tested solutions for each.
The Hidden Cost of Fire-and-Forget Tasks
The most insidious asyncio bug looks completely harmless:
async def handle_request(data): # Process the request result = await process(data)
# Fire and forget - this is dangerous asyncio.create_task(send_notification(result))
return resultThis code creates a task but stores no reference to it. Under light load, everything works. Under heavy load, tasks silently disappear.
Here’s why: Python’s garbage collector can collect a task if no strong reference exists. When that happens, the task gets cancelled without any exception, any log message, or any indication that something went wrong. Your notification never sends, but your monitoring shows zero errors.
The underlying mechanism is straightforward but easy to overlook. When you call asyncio.create_task(), Python creates a Task object and schedules it on the event loop. However, if you don’t store the returned Task object in a variable that stays alive, the only reference to that task is a weak reference held by the event loop itself. Python’s garbage collector treats weak references specially—it can collect objects even when weak references point to them. When the garbage collector runs (which happens unpredictably based on memory pressure), your task simply vanishes.
The symptoms are maddening to debug. You’ll see intermittent failures that only appear under load. Logs show the task was created but never completed. Adding print statements seems to fix the problem (because the print statement keeps the task alive longer by extending the stack frame’s lifetime). Reducing load makes failures disappear because the garbage collector runs less frequently when memory pressure is low.
What makes this particularly treacherous is that your test suite will almost never catch it. Unit tests typically run quickly with minimal concurrent load, so the garbage collector rarely triggers mid-task. Integration tests might occasionally fail, but the failures seem random and unreproducible. Only in production, under sustained load with real memory pressure, does the problem consistently manifest.
The fix requires maintaining strong references to all tasks:
import asynciofrom typing import Set
class TaskRegistry: def __init__(self): self._tasks: Set[asyncio.Task] = set()
def create_task(self, coro, *, name=None): task = asyncio.create_task(coro, name=name) self._tasks.add(task) task.add_done_callback(self._tasks.discard) return task
async def wait_all(self, timeout=None): if self._tasks: await asyncio.wait(self._tasks, timeout=timeout)
def cancel_all(self): for task in self._tasks: task.cancel()
# Module-level registry for background tasksbackground_tasks = TaskRegistry()
async def handle_request(data): result = await process(data)
# Task is now tracked and won't be garbage collected background_tasks.create_task( send_notification(result), name=f"notify-{data['id']}" )
return resultThe add_done_callback with discard ensures completed tasks are removed from the set, preventing memory leaks. The set itself maintains strong references, preventing garbage collection. Named tasks make debugging easier when you need to inspect running tasks.
This pattern also enables graceful shutdown—you can call wait_all() to ensure background work completes before your application exits. Without task tracking, you have no way to know whether pending work exists or how long to wait for it.
Consider also using a context-aware registry that groups tasks by request or session. This allows you to cancel all tasks associated with a specific request if that request is cancelled, preventing orphaned work that continues executing after its results are no longer needed.
Pro Tip: Use
asyncio.all_tasks()during debugging to see all running tasks. If tasks are disappearing, this function will show you which ones remain and help identify where references are being lost. You can also periodically log the task count in production to detect task leaks before they cause problems.
Exception Handling That Actually Works
Exceptions in asyncio tasks don’t propagate the way you expect. Consider this code:
async def fetch_data(): raise ValueError("Connection failed")
async def main(): task = asyncio.create_task(fetch_data()) await asyncio.sleep(1) print("Finished") # This prints even though fetch_data raisedThe exception is stored in the task object, but it’s never raised to your code. Python 3.11+ will print a warning when a task with an unhandled exception is garbage collected, but by then the context is lost and debugging is nearly impossible.
Understanding why this happens requires understanding asyncio’s execution model. When you create a task, you’re scheduling a coroutine to run independently of your current execution flow. The event loop runs the coroutine when it can, but your code continues immediately. If that independent coroutine raises an exception, where should it go? Your code has already moved on—it might be in a completely different function or even awaiting something else entirely. There’s no call stack to propagate up.
This is fundamentally different from synchronous code or even threaded code. In synchronous code, an exception propagates up the call stack until something catches it or the program terminates. In threaded code, an uncaught exception terminates the thread, which is visible (if you’re watching for it). But in asyncio, the exception gets stored in the task object, waiting for someone to retrieve it by awaiting the task or calling task.exception().
There’s an important distinction between task exceptions and coroutine exceptions. Awaiting a coroutine directly propagates exceptions immediately—the exception travels up your call stack just like synchronous code. Awaiting a task also propagates exceptions, but only at the point where you await. But if you create a task and never await it, the exception stays trapped inside. This is the source of countless production bugs: developers create tasks for “fire and forget” operations, those operations fail, and nobody ever knows.
The problem compounds in complex systems. A task might spawn child tasks, each of which might spawn more. If a deeply nested task fails, the exception is trapped there, invisible to all the parent tasks and to your monitoring. Your application continues running, but part of its functionality has silently stopped working.
Build exception handlers that capture context:
import asyncioimport loggingimport tracebackfrom functools import wraps
logger = logging.getLogger(__name__)
def log_exceptions(func): @wraps(func) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except asyncio.CancelledError: raise # Don't log cancellation, it's intentional except Exception as e: logger.exception( f"Unhandled exception in {func.__name__}: {e}", extra={ "function": func.__name__, "args": str(args)[:200], "traceback": traceback.format_exc() } ) raise return wrapper
class SafeTaskRegistry: def __init__(self, on_exception=None): self._tasks: set = set() self._on_exception = on_exception or self._default_handler
def _default_handler(self, task): if not task.cancelled() and task.exception(): logger.error( f"Task {task.get_name()} failed: {task.exception()}", exc_info=task.exception() )
def create_task(self, coro, *, name=None): task = asyncio.create_task(coro, name=name) self._tasks.add(task) task.add_done_callback(self._handle_done) return task
def _handle_done(self, task): self._tasks.discard(task) self._on_exception(task)Notice that we explicitly re-raise CancelledError. This is critical—cancellation is a normal control flow mechanism in asyncio, not an error condition. If you catch and suppress it, you break the cancellation chain and create tasks that refuse to die.
Python 3.11 introduced asyncio.TaskGroup for structured concurrency, which solves many exception handling problems:
async def fetch_all_data(urls): results = []
async with asyncio.TaskGroup() as tg: for url in urls: tg.create_task(fetch_url(url))
# If any task raises, the TaskGroup: # 1. Cancels all other tasks # 2. Waits for cancellation to complete # 3. Raises ExceptionGroup with all exceptions
return resultsTaskGroup guarantees that all tasks complete (or are cancelled) before the context manager exits. Exceptions are collected into an ExceptionGroup, ensuring nothing is lost. This structured approach makes reasoning about async code far simpler.
The structured concurrency model that TaskGroup implements represents a fundamental shift in how we think about concurrent programming. Instead of launching tasks that float freely in the system, you create a scope where tasks exist. When that scope ends, either all tasks have completed successfully, or something went wrong and you know about it. There’s no in-between state where tasks are running but you’ve lost track of them.
Warning: When catching exceptions from a TaskGroup, remember you’re catching
ExceptionGroup, not individual exceptions. Useexcept*syntax for granular handling. This syntax lets you handle specific exception types while letting others propagate, which is essential when different failures require different responses.
Graceful Shutdown Without Data Loss
Most asyncio shutdown implementations are wrong. The typical pattern looks like this:
async def main(): server = await start_server() try: await server.serve_forever() except asyncio.CancelledError: server.close() # Abrupt shutdown, requests droppedWhen SIGTERM arrives, in-flight requests get cancelled mid-execution. Database transactions roll back. HTTP responses never send. Users see cryptic connection reset errors.
The consequences of abrupt shutdown extend beyond user experience. Partial writes to databases can leave data in inconsistent states. Background jobs that were updating caches now leave stale data. Distributed transactions might partially commit on some services but not others. The cleanup required after a dirty shutdown often takes longer than a proper graceful shutdown would have.
Container orchestration systems like Kubernetes send SIGTERM and then wait for a grace period before sending SIGKILL. This is your window for graceful shutdown, typically 30 seconds by default. If your application doesn’t shut down cleanly within that window, it gets forcibly terminated, which is even worse than the abrupt cancellation case.
Proper shutdown requires a multi-phase approach:
import asyncioimport signalfrom contextlib import asynccontextmanager
class GracefulShutdown: def __init__(self, shutdown_timeout=30): self._shutdown_event = asyncio.Event() self._shutdown_timeout = shutdown_timeout self._active_requests = 0 self._active_lock = asyncio.Lock()
@asynccontextmanager async def request_context(self): async with self._active_lock: if self._shutdown_event.is_set(): raise RuntimeError("Server is shutting down") self._active_requests += 1 try: yield finally: async with self._active_lock: self._active_requests -= 1
async def wait_for_shutdown(self): await self._shutdown_event.wait()
async def initiate_shutdown(self): self._shutdown_event.set()
# Wait for active requests to complete deadline = asyncio.get_event_loop().time() + self._shutdown_timeout while self._active_requests > 0: remaining = deadline - asyncio.get_event_loop().time() if remaining <= 0: break await asyncio.sleep(min(0.1, remaining))
return self._active_requests == 0
def setup_signal_handlers(shutdown: GracefulShutdown): loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda: asyncio.create_task(shutdown.initiate_shutdown()) )
async def main(): shutdown = GracefulShutdown(shutdown_timeout=30) setup_signal_handlers(shutdown)
server = await start_server(shutdown)
await shutdown.wait_for_shutdown()
# Phase 1: Stop accepting new connections server.close() await server.wait_closed()
# Phase 2: Wait for in-flight requests (with timeout) success = await shutdown.initiate_shutdown()
if not success: # Phase 3: Force cancel remaining tasks for task in asyncio.all_tasks(): if task is not asyncio.current_task(): task.cancel()
await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()}, return_exceptions=True)The shutdown sequence matters: stop accepting new work, drain existing work, then force cleanup. The timeout prevents hanging shutdowns when tasks refuse to complete.
Each phase serves a specific purpose. Stopping new connections ensures the problem doesn’t grow while you’re trying to shut down. Draining existing work gives in-flight operations a chance to complete normally, preserving data integrity. The forced cancellation is the last resort, ensuring you meet your shutdown deadline even if some tasks are stuck.
The polling loop with small sleeps (100ms intervals) gives you regular opportunities to check whether all work has completed. This is more responsive than a single long wait, and it lets you log progress during shutdown for debugging purposes.
Consider also implementing health check integration with your shutdown process. When shutdown begins, your health check endpoint should immediately start returning unhealthy status. This tells load balancers to stop sending new traffic, giving your drain phase a head start.
Backpressure: The Missing Piece in Most Async Systems
Unbounded queues and unlimited concurrency cause cascading failures. Without backpressure, a slow downstream service causes requests to pile up in memory until your application crashes.
The failure mode is insidious. Your service receives requests faster than it can process them because an external dependency is slow. Without backpressure, you accept every request and queue it internally. Memory usage grows. Eventually, either your application runs out of memory and crashes, or the queued requests time out on the client side, meaning all that queued work was wasted. Meanwhile, the downstream service that was already slow is now overwhelmed by the backlog you’ve accumulated.
Backpressure is the principle that when a system is overloaded, it should signal that fact upstream rather than trying to absorb infinite work. In synchronous systems, this happens naturally—if your threads are all busy, new requests block at the connection level. But asyncio can schedule essentially unlimited work on a single thread, so you have to implement backpressure explicitly.
Implement semaphores for connection pooling and rate limiting:
import asynciofrom contextlib import asynccontextmanager
class ConnectionPool: def __init__(self, max_connections=10, timeout=30): self._semaphore = asyncio.Semaphore(max_connections) self._timeout = timeout
@asynccontextmanager async def acquire(self): try: await asyncio.wait_for( self._semaphore.acquire(), timeout=self._timeout ) except asyncio.TimeoutError: raise RuntimeError("Connection pool exhausted")
try: yield finally: self._semaphore.release()
# Usagepool = ConnectionPool(max_connections=20)
async def fetch_with_limit(url): async with pool.acquire(): return await http_client.get(url)The semaphore ensures you never have more than max_connections concurrent operations. The timeout prevents indefinite waiting—if the pool is exhausted for too long, you fail fast and inform the caller rather than queueing indefinitely.
Choosing the right concurrency limit requires understanding your downstream dependencies. If you’re calling a database that can handle 100 concurrent connections, setting your semaphore to 1000 provides no backpressure—requests will queue inside the database. Set your semaphore below the downstream capacity to ensure you’re the limiting factor, not an overwhelmed dependency.
For producer-consumer patterns, use bounded queues:
import asyncio
class BackpressureQueue: def __init__(self, maxsize=100, high_water=80, low_water=20): self._queue = asyncio.Queue(maxsize=maxsize) self._high_water = high_water self._low_water = low_water self._backpressure_event = asyncio.Event() self._backpressure_event.set() # Not under pressure initially
async def put(self, item, timeout=None): if self._queue.qsize() >= self._high_water: self._backpressure_event.clear()
await asyncio.wait_for( self._queue.put(item), timeout=timeout )
async def get(self): item = await self._queue.get()
if self._queue.qsize() <= self._low_water: self._backpressure_event.set()
return item
async def wait_for_capacity(self, timeout=None): await asyncio.wait_for( self._backpressure_event.wait(), timeout=timeout )
@property def is_under_pressure(self): return not self._backpressure_event.is_set()High and low water marks prevent oscillation. When the queue fills to the high water mark, producers slow down. They resume only when the queue drains to the low water mark. This hysteresis prevents the rapid back-and-forth that would occur if producers restarted immediately when a single slot became available.
The water mark pattern appears throughout systems engineering—TCP uses it for flow control, message brokers use it for consumer throttling, and your application should use it for internal backpressure. The specific values depend on your workload characteristics, but a common starting point is 80% for high water and 20% for low water.
When designing systems with backpressure, consider how the pressure should propagate. If your queue is full, should you reject new work with an error (fail-fast), block until capacity is available (blocking backpressure), or drop the oldest items (shedding)? Each approach is appropriate for different scenarios—fail-fast works well for user-facing requests, blocking backpressure suits background processing, and shedding fits real-time systems where stale data has no value.
Note: Adding backpressure after launch is painful because it requires changing how producers behave when the system is overloaded. Design for backpressure from day one. It’s much easier to relax limits that are too tight than to retrofit limits into a system that assumes unlimited capacity.
Timeouts That Protect Without Breaking
Naive timeouts leave resources in inconsistent states. Consider this problematic pattern:
async def transfer_funds(from_account, to_account, amount): try: async with asyncio.timeout(5): await debit(from_account, amount) await credit(to_account, amount) # What if timeout hits here? except TimeoutError: # Money was debited but not credited - inconsistent state! passThe timeout can fire at any await point within its scope. If it fires between the debit and credit operations, you’ve taken money from one account without adding it to another. The customer’s money has vanished from their perspective, even though it’s probably sitting in some internal ledger waiting for reconciliation.
This problem extends to any operation with multiple steps: database transactions that involve multiple queries, API calls that require setup and teardown, or file operations that open handles before writing. A timeout can interrupt any of these mid-operation, leaving resources in undefined states.
Python 3.11+ provides three timeout mechanisms with different behaviors:
import asyncio
# asyncio.timeout - cancels the task, raises TimeoutErrorasync def with_timeout_cancel(): async with asyncio.timeout(5): await long_operation()
# asyncio.wait_for - similar but functional styleasync def with_wait_for(): await asyncio.wait_for(long_operation(), timeout=5)
# asyncio.shield - protects from cancellation but not timeoutasync def with_shield(): try: async with asyncio.timeout(5): await asyncio.shield(critical_operation()) except TimeoutError: # critical_operation continues running even after timeout! passThe asyncio.timeout context manager is the preferred approach for new code. It clearly scopes where the timeout applies and integrates cleanly with Python’s context management protocol. The older wait_for function achieves the same result but doesn’t compose as well with other context managers.
The asyncio.shield function is particularly important for cleanup operations. When a timeout fires, Python cancels the awaited coroutine by injecting a CancelledError. But sometimes you need an operation to complete regardless—rolling back a transaction, releasing a lock, or acknowledging a message. Shield prevents the cancellation from reaching the protected coroutine.
However, shield doesn’t prevent the timeout from firing. Your outer code still sees the TimeoutError, and your cleanup code still runs in the background. You need to be careful here—if your cleanup code might take a long time, you might need to track it to ensure it completes before shutdown.
Implement deadline propagation across nested calls:
import asynciofrom contextvars import ContextVarfrom typing import Optional
deadline_var: ContextVar[Optional[float]] = ContextVar('deadline', default=None)
def get_remaining_time() -> Optional[float]: deadline = deadline_var.get() if deadline is None: return None remaining = deadline - asyncio.get_event_loop().time() return max(0, remaining)
async def with_deadline(timeout: float): loop = asyncio.get_event_loop() current_deadline = deadline_var.get() new_deadline = loop.time() + timeout
if current_deadline is not None: new_deadline = min(new_deadline, current_deadline)
token = deadline_var.set(new_deadline) try: remaining = get_remaining_time() async with asyncio.timeout(remaining): yield finally: deadline_var.reset(token)
async def nested_operation(): remaining = get_remaining_time() if remaining is not None and remaining < 1: raise TimeoutError("Not enough time for operation")
async with asyncio.timeout(remaining): await do_work()Deadline propagation solves a subtle problem: when you have a 10-second timeout at the top level and nested operations each with their own 5-second timeouts, the nested timeouts don’t respect the top-level deadline. If each nested operation takes 4 seconds, you’ve blown past your 10-second budget by the third call, but no individual timeout fired.
With deadline propagation, each layer of your call stack can check how much time remains and set its timeout accordingly. The context variable carries the deadline through all nested calls, even across await points where the event loop might run other tasks in between.
Combine timeouts with proper cleanup:
async def safe_transfer(from_account, to_account, amount): transaction_id = await begin_transaction()
try: async with asyncio.timeout(10): await debit(from_account, amount, transaction_id) await credit(to_account, amount, transaction_id) await commit_transaction(transaction_id) except TimeoutError: # Always cleanup, even on timeout await asyncio.shield(rollback_transaction(transaction_id)) raise except Exception: await asyncio.shield(rollback_transaction(transaction_id)) raiseThe asyncio.shield around rollback ensures cleanup completes even if the parent task is cancelled. This pattern of “try operation, cleanup on failure” appears throughout production async code. The rollback operation is non-cancellable because leaving a transaction open would be worse than blocking slightly longer on shutdown.
Debugging Async Code in Production
Debugging asyncio applications presents unique challenges. The call stack at any moment doesn’t tell you how you got there—the event loop ran some callbacks, awaited some coroutines, and eventually reached your current code. Traditional debuggers struggle because breakpoints pause the entire event loop, affecting all concurrent operations.
Enable asyncio debug mode to catch slow callbacks and unawaited coroutines:
import asyncioimport os
# Enable via environment variableos.environ['PYTHONASYNCIODEBUG'] = '1'
# Or programmaticallyasyncio.get_event_loop().set_debug(True)
# Customize slow callback threshold (default 0.1 seconds)asyncio.get_event_loop().slow_callback_duration = 0.05Debug mode reveals several classes of problems. Blocking calls that freeze the event loop generate warnings—if you accidentally call a synchronous function that takes 200ms, debug mode will tell you. Coroutines that were created but never awaited (a common mistake when forgetting the await keyword) generate warnings at garbage collection time with the source line that created them. Tasks that take too long to complete are flagged with their creation site.
The performance overhead of debug mode is modest but noticeable, typically 10-20% slower than normal operation. This makes it suitable for staging environments and sometimes production systems that aren’t CPU-bound. The insights you gain often justify the overhead.
Use contextvars for request tracing across await boundaries:
import asyncioimport uuidimport loggingfrom contextvars import ContextVar
request_id_var: ContextVar[str] = ContextVar('request_id', default='unknown')
class RequestContextFilter(logging.Filter): def filter(self, record): record.request_id = request_id_var.get() return True
logger = logging.getLogger(__name__)logger.addFilter(RequestContextFilter())
async def handle_request(request): request_id = str(uuid.uuid4())[:8] request_id_var.set(request_id)
logger.info("Request started")
try: result = await process_request(request) logger.info("Request completed") return result except Exception: logger.exception("Request failed") raiseContext variables are specifically designed for asyncio. Unlike thread-local storage, context variables are inherited by child tasks and preserved across await points. This means you can set a request ID at the start of handling a request, and every log message, metric, and error report from that request will include the ID, even when the event loop interleaves execution with other requests.
This request tracing capability is essential for production debugging. When a user reports an error, you can search your logs for their request ID and see every step of processing, in order, despite the concurrent execution. Without it, logs from concurrent requests are interleaved and nearly impossible to follow.
Instrument coroutines without changing business logic:
import asyncioimport timefrom functools import wraps
def trace_async(func): @wraps(func) async def wrapper(*args, **kwargs): start = time.perf_counter() request_id = request_id_var.get()
try: result = await func(*args, **kwargs) duration = time.perf_counter() - start logger.debug( f"{func.__name__} completed", extra={"duration_ms": duration * 1000} ) return result except Exception as e: duration = time.perf_counter() - start logger.warning( f"{func.__name__} failed: {e}", extra={"duration_ms": duration * 1000} ) raise
return wrapperThis decorator pattern lets you add timing and logging to any async function without modifying its implementation. Apply it to functions you suspect of being slow or unreliable, then remove it once you’ve diagnosed the issue. The @wraps decorator preserves the original function’s name and docstring, so debugging tools still show meaningful information.
For production systems, consider integrating with distributed tracing systems like OpenTelemetry. These systems provide visualization of request flows across services, helping you identify which service or function is the bottleneck in a complex system. The context variable pattern shown above integrates naturally with these tools.
Pro Tip: Enable
PYTHONASYNCIODEBUGin staging environments to catch issues before production. The performance overhead is minimal for non-production workloads. Consider also implementing periodic task dumps that log all running tasks, their creation times, and their current state—this information is invaluable when debugging stuck systems.
Putting It Together: A Production-Ready Async Service Template
Here’s a complete service structure combining all the patterns:
import asyncioimport signalimport loggingfrom contextlib import asynccontextmanagerfrom contextvars import ContextVar
logger = logging.getLogger(__name__)request_id_var: ContextVar[str] = ContextVar('request_id')
class ProductionService: def __init__(self, config): self.config = config self._shutdown_event = asyncio.Event() self._tasks: set = set() self._semaphore = asyncio.Semaphore(config.max_concurrent_requests) self._request_queue = asyncio.Queue(maxsize=config.queue_size) self._healthy = True
def create_task(self, coro, *, name=None): task = asyncio.create_task(coro, name=name) self._tasks.add(task) task.add_done_callback(self._on_task_done) return task
def _on_task_done(self, task): self._tasks.discard(task) if not task.cancelled() and task.exception(): logger.error(f"Task {task.get_name()} failed", exc_info=task.exception())
@asynccontextmanager async def request_context(self): if self._shutdown_event.is_set(): raise RuntimeError("Service shutting down")
try: await asyncio.wait_for( self._semaphore.acquire(), timeout=self.config.acquire_timeout ) except asyncio.TimeoutError: self._healthy = False raise RuntimeError("Service overloaded")
try: yield finally: self._semaphore.release()
async def health_check(self): return { "healthy": self._healthy and not self._shutdown_event.is_set(), "active_tasks": len(self._tasks), "queue_size": self._request_queue.qsize(), "semaphore_available": self._semaphore._value }
async def shutdown(self, timeout=30): logger.info("Initiating graceful shutdown") self._shutdown_event.set()
# Wait for tasks with timeout if self._tasks: done, pending = await asyncio.wait( self._tasks, timeout=timeout )
for task in pending: task.cancel()
if pending: await asyncio.gather(*pending, return_exceptions=True)
logger.info("Shutdown complete")
async def run(self): loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, lambda: self.create_task(self.shutdown(), name="shutdown") )
# Start workers workers = [ self.create_task(self._worker(i), name=f"worker-{i}") for i in range(self.config.num_workers) ]
await self._shutdown_event.wait()
async def _worker(self, worker_id): while not self._shutdown_event.is_set(): try: async with asyncio.timeout(1): request = await self._request_queue.get()
async with self.request_context(): await self._process_request(request)
except asyncio.TimeoutError: continue except asyncio.CancelledError: break except Exception: logger.exception("Worker error")The service includes task tracking with exception logging, graceful shutdown with configurable timeout, semaphore-based concurrency limiting, bounded request queue for backpressure, health checks reflecting actual async state, and signal handlers for container orchestration.
Each component serves a specific purpose in building reliability. The task registry prevents orphaned tasks and enables monitoring of background work. The shutdown event coordinates the graceful termination sequence. The semaphore prevents overload from cascading into failure. The bounded queue enforces backpressure when work arrives faster than it can be processed.
The health check method deserves special attention. Many health checks simply return “OK” if the process is running, but this tells you nothing about whether the service can actually handle requests. The health check here reports actual capacity—how many requests are in flight, how much queue space remains, whether the service is under backpressure. This information lets load balancers make intelligent decisions about traffic routing.
Configure concurrency limits based on your downstream dependencies:
from dataclasses import dataclass
@dataclassclass ServiceConfig: max_concurrent_requests: int = 100 queue_size: int = 1000 num_workers: int = 10 acquire_timeout: float = 5.0 shutdown_timeout: float = 30.0These defaults are starting points, not final answers. Tune them based on your specific workload and dependencies. If your downstream database can handle 50 concurrent connections, setting max_concurrent_requests to 100 creates contention at the database level rather than at your service level. If your requests typically take 100ms, a 5-second acquire timeout might be too generous—you might prefer to fail fast and return an error to the client rather than queue requests indefinitely.
Monitor these metrics in production: queue depth over time, semaphore wait times, task completion rates, and shutdown duration. Increasing queue depths indicate you’re falling behind on processing. Long semaphore waits suggest your concurrency limit is too low (or your downstream services are too slow). Failed task rates show reliability problems. Long shutdown durations indicate tasks that don’t respect cancellation.
Key Takeaways
- Always store task references in a set and use
add_done_callbackto remove them—never fire and forget, as the garbage collector will silently cancel orphaned tasks - Wrap task creation in try/except and use TaskGroup when possible to prevent silent exception loss—exceptions in unawaited tasks are trapped and invisible to your monitoring
- Implement bounded queues and semaphores from day one—adding backpressure after launch is painful because it requires changing how producers behave under load
- Use
asyncio.timeout(Python 3.11+) or async-timeout library with explicit cleanup handlers, and consider deadline propagation for nested operations - Enable
PYTHONASYNCIODEBUGin staging environments to catch slow callbacks and unawaited coroutines before they become production incidents - Design graceful shutdown from the start with clear phases: stop accepting work, drain existing work, then force cleanup with timeouts
- Use contextvars for request tracing to maintain debugging context across await points where traditional stack traces fail
- Build health checks that reflect actual async state—queue depth, semaphore availability, active task counts—not just process liveness