Hero image for Taming Callback Hell: Practical asyncio Patterns for Production Python

Taming Callback Hell: Practical asyncio Patterns for Production Python


Your asyncio code works perfectly in development, then mysteriously hangs in production with no stack trace. You’ve scattered print statements everywhere, but the event loop swallows errors whole. After hours of debugging, you discover a forgotten await buried three layers deep—a problem that proper patterns would have caught immediately.

The gap between “I understand async/await” and “I can debug asyncio in production at 3 AM” is wider than most Python developers expect. Asyncio’s concurrency model introduces failure modes that don’t exist in synchronous code: silent task failures, orphaned coroutines, resource leaks from improper cancellation, and race conditions that only manifest under load.

Understanding asyncio requires a mental model shift. In synchronous Python, code executes linearly—one thing happens after another, and exceptions bubble up naturally through the call stack. Asyncio introduces cooperative multitasking, where multiple tasks share a single thread by voluntarily yielding control at await points. This cooperative model delivers excellent performance for I/O-bound workloads, but it fundamentally changes how errors propagate, how resources get cleaned up, and how you reason about concurrent operations.

This guide covers the patterns that separate production-ready asyncio from code that works on your laptop. You’ll learn how to surface silent failures, structure concurrent operations for guaranteed cleanup, handle timeouts and cancellation properly, write reliable async tests, and gain visibility into what your event loop is actually doing. Each section addresses a specific category of production problems with concrete solutions you can implement immediately.


Why asyncio Fails Silently (And How to Stop It)

When a synchronous function raises an exception, Python’s default behavior ensures you see it—either through an unhandled exception traceback or your logging configuration. Asyncio breaks this expectation in subtle ways that cost hours of debugging time. Understanding why this happens requires examining how the event loop manages task lifecycles.

The event loop maintains a collection of tasks, each representing an in-progress coroutine. When you call asyncio.create_task(), the loop schedules that coroutine for execution but doesn’t wait for it to complete. This is the “fire-and-forget” pattern—useful for background work, but dangerous when exceptions occur. The event loop handles exceptions in fire-and-forget tasks differently than you’d expect because there’s no caller waiting to receive the exception.

When you create a task with asyncio.create_task() and never await it, any exception raised inside that task gets attached to the task object. If nothing retrieves that exception—by awaiting the task or calling task.exception()—the exception vanishes until the task is garbage collected. At that point, Python logs a warning, but by then you’ve lost the context of what went wrong. The warning appears far from where the error occurred, often after your application has moved on to other work, making correlation nearly impossible.

silent_failure_demo.py
import asyncio
async def background_task():
await asyncio.sleep(1)
raise ValueError("This error disappears into the void")
async def main():
# Fire and forget - exception won't surface until GC
asyncio.create_task(background_task())
# Main continues, unaware of the failure
await asyncio.sleep(5)
print("Main completed - did you notice the error?")
asyncio.run(main())

The exception from background_task() exists—it’s stored on the task object—but nothing retrieves it. Python eventually logs “Task exception was never retrieved” when the task gets garbage collected, but this message appears without the context of what your application was doing when the error occurred. In production, with high throughput and rapid object creation, garbage collection timing becomes unpredictable, and these warnings can appear minutes after the actual failure.

The fix starts with setting a global exception handler on the event loop. This handler catches exceptions from tasks that were never awaited, giving you immediate visibility into failures. The handler receives a context dictionary containing the exception, the message, and other metadata about what went wrong.

exception_handler.py
import asyncio
import logging
import traceback
import sys
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)
def handle_exception(loop: asyncio.AbstractEventLoop, context: dict) -> None:
"""Global handler for unhandled exceptions in asyncio tasks.
This handler catches:
- Exceptions from tasks that were never awaited
- Exceptions from callbacks scheduled with call_soon/call_later
- Exceptions from signal handlers
"""
exception = context.get("exception")
message = context.get("message", "Unhandled exception in event loop")
# Extract task information if available
task = context.get("task")
task_name = task.get_name() if task else "unknown"
if exception:
# Log with full traceback for debugging
tb_str = ''.join(traceback.format_exception(
type(exception), exception, exception.__traceback__
))
logger.error(
f"Task '{task_name}' failed: {message}\n"
f"Exception: {exception}\n"
f"Traceback:\n{tb_str}"
)
# In production, you'd also want to:
# - Send to error tracking (Sentry, Rollbar, etc.)
# - Increment error metrics
# - Potentially trigger alerts for critical failures
else:
logger.error(f"Event loop error in task '{task_name}': {message}")
async def main():
loop = asyncio.get_running_loop()
loop.set_exception_handler(handle_exception)
# Now fire-and-forget tasks will log errors immediately
asyncio.create_task(some_background_work())
await asyncio.sleep(10)
asyncio.run(main())

