Taming Callback Hell: Practical asyncio Patterns for Production Python
Your asyncio code works perfectly in development, then mysteriously hangs in production with no stack trace. You’ve scattered print statements everywhere, but the event loop swallows errors whole. After hours of debugging, you discover a forgotten await buried three layers deep—a problem that proper patterns would have caught immediately.
The gap between “I understand async/await” and “I can debug asyncio in production at 3 AM” is wider than most Python developers expect. Asyncio’s concurrency model introduces failure modes that don’t exist in synchronous code: silent task failures, orphaned coroutines, resource leaks from improper cancellation, and race conditions that only manifest under load.
Understanding asyncio requires a mental model shift. In synchronous Python, code executes linearly—one thing happens after another, and exceptions bubble up naturally through the call stack. Asyncio introduces cooperative multitasking, where multiple tasks share a single thread by voluntarily yielding control at await points. This cooperative model delivers excellent performance for I/O-bound workloads, but it fundamentally changes how errors propagate, how resources get cleaned up, and how you reason about concurrent operations.
This guide covers the patterns that separate production-ready asyncio from code that works on your laptop. You’ll learn how to surface silent failures, structure concurrent operations for guaranteed cleanup, handle timeouts and cancellation properly, write reliable async tests, and gain visibility into what your event loop is actually doing. Each section addresses a specific category of production problems with concrete solutions you can implement immediately.
Why asyncio Fails Silently (And How to Stop It)
When a synchronous function raises an exception, Python’s default behavior ensures you see it—either through an unhandled exception traceback or your logging configuration. Asyncio breaks this expectation in subtle ways that cost hours of debugging time. Understanding why this happens requires examining how the event loop manages task lifecycles.
The event loop maintains a collection of tasks, each representing an in-progress coroutine. When you call asyncio.create_task(), the loop schedules that coroutine for execution but doesn’t wait for it to complete. This is the “fire-and-forget” pattern—useful for background work, but dangerous when exceptions occur. The event loop handles exceptions in fire-and-forget tasks differently than you’d expect because there’s no caller waiting to receive the exception.
When you create a task with asyncio.create_task() and never await it, any exception raised inside that task gets attached to the task object. If nothing retrieves that exception—by awaiting the task or calling task.exception()—the exception vanishes until the task is garbage collected. At that point, Python logs a warning, but by then you’ve lost the context of what went wrong. The warning appears far from where the error occurred, often after your application has moved on to other work, making correlation nearly impossible.
import asyncio
async def background_task(): await asyncio.sleep(1) raise ValueError("This error disappears into the void")
async def main(): # Fire and forget - exception won't surface until GC asyncio.create_task(background_task())
# Main continues, unaware of the failure await asyncio.sleep(5) print("Main completed - did you notice the error?")
asyncio.run(main())The exception from background_task() exists—it’s stored on the task object—but nothing retrieves it. Python eventually logs “Task exception was never retrieved” when the task gets garbage collected, but this message appears without the context of what your application was doing when the error occurred. In production, with high throughput and rapid object creation, garbage collection timing becomes unpredictable, and these warnings can appear minutes after the actual failure.
The fix starts with setting a global exception handler on the event loop. This handler catches exceptions from tasks that were never awaited, giving you immediate visibility into failures. The handler receives a context dictionary containing the exception, the message, and other metadata about what went wrong.
import asyncioimport loggingimport tracebackimport sys
logging.basicConfig(level=logging.ERROR)logger = logging.getLogger(__name__)
def handle_exception(loop: asyncio.AbstractEventLoop, context: dict) -> None: """Global handler for unhandled exceptions in asyncio tasks.
This handler catches: - Exceptions from tasks that were never awaited - Exceptions from callbacks scheduled with call_soon/call_later - Exceptions from signal handlers """ exception = context.get("exception") message = context.get("message", "Unhandled exception in event loop")
# Extract task information if available task = context.get("task") task_name = task.get_name() if task else "unknown"
if exception: # Log with full traceback for debugging tb_str = ''.join(traceback.format_exception( type(exception), exception, exception.__traceback__ )) logger.error( f"Task '{task_name}' failed: {message}\n" f"Exception: {exception}\n" f"Traceback:\n{tb_str}" )
# In production, you'd also want to: # - Send to error tracking (Sentry, Rollbar, etc.) # - Increment error metrics # - Potentially trigger alerts for critical failures else: logger.error(f"Event loop error in task '{task_name}': {message}")
async def main(): loop = asyncio.get_running_loop() loop.set_exception_handler(handle_exception)
# Now fire-and-forget tasks will log errors immediately asyncio.create_task(some_background_work()) await asyncio.sleep(10)
asyncio.run(main())⚠️ Warning: The exception handler is a safety net, not a replacement for proper task management. Use it to catch mistakes, not as your primary error handling strategy. A well-designed application should rarely trigger the global exception handler.
Beyond the global handler, enable asyncio’s debug mode during development. Set the environment variable PYTHONASYNCIODEBUG=1 or call asyncio.run(main(), debug=True). Debug mode provides several benefits that dramatically improve your ability to catch problems early:
- Unawaited coroutine detection: When you create a coroutine but forget to await it, debug mode logs a warning with a traceback showing where the coroutine was created
- Slow callback warnings: Operations taking longer than 100ms (configurable via
loop.slow_callback_duration) generate warnings, helping identify blocking code - Enhanced tracebacks: Exception tracebacks include more context about where tasks were created, not just where they failed
- Resource warning detection: Debug mode surfaces warnings about unclosed resources like file handles and network connections
import asyncioimport os
# Enable via environment variable (useful for all processes)os.environ['PYTHONASYNCIODEBUG'] = '1'
# Or enable programmatically for specific runsasync def main(): # Your application code pass
asyncio.run(main(), debug=True)The combination of a global exception handler and debug mode transforms asyncio from a debugging nightmare into a system that actively surfaces problems before they reach production. Set up the exception handler in your application’s entry point, enable debug mode in development and staging environments, and you’ll catch the majority of async-related bugs before they affect users.
Structured Concurrency: TaskGroups Over Scattered Tasks
Scattered create_task() calls throughout your codebase create maintenance nightmares. You lose track of which tasks are running, exceptions in one task don’t propagate to related tasks, and cleanup becomes manual and error-prone. Every forgotten task reference is a potential resource leak—database connections held open, HTTP sessions not closed, memory accumulating until someone notices the service degrading.
Structured concurrency solves this by tying task lifetimes to lexical scopes. The principle is simple: tasks created within a scope cannot outlive that scope. When the scope exits—whether normally, through an exception, or via cancellation—all tasks within it are guaranteed to be completed or cancelled. This guarantee eliminates an entire category of resource management bugs.
Python 3.11 introduced asyncio.TaskGroup, which implements structured concurrency as a first-class language feature. A TaskGroup guarantees that all tasks created within the group complete (or fail) before the group exits. This eliminates orphaned tasks and ensures proper exception propagation.
import asynciofrom typing import List
async def fetch_user(user_id: int) -> dict: """Simulate fetching user data from an API.""" await asyncio.sleep(0.1) # Simulated network latency if user_id == 3: raise ValueError(f"User {user_id} not found") return {"id": user_id, "name": f"User_{user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]: """Fetch multiple users concurrently with automatic cleanup.
If any fetch fails, all other fetches are cancelled and resources are cleaned up before the exception propagates. """ results = []
async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
# Only reached if ALL tasks succeed # If any task raises, the exception propagates here return [task.result() for task in tasks]
async def main(): try: users = await fetch_all_users([1, 2, 3, 4, 5]) print(f"Fetched {len(users)} users") except ExceptionGroup as eg: # TaskGroup wraps exceptions in ExceptionGroup # This allows handling multiple simultaneous failures for exc in eg.exceptions: print(f"Task failed: {exc}")
asyncio.run(main())When any task in a TaskGroup raises an exception, the group initiates a shutdown sequence. First, it cancels all other tasks in the group. Then, it waits for those cancelled tasks to finish their cleanup (handling their own CancelledError). Finally, it collects all exceptions—both the original failures and any exceptions raised during cancellation—into an ExceptionGroup. This behavior ensures resources are properly released even in failure scenarios.
The ExceptionGroup (introduced in Python 3.11 alongside TaskGroup) represents multiple exceptions that occurred concurrently. You can handle them with the except* syntax, which matches against exception types within the group:
import asyncio
async def might_fail(task_id: int) -> str: if task_id % 2 == 0: raise ValueError(f"Even task {task_id} failed") if task_id % 3 == 0: raise RuntimeError(f"Task {task_id} had a runtime error") return f"Task {task_id} succeeded"
async def main(): try: async with asyncio.TaskGroup() as tg: for i in range(6): tg.create_task(might_fail(i)) except* ValueError as eg: print(f"Caught {len(eg.exceptions)} ValueErrors:") for exc in eg.exceptions: print(f" - {exc}") except* RuntimeError as eg: print(f"Caught {len(eg.exceptions)} RuntimeErrors:") for exc in eg.exceptions: print(f" - {exc}")
asyncio.run(main())For Python 3.10 and earlier, you can implement similar patterns using asyncio.gather() with return_exceptions=False (the default), but you lose automatic cancellation of sibling tasks. This custom context manager provides structured concurrency semantics for older Python versions:
import asynciofrom contextlib import asynccontextmanagerfrom typing import Set, Callable, Coroutine, Any
@asynccontextmanagerasync def managed_tasks(): """Context manager for structured concurrency in Python <3.11.
All tasks created through the yielded function are guaranteed to be cancelled when the context exits, whether normally or due to an exception. """ tasks: Set[asyncio.Task] = set()
def create_task(coro: Coroutine[Any, Any, Any]) -> asyncio.Task: task = asyncio.create_task(coro) tasks.add(task) task.add_done_callback(tasks.discard) return task
try: yield create_task finally: # Cancel all pending tasks on exit for task in tasks: if not task.done(): task.cancel()
# Wait for cancellations to complete # return_exceptions=True prevents CancelledError from propagating if tasks: await asyncio.gather(*tasks, return_exceptions=True)
async def worker(name: str, duration: float) -> str: """Example worker that simulates some async work.""" await asyncio.sleep(duration) return f"{name} completed after {duration}s"
async def main(): async with managed_tasks() as create: t1 = create(worker("A", 1.0)) t2 = create(worker("B", 2.0)) t3 = create(worker("C", 0.5))
# Wait for first to complete, others auto-cancel on exit done, pending = await asyncio.wait( [t1, t2, t3], return_when=asyncio.FIRST_COMPLETED ) print(f"First completed: {done.pop().result()}") # All remaining tasks cancelled and awaited here
asyncio.run(main())💡 Pro Tip: Adopt TaskGroup as your default for any operation spawning multiple tasks. The explicit scope makes code easier to reason about and eliminates an entire category of resource leak bugs. Even if you think you need fire-and-forget semantics, consider whether a TaskGroup with a longer-lived scope might be more appropriate.
Timeouts and Cancellation: Doing It Right
Timeouts in asyncio require understanding the difference between asyncio.wait_for() and the newer asyncio.timeout() context manager introduced in Python 3.11. Both cancel the wrapped operation when time expires, but they differ in ergonomics, composability, and how they handle edge cases.
The fundamental challenge with async timeouts is that you’re not just stopping execution—you’re initiating a cancellation sequence. The cancelled operation must have an opportunity to clean up resources, release locks, and ensure data consistency. A timeout that doesn’t allow for cleanup is a resource leak waiting to happen.
The wait_for() function wraps a single awaitable and raises TimeoutError if it doesn’t complete in time. It’s been available since Python 3.4 and remains useful for simple cases:
import asyncio
async def slow_operation() -> str: """An operation that takes longer than we want to wait.""" await asyncio.sleep(10) return "completed"
async def main(): try: # wait_for cancels the operation and raises TimeoutError result = await asyncio.wait_for(slow_operation(), timeout=2.0) print(f"Got result: {result}") except TimeoutError: print("Operation timed out after 2 seconds") # The slow_operation coroutine received CancelledError # and had a chance to clean up
asyncio.run(main())The asyncio.timeout() context manager provides cleaner handling for multiple operations and better composability. Instead of wrapping a single awaitable, it establishes a deadline for an entire block of code. Any operation within the block shares the same timeout budget:
import asyncio
async def fetch_data(url: str) -> dict: """Simulate fetching data from an external service.""" await asyncio.sleep(1) return {"url": url, "data": "response payload"}
async def validate_data(data: dict) -> bool: """Simulate validating the fetched data.""" await asyncio.sleep(0.3) return True
async def process_data(data: dict) -> dict: """Simulate processing the validated data.""" await asyncio.sleep(0.5) return {"processed": True, **data}
async def main(): try: async with asyncio.timeout(2.0): # All three operations share the same 2-second budget # If fetch takes 1.5s, process only has 0.5s remaining data = await fetch_data("https://api.example.com") is_valid = await validate_data(data) if is_valid: result = await process_data(data) print(f"Result: {result}") except TimeoutError: print("Combined operations exceeded 2 second budget")
asyncio.run(main())You can also use asyncio.timeout_at() for absolute deadlines, which is useful when you need to coordinate timeouts across multiple components that each track time independently:
import asyncio
async def coordinated_operations(): """Multiple operations that must complete by the same deadline.""" loop = asyncio.get_running_loop() deadline = loop.time() + 5.0 # 5 seconds from now
async with asyncio.timeout_at(deadline): result1 = await operation_one()
# Same deadline for the second operation async with asyncio.timeout_at(deadline): result2 = await operation_two()
return result1, result2Cancellation handling requires special attention. When a task is cancelled, Python raises CancelledError at the current await point. The critical rule: never swallow CancelledError. If you catch it and don’t re-raise, you break the cancellation mechanism and create zombie tasks that continue running when they should have stopped.
import asyncio
async def acquire_resource(): """Simulate acquiring a resource like a database connection.""" return type('Resource', (), {'close': lambda self: asyncio.sleep(0)})()
async def process_next_item(resource): """Simulate processing work.""" await asyncio.sleep(0.1)
async def cleanup_on_cancel(): """Demonstrates proper cancellation handling with cleanup.
The key insight: you can perform cleanup operations during cancellation, but you MUST re-raise CancelledError when done. """ resource = await acquire_resource() try: while True: await process_next_item(resource) except asyncio.CancelledError: # Perform cleanup, then re-raise # This gives you a chance to release resources await resource.close() print("Resource closed during cancellation") raise # CRITICAL: Always re-raise CancelledError
async def bad_cancellation_handling(): """DON'T DO THIS - swallowing CancelledError breaks cancellation.""" try: await some_operation() except asyncio.CancelledError: print("Cancelled, but I'm going to keep running anyway") # Missing raise! This task is now a zombieFor graceful shutdown scenarios—handling SIGTERM in containers, SIGINT from Ctrl+C—you need to coordinate cancellation across multiple tasks:
import asyncioimport signal
async def long_running_worker(): """A worker that processes items indefinitely.""" while True: await asyncio.sleep(1) print("Processing...")
async def graceful_shutdown(): """Handle SIGTERM/SIGINT for clean shutdown.""" loop = asyncio.get_running_loop() shutdown_event = asyncio.Event()
def signal_handler(): print("Shutdown signal received") shutdown_event.set()
# Register signal handlers for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler)
# Start background tasks worker_task = asyncio.create_task(long_running_worker())
# Wait for shutdown signal await shutdown_event.wait()
# Graceful cancellation with timeout worker_task.cancel() try: # Give the task time to clean up await asyncio.wait_for(worker_task, timeout=5.0) except asyncio.CancelledError: pass # Expected after cancellation except TimeoutError: print("Worker didn't shut down cleanly within timeout")
print("Shutdown complete")For operations that must complete regardless of cancellation—database commits, message acknowledgments, audit log writes—use asyncio.shield(). Shield creates a barrier that prevents cancellation from propagating into the shielded coroutine:
import asyncio
async def critical_write(data: dict) -> None: """This write must complete even if the parent is cancelled.
Examples: database commits, message queue acknowledgments, audit log entries, billing records. """ await database.commit(data) await message_queue.ack()
async def process_message(msg: dict) -> None: """Process a message with a critical final step.""" result = await compute_result(msg)
# Shield the critical write from cancellation # Even if process_message is cancelled, the write continues await asyncio.shield(critical_write(result))📝 Note:
shield()doesn’t prevent cancellation—it just prevents the cancellation from propagating into the shielded coroutine. The outer task still receivesCancelledErrorafter the shielded operation completes. If the shielded operation itself is slow, consider adding a timeout to avoid indefinite hangs during shutdown.
Testing Async Code Without the Pain
Testing asyncio code introduces challenges that don’t exist with synchronous tests: fixture scoping across async boundaries, event loop lifecycle management, mocking functions that return awaitables, and time-dependent tests that become flaky in CI. The pytest-asyncio plugin handles most of these concerns, but getting the configuration right matters enormously for test reliability.
The most common mistake is using an event loop that persists across tests. Shared state in the event loop—pending callbacks, signal handlers, exception handlers—can cause tests to interfere with each other, creating failures that only appear when running the full test suite. Each test should get a fresh event loop with no residual state.
Start with proper pytest configuration in pyproject.toml:
[tool.pytest.ini_options]asyncio_mode = "auto"asyncio_default_fixture_loop_scope = "function"filterwarnings = [ "error::DeprecationWarning", "error::PendingDeprecationWarning",]The asyncio_mode = "auto" setting automatically wraps async test functions without requiring the @pytest.mark.asyncio decorator on every test. This reduces boilerplate and ensures you don’t accidentally forget the decorator (which results in the test appearing to pass instantly without actually running). The asyncio_default_fixture_loop_scope = "function" setting ensures each test gets a fresh event loop.
Async fixtures work naturally with pytest-asyncio and can perform setup and teardown operations that involve awaiting:
import asyncioimport pytestfrom unittest.mock import AsyncMock, patch, MagicMock
from myapp.service import UserService
@pytest.fixtureasync def user_service(): """Async fixture that handles connection lifecycle.
The service connects before the test runs and disconnects after, even if the test fails. """ service = UserService() await service.connect() yield service await service.disconnect()
@pytest.fixtureasync def mock_database(): """Fixture providing a mock database connection.""" mock_db = AsyncMock() mock_db.query = AsyncMock(return_value=[{"id": 1, "name": "Test User"}]) mock_db.execute = AsyncMock(return_value={"rows_affected": 1}) return mock_db
async def test_fetch_user_success(user_service): """Test basic user fetch with real service fixture.""" user = await user_service.fetch_user(123) assert user["id"] == 123 assert "name" in user
async def test_fetch_user_timeout(): """Test timeout handling with mocked slow response.""" service = UserService()
async def slow_fetch(*args): await asyncio.sleep(10) # Longer than any reasonable timeout return {"id": 1}
with patch.object(service, "_make_request", side_effect=slow_fetch): with pytest.raises(TimeoutError): await asyncio.wait_for(service.fetch_user(1), timeout=0.1)Mocking async context managers requires the AsyncMock class with proper configuration for __aenter__ and __aexit__ methods. The mock must return itself from __aenter__ to enable the as binding:
import pytestfrom unittest.mock import AsyncMock, MagicMock
async def test_database_transaction(): """Test code that uses async context managers.""" mock_conn = AsyncMock() # __aenter__ returns the connection object for 'as conn' binding mock_conn.__aenter__.return_value = mock_conn # __aexit__ returns None (no exception suppression) mock_conn.__aexit__.return_value = None mock_conn.execute = AsyncMock(return_value={"rows_affected": 1}) mock_conn.commit = AsyncMock() mock_conn.rollback = AsyncMock()
# Use the mock in your test async with mock_conn as conn: result = await conn.execute("UPDATE users SET active = true") assert result["rows_affected"] == 1 await conn.commit()
# Verify the context manager was used correctly mock_conn.__aenter__.assert_called_once() mock_conn.__aexit__.assert_called_once() mock_conn.commit.assert_called_once()
async def test_context_manager_error_handling(): """Test that errors in context managers are handled.""" mock_conn = AsyncMock() mock_conn.__aenter__.return_value = mock_conn mock_conn.__aexit__.return_value = None mock_conn.execute = AsyncMock(side_effect=RuntimeError("DB error")) mock_conn.rollback = AsyncMock()
with pytest.raises(RuntimeError, match="DB error"): async with mock_conn as conn: await conn.execute("INVALID SQL")For time-dependent tests, avoid asyncio.sleep() in production code where possible—inject a sleep function that tests can mock. When you must test timing behavior directly, use short timeouts and structure tests to verify behavior rather than exact timing:
import asyncioimport pytest
async def test_rate_limiter_rejects_excess(): """Test rate limiting without actual delays.
Instead of verifying exact timing, verify that the rate limiter blocks when it should and allows when it should. """ from myapp.rate_limiter import RateLimiter
limiter = RateLimiter(max_requests=2, window_seconds=1.0)
# First two requests succeed immediately assert await limiter.acquire() is True assert await limiter.acquire() is True
# Third request would wait - verify with short timeout with pytest.raises(TimeoutError): await asyncio.wait_for(limiter.acquire(), timeout=0.01)
async def test_retry_mechanism(): """Test retry logic with controlled failures.""" attempt_count = 0
async def flaky_operation(): nonlocal attempt_count attempt_count += 1 if attempt_count < 3: raise ConnectionError("Temporary failure") return "success"
from myapp.retry import with_retry result = await with_retry(flaky_operation, max_attempts=5)
assert result == "success" assert attempt_count == 3 # Succeeded on third attemptCommon testing antipatterns that cause flaky tests:
- Shared event loop state: Use function-scoped loops unless you have a specific reason not to. Session-scoped loops accumulate state that can cause interference between tests.
- Real network calls: Always mock external services. Real network calls introduce latency variability and external dependencies that make tests unreliable.
- Time.sleep in async tests: Use
asyncio.sleep()for async code, and prefer mocking time entirely for tests that depend on specific timing. - Missing cleanup: Ensure fixtures properly close connections and cancel tasks. Leaked tasks can cause “task was destroyed but it is pending” warnings.
- Assuming execution order: Async operations may complete in any order. Don’t write tests that assume tasks complete in creation order.
Mixing Sync and Async: The Boundaries That Matter
Real applications rarely achieve 100% async purity. You’ll integrate with synchronous libraries that lack async alternatives, call blocking system APIs for operations like file I/O, and work with legacy code that predates asyncio. The key is managing these boundaries carefully to prevent blocking the event loop.
Blocking the event loop is asyncio’s cardinal sin. When the event loop is blocked—waiting for synchronous I/O, crunching CPU-bound calculations, or holding a synchronous lock—no other tasks can make progress. All those concurrent connections you’re handling? They’re all frozen until the blocking operation completes. A single 100ms blocking call in a server handling 1000 concurrent requests means 1000 users experience latency spikes.
asyncio.to_thread() (Python 3.9+) runs synchronous functions in a thread pool executor, freeing the event loop to continue processing other tasks. The function call happens in a separate thread, and the await returns when that thread completes:
import asyncioimport hashlibfrom pathlib import Path
def compute_file_hash(filepath: Path) -> str: """CPU-bound synchronous function.
File I/O and hash computation are both blocking operations that would freeze the event loop if called directly. """ hasher = hashlib.sha256() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): hasher.update(chunk) return hasher.hexdigest()
async def process_files(filepaths: list[Path]) -> dict[str, str]: """Process multiple files without blocking the event loop.
Each file hash runs in its own thread, allowing concurrent processing without blocking async operations. """ tasks = [ asyncio.to_thread(compute_file_hash, fp) for fp in filepaths ]
results = await asyncio.gather(*tasks) return dict(zip(filepaths, results))
async def main(): files = list(Path("/var/log").glob("*.log"))[:10] hashes = await process_files(files) for path, hash_value in hashes.items(): print(f"{path.name}: {hash_value[:16]}...")
asyncio.run(main())For Python 3.8 and earlier, use loop.run_in_executor() with an explicit executor. This also gives you more control over the thread pool configuration:
import asynciofrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Create dedicated executors for different workloadsio_executor = ThreadPoolExecutor( max_workers=20, thread_name_prefix="io_")cpu_executor = ProcessPoolExecutor( max_workers=4 # For CPU-bound work, use processes)
async def legacy_integration(): """Integrate with synchronous library using explicit executor.""" loop = asyncio.get_running_loop()
# Run sync function in thread pool result = await loop.run_in_executor( io_executor, blocking_library_call, arg1, arg2 ) return result
async def parallel_cpu_work(items: list) -> list: """CPU-bound work benefits from ProcessPoolExecutor.""" loop = asyncio.get_running_loop()
# Each item processed in a separate process tasks = [ loop.run_in_executor(cpu_executor, cpu_intensive_function, item) for item in items ] return await asyncio.gather(*tasks)The blocking call trap catches many developers, especially those coming from synchronous Python. Any synchronous I/O or CPU-intensive operation inside an async function blocks the entire event loop:
import asyncioimport requests # Synchronous HTTP library - DO NOT USE IN ASYNC CODE
async def bad_fetch(url: str) -> str: # WRONG: blocks the event loop for every request # All other concurrent operations freeze during this call response = requests.get(url) return response.text
async def acceptable_fetch(url: str) -> str: # ACCEPTABLE: offload to thread if you must use sync library # Still has thread overhead and pool contention response = await asyncio.to_thread(requests.get, url) return response.text
# BEST: use an async HTTP library designed for asyncioimport aiohttp
async def best_fetch(url: str) -> str: # RIGHT: fully async, no blocking, efficient connection pooling async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text()⚠️ Warning: File I/O in Python is synchronous, even when using
async with aiofiles. Theaiofileslibrary wraps file operations in thread executors, which adds overhead. For high-throughput file operations, consider memory-mapped files, dedicated I/O threads with batch processing, or async-native solutions likeaioboto3for cloud storage.
Context switching between threads and the event loop has measurable overhead—typically 50-100 microseconds per switch. For operations under 1ms, the thread dispatch cost can exceed the operation itself. Profile before offloading trivial operations. Sometimes it’s acceptable to block briefly rather than pay the thread switching cost thousands of times per second.
Observability: Logging and Debugging in Event Loops
Standard logging works in async code, but loses critical context that synchronous code takes for granted. When multiple requests interleave in a single thread, log messages from different requests mix together. A log line reading “User fetch failed” tells you nothing about which user or which request. Structured logging with context propagation solves this problem.
Python’s contextvars module provides task-local storage that propagates across await boundaries. Unlike thread-local storage, context variables follow the async task rather than the thread, maintaining correct context even as tasks hop between threads in executors:
import asyncioimport loggingimport uuidfrom contextvars import ContextVarfrom typing import Optional
# Context variables for request trackingrequest_id: ContextVar[str] = ContextVar("request_id", default="no-request")user_id: ContextVar[Optional[int]] = ContextVar("user_id", default=None)
class RequestContextFilter(logging.Filter): """Inject request context into all log records.
This filter runs for every log message and adds context variables as record attributes for the formatter. """
def filter(self, record: logging.LogRecord) -> bool: record.request_id = request_id.get() record.user_id = user_id.get() or "anonymous" return True
# Configure logging with contexthandler = logging.StreamHandler()handler.setFormatter(logging.Formatter( "%(asctime)s [%(request_id)s] user=%(user_id)s %(levelname)s: %(message)s"))handler.addFilter(RequestContextFilter())
logger = logging.getLogger(__name__)logger.addHandler(handler)logger.setLevel(logging.INFO)
async def handle_request(data: dict) -> dict: """Process a request with context-aware logging.
All log messages within this function and any functions it calls will include the request ID automatically. """ # Set context for this request token = request_id.set(str(uuid.uuid4())[:8]) user_token = user_id.set(data.get("user_id"))
try: logger.info(f"Processing request: {data.get('action')}")
# These nested calls inherit the context result = await validate_input(data) result = await process_data(result)
logger.info(f"Request completed successfully") return result except Exception as e: logger.error(f"Request failed: {e}") raise finally: # Reset context when request completes request_id.reset(token) user_id.reset(user_token)For identifying slow callbacks and bottlenecks, asyncio’s debug mode logs warnings when callbacks take longer than the slow callback threshold (100ms default). You can customize this threshold for more or less sensitive detection:
import asyncioimport logging
# Enable asyncio debug logginglogging.basicConfig(level=logging.DEBUG)asyncio_logger = logging.getLogger("asyncio")asyncio_logger.setLevel(logging.WARNING) # Only show problems
async def main(): loop = asyncio.get_running_loop()
# Customize slow callback threshold (default is 0.1 seconds) loop.slow_callback_duration = 0.05 # 50ms - more aggressive detection
# Your application code here await run_server()
# Run with debug mode enabledasyncio.run(main(), debug=True)When debug mode flags a slow callback, investigate with profiling. The callback’s name and arguments appear in the log message, pointing you to the problematic code. Common causes include:
- Synchronous library calls that should be offloaded to threads
- CPU-intensive operations like JSON parsing of large payloads
- Blocking database drivers instead of async alternatives
- Accidental synchronous file operations
Integrating with APM tools like DataDog, New Relic, or OpenTelemetry requires instrumenting key async operations. Most APM libraries provide asyncio-aware instrumentation that preserves trace context across await boundaries:
from opentelemetry import tracefrom opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
# Enable asyncio instrumentation for automatic span propagationAsyncioInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
async def traced_operation(item_id: int) -> dict: """Operation with explicit span for detailed tracing.""" with tracer.start_as_current_span("process_item") as span: span.set_attribute("item.id", item_id)
result = await fetch_item(item_id) span.set_attribute("item.status", result["status"])
if result["status"] == "error": span.set_status(trace.Status(trace.StatusCode.ERROR))
return result💡 Pro Tip: In production, set
PYTHONASYNCIODEBUG=0explicitly. Debug mode adds overhead—tracking coroutine creation points, checking for slow callbacks—that impacts performance. Enable it only during development or when actively troubleshooting production issues.
Production Checklist: Before You Deploy
Before your asyncio service handles production traffic, verify these critical configurations. Production failures in async code are particularly painful because they often manifest as mysterious hangs rather than clear error messages.
Connection Pool Sizing
Database and HTTP connection pools need limits that match your concurrency model. An unbounded pool under load creates thousands of connections, exhausting database limits and file descriptors. But a pool that’s too small becomes a bottleneck.
- Set explicit
min_sizeandmax_sizefor database connection pools. A good starting point ismax_size = 2 × number_of_CPU_cores × number_of_database_servers - For HTTP client pools, size to your expected concurrent request count per upstream host, not total throughput
- Configure connection acquisition timeouts shorter than your request timeout—if you can’t get a connection quickly, fail fast
- Monitor pool utilization; consistently near-full pools indicate you need to scale up or optimize queries
Resource Limits
- Set
ulimit -nappropriately for expected connection counts. A service handling 10,000 concurrent connections needs at least that many file descriptors available (typically 65535+) - Configure OS-level TCP settings for high-connection servers:
net.core.somaxconn,net.ipv4.tcp_max_syn_backlog - Monitor file descriptor usage with health checks—exhaustion causes cryptic “too many open files” errors
- Set memory limits that account for per-connection buffers; each WebSocket connection typically uses 8-64KB
Health Checks That Work
A health check that returns 200 OK doesn’t verify your event loop is responsive. A blocked event loop will still accept connections at the socket level while being unable to process them. Implement checks that actually exercise async operations:
- Include a small async operation in health checks: redis ping, database query, or a simple
asyncio.sleep(0)followed by immediate response - Set health check timeouts shorter than your load balancer timeout—if the health check times out, the load balancer should mark the instance unhealthy
- Monitor event loop lag: measure time between scheduling a callback and its execution. If callbacks queue for seconds, the loop is blocked
Common Production Pitfalls
| Symptom | Likely Cause | Fix |
|---|---|---|
| Requests hang, no errors | Forgotten await | Enable debug mode, add linting for unawaited coroutines |
| Memory grows unbounded | Uncollected task references | Use TaskGroups, check for task leaks with asyncio.all_tasks() |
| Intermittent timeouts | Event loop blocking | Profile with debug mode, offload sync code to threads |
| Connection refused under load | Pool exhaustion | Increase pool size, add backpressure, check for connection leaks |
| Graceful shutdown hangs | Tasks ignoring cancellation | Audit CancelledError handling, add shutdown timeouts |
| High latency variance | GC pauses or blocking calls | Profile with py-spy, check for sync I/O |
Quick Reference Patterns
For HTTP services:
- One
ClientSessionper upstream service, reused across requests (creating sessions is expensive) - Connection pool sized to 100 × number of upstream hosts as a starting point
- Timeout budget: 80% of client-facing timeout for upstream calls, leaving room for processing
For background workers:
- TaskGroup for batch processing with automatic cleanup
- Graceful shutdown with SIGTERM handler and shutdown timeout
- Dead letter queue for failed tasks that exhaust retries
- Idempotency for tasks that might be processed more than once
For mixed workloads:
- Separate thread pools for CPU-bound vs blocking I/O operations
- Semaphores to limit concurrent expensive operations and prevent resource exhaustion
- Circuit breakers for external service calls to prevent cascade failures
Key Takeaways
- Configure a global exception handler with
loop.set_exception_handler()on day one—never let exceptions vanish silently into garbage collection - Replace scattered
create_task()calls with TaskGroup context managers to guarantee cleanup and proper exception propagation - Use
asyncio.timeout()context managers instead ofwait_for()for cleaner timeout handling, and always letCancelledErrorpropagate through your code - Run blocking I/O through
asyncio.to_thread()rather than calling synchronous code directly in async functions—profile first to ensure the threading overhead is worth it - Enable asyncio debug mode in development and set up structured logging with context IDs to trace requests across await boundaries
Resources
- Python asyncio documentation
- PEP 654 – Exception Groups and except* (for understanding TaskGroup exceptions)
- Python contextvars documentation
- pytest-asyncio documentation
- A Conceptual Overview of asyncio (Python official guide)