Threading Monitoring for Hermetic Tests¶
Threading monitoring is a test enforcement mechanism that detects when small tests create threads during execution. Unlike other isolation mechanisms that block operations, threading monitoring warns about thread creation, allowing tests to continue while flagging potential issues.
When enabled, the pytest-test-categories plugin intercepts thread creation and emits warnings for small tests that use threading.
Why Threading Monitoring Matters¶
Tests that create threads introduce several problems:
Non-Determinism¶
Multi-threaded tests are inherently non-deterministic:
Thread scheduling varies between runs
Race conditions cause intermittent failures
Test results depend on timing, not logic
Debugging multi-threaded failures is difficult
Resource Management¶
Threads require careful lifecycle management:
Threads must be properly joined or they leak
Daemon threads may be killed mid-execution
Thread cleanup in test teardown is error-prone
Resource contention between tests
Isolation Challenges¶
Threading breaks test isolation:
Shared state between threads causes interference
Thread-local storage can leak between tests
Exceptions in threads may not fail the test
Deadlocks can hang the entire test suite
Single-Threaded Principle¶
Google’s test size definitions specify that small tests should be single-threaded:
“Small tests must run in a single process and a single thread.”
Software Engineering at Google
Why Monitoring Instead of Blocking?¶
Unlike network, filesystem, and subprocess isolation which block operations, threading monitoring only warns. This is because:
Library Dependencies: Many libraries use threading internally (logging, garbage collection, some test frameworks)
Infrastructure Code: pytest itself and its plugins may use threads
Breaking Blocking: Completely blocking threading could break legitimate test infrastructure
Gradual Adoption: Warnings allow teams to identify issues without immediate test failures
Threading monitoring provides visibility into thread usage while maintaining test suite stability.
Test Size Restrictions¶
Threading monitoring follows Google’s test size definitions from “Software Engineering at Google”:
Test Size |
Threading |
Monitoring Behavior |
|---|---|---|
Small |
Discouraged |
Warns on thread creation |
Medium |
Allowed |
No monitoring |
Large |
Allowed |
No monitoring |
XLarge |
Allowed |
No monitoring |
Small Tests¶
Small tests should be single-threaded:
Deterministic: No thread scheduling variability
Simple: No synchronization complexity
Fast: No thread creation overhead
Reliable: No race conditions or deadlocks
Threading monitoring emits warnings when small tests create threads, helping teams identify tests that should either be refactored or promoted to medium.
Medium, Large, and XLarge Tests¶
These tests may use threading freely for:
Testing concurrent code
Load testing with multiple workers
Integration tests with background tasks
Performance testing
How It Works¶
The plugin intercepts thread creation by patching Python’s threading modules:
Monitored Entry Points¶
The following thread creation entry points are monitored:
Entry Point |
Module |
Description |
|---|---|---|
|
|
Standard thread class |
|
|
Delayed execution (inherits from Thread) |
|
|
Thread pool for parallel execution |
|
|
Process pool (also monitored) |
Monitoring Behavior¶
When a small test creates a thread:
The monitor intercepts the Thread/Executor constructor
It checks if the current test is marked as
smallFor small tests, it emits a pytest warning
The thread is allowed to continue (not blocked)
Warning Format¶
When threading is detected in a small test:
PytestWarning: Small test 'tests/test_concurrent.py::test_parallel_processing'
uses threading.Thread. Small tests should be single-threaded for determinism.
Consider using @pytest.mark.medium if concurrency testing is required.
Enabling Threading Monitoring¶
Threading monitoring is controlled by the test_categories_enforcement configuration option.
Configuration via pyproject.toml¶
[tool.pytest.ini_options]
# Enable threading monitoring (as part of enforcement)
test_categories_enforcement = "warn"
Configuration via pytest.ini¶
[pytest]
test_categories_enforcement = warn
Configuration via Command Line¶
pytest --test-categories-enforcement=warn
Enforcement Modes¶
Threading monitoring behaves consistently in WARN mode. Unlike other isolation mechanisms:
Mode |
Behavior |
|---|---|
STRICT |
Emits warning (does not block) |
WARN |
Emits warning |
OFF |
No monitoring |
Threading monitoring never blocks thread creation, only warns about it.
Common Remediation Strategies¶
1. Mock Threading for Unit Tests¶
Replace thread creation with mocking:
import pytest
@pytest.mark.small
def test_background_task_scheduled(mocker):
mock_thread = mocker.patch("threading.Thread")
from myapp.tasks import schedule_background_task
schedule_background_task("process_data")
mock_thread.assert_called_once()
assert mock_thread.call_args[1]["target"].__name__ == "process_data"
2. Test Thread Logic Separately¶
Separate the logic from threading:
import pytest
# Production code
def process_items(items):
"""Business logic - no threading here."""
return [transform(item) for item in items]
def process_items_parallel(items, num_workers=4):
"""Parallel wrapper - tested in medium tests."""
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=num_workers) as executor:
return list(executor.map(transform, items))
# Small test: Test the logic
@pytest.mark.small
def test_process_items():
items = [1, 2, 3]
result = process_items(items)
assert result == [2, 4, 6] # Assuming transform doubles
# Medium test: Test the parallel execution
@pytest.mark.medium
def test_process_items_parallel():
items = list(range(100))
result = process_items_parallel(items, num_workers=4)
assert len(result) == 100
3. Use Dependency Injection for Executors¶
Design code to accept executor interfaces:
from abc import ABC, abstractmethod
from concurrent.futures import Executor, ThreadPoolExecutor
from typing import Callable, TypeVar
import pytest
T = TypeVar("T")
# Executor abstraction
class TaskExecutor(ABC):
@abstractmethod
def submit(self, fn: Callable[[], T]) -> T: ...
# Production implementation
class ThreadedExecutor(TaskExecutor):
def __init__(self, max_workers: int = 4):
self.pool = ThreadPoolExecutor(max_workers=max_workers)
def submit(self, fn: Callable[[], T]) -> T:
future = self.pool.submit(fn)
return future.result()
# Test implementation (synchronous)
class SyncExecutor(TaskExecutor):
def submit(self, fn: Callable[[], T]) -> T:
return fn() # Execute immediately in same thread
# Code using executor
class DataProcessor:
def __init__(self, executor: TaskExecutor):
self.executor = executor
def process(self, data: list) -> list:
results = []
for item in data:
result = self.executor.submit(lambda: transform(item))
results.append(result)
return results
# Small test with sync executor
@pytest.mark.small
def test_data_processor():
executor = SyncExecutor()
processor = DataProcessor(executor)
result = processor.process([1, 2, 3])
assert result == [2, 4, 6]
4. Use asyncio Instead of Threading¶
For I/O-bound concurrency, prefer asyncio:
import asyncio
import pytest
# Async code (no threading)
async def fetch_all(urls: list[str]) -> list[str]:
async def fetch_one(url: str) -> str:
# Simulated async fetch
await asyncio.sleep(0) # Yield control
return f"content from {url}"
return await asyncio.gather(*[fetch_one(url) for url in urls])
@pytest.mark.small
@pytest.mark.asyncio
async def test_fetch_all():
urls = ["http://a.com", "http://b.com"]
results = await fetch_all(urls)
assert len(results) == 2
assert "content from" in results[0]
5. Change Test Size¶
If the test genuinely requires threading:
import pytest
@pytest.mark.medium # Medium tests can use threading
def test_concurrent_processing():
from concurrent.futures import ThreadPoolExecutor
from myapp.processor import process_batch
items = list(range(1000))
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_batch, items))
assert len(results) == 1000
6. Suppress Warning for Known Cases¶
If a warning is expected and acceptable:
import pytest
import warnings
@pytest.mark.small
def test_with_expected_threading():
# Suppress the specific warning
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="Small test.*uses threading")
# Code that legitimately uses threading
from myapp.tasks import run_with_timeout
result = run_with_timeout(my_function, timeout=1.0)
assert result is not None
Best Practices¶
1. Review Threading Warnings Regularly¶
Treat threading warnings as technical debt:
pytest --test-categories-enforcement=warn 2>&1 | grep "uses threading"
2. Categorize Tests Appropriately¶
If a test needs threading, it probably should be a medium test:
# Before: Small test with threading (generates warning)
@pytest.mark.small
def test_parallel_computation():
from concurrent.futures import ThreadPoolExecutor
# ...
# After: Properly categorized as medium
@pytest.mark.medium
def test_parallel_computation():
from concurrent.futures import ThreadPoolExecutor
# ...
3. Isolate Concurrent Logic¶
Design code so concurrent execution is separate from business logic:
+------------------+ +-------------------+ +------------------+
| Business Logic | --> | Execution Layer | --> | Threading |
| (Small Tests) | | (Abstraction) | | (Medium Tests) |
+------------------+ +-------------------+ +------------------+
4. Use Thread-Safe Test Fixtures¶
When threading is necessary, ensure fixtures are thread-safe:
import threading
import pytest
@pytest.fixture
def thread_safe_counter():
class Counter:
def __init__(self):
self._count = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._count += 1
@property
def value(self):
with self._lock:
return self._count
return Counter()
@pytest.mark.medium
def test_concurrent_increments(thread_safe_counter):
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(thread_safe_counter.increment)
for _ in range(100)
]
concurrent.futures.wait(futures)
assert thread_safe_counter.value == 100
Understanding the Warnings¶
Common Warning Sources¶
Warning Source |
Likely Cause |
Recommended Action |
|---|---|---|
|
Direct thread creation |
Mock or change to medium |
|
Parallel task execution |
Use sync executor or change to medium |
|
Multiprocessing |
Mock or change to medium |
|
Delayed execution |
Mock or use async patterns |
Identifying the Root Cause¶
When you see a threading warning, check:
Is it your code? The thread might be created by a library.
Is it necessary? Could the logic be tested without threading?
Is the test size correct? Should this be a medium test?
Troubleshooting¶
“Threading warning in library code”¶
Some libraries create threads during import or initialization.
Solution: This is expected. Either:
Ignore the warning if it’s from infrastructure
Mock the library’s thread creation
Mark the test as medium if threading is integral
“Warning appears for Timer usage”¶
threading.Timer inherits from threading.Thread, so it triggers monitoring.
Solution: Mock the timer or use a controllable time abstraction:
@pytest.mark.small
def test_scheduled_task(mocker):
mock_timer = mocker.patch("threading.Timer")
from myapp.scheduler import schedule_task
schedule_task("cleanup", delay=60.0)
mock_timer.assert_called_once()
“ThreadPoolExecutor warning even when mocked”¶
The warning triggers on class instantiation. Ensure you’re patching at the right level:
@pytest.mark.small
def test_parallel_fetch(mocker):
# Patch the class, not just the submit method
mock_executor = mocker.patch("concurrent.futures.ThreadPoolExecutor")
mock_instance = mocker.MagicMock()
mock_executor.return_value.__enter__.return_value = mock_instance
from myapp.fetcher import fetch_parallel
fetch_parallel(["url1", "url2"])
mock_instance.submit.assert_called()