⚠️ Warning: The exception handler is a safety net, not a replacement for proper task management. Use it to catch mistakes, not as your primary error handling strategy. A well-designed application should rarely trigger the global exception handler.

Beyond the global handler, enable asyncio’s debug mode during development. Set the environment variable PYTHONASYNCIODEBUG=1 or call asyncio.run(main(), debug=True). Debug mode provides several benefits that dramatically improve your ability to catch problems early:

  • Unawaited coroutine detection: When you create a coroutine but forget to await it, debug mode logs a warning with a traceback showing where the coroutine was created
  • Slow callback warnings: Operations taking longer than 100ms (configurable via loop.slow_callback_duration) generate warnings, helping identify blocking code
  • Enhanced tracebacks: Exception tracebacks include more context about where tasks were created, not just where they failed
  • Resource warning detection: Debug mode surfaces warnings about unclosed resources like file handles and network connections
debug_mode_example.py
import asyncio
import os
# Enable via environment variable (useful for all processes)
os.environ['PYTHONASYNCIODEBUG'] = '1'
# Or enable programmatically for specific runs
async def main():
# Your application code
pass
asyncio.run(main(), debug=True)

The combination of a global exception handler and debug mode transforms asyncio from a debugging nightmare into a system that actively surfaces problems before they reach production. Set up the exception handler in your application’s entry point, enable debug mode in development and staging environments, and you’ll catch the majority of async-related bugs before they affect users.


Structured Concurrency: TaskGroups Over Scattered Tasks

Scattered create_task() calls throughout your codebase create maintenance nightmares. You lose track of which tasks are running, exceptions in one task don’t propagate to related tasks, and cleanup becomes manual and error-prone. Every forgotten task reference is a potential resource leak—database connections held open, HTTP sessions not closed, memory accumulating until someone notices the service degrading.

Structured concurrency solves this by tying task lifetimes to lexical scopes. The principle is simple: tasks created within a scope cannot outlive that scope. When the scope exits—whether normally, through an exception, or via cancellation—all tasks within it are guaranteed to be completed or cancelled. This guarantee eliminates an entire category of resource management bugs.

Python 3.11 introduced asyncio.TaskGroup, which implements structured concurrency as a first-class language feature. A TaskGroup guarantees that all tasks created within the group complete (or fail) before the group exits. This eliminates orphaned tasks and ensures proper exception propagation.

