Async Testing with pytest-asyncio¶
This guide covers testing asynchronous Python code with pytest-test-categories, including timing considerations and async mocking patterns.
Why Async Testing Matters¶
Async code introduces unique testing challenges:
Event loop management: Tests need an event loop to run coroutines
Concurrency timing: Race conditions and timing issues
Async mocking: Standard mocks do not work with
awaitResource cleanup: Async resources need proper cleanup
Installation¶
pip install pytest-asyncio
# or
uv add --dev pytest-asyncio
Basic Configuration¶
pytest.ini or pyproject.toml¶
[tool.pytest.ini_options]
asyncio_mode = "auto" # Automatically handle async tests
asyncio_default_fixture_loop_scope = "function" # New in 0.23+
Or use the marker explicitly:
[tool.pytest.ini_options]
asyncio_mode = "strict" # Require explicit @pytest.mark.asyncio
Basic Async Tests¶
Simple Async Function¶
import pytest
async def fetch_data() -> dict:
"""Simulated async data fetch."""
return {"status": "ok", "data": [1, 2, 3]}
@pytest.mark.small
@pytest.mark.asyncio
async def test_fetch_data_returns_dict():
"""Test async function returns expected structure."""
result = await fetch_data()
assert result["status"] == "ok"
assert len(result["data"]) == 3
Async Context Managers¶
import pytest
from contextlib import asynccontextmanager
@asynccontextmanager
async def database_connection():
"""Simulated async database connection."""
connection = {"connected": True}
try:
yield connection
finally:
connection["connected"] = False
@pytest.mark.small
@pytest.mark.asyncio
async def test_database_connection_lifecycle():
"""Test async context manager properly cleans up."""
async with database_connection() as conn:
assert conn["connected"] is True
assert conn["connected"] is False
Testing Async Generators¶
import pytest
async def generate_items(count: int):
"""Async generator yielding items."""
for i in range(count):
yield {"id": i, "value": i * 10}
@pytest.mark.small
@pytest.mark.asyncio
async def test_async_generator_yields_items():
"""Test async generator produces expected items."""
items = [item async for item in generate_items(3)]
assert len(items) == 3
assert items[0]["value"] == 0
assert items[2]["value"] == 20
Async Fixtures¶
Basic Async Fixture¶
import pytest
@pytest.fixture
async def async_client():
"""Async fixture providing a client."""
client = AsyncClient()
await client.connect()
yield client
await client.disconnect()
@pytest.mark.small
@pytest.mark.asyncio
async def test_with_async_fixture(async_client):
"""Test using async fixture."""
result = await async_client.get("/status")
assert result["status"] == "ok"
Async Fixture with Cleanup¶
import pytest
class AsyncResource:
def __init__(self):
self.initialized = False
self.closed = False
async def initialize(self):
self.initialized = True
async def close(self):
self.closed = True
@pytest.fixture
async def resource():
"""Async fixture with proper cleanup."""
r = AsyncResource()
await r.initialize()
yield r
await r.close()
@pytest.mark.small
@pytest.mark.asyncio
async def test_resource_is_initialized(resource):
"""Resource is initialized by fixture."""
assert resource.initialized is True
assert resource.closed is False
Timing Considerations¶
Async Operations and Test Size¶
Async tests measure wall-clock time, which includes any await calls:
import asyncio
@pytest.mark.small # Must complete in < 1 second
@pytest.mark.asyncio
async def test_quick_async_operation():
"""This passes: total time < 1 second."""
await asyncio.sleep(0.1) # 100ms
await asyncio.sleep(0.1) # 100ms
# Total: ~200ms, well under 1 second
@pytest.mark.small # FAILS: takes > 1 second
@pytest.mark.asyncio
async def test_slow_async_operation():
"""This fails: total time > 1 second."""
await asyncio.sleep(2) # 2 seconds
Concurrent Async Operations¶
Concurrent operations run in parallel, saving time:
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(0.3)
return "done"
@pytest.mark.small # Passes: concurrent execution
@pytest.mark.asyncio
async def test_concurrent_operations():
"""Three 300ms operations run concurrently in ~300ms total."""
results = await asyncio.gather(
slow_operation(),
slow_operation(),
slow_operation(),
)
assert len(results) == 3
assert all(r == "done" for r in results)
Timeouts in Async Tests¶
Use asyncio.timeout (Python 3.11+) or asyncio.wait_for for explicit timeouts:
import asyncio
@pytest.mark.small
@pytest.mark.asyncio
async def test_with_timeout():
"""Test with explicit timeout."""
async with asyncio.timeout(0.5):
result = await quick_operation()
assert result is not None
@pytest.mark.small
@pytest.mark.asyncio
async def test_timeout_raises():
"""Test that slow operation times out."""
with pytest.raises(asyncio.TimeoutError):
async with asyncio.timeout(0.1):
await asyncio.sleep(1) # Longer than timeout
Async HTTP Mocking with respx¶
respx mocks httpx async clients.
Installation¶
pip install respx
# or
uv add --dev respx
Basic Usage¶
import pytest
import httpx
import respx
@pytest.mark.small
@pytest.mark.asyncio
@respx.mock
async def test_async_http_request():
"""Mock async HTTP request."""
respx.get("https://api.example.com/users/1").respond(
json={"id": 1, "name": "Alice"},
)
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/users/1")
assert response.status_code == 200
assert response.json()["name"] == "Alice"
Using respx as a Fixture¶
import pytest
import httpx
import respx
@pytest.fixture
def mock_api():
"""Fixture providing respx mock."""
with respx.mock:
yield respx
@pytest.mark.small
@pytest.mark.asyncio
async def test_with_respx_fixture(mock_api):
"""Use respx fixture for cleaner setup."""
mock_api.get("https://api.example.com/data").respond(
json={"status": "ok"},
)
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
assert response.json()["status"] == "ok"
Mocking Multiple Endpoints¶
@pytest.mark.small
@pytest.mark.asyncio
@respx.mock
async def test_multiple_endpoints():
"""Mock multiple API endpoints."""
respx.get("https://api.example.com/users/1").respond(
json={"id": 1, "name": "Alice"},
)
respx.get("https://api.example.com/users/1/orders").respond(
json=[{"order_id": "A001", "total": 99.99}],
)
async with httpx.AsyncClient() as client:
user_response = await client.get("https://api.example.com/users/1")
orders_response = await client.get("https://api.example.com/users/1/orders")
assert user_response.json()["name"] == "Alice"
assert len(orders_response.json()) == 1
Mocking Errors¶
import httpx
import respx
@pytest.mark.small
@pytest.mark.asyncio
@respx.mock
async def test_handles_network_error():
"""Mock network errors."""
respx.get("https://api.example.com/unreachable").mock(
side_effect=httpx.ConnectError("Connection refused"),
)
async with httpx.AsyncClient() as client:
with pytest.raises(httpx.ConnectError):
await client.get("https://api.example.com/unreachable")
@pytest.mark.small
@pytest.mark.asyncio
@respx.mock
async def test_handles_timeout():
"""Mock timeout errors."""
respx.get("https://api.example.com/slow").mock(
side_effect=httpx.TimeoutException("Request timed out"),
)
async with httpx.AsyncClient() as client:
with pytest.raises(httpx.TimeoutException):
await client.get("https://api.example.com/slow")
Verifying Requests¶
import respx
@pytest.mark.small
@pytest.mark.asyncio
@respx.mock
async def test_request_verification():
"""Verify correct requests were made."""
route = respx.post("https://api.example.com/users").respond(
json={"id": 42},
status_code=201,
)
async with httpx.AsyncClient() as client:
await client.post(
"https://api.example.com/users",
json={"name": "Bob", "email": "bob@example.com"},
)
assert route.called
assert route.call_count == 1
request = route.calls[0].request
assert b'"name": "Bob"' in request.content
Mocking Async Functions¶
Using pytest-mock with Async¶
import pytest
async def external_api_call(user_id: str) -> dict:
"""External API call to mock."""
# In reality, this would make an HTTP request
...
async def get_user_profile(user_id: str) -> dict:
"""Function that uses external API."""
data = await external_api_call(user_id)
return {"id": user_id, "name": data["name"], "active": True}
@pytest.mark.small
@pytest.mark.asyncio
async def test_get_user_profile(mocker):
"""Mock async function with mocker."""
# Create async mock
mock_api = mocker.AsyncMock(return_value={"name": "Alice", "email": "alice@example.com"})
mocker.patch("mymodule.external_api_call", mock_api)
result = await get_user_profile("123")
assert result["name"] == "Alice"
assert result["active"] is True
mock_api.assert_called_once_with("123")
AsyncMock for Complex Scenarios¶
import pytest
@pytest.mark.small
@pytest.mark.asyncio
async def test_async_mock_side_effects(mocker):
"""AsyncMock with side effects."""
mock = mocker.AsyncMock(
side_effect=[
{"attempt": 1, "status": "failed"},
{"attempt": 2, "status": "success"},
],
)
result1 = await mock()
result2 = await mock()
assert result1["status"] == "failed"
assert result2["status"] == "success"
@pytest.mark.small
@pytest.mark.asyncio
async def test_async_mock_raises(mocker):
"""AsyncMock that raises exception."""
mock = mocker.AsyncMock(side_effect=ValueError("Invalid input"))
with pytest.raises(ValueError, match="Invalid input"):
await mock()
Testing Async Context Managers¶
import pytest
from contextlib import asynccontextmanager
class AsyncDatabasePool:
"""Simulated async database pool."""
async def acquire(self):
return AsyncConnection()
async def release(self, conn):
await conn.close()
class AsyncConnection:
"""Simulated async connection."""
def __init__(self):
self.closed = False
async def execute(self, query: str) -> list:
return [{"id": 1, "name": "Test"}]
async def close(self):
self.closed = True
@asynccontextmanager
async def get_connection(pool: AsyncDatabasePool):
"""Async context manager for connections."""
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)
@pytest.mark.small
@pytest.mark.asyncio
async def test_async_context_manager(mocker):
"""Test async context manager behavior."""
pool = AsyncDatabasePool()
async with get_connection(pool) as conn:
result = await conn.execute("SELECT * FROM users")
assert len(result) == 1
assert conn.closed is True
Testing Async Iterators¶
import pytest
class AsyncPaginator:
"""Async iterator for paginated results."""
def __init__(self, items: list, page_size: int = 2):
self.items = items
self.page_size = page_size
self.offset = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.offset >= len(self.items):
raise StopAsyncIteration
page = self.items[self.offset : self.offset + self.page_size]
self.offset += self.page_size
return page
@pytest.mark.small
@pytest.mark.asyncio
async def test_async_paginator():
"""Test async iterator produces pages."""
items = [1, 2, 3, 4, 5]
paginator = AsyncPaginator(items, page_size=2)
pages = [page async for page in paginator]
assert len(pages) == 3
assert pages[0] == [1, 2]
assert pages[1] == [3, 4]
assert pages[2] == [5]
Concurrent Test Execution¶
Testing Race Conditions¶
import asyncio
import pytest
class Counter:
"""Shared counter for concurrency testing."""
def __init__(self):
self.value = 0
self._lock = asyncio.Lock()
async def increment_unsafe(self):
"""Unsafe increment (race condition)."""
current = self.value
await asyncio.sleep(0.001) # Simulate work
self.value = current + 1
async def increment_safe(self):
"""Safe increment with lock."""
async with self._lock:
current = self.value
await asyncio.sleep(0.001)
self.value = current + 1
@pytest.mark.small
@pytest.mark.asyncio
async def test_safe_concurrent_increment():
"""Test that locked increment is thread-safe."""
counter = Counter()
await asyncio.gather(*[counter.increment_safe() for _ in range(10)])
assert counter.value == 10
Testing Task Cancellation¶
import asyncio
import pytest
async def long_running_task():
"""Task that can be cancelled."""
try:
await asyncio.sleep(10)
return "completed"
except asyncio.CancelledError:
return "cancelled"
@pytest.mark.small
@pytest.mark.asyncio
async def test_task_cancellation():
"""Test that task handles cancellation gracefully."""
task = asyncio.create_task(long_running_task())
await asyncio.sleep(0.1) # Let task start
task.cancel()
result = await task
assert result == "cancelled"
Best Practices¶
1. Use asyncio_mode = “auto”¶
Reduces boilerplate by auto-detecting async tests:
[tool.pytest.ini_options]
asyncio_mode = "auto"
2. Prefer AsyncMock Over Manual Coroutines¶
# Good: Use AsyncMock
mock = mocker.AsyncMock(return_value={"data": "value"})
# Less good: Manual async wrapper
async def mock_coro():
return {"data": "value"}
3. Clean Up Async Resources¶
Always use try/finally or context managers:
@pytest.fixture
async def client():
"""Fixture with guaranteed cleanup."""
client = AsyncClient()
await client.connect()
try:
yield client
finally:
await client.disconnect()
4. Avoid asyncio.sleep in Small Tests¶
Replace sleeps with mocks or reduce sleep duration:
# Bad: Long sleep in small test
@pytest.mark.small
@pytest.mark.asyncio
async def test_with_long_sleep():
await asyncio.sleep(2) # Exceeds 1 second limit
# Good: Mock the sleep or use tiny delays
@pytest.mark.small
@pytest.mark.asyncio
async def test_with_mocked_time(mocker):
mocker.patch("asyncio.sleep", new_callable=mocker.AsyncMock)
await some_function_that_sleeps()
5. Use Timeouts for External Calls¶
Even in medium/large tests, use explicit timeouts:
@pytest.mark.medium
@pytest.mark.asyncio
async def test_external_service():
"""Test with explicit timeout."""
async with asyncio.timeout(30): # 30 second timeout
result = await call_external_service()
assert result is not None