"""Core plugin implementation.
This module provides the pytest plugin entry point and hook implementations.
It follows hexagonal architecture by orchestrating services and adapters
rather than containing business logic.
The plugin's sole responsibility is:
- Registering pytest hooks
- Orchestrating calls to services through ports
- Managing session lifecycle
All business logic is delegated to services:
- TestDiscoveryService: Finding test size markers
- TimingValidationService: Validating test timing
- DistributionValidationService: Validating distribution
- TestReportingService: Managing test reports
All pytest interactions go through adapters:
- PytestConfigAdapter: Config state management
- PytestItemAdapter: Test item abstraction
- PytestWarningAdapter: Warning system
- TerminalReporterAdapter: Terminal output
This design makes the hooks thin orchestration layers (5-15 lines each)
that are easy to understand and maintain.
"""
from __future__ import annotations
import json
from collections import defaultdict
from contextlib import ExitStack
from importlib.metadata import version
from pathlib import Path
from typing import (
TYPE_CHECKING,
cast,
)
import pytest
from pytest_test_categories.adapters.database import DatabasePatchingBlocker
from pytest_test_categories.adapters.external_systems import ExternalSystemsDetector
from pytest_test_categories.adapters.filesystem import FilesystemPatchingBlocker
from pytest_test_categories.adapters.network import SocketPatchingNetworkBlocker
from pytest_test_categories.adapters.process import SubprocessPatchingBlocker
from pytest_test_categories.adapters.pytest_adapter import (
PytestConfigAdapter,
PytestItemAdapter,
PytestWarningAdapter,
TerminalReporterAdapter,
)
from pytest_test_categories.adapters.sleep import SleepPatchingBlocker
from pytest_test_categories.adapters.threading import ThreadPatchingMonitor
from pytest_test_categories.distribution.config import (
DEFAULT_DISTRIBUTION_CONFIG,
DistributionConfig,
)
from pytest_test_categories.distribution.stats import (
DistributionStats,
TestCounts,
)
from pytest_test_categories.json_report import JsonReport
from pytest_test_categories.ports.network import EnforcementMode
from pytest_test_categories.services.distribution_validation import (
DistributionValidationService,
DistributionViolationError,
)
from pytest_test_categories.services.hermeticity_summary import HermeticitySummaryService
from pytest_test_categories.services.suggestion_summary import SuggestionSummaryService
from pytest_test_categories.services.test_discovery import TestDiscoveryService
from pytest_test_categories.services.test_reporting import TestReportingService
from pytest_test_categories.services.timing_validation import TimingValidationService
from pytest_test_categories.suggestion import (
SuggestionCollector,
)
from pytest_test_categories.timers import WallTimer
from pytest_test_categories.timing import (
PerformanceBaselineViolationError,
TimingViolationError,
)
from pytest_test_categories.types import (
TestSize,
TimerState,
)
from pytest_test_categories.violation_tracking import (
ViolationTracker,
ViolationType,
)
from pytest_test_categories.xdist_compat import (
WORKEROUTPUT_DISTRIBUTION_KEY,
WORKEROUTPUT_REPORT_KEY,
deserialize_distribution_counts,
is_xdist_worker,
merge_report_data,
serialize_distribution_counts,
serialize_report_data,
)
if TYPE_CHECKING:
from collections.abc import Generator
import pytest_test_categories.types
from pytest_test_categories.adapters.pytest_adapter import PytestConfigAdapter as PytestConfigAdapterType
from pytest_test_categories.distribution.stats import DistributionStats as DistributionStatsType
from pytest_test_categories.reporting import TestSizeReport
# Package version for JSON report
[docs]
PLUGIN_VERSION = version('pytest-test-categories')
# Valid enforcement modes for ini option validation
_VALID_ENFORCEMENT_MODES = {'off', 'warn', 'strict'}
[docs]
def pytest_addoption(parser: pytest.Parser) -> None:
"""Add plugin-specific command-line options.
This hook registers the --test-size-report option that controls
whether and how test size reports are generated and the
--test-categories-enforcement option for resource blocking control.
Args:
parser: The pytest command-line option parser.
"""
group = parser.getgroup('test-categories')
group.addoption(
'--test-size-report',
action='store',
default=None,
choices=[None, 'basic', 'detailed', 'json'],
nargs='?',
const='basic',
help='Generate a report of test sizes (basic, detailed, or json)',
)
group.addoption(
'--test-size-report-file',
action='store',
default=None,
help='Output file path for JSON report (requires --test-size-report=json)',
)
group.addoption(
'--test-categories-enforcement',
action='store',
default=None,
choices=['off', 'warn', 'strict'],
help='Set enforcement mode for test hermeticity (off, warn, strict). Overrides ini option.',
)
parser.addini(
'test_categories_enforcement',
help='Enforcement mode for test hermeticity: off (default), warn, or strict',
default='off',
)
# Distribution enforcement options
group.addoption(
'--test-categories-distribution-enforcement',
action='store',
default=None,
choices=['off', 'warn', 'strict'],
help='Set enforcement mode for distribution validation (off, warn, strict). Overrides ini option.',
)
parser.addini(
'test_categories_distribution_enforcement',
help='Enforcement mode for distribution validation: off (default), warn, or strict',
default='off',
)
# Distribution target configuration options
group.addoption(
'--test-categories-small-target',
action='store',
type=float,
default=None,
help='Target percentage for small tests (default: 80.0). Overrides ini option.',
)
group.addoption(
'--test-categories-medium-target',
action='store',
type=float,
default=None,
help='Target percentage for medium tests (default: 15.0). Overrides ini option.',
)
group.addoption(
'--test-categories-large-target',
action='store',
type=float,
default=None,
help='Target percentage for large/xlarge tests (default: 5.0). Overrides ini option.',
)
group.addoption(
'--test-categories-tolerance',
action='store',
type=float,
default=None,
help='Tolerance percentage for all sizes (default: 5.0 small/medium, 3.0 large). Overrides ini.',
)
# Distribution target ini options
parser.addini(
'test_categories_small_target',
help='Target percentage for small tests (default: 80.0)',
default='',
)
parser.addini(
'test_categories_medium_target',
help='Target percentage for medium tests (default: 15.0)',
default='',
)
parser.addini(
'test_categories_large_target',
help='Target percentage for large/xlarge tests (default: 5.0)',
default='',
)
parser.addini(
'test_categories_tolerance',
help='Tolerance percentage for all test sizes (default: 5.0 for small/medium, 3.0 for large)',
default='',
)
# Auto-categorization suggestion options
group.addoption(
'--test-categories-suggest',
action='store_true',
default=False,
help='Analyze tests and suggest appropriate size categories based on observed behavior.',
)
group.addoption(
'--test-categories-suggest-output',
action='store',
default=None,
help='Output file path for JSON suggestion report (requires --test-categories-suggest).',
)
@pytest.hookimpl(tryfirst=True)
@pytest.hookimpl(tryfirst=True)
[docs]
def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None:
"""Count tests by size and append size labels to test IDs."""
config_adapter = PytestConfigAdapter(config)
session_state = config_adapter.get_plugin_state()
discovery_service = _ensure_discovery_service(session_state)
# Get suggestion collector if in suggest mode
suggestion_collector = cast('SuggestionCollector | None', session_state.suggestion_collector)
# Count tests by size using the discovery service
counts: dict[TestSize, int] = defaultdict(int)
for item in items:
item_adapter = PytestItemAdapter(item)
test_size = discovery_service.find_test_size(item_adapter)
if test_size:
counts[test_size] += 1
# Append size label to test node ID
item_adapter.set_nodeid(f'{item_adapter.nodeid} {test_size.label}')
# Record current size in suggestion collector (includes None for uncategorized)
if suggestion_collector is not None:
suggestion_collector.record_current_size(item.nodeid, test_size)
# Update distribution stats with the counts
current_stats = config_adapter.get_distribution_stats()
updated_stats = current_stats.update_counts(counts=counts)
config_adapter.set_distribution_stats(updated_stats)
@pytest.hookimpl
[docs]
def pytest_collection_finish(session: pytest.Session) -> None:
"""Validate test distribution after collection.
Uses the distribution enforcement mode to determine behavior:
- OFF: Skip validation entirely
- WARN: Emit warning if out of spec, allow build to continue
- STRICT: Raise DistributionViolationError if out of spec
Args:
session: The pytest session object.
Raises:
pytest.UsageError: If enforcement mode is STRICT and distribution
is outside acceptable range.
"""
config_adapter = PytestConfigAdapter(session.config)
session_state = config_adapter.get_plugin_state()
stats = config_adapter.get_distribution_stats()
warning_system = PytestWarningAdapter()
validation_service = DistributionValidationService()
enforcement_mode = _get_distribution_enforcement_mode(session.config)
# Get distribution config from session state (initialized in pytest_configure)
distribution_config = cast('DistributionConfig | None', session_state.distribution_config)
try:
validation_service.validate_distribution(stats, warning_system, enforcement_mode, config=distribution_config)
except DistributionViolationError as e:
raise pytest.UsageError(str(e)) from e
@pytest.hookimpl(hookwrapper=True)
[docs]
def pytest_runtest_protocol(item: pytest.Item, nextitem: pytest.Item | None) -> Generator[None, None, None]: # noqa: ARG001
"""Track test timing during execution."""
config_adapter = PytestConfigAdapter(item.config)
session_state = config_adapter.get_plugin_state()
discovery_service = _ensure_discovery_service(session_state)
# Add test to report if reporting is enabled
if session_state.test_size_report is not None:
item_adapter = PytestItemAdapter(item)
test_size = discovery_service.find_test_size(item_adapter)
reporting_service = TestReportingService()
test_report = cast('TestSizeReport', session_state.test_size_report)
reporting_service.add_test_to_report(test_report, item.nodeid, test_size)
# Create and start timer for this test
if item.nodeid not in session_state.timers:
# Type narrowing: timer_factory is guaranteed to be set in pytest_configure
if session_state.timer_factory is None:
msg = 'timer_factory must be initialized in pytest_configure'
raise RuntimeError(msg)
timer = session_state.timer_factory(state=TimerState.READY)
session_state.timers[item.nodeid] = timer
else:
timer = session_state.timers[item.nodeid]
try:
timer.start()
yield # Let the test run
finally:
# Ensure timer is always stopped, even if test fails
if timer.state == TimerState.RUNNING:
timer.stop()
@pytest.hookimpl(hookwrapper=True)
[docs]
def pytest_runtest_call(item: pytest.Item) -> Generator[None, None, None]: # noqa: PLR0915
"""Block resources based on test size during execution and monitor threading.
This hook enforces resource isolation based on test size and enforcement
configuration. When enforcement is enabled (strict or warn):
Network access (based on Google's test size definitions):
- Small tests: All network blocked (BLOCK_ALL)
- Medium tests: Localhost only (LOCALHOST_ONLY)
- Large/XLarge tests: Full network access (ALLOW_ALL)
Filesystem and process isolation (small tests only):
- Small tests: Filesystem access blocked (no escape hatches)
- Small tests: Subprocess spawning blocked
- Small tests: Database connections blocked
- Small tests: Thread creation warnings emitted
External systems detection (medium tests only):
- Medium tests: Warn if testcontainers/docker imported (unless suppressed)
- Suppressed via @pytest.mark.medium(allow_external_systems=True)
Note: Thread monitoring WARNS instead of blocking because many libraries
use threading internally. Blocking would break legitimate test infrastructure.
Uses ExitStack pattern to manage all resource blockers together,
ensuring proper cleanup even if exceptions occur.
Args:
item: The test item being executed.
Yields:
Control to pytest to run the test.
"""
enforcement_mode = _get_enforcement_mode(item.config)
if enforcement_mode == EnforcementMode.OFF:
yield
return
config_adapter = PytestConfigAdapter(item.config)
session_state = config_adapter.get_plugin_state()
discovery_service = _ensure_discovery_service(session_state)
item_adapter = PytestItemAdapter(item)
test_size = discovery_service.find_test_size(item_adapter)
# Large, XLarge, and unsized tests have no restrictions
if test_size is None or test_size in (TestSize.LARGE, TestSize.XLARGE):
yield
return
# Use ExitStack for combined resource blocking
# At this point test_size is guaranteed to be SMALL or MEDIUM
with ExitStack() as stack:
# Network blocking applies to both small and medium tests
# - Small: BLOCK_ALL (no network)
# - Medium: LOCALHOST_ONLY (localhost only)
network_blocker = _get_network_blocker(item.config)
network_blocker.current_test_nodeid = item.nodeid
network_blocker.activate(test_size, enforcement_mode)
stack.callback(_safe_deactivate_network, network_blocker)
# Filesystem and process blocking only applies to small tests
if test_size == TestSize.SMALL:
# Activate filesystem blocker - blocks ALL filesystem access (no exceptions)
filesystem_blocker = _get_filesystem_blocker(item.config)
filesystem_blocker.current_test_nodeid = item.nodeid
filesystem_blocker.activate(test_size, enforcement_mode, frozenset())
stack.callback(_safe_deactivate_filesystem, filesystem_blocker)
# Activate process blocker
process_blocker = _get_process_blocker(item.config)
process_blocker.current_test_nodeid = item.nodeid
process_blocker.activate(test_size, enforcement_mode)
stack.callback(_safe_deactivate_process, process_blocker)
# Activate sleep blocker
sleep_blocker = _get_sleep_blocker(item.config)
sleep_blocker.current_test_nodeid = item.nodeid
sleep_blocker.activate(test_size, enforcement_mode)
stack.callback(_safe_deactivate_sleep, sleep_blocker)
# Activate database blocker
database_blocker = _get_database_blocker(item.config)
database_blocker.current_test_nodeid = item.nodeid
database_blocker.activate(test_size, enforcement_mode)
stack.callback(_safe_deactivate_database, database_blocker)
# Activate thread monitor (warns instead of blocking)
thread_monitor = _get_thread_monitor(item.config)
thread_monitor.current_test_nodeid = item.nodeid
thread_monitor.activate(test_size, enforcement_mode)
stack.callback(_safe_deactivate_thread_monitor, thread_monitor)
# External systems detection for MEDIUM tests only
# Per Google's test sizes, external systems are DISCOURAGED (not prohibited)
external_systems_detector: ExternalSystemsDetector | None = None
if test_size == TestSize.MEDIUM:
# Check if warning is suppressed via marker kwarg
marker_kwargs = item_adapter.get_marker_kwargs('medium')
allow_external = marker_kwargs.get('allow_external_systems', False)
if not allow_external:
external_systems_detector = _get_external_systems_detector(item.config)
external_systems_detector.current_test_nodeid = item.nodeid
external_systems_detector.activate(test_size, enforcement_mode)
stack.callback(_safe_deactivate_external_systems, external_systems_detector)
yield
# After test execution, check for external systems imports (medium tests only)
if external_systems_detector is not None and external_systems_detector.is_active:
detected = external_systems_detector.check_external_systems_detected()
if detected:
external_systems_detector.on_external_systems_detected(detected, item.nodeid)
@pytest.hookimpl(hookwrapper=True)
[docs]
def pytest_runtest_makereport(item: pytest.Item) -> Generator[None, None, None]:
"""Validate timing and update test reports.
Args:
item: The test item that ran.
Yields:
Control to pytest to generate the report.
"""
config_adapter = PytestConfigAdapter(item.config)
session_state = config_adapter.get_plugin_state()
discovery_service = _ensure_discovery_service(session_state)
item_adapter = PytestItemAdapter(item)
test_size = discovery_service.find_test_size(item_adapter)
outcome = yield
report = outcome.get_result() # type: ignore[attr-defined]
# Only process call phase reports
if report.when != 'call':
return
# Get duration once for both validation and reporting
timer = session_state.timers.get(item.nodeid)
timing_service = TimingValidationService()
duration = timing_service.get_test_duration(
timer,
report.duration if hasattr(report, 'duration') else None,
)
# Validate timing if test has a size marker
if test_size and duration is not None:
try:
# Check for custom timeout baseline from marker
custom_timeout = discovery_service.get_timeout(item_adapter)
timing_service.validate_timing_with_baseline(test_size, duration, custom_timeout, item.nodeid)
except (PerformanceBaselineViolationError, TimingViolationError, ValueError) as e:
report.longrepr = str(e)
report.outcome = 'failed'
# Update test size report if enabled
if session_state.test_size_report is not None:
test_report = cast('TestSizeReport', session_state.test_size_report)
TestReportingService().update_test_result(
test_report,
item.nodeid,
report.outcome,
duration,
)
timing_service.cleanup_timer(session_state.timers, item.nodeid)
# Record execution time in suggestion collector if in suggest mode
suggestion_collector = cast('SuggestionCollector | None', session_state.suggestion_collector)
if suggestion_collector is not None and duration is not None:
suggestion_collector.record_execution_time(item.nodeid, duration)
@pytest.hookimpl
[docs]
def pytest_terminal_summary(terminalreporter: pytest.TerminalReporter) -> None:
"""Write distribution summary, hermeticity violations, and optional size report."""
config_adapter = PytestConfigAdapter(terminalreporter.config)
session_state = config_adapter.get_plugin_state()
stats = config_adapter.get_distribution_stats()
# Write distribution summary through the port interface
writer = TerminalReporterAdapter(terminalreporter)
reporting_service = TestReportingService()
reporting_service.write_distribution_summary(stats, writer)
# Write hermeticity violation summary if any violations occurred
violation_tracker = cast('ViolationTracker', session_state.violation_tracker)
if violation_tracker.has_violations:
enforcement_mode = _get_enforcement_mode(terminalreporter.config)
quiet = terminalreporter.verbosity < 0
hermeticity_service = HermeticitySummaryService()
hermeticity_service.write_hermeticity_summary(violation_tracker, enforcement_mode, writer, quiet=quiet)
# Add test size report if requested
if session_state.test_size_report is not None:
test_report = cast('TestSizeReport', session_state.test_size_report)
report_type = config_adapter.get_option('--test-size-report')
if report_type == 'json':
_write_json_report(test_report, stats, config_adapter, terminalreporter, violation_tracker)
elif report_type == 'detailed':
test_report.write_detailed_report(terminalreporter)
else:
test_report.write_basic_report(terminalreporter)
# Write suggestion summary if in suggest mode
suggestion_collector = cast('SuggestionCollector | None', session_state.suggestion_collector)
if suggestion_collector is not None:
suggestion_service = SuggestionSummaryService()
suggestion_service.write_suggestion_summary(suggestion_collector, writer)
# Write JSON output if requested
json_output_path = config_adapter.get_option('--test-categories-suggest-output')
if json_output_path:
_write_suggestion_json_report(suggestion_collector, json_output_path)
# =============================================================================
# pytest-xdist Compatibility Hooks
# =============================================================================
@pytest.hookimpl
[docs]
def pytest_sessionfinish(session: pytest.Session) -> None:
"""Send distribution stats and report data to controller when running as xdist worker.
When running with pytest-xdist, workers collect and run tests but the terminal
summary is displayed by the controller. This hook sends the worker's stats back
to the controller via the workeroutput mechanism.
Args:
session: The pytest session object.
"""
if not is_xdist_worker():
return
# Get config and check for workeroutput (only present on workers)
config = session.config
workeroutput = getattr(config, 'workeroutput', None)
if workeroutput is None:
return
config_adapter = PytestConfigAdapter(config)
session_state = config_adapter.get_plugin_state()
stats = config_adapter.get_distribution_stats()
# Serialize and send distribution counts
if stats is not None:
workeroutput[WORKEROUTPUT_DISTRIBUTION_KEY] = serialize_distribution_counts(stats.counts)
# Serialize and send report data if reporting is enabled
if session_state.test_size_report is not None:
test_report = cast('TestSizeReport', session_state.test_size_report)
workeroutput[WORKEROUTPUT_REPORT_KEY] = serialize_report_data(test_report)
@pytest.hookimpl(optionalhook=True)
[docs]
def pytest_testnodedown(node: object, error: object | None) -> None: # noqa: ARG001
"""Aggregate distribution stats from a worker when it shuts down.
This xdist hook is called on the controller when a worker node completes.
We use it to aggregate the distribution counts and report data from all workers.
Args:
node: The xdist WorkerController node that shut down.
error: Error if the worker crashed, None otherwise.
"""
# This hook only runs on controller, but verify we're not a worker
if is_xdist_worker():
return
# Access workeroutput from the node
workeroutput = getattr(node, 'workeroutput', None)
if workeroutput is None:
return
# Get controller's config from the node
config = getattr(node, 'config', None)
if config is None:
return
config_adapter = PytestConfigAdapter(config)
session_state = config_adapter.get_plugin_state()
# Get distribution counts from worker
# With xdist, each worker collects ALL tests but only runs assigned ones.
# The distribution counts from any worker represent the full test suite,
# so we only need to take the counts from the first worker (controller starts at 0).
worker_dist_data = workeroutput.get(WORKEROUTPUT_DISTRIBUTION_KEY)
if worker_dist_data is not None:
worker_counts = deserialize_distribution_counts(worker_dist_data)
current_stats = config_adapter.get_distribution_stats()
# Only update if controller hasn't been populated yet (counts are all 0)
# This ensures we use the first worker's counts and don't double-count
current_total = (
current_stats.counts.small
+ current_stats.counts.medium
+ current_stats.counts.large
+ current_stats.counts.xlarge
)
if current_total == 0:
updated_stats = DistributionStats(counts=TestCounts(**worker_counts))
config_adapter.set_distribution_stats(updated_stats)
# Aggregate report data from worker
worker_report_data = workeroutput.get(WORKEROUTPUT_REPORT_KEY)
if worker_report_data is not None and session_state.test_size_report is not None:
test_report = cast('TestSizeReport', session_state.test_size_report)
merge_report_data(test_report, worker_report_data)
def _ensure_discovery_service(session_state: pytest_test_categories.types.PluginState) -> TestDiscoveryService:
"""Ensure TestDiscoveryService is initialized.
This is a helper function that creates the discovery service if it
doesn't exist. This should never be needed in normal operation (the
service is created in pytest_configure), but provides a safety net.
Args:
session_state: The plugin state containing the discovery service.
Returns:
The TestDiscoveryService instance.
"""
if session_state.test_discovery_service is None:
warning_system = PytestWarningAdapter()
session_state.test_discovery_service = TestDiscoveryService(warning_system=warning_system)
return cast('TestDiscoveryService', session_state.test_discovery_service)
def _get_enforcement_mode(config: pytest.Config) -> EnforcementMode:
"""Get the enforcement mode from configuration.
CLI option takes precedence over ini setting.
Args:
config: The pytest configuration object.
Returns:
The EnforcementMode enum value.
"""
cli_value = config.getoption('--test-categories-enforcement', default=None)
if cli_value is not None:
return EnforcementMode(cli_value)
ini_value = config.getini('test_categories_enforcement')
if ini_value and ini_value in _VALID_ENFORCEMENT_MODES:
return EnforcementMode(ini_value)
return EnforcementMode.OFF
def _get_distribution_enforcement_mode(config: pytest.Config) -> EnforcementMode:
"""Get the distribution enforcement mode from configuration.
CLI option takes precedence over ini setting.
Args:
config: The pytest configuration object.
Returns:
The EnforcementMode enum value for distribution validation.
"""
cli_value = config.getoption('--test-categories-distribution-enforcement', default=None)
if cli_value is not None:
return EnforcementMode(cli_value)
ini_value = config.getini('test_categories_distribution_enforcement')
if ini_value and ini_value in _VALID_ENFORCEMENT_MODES:
return EnforcementMode(ini_value)
return EnforcementMode.OFF
def _get_distribution_config(config: pytest.Config) -> DistributionConfig:
"""Get the distribution target configuration from CLI and ini options.
CLI options take precedence over ini settings.
Priority (highest to lowest):
1. CLI options (--test-categories-small-target, etc.)
2. Ini options (test_categories_small_target, etc.)
3. Default values from DEFAULT_DISTRIBUTION_CONFIG
Args:
config: The pytest configuration object.
Returns:
A DistributionConfig with the resolved targets and tolerances.
"""
# Start with defaults
targets: dict[str, float] = {
'small_target': DEFAULT_DISTRIBUTION_CONFIG.small_target,
'medium_target': DEFAULT_DISTRIBUTION_CONFIG.medium_target,
'large_target': DEFAULT_DISTRIBUTION_CONFIG.large_target,
'small_tolerance': DEFAULT_DISTRIBUTION_CONFIG.small_tolerance,
'medium_tolerance': DEFAULT_DISTRIBUTION_CONFIG.medium_tolerance,
'large_tolerance': DEFAULT_DISTRIBUTION_CONFIG.large_tolerance,
}
# Override with ini values for targets (lower priority)
for category in ['small', 'medium', 'large']:
ini_value = config.getini(f'test_categories_{category}_target')
if ini_value and ini_value.strip():
targets[f'{category}_target'] = float(ini_value)
# Override with ini tolerance (applies to all sizes)
ini_tolerance = config.getini('test_categories_tolerance')
if ini_tolerance and ini_tolerance.strip():
tolerance_value = float(ini_tolerance)
targets['small_tolerance'] = tolerance_value
targets['medium_tolerance'] = tolerance_value
targets['large_tolerance'] = tolerance_value
# Override with CLI values for targets (highest priority)
for category in ['small', 'medium', 'large']:
cli_value = config.getoption(f'--test-categories-{category}-target', default=None)
if cli_value is not None:
targets[f'{category}_target'] = float(cli_value)
# Override with CLI tolerance (applies to all sizes)
cli_tolerance = config.getoption('--test-categories-tolerance', default=None)
if cli_tolerance is not None:
tolerance_value = float(cli_tolerance)
targets['small_tolerance'] = tolerance_value
targets['medium_tolerance'] = tolerance_value
targets['large_tolerance'] = tolerance_value
return DistributionConfig(**targets)
def _make_violation_callback(config: pytest.Config) -> object:
"""Create a violation callback that records to the session's ViolationTracker.
This callback is passed to blocker adapters to record violations for the
terminal summary output.
Args:
config: The pytest configuration object.
Returns:
A callable that records violations to the ViolationTracker.
"""
config_adapter = PytestConfigAdapter(config)
def callback(violation_type_str: str, test_nodeid: str, details: str, *, failed: bool) -> None:
"""Record a violation to the session's ViolationTracker."""
session_state = config_adapter.get_plugin_state()
violation_tracker = cast('ViolationTracker', session_state.violation_tracker)
# Map string to ViolationType enum
violation_type_map = {
'network': ViolationType.NETWORK,
'filesystem': ViolationType.FILESYSTEM,
'process': ViolationType.PROCESS,
'database': ViolationType.DATABASE,
'sleep': ViolationType.SLEEP,
}
violation_type = violation_type_map.get(violation_type_str, ViolationType.NETWORK)
violation_tracker.record_violation(violation_type, test_nodeid, details, failed=failed)
return callback
def _get_network_blocker(config: pytest.Config) -> SocketPatchingNetworkBlocker:
"""Get or create the network blocker instance.
The blocker is stored on the config object to ensure proper lifecycle
management across test execution.
Args:
config: The pytest configuration object.
Returns:
The SocketPatchingNetworkBlocker instance.
"""
blocker_attr = '_test_categories_network_blocker'
if not hasattr(config, blocker_attr):
blocker = SocketPatchingNetworkBlocker()
blocker.violation_callback = _make_violation_callback(config)
setattr(config, blocker_attr, blocker)
return cast('SocketPatchingNetworkBlocker', getattr(config, blocker_attr))
def _get_filesystem_blocker(config: pytest.Config) -> FilesystemPatchingBlocker:
"""Get or create the filesystem blocker instance.
The blocker is stored on the config object to ensure proper lifecycle
management across test execution.
Args:
config: The pytest configuration object.
Returns:
The FilesystemPatchingBlocker instance.
"""
blocker_attr = '_test_categories_filesystem_blocker'
if not hasattr(config, blocker_attr):
blocker = FilesystemPatchingBlocker()
blocker.violation_callback = _make_violation_callback(config)
setattr(config, blocker_attr, blocker)
return cast('FilesystemPatchingBlocker', getattr(config, blocker_attr))
def _safe_deactivate_network(blocker: SocketPatchingNetworkBlocker) -> None:
"""Safely deactivate network blocker, handling edge cases.
This function is used as a callback in ExitStack to ensure cleanup.
Args:
blocker: The network blocker to deactivate.
"""
if blocker.state.value == 'active':
blocker.deactivate()
def _safe_deactivate_filesystem(blocker: FilesystemPatchingBlocker) -> None:
"""Safely deactivate filesystem blocker, handling edge cases.
This function is used as a callback in ExitStack to ensure cleanup.
Args:
blocker: The filesystem blocker to deactivate.
"""
if blocker.state.value == 'active':
blocker.deactivate()
def _get_process_blocker(config: pytest.Config) -> SubprocessPatchingBlocker:
"""Get or create the process blocker instance.
The blocker is stored on the config object to ensure proper lifecycle
management across test execution.
Args:
config: The pytest configuration object.
Returns:
The SubprocessPatchingBlocker instance.
"""
blocker_attr = '_test_categories_process_blocker'
if not hasattr(config, blocker_attr):
blocker = SubprocessPatchingBlocker()
blocker.violation_callback = _make_violation_callback(config)
setattr(config, blocker_attr, blocker)
return cast('SubprocessPatchingBlocker', getattr(config, blocker_attr))
def _safe_deactivate_process(blocker: SubprocessPatchingBlocker) -> None:
"""Safely deactivate process blocker, handling edge cases.
This function is used as a callback in ExitStack to ensure cleanup.
Args:
blocker: The process blocker to deactivate.
"""
if blocker.state.value == 'active':
blocker.deactivate()
def _get_sleep_blocker(config: pytest.Config) -> SleepPatchingBlocker:
"""Get or create the sleep blocker instance.
The blocker is stored on the config object to ensure proper lifecycle
management across test execution.
Args:
config: The pytest configuration object.
Returns:
The SleepPatchingBlocker instance.
"""
blocker_attr = '_test_categories_sleep_blocker'
if not hasattr(config, blocker_attr):
blocker = SleepPatchingBlocker()
blocker.violation_callback = _make_violation_callback(config)
setattr(config, blocker_attr, blocker)
return cast('SleepPatchingBlocker', getattr(config, blocker_attr))
def _safe_deactivate_sleep(blocker: SleepPatchingBlocker) -> None:
"""Safely deactivate sleep blocker, handling edge cases.
This function is used as a callback in ExitStack to ensure cleanup.
Args:
blocker: The sleep blocker to deactivate.
"""
if blocker.state.value == 'active':
blocker.deactivate()
def _get_database_blocker(config: pytest.Config) -> DatabasePatchingBlocker:
"""Get or create the database blocker instance.
The blocker is stored on the config object to ensure proper lifecycle
management across test execution.
Args:
config: The pytest configuration object.
Returns:
The DatabasePatchingBlocker instance.
"""
blocker_attr = '_test_categories_database_blocker'
if not hasattr(config, blocker_attr):
blocker = DatabasePatchingBlocker()
blocker.violation_callback = _make_violation_callback(config)
setattr(config, blocker_attr, blocker)
return cast('DatabasePatchingBlocker', getattr(config, blocker_attr))
def _safe_deactivate_database(blocker: DatabasePatchingBlocker) -> None:
"""Safely deactivate database blocker, handling edge cases.
This function is used as a callback in ExitStack to ensure cleanup.
Args:
blocker: The database blocker to deactivate.
"""
if blocker.state.value == 'active':
blocker.deactivate()
def _get_thread_monitor(config: pytest.Config) -> ThreadPatchingMonitor:
"""Get or create the thread monitor instance.
The monitor is stored on the config object to ensure proper lifecycle
management across test execution.
Args:
config: The pytest configuration object.
Returns:
The ThreadPatchingMonitor instance.
"""
monitor_attr = '_test_categories_thread_monitor'
if not hasattr(config, monitor_attr):
monitor = ThreadPatchingMonitor()
setattr(config, monitor_attr, monitor)
return cast('ThreadPatchingMonitor', getattr(config, monitor_attr))
def _safe_deactivate_thread_monitor(monitor: ThreadPatchingMonitor) -> None:
"""Safely deactivate thread monitor, handling edge cases.
This function is used as a callback in ExitStack to ensure cleanup.
Args:
monitor: The thread monitor to deactivate.
"""
if monitor.state.value == 'active':
monitor.deactivate()
def _get_external_systems_detector(config: pytest.Config) -> ExternalSystemsDetector:
"""Get or create the external systems detector instance.
The detector is stored on the config object to ensure proper lifecycle
management across test execution.
Args:
config: The pytest configuration object.
Returns:
The ExternalSystemsDetector instance.
"""
detector_attr = '_test_categories_external_systems_detector'
if not hasattr(config, detector_attr):
detector = ExternalSystemsDetector()
setattr(config, detector_attr, detector)
return cast('ExternalSystemsDetector', getattr(config, detector_attr))
def _safe_deactivate_external_systems(detector: ExternalSystemsDetector) -> None:
"""Safely deactivate external systems detector, handling edge cases.
This function is used as a callback in ExitStack to ensure cleanup.
Args:
detector: The external systems detector to deactivate.
"""
if detector.state.value == 'active':
detector.deactivate()
def _write_json_report(
test_report: TestSizeReport,
stats: DistributionStatsType,
config_adapter: PytestConfigAdapterType,
terminalreporter: pytest.TerminalReporter,
violation_tracker: ViolationTracker | None = None,
) -> None:
"""Write JSON report to file or stdout.
Args:
test_report: The test size report containing test data.
stats: The distribution statistics.
config_adapter: The config adapter for accessing options.
terminalreporter: The terminal reporter for output.
violation_tracker: Optional violation tracker with hermeticity violations.
"""
json_report = JsonReport.from_test_size_report(
test_report=test_report,
distribution_stats=stats,
version=PLUGIN_VERSION,
violation_tracker=violation_tracker,
)
json_output = json_report.model_dump_json(indent=2)
file_path = config_adapter.get_option('--test-size-report-file')
if file_path:
output_path = Path(str(file_path))
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json_output)
else:
terminalreporter.write_line('')
terminalreporter.write_line(json_output)
def _write_suggestion_json_report(
collector: SuggestionCollector,
output_path_str: object,
) -> None:
"""Write suggestion report to JSON file.
Args:
collector: The SuggestionCollector with observed data.
output_path_str: The output file path.
"""
suggestions = collector.generate_suggestions()
output_data = {
'version': PLUGIN_VERSION,
'suggestions': [
{
'test_nodeid': s.test_nodeid,
'current_size': s.current_size.value if s.current_size else None,
'suggested_size': s.suggested_size.value,
'reason': s.reason,
}
for s in suggestions
],
}
output_path = Path(str(output_path_str))
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(output_data, indent=2))