taskgroup_example.py
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
"""Simulate fetching user data from an API."""
await asyncio.sleep(0.1) # Simulated network latency
if user_id == 3:
raise ValueError(f"User {user_id} not found")
return {"id": user_id, "name": f"User_{user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""Fetch multiple users concurrently with automatic cleanup.
If any fetch fails, all other fetches are cancelled and
resources are cleaned up before the exception propagates.
"""
results = []
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(uid)) for uid in user_ids]
# Only reached if ALL tasks succeed
# If any task raises, the exception propagates here
return [task.result() for task in tasks]
async def main():
try:
users = await fetch_all_users([1, 2, 3, 4, 5])
print(f"Fetched {len(users)} users")
except ExceptionGroup as eg:
# TaskGroup wraps exceptions in ExceptionGroup
# This allows handling multiple simultaneous failures
for exc in eg.exceptions:
print(f"Task failed: {exc}")
asyncio.run(main())

When any task in a TaskGroup raises an exception, the group initiates a shutdown sequence. First, it cancels all other tasks in the group. Then, it waits for those cancelled tasks to finish their cleanup (handling their own CancelledError). Finally, it collects all exceptions—both the original failures and any exceptions raised during cancellation—into an ExceptionGroup. This behavior ensures resources are properly released even in failure scenarios.

The ExceptionGroup (introduced in Python 3.11 alongside TaskGroup) represents multiple exceptions that occurred concurrently. You can handle them with the except* syntax, which matches against exception types within the group:

exception_group_handling.py
import asyncio
async def might_fail(task_id: int) -> str:
if task_id % 2 == 0:
raise ValueError(f"Even task {task_id} failed")
if task_id % 3 == 0:
raise RuntimeError(f"Task {task_id} had a runtime error")
return f"Task {task_id} succeeded"
async def main():
try:
async with asyncio.TaskGroup() as tg:
for i in range(6):
tg.create_task(might_fail(i))
except* ValueError as eg:
print(f"Caught {len(eg.exceptions)} ValueErrors:")
for exc in eg.exceptions:
print(f" - {exc}")
except* RuntimeError as eg:
print(f"Caught {len(eg.exceptions)} RuntimeErrors:")
for exc in eg.exceptions:
print(f" - {exc}")
asyncio.run(main())

For Python 3.10 and earlier, you can implement similar patterns using asyncio.gather() with return_exceptions=False (the default), but you lose automatic cancellation of sibling tasks. This custom context manager provides structured concurrency semantics for older Python versions:

pre_311_structured.py
import asyncio
from contextlib import asynccontextmanager
from typing import Set, Callable, Coroutine, Any
@asynccontextmanager
async def managed_tasks():
"""Context manager for structured concurrency in Python <3.11.
All tasks created through the yielded function are guaranteed
to be cancelled when the context exits, whether normally or
due to an exception.
"""
tasks: Set[asyncio.Task] = set()
def create_task(coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
task = asyncio.create_task(coro)
tasks.add(task)
task.add_done_callback(tasks.discard)
return task
try:
yield create_task
finally:
# Cancel all pending tasks on exit
for task in tasks:
if not task.done():
task.cancel()
# Wait for cancellations to complete
# return_exceptions=True prevents CancelledError from propagating
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def worker(name: str, duration: float) -> str:
"""Example worker that simulates some async work."""
await asyncio.sleep(duration)
return f"{name} completed after {duration}s"
async def main():
async with managed_tasks() as create:
t1 = create(worker("A", 1.0))
t2 = create(worker("B", 2.0))
t3 = create(worker("C", 0.5))
# Wait for first to complete, others auto-cancel on exit
done, pending = await asyncio.wait(
[t1, t2, t3],
return_when=asyncio.FIRST_COMPLETED
)
print(f"First completed: {done.pop().result()}")
# All remaining tasks cancelled and awaited here
asyncio.run(main())

💡 Pro Tip: Adopt TaskGroup as your default for any operation spawning multiple tasks. The explicit scope makes code easier to reason about and eliminates an entire category of resource leak bugs. Even if you think you need fire-and-forget semantics, consider whether a TaskGroup with a longer-lived scope might be more appropriate.


Timeouts and Cancellation: Doing It Right

Timeouts in asyncio require understanding the difference between asyncio.wait_for() and the newer asyncio.timeout() context manager introduced in Python 3.11. Both cancel the wrapped operation when time expires, but they differ in ergonomics, composability, and how they handle edge cases.

The fundamental challenge with async timeouts is that you’re not just stopping execution—you’re initiating a cancellation sequence. The cancelled operation must have an opportunity to clean up resources, release locks, and ensure data consistency. A timeout that doesn’t allow for cleanup is a resource leak waiting to happen.

The wait_for() function wraps a single awaitable and raises TimeoutError if it doesn’t complete in time. It’s been available since Python 3.4 and remains useful for simple cases:

wait_for_timeout.py
import asyncio
async def slow_operation() -> str:
"""An operation that takes longer than we want to wait."""
await asyncio.sleep(10)
return "completed"
async def main():
try:
# wait_for cancels the operation and raises TimeoutError
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(f"Got result: {result}")
except TimeoutError:
print("Operation timed out after 2 seconds")
# The slow_operation coroutine received CancelledError
# and had a chance to clean up
asyncio.run(main())

The asyncio.timeout() context manager provides cleaner handling for multiple operations and better composability. Instead of wrapping a single awaitable, it establishes a deadline for an entire block of code. Any operation within the block shares the same timeout budget:

timeout_context.py
import asyncio
async def fetch_data(url: str) -> dict:
"""Simulate fetching data from an external service."""
await asyncio.sleep(1)
return {"url": url, "data": "response payload"}
async def validate_data(data: dict) -> bool:
"""Simulate validating the fetched data."""
await asyncio.sleep(0.3)
return True
async def process_data(data: dict) -> dict:
"""Simulate processing the validated data."""
await asyncio.sleep(0.5)
return {"processed": True, **data}
async def main():
try:
async with asyncio.timeout(2.0):
# All three operations share the same 2-second budget
# If fetch takes 1.5s, process only has 0.5s remaining
data = await fetch_data("https://api.example.com")
is_valid = await validate_data(data)
if is_valid:
result = await process_data(data)
print(f"Result: {result}")
except TimeoutError:
print("Combined operations exceeded 2 second budget")
asyncio.run(main())

You can also use asyncio.timeout_at() for absolute deadlines, which is useful when you need to coordinate timeouts across multiple components that each track time independently:

timeout_at_example.py
import asyncio
async def coordinated_operations():
"""Multiple operations that must complete by the same deadline."""
loop = asyncio.get_running_loop()
deadline = loop.time() + 5.0 # 5 seconds from now
async with asyncio.timeout_at(deadline):
result1 = await operation_one()
# Same deadline for the second operation
async with asyncio.timeout_at(deadline):
result2 = await operation_two()
return result1, result2

Cancellation handling requires special attention. When a task is cancelled, Python raises CancelledError at the current await point. The critical rule: never swallow CancelledError. If you catch it and don’t re-raise, you break the cancellation mechanism and create zombie tasks that continue running when they should have stopped.

proper_cancellation.py
import asyncio
async def acquire_resource():
"""Simulate acquiring a resource like a database connection."""
return type('Resource', (), {'close': lambda self: asyncio.sleep(0)})()
async def process_next_item(resource):
"""Simulate processing work."""
await asyncio.sleep(0.1)
async def cleanup_on_cancel():
"""Demonstrates proper cancellation handling with cleanup.
The key insight: you can perform cleanup operations during
cancellation, but you MUST re-raise CancelledError when done.
"""
resource = await acquire_resource()
try:
while True:
await process_next_item(resource)
except asyncio.CancelledError:
# Perform cleanup, then re-raise
# This gives you a chance to release resources
await resource.close()
print("Resource closed during cancellation")
raise # CRITICAL: Always re-raise CancelledError
async def bad_cancellation_handling():
"""DON'T DO THIS - swallowing CancelledError breaks cancellation."""
try:
await some_operation()
except asyncio.CancelledError:
print("Cancelled, but I'm going to keep running anyway")
# Missing raise! This task is now a zombie

For graceful shutdown scenarios—handling SIGTERM in containers, SIGINT from Ctrl+C—you need to coordinate cancellation across multiple tasks:

graceful_shutdown.py
import asyncio
import signal
async def long_running_worker():
"""A worker that processes items indefinitely."""
while True:
await asyncio.sleep(1)
print("Processing...")
async def graceful_shutdown():
"""Handle SIGTERM/SIGINT for clean shutdown."""
loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()
def signal_handler():
print("Shutdown signal received")
shutdown_event.set()
# Register signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
# Start background tasks
worker_task = asyncio.create_task(long_running_worker())
# Wait for shutdown signal
await shutdown_event.wait()
# Graceful cancellation with timeout
worker_task.cancel()
try:
# Give the task time to clean up
await asyncio.wait_for(worker_task, timeout=5.0)
except asyncio.CancelledError:
pass # Expected after cancellation
except TimeoutError:
print("Worker didn't shut down cleanly within timeout")
print("Shutdown complete")

For operations that must complete regardless of cancellation—database commits, message acknowledgments, audit log writes—use asyncio.shield(). Shield creates a barrier that prevents cancellation from propagating into the shielded coroutine:

shield_example.py
import asyncio
async def critical_write(data: dict) -> None:
"""This write must complete even if the parent is cancelled.
Examples: database commits, message queue acknowledgments,
audit log entries, billing records.
"""
await database.commit(data)
await message_queue.ack()
async def process_message(msg: dict) -> None:
"""Process a message with a critical final step."""
result = await compute_result(msg)
# Shield the critical write from cancellation
# Even if process_message is cancelled, the write continues
await asyncio.shield(critical_write(result))

📝 Note: shield() doesn’t prevent cancellation—it just prevents the cancellation from propagating into the shielded coroutine. The outer task still receives CancelledError after the shielded operation completes. If the shielded operation itself is slow, consider adding a timeout to avoid indefinite hangs during shutdown.


Testing Async Code Without the Pain

Testing asyncio code introduces challenges that don’t exist with synchronous tests: fixture scoping across async boundaries, event loop lifecycle management, mocking functions that return awaitables, and time-dependent tests that become flaky in CI. The pytest-asyncio plugin handles most of these concerns, but getting the configuration right matters enormously for test reliability.

The most common mistake is using an event loop that persists across tests. Shared state in the event loop—pending callbacks, signal handlers, exception handlers—can cause tests to interfere with each other, creating failures that only appear when running the full test suite. Each test should get a fresh event loop with no residual state.

Start with proper pytest configuration in pyproject.toml:

pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
"error::DeprecationWarning",
"error::PendingDeprecationWarning",
]

The asyncio_mode = "auto" setting automatically wraps async test functions without requiring the @pytest.mark.asyncio decorator on every test. This reduces boilerplate and ensures you don’t accidentally forget the decorator (which results in the test appearing to pass instantly without actually running). The asyncio_default_fixture_loop_scope = "function" setting ensures each test gets a fresh event loop.

Async fixtures work naturally with pytest-asyncio and can perform setup and teardown operations that involve awaiting:

test_async_service.py
import asyncio
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from myapp.service import UserService
@pytest.fixture
async def user_service():
"""Async fixture that handles connection lifecycle.
The service connects before the test runs and
disconnects after, even if the test fails.
"""
service = UserService()
await service.connect()
yield service
await service.disconnect()
@pytest.fixture
async def mock_database():
"""Fixture providing a mock database connection."""
mock_db = AsyncMock()
mock_db.query = AsyncMock(return_value=[{"id": 1, "name": "Test User"}])
mock_db.execute = AsyncMock(return_value={"rows_affected": 1})
return mock_db
async def test_fetch_user_success(user_service):
"""Test basic user fetch with real service fixture."""
user = await user_service.fetch_user(123)
assert user["id"] == 123
assert "name" in user
async def test_fetch_user_timeout():
"""Test timeout handling with mocked slow response."""
service = UserService()
async def slow_fetch(*args):
await asyncio.sleep(10) # Longer than any reasonable timeout
return {"id": 1}
with patch.object(service, "_make_request", side_effect=slow_fetch):
with pytest.raises(TimeoutError):
await asyncio.wait_for(service.fetch_user(1), timeout=0.1)

Mocking async context managers requires the AsyncMock class with proper configuration for __aenter__ and __aexit__ methods. The mock must return itself from __aenter__ to enable the as binding:

test_async_context.py
import pytest
from unittest.mock import AsyncMock, MagicMock
async def test_database_transaction():
"""Test code that uses async context managers."""
mock_conn = AsyncMock()
# __aenter__ returns the connection object for 'as conn' binding
mock_conn.__aenter__.return_value = mock_conn
# __aexit__ returns None (no exception suppression)
mock_conn.__aexit__.return_value = None
mock_conn.execute = AsyncMock(return_value={"rows_affected": 1})
mock_conn.commit = AsyncMock()
mock_conn.rollback = AsyncMock()
# Use the mock in your test
async with mock_conn as conn:
result = await conn.execute("UPDATE users SET active = true")
assert result["rows_affected"] == 1
await conn.commit()
# Verify the context manager was used correctly
mock_conn.__aenter__.assert_called_once()
mock_conn.__aexit__.assert_called_once()
mock_conn.commit.assert_called_once()
async def test_context_manager_error_handling():
"""Test that errors in context managers are handled."""
mock_conn = AsyncMock()
mock_conn.__aenter__.return_value = mock_conn
mock_conn.__aexit__.return_value = None
mock_conn.execute = AsyncMock(side_effect=RuntimeError("DB error"))
mock_conn.rollback = AsyncMock()
with pytest.raises(RuntimeError, match="DB error"):
async with mock_conn as conn:
await conn.execute("INVALID SQL")

For time-dependent tests, avoid asyncio.sleep() in production code where possible—inject a sleep function that tests can mock. When you must test timing behavior directly, use short timeouts and structure tests to verify behavior rather than exact timing:

test_time_dependent.py
import asyncio
import pytest
async def test_rate_limiter_rejects_excess():
"""Test rate limiting without actual delays.
Instead of verifying exact timing, verify that the rate
limiter blocks when it should and allows when it should.
"""
from myapp.rate_limiter import RateLimiter
limiter = RateLimiter(max_requests=2, window_seconds=1.0)
# First two requests succeed immediately
assert await limiter.acquire() is True
assert await limiter.acquire() is True
# Third request would wait - verify with short timeout
with pytest.raises(TimeoutError):
await asyncio.wait_for(limiter.acquire(), timeout=0.01)
async def test_retry_mechanism():
"""Test retry logic with controlled failures."""
attempt_count = 0
async def flaky_operation():
nonlocal attempt_count
attempt_count += 1
if attempt_count < 3:
raise ConnectionError("Temporary failure")
return "success"
from myapp.retry import with_retry
result = await with_retry(flaky_operation, max_attempts=5)
assert result == "success"
assert attempt_count == 3 # Succeeded on third attempt

Common testing antipatterns that cause flaky tests:

  • Shared event loop state: Use function-scoped loops unless you have a specific reason not to. Session-scoped loops accumulate state that can cause interference between tests.
  • Real network calls: Always mock external services. Real network calls introduce latency variability and external dependencies that make tests unreliable.
  • Time.sleep in async tests: Use asyncio.sleep() for async code, and prefer mocking time entirely for tests that depend on specific timing.
  • Missing cleanup: Ensure fixtures properly close connections and cancel tasks. Leaked tasks can cause “task was destroyed but it is pending” warnings.
  • Assuming execution order: Async operations may complete in any order. Don’t write tests that assume tasks complete in creation order.

Mixing Sync and Async: The Boundaries That Matter

Real applications rarely achieve 100% async purity. You’ll integrate with synchronous libraries that lack async alternatives, call blocking system APIs for operations like file I/O, and work with legacy code that predates asyncio. The key is managing these boundaries carefully to prevent blocking the event loop.

Blocking the event loop is asyncio’s cardinal sin. When the event loop is blocked—waiting for synchronous I/O, crunching CPU-bound calculations, or holding a synchronous lock—no other tasks can make progress. All those concurrent connections you’re handling? They’re all frozen until the blocking operation completes. A single 100ms blocking call in a server handling 1000 concurrent requests means 1000 users experience latency spikes.

asyncio.to_thread() (Python 3.9+) runs synchronous functions in a thread pool executor, freeing the event loop to continue processing other tasks. The function call happens in a separate thread, and the await returns when that thread completes:

sync_integration.py
import asyncio
import hashlib
from pathlib import Path
def compute_file_hash(filepath: Path) -> str:
"""CPU-bound synchronous function.
File I/O and hash computation are both blocking operations
that would freeze the event loop if called directly.
"""
hasher = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
hasher.update(chunk)
return hasher.hexdigest()
async def process_files(filepaths: list[Path]) -> dict[str, str]:
"""Process multiple files without blocking the event loop.
Each file hash runs in its own thread, allowing
concurrent processing without blocking async operations.
"""
tasks = [
asyncio.to_thread(compute_file_hash, fp)
for fp in filepaths
]
results = await asyncio.gather(*tasks)
return dict(zip(filepaths, results))
async def main():
files = list(Path("/var/log").glob("*.log"))[:10]
hashes = await process_files(files)
for path, hash_value in hashes.items():
print(f"{path.name}: {hash_value[:16]}...")
asyncio.run(main())

For Python 3.8 and earlier, use loop.run_in_executor() with an explicit executor. This also gives you more control over the thread pool configuration:

executor_example.py
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# Create dedicated executors for different workloads
io_executor = ThreadPoolExecutor(
max_workers=20,
thread_name_prefix="io_"
)
cpu_executor = ProcessPoolExecutor(
max_workers=4 # For CPU-bound work, use processes
)
async def legacy_integration():
"""Integrate with synchronous library using explicit executor."""
loop = asyncio.get_running_loop()
# Run sync function in thread pool
result = await loop.run_in_executor(
io_executor,
blocking_library_call,
arg1, arg2
)
return result
async def parallel_cpu_work(items: list) -> list:
"""CPU-bound work benefits from ProcessPoolExecutor."""
loop = asyncio.get_running_loop()
# Each item processed in a separate process
tasks = [
loop.run_in_executor(cpu_executor, cpu_intensive_function, item)
for item in items
]
return await asyncio.gather(*tasks)

The blocking call trap catches many developers, especially those coming from synchronous Python. Any synchronous I/O or CPU-intensive operation inside an async function blocks the entire event loop:

blocking_trap.py
import asyncio
import requests # Synchronous HTTP library - DO NOT USE IN ASYNC CODE
async def bad_fetch(url: str) -> str:
# WRONG: blocks the event loop for every request
# All other concurrent operations freeze during this call
response = requests.get(url)
return response.text
async def acceptable_fetch(url: str) -> str:
# ACCEPTABLE: offload to thread if you must use sync library
# Still has thread overhead and pool contention
response = await asyncio.to_thread(requests.get, url)
return response.text
# BEST: use an async HTTP library designed for asyncio
import aiohttp
async def best_fetch(url: str) -> str:
# RIGHT: fully async, no blocking, efficient connection pooling
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()

⚠️ Warning: File I/O in Python is synchronous, even when using async with aiofiles. The aiofiles library wraps file operations in thread executors, which adds overhead. For high-throughput file operations, consider memory-mapped files, dedicated I/O threads with batch processing, or async-native solutions like aioboto3 for cloud storage.

Context switching between threads and the event loop has measurable overhead—typically 50-100 microseconds per switch. For operations under 1ms, the thread dispatch cost can exceed the operation itself. Profile before offloading trivial operations. Sometimes it’s acceptable to block briefly rather than pay the thread switching cost thousands of times per second.


Observability: Logging and Debugging in Event Loops

Standard logging works in async code, but loses critical context that synchronous code takes for granted. When multiple requests interleave in a single thread, log messages from different requests mix together. A log line reading “User fetch failed” tells you nothing about which user or which request. Structured logging with context propagation solves this problem.

Python’s contextvars module provides task-local storage that propagates across await boundaries. Unlike thread-local storage, context variables follow the async task rather than the thread, maintaining correct context even as tasks hop between threads in executors:

async_logging.py
import asyncio
import logging
import uuid
from contextvars import ContextVar
from typing import Optional
# Context variables for request tracking
request_id: ContextVar[str] = ContextVar("request_id", default="no-request")
user_id: ContextVar[Optional[int]] = ContextVar("user_id", default=None)
class RequestContextFilter(logging.Filter):
"""Inject request context into all log records.
This filter runs for every log message and adds context
variables as record attributes for the formatter.
"""
def filter(self, record: logging.LogRecord) -> bool:
record.request_id = request_id.get()
record.user_id = user_id.get() or "anonymous"
return True
# Configure logging with context
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(request_id)s] user=%(user_id)s %(levelname)s: %(message)s"
))
handler.addFilter(RequestContextFilter())
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
async def handle_request(data: dict) -> dict:
"""Process a request with context-aware logging.
All log messages within this function and any functions
it calls will include the request ID automatically.
"""
# Set context for this request
token = request_id.set(str(uuid.uuid4())[:8])
user_token = user_id.set(data.get("user_id"))
try:
logger.info(f"Processing request: {data.get('action')}")
# These nested calls inherit the context
result = await validate_input(data)
result = await process_data(result)
logger.info(f"Request completed successfully")
return result
except Exception as e:
logger.error(f"Request failed: {e}")
raise
finally:
# Reset context when request completes
request_id.reset(token)
user_id.reset(user_token)

For identifying slow callbacks and bottlenecks, asyncio’s debug mode logs warnings when callbacks take longer than the slow callback threshold (100ms default). You can customize this threshold for more or less sensitive detection:

debug_mode_detailed.py
import asyncio
import logging
# Enable asyncio debug logging
logging.basicConfig(level=logging.DEBUG)
asyncio_logger = logging.getLogger("asyncio")
asyncio_logger.setLevel(logging.WARNING) # Only show problems
async def main():
loop = asyncio.get_running_loop()
# Customize slow callback threshold (default is 0.1 seconds)
loop.slow_callback_duration = 0.05 # 50ms - more aggressive detection
# Your application code here
await run_server()
# Run with debug mode enabled
asyncio.run(main(), debug=True)

When debug mode flags a slow callback, investigate with profiling. The callback’s name and arguments appear in the log message, pointing you to the problematic code. Common causes include:

  • Synchronous library calls that should be offloaded to threads
  • CPU-intensive operations like JSON parsing of large payloads
  • Blocking database drivers instead of async alternatives
  • Accidental synchronous file operations

Integrating with APM tools like DataDog, New Relic, or OpenTelemetry requires instrumenting key async operations. Most APM libraries provide asyncio-aware instrumentation that preserves trace context across await boundaries:

otel_tracing.py
from opentelemetry import trace
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
# Enable asyncio instrumentation for automatic span propagation
AsyncioInstrumentor().instrument()
tracer = trace.get_tracer(__name__)
async def traced_operation(item_id: int) -> dict:
"""Operation with explicit span for detailed tracing."""
with tracer.start_as_current_span("process_item") as span:
span.set_attribute("item.id", item_id)
result = await fetch_item(item_id)
span.set_attribute("item.status", result["status"])
if result["status"] == "error":
span.set_status(trace.Status(trace.StatusCode.ERROR))
return result

💡 Pro Tip: In production, set PYTHONASYNCIODEBUG=0 explicitly. Debug mode adds overhead—tracking coroutine creation points, checking for slow callbacks—that impacts performance. Enable it only during development or when actively troubleshooting production issues.


Production Checklist: Before You Deploy

Before your asyncio service handles production traffic, verify these critical configurations. Production failures in async code are particularly painful because they often manifest as mysterious hangs rather than clear error messages.

Connection Pool Sizing

Database and HTTP connection pools need limits that match your concurrency model. An unbounded pool under load creates thousands of connections, exhausting database limits and file descriptors. But a pool that’s too small becomes a bottleneck.

  • Set explicit min_size and max_size for database connection pools. A good starting point is max_size = 2 × number_of_CPU_cores × number_of_database_servers
  • For HTTP client pools, size to your expected concurrent request count per upstream host, not total throughput
  • Configure connection acquisition timeouts shorter than your request timeout—if you can’t get a connection quickly, fail fast
  • Monitor pool utilization; consistently near-full pools indicate you need to scale up or optimize queries

Resource Limits

  • Set ulimit -n appropriately for expected connection counts. A service handling 10,000 concurrent connections needs at least that many file descriptors available (typically 65535+)
  • Configure OS-level TCP settings for high-connection servers: net.core.somaxconn, net.ipv4.tcp_max_syn_backlog
  • Monitor file descriptor usage with health checks—exhaustion causes cryptic “too many open files” errors
  • Set memory limits that account for per-connection buffers; each WebSocket connection typically uses 8-64KB

Health Checks That Work

A health check that returns 200 OK doesn’t verify your event loop is responsive. A blocked event loop will still accept connections at the socket level while being unable to process them. Implement checks that actually exercise async operations:

  • Include a small async operation in health checks: redis ping, database query, or a simple asyncio.sleep(0) followed by immediate response
  • Set health check timeouts shorter than your load balancer timeout—if the health check times out, the load balancer should mark the instance unhealthy
  • Monitor event loop lag: measure time between scheduling a callback and its execution. If callbacks queue for seconds, the loop is blocked

Common Production Pitfalls

SymptomLikely CauseFix
Requests hang, no errorsForgotten awaitEnable debug mode, add linting for unawaited coroutines
Memory grows unboundedUncollected task referencesUse TaskGroups, check for task leaks with asyncio.all_tasks()
Intermittent timeoutsEvent loop blockingProfile with debug mode, offload sync code to threads
Connection refused under loadPool exhaustionIncrease pool size, add backpressure, check for connection leaks
Graceful shutdown hangsTasks ignoring cancellationAudit CancelledError handling, add shutdown timeouts
High latency varianceGC pauses or blocking callsProfile with py-spy, check for sync I/O

Quick Reference Patterns

For HTTP services:

  • One ClientSession per upstream service, reused across requests (creating sessions is expensive)
  • Connection pool sized to 100 × number of upstream hosts as a starting point
  • Timeout budget: 80% of client-facing timeout for upstream calls, leaving room for processing

For background workers:

  • TaskGroup for batch processing with automatic cleanup
  • Graceful shutdown with SIGTERM handler and shutdown timeout
  • Dead letter queue for failed tasks that exhaust retries
  • Idempotency for tasks that might be processed more than once

For mixed workloads:

  • Separate thread pools for CPU-bound vs blocking I/O operations
  • Semaphores to limit concurrent expensive operations and prevent resource exhaustion
  • Circuit breakers for external service calls to prevent cascade failures

Key Takeaways

  • Configure a global exception handler with loop.set_exception_handler() on day one—never let exceptions vanish silently into garbage collection
  • Replace scattered create_task() calls with TaskGroup context managers to guarantee cleanup and proper exception propagation
  • Use asyncio.timeout() context managers instead of wait_for() for cleaner timeout handling, and always let CancelledError propagate through your code
  • Run blocking I/O through asyncio.to_thread() rather than calling synchronous code directly in async functions—profile first to ensure the threading overhead is worth it
  • Enable asyncio debug mode in development and set up structured logging with context IDs to trace requests across await boundaries

Resources