Skip to content

[FEAT]: pytest-xdist support for distributed test execution#73

Open
nina-msft wants to merge 4 commits into
microsoft:mainfrom
nina-msft:dev/nina-msft/8259
Open

[FEAT]: pytest-xdist support for distributed test execution#73
nina-msft wants to merge 4 commits into
microsoft:mainfrom
nina-msft:dev/nina-msft/8259

Conversation

@nina-msft

@nina-msft nina-msft commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

Description

Summary

Adds first-class pytest-xdist support to RAMPART so attack/probe sessions can shard across worker processes and still produce a single coherent report. Worker processes serialize their results through xdist's workeroutput channel; the controller deserializes, merges, and writes the final report.

What's included

New: _xdist.py

  • Versioned schema (rampart.xdist.v1) for worker → controller payloads
  • _sanitize JSON-safe coercion at the trust boundary with depth limit, byte-size cap, and ANSI-escape stripping on deserialize to defend against terminal injection from worker output
  • SizeLimitError raised by finalize_worker when serialized payload exceeds the configured cap; controller logs and continues
  • Configurable cap via new pytest CLI option --rampart-xdist-max-bytes / ini option rampart_xdist_max_bytes (default 64 MiB), validated positive
  • Sink resolution re-raises KeyboardInterrupt/SystemExit, catches Exception

Trial-group aggregation across workers

  • Trial metadata is captured as data on RampartSession at collection time and shipped through the rampart.xdist.v1 payload (trial_specs), so the controller aggregates trial groups from merged worker data instead of session.items — which is not reliably populated with trial clones on the controller at session finish. Without this, trial-group FAIL/PASS verdicts silently disappeared under xdist even though per-clone results were present.
  • trial_specs is back-compatible: payloads without trials emit an empty list; malformed entries are skipped and non-finite thresholds are clamped on deserialize.

Updated plugin hooks (plugin.py, _session.py)

  • pytest_addoption registers the new CLI/ini option
  • Worker, controller, and non-xdist branches dispatched cleanly; worker branch wraps finalize_worker to surface size-limit truncation as a warning

Reporting

  • JsonFileReportSink now projects report.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) lands in the emitted JSON file

Tests

  • 73 unit tests in test_xdist.py covering serialization, sanitization, size limits, ANSI stripping, sink resolution, option parsing, and trial-spec round-trip/merge
  • Integration test test_xdist_aggregation.py exercising end-to-end aggregation across workers, including trial-group verdicts under both --dist=loadgroup and --dist=load

Docs

  • New xdist.md user guide
  • Configuration, CI integration, pytest-integration, and API reference pages updated to describe the new option (under a dedicated "Pytest Options" section, not under env vars)
  • Architecture page notes the controller/worker split

Incidental

  • onedrive.py: file-level pyright directive suppressing the Unknown* cascade caused by msgraph-sdk shipping without type stubs. No behavior change.

Security notes

The xdist boundary treats worker output as untrusted:

  • All values pass through _sanitize → JSON-safe primitives only
  • Strings are stripped of ANSI escape sequences on deserialize (including nested strings inside dicts/lists) to prevent terminal injection from a compromised/misbehaving worker
  • Total payload size is capped to prevent controller memory exhaustion; truncation is recorded with a marker and raised as SizeLimitError
  • Depth cap (MAX_METADATA_DEPTH = 6) prevents pathological nesting

Validation

  • uv run pre-commit run --all-files → all hooks pass (ruff, ruff-format, pyright)
  • pytest tests/unit/pytest_plugin/test_xdist.py → 73 passed
  • pytest tests/integration/test_xdist_aggregation.py → 13 passed (trial-group verdicts verified under --dist=loadgroup and --dist=load)
  • Ran pytest -n 4 --dist=loadgroup against HelpDesk Bot example in rampart-examples, verified 1 report generated. Running with xdist also took 99.66s versus serial execution at 230.73s (yay for speed!)

Breaking changes

None

Checklist

  • pre-commit run --all-files passes
  • Tests added or updated for changes
  • Documentation updated

@nina-msft nina-msft requested review from a team and bashirpartovi June 4, 2026 01:35
The controller's _aggregate_trial_results iterated session.items looking for a _rampart_trial_base attribute set on cloned items at collection time. Under pytest-xdist that attribute is not reliably reachable on the controller at pytest_sessionfinish, so trial-group verdicts silently disappeared from the terminal summary and the JSON report -- per-clone results were present but no aggregate FAIL/PASS line was emitted.

Decouple aggregation from pytest.Item state:

- Store trial metadata as data on RampartSession._trial_specs (a dict[clone_nodeid, TrialSpec]) at collection time on every process.

- Ship 	rial_specs through the existing 
ampart.xdist.v1 worker payload (back-compatible: missing/empty list is treated as no trials).

- Controller merges specs from each worker and aggregates from the merged data instead of session.items.

Also fixes a related JSON-sink gap: JsonFileReportSink now projects 
eport.metadata, so xdist run-mode info (worker_count, dist_mode, incomplete reasons) actually lands in the emitted file.

Tests:

- New TestTrialSpecs unit class (round-trip, malformed entries, non-finite thresholds, idempotent merge, first-writer-wins).

- handle_testnodedown test covering trial-spec merge.

- Strengthened 	ests/integration/test_xdist_aggregation.py to assert the trial-group line is present and correct under --dist=loadgroup and --dist=load (the prior tests only checked per-clone counts and would have missed this regression).

- JSON-sink metadata projection unit tests.

565 unit tests + 13 xdist integration tests pass; ruff clean.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
spencrr

This comment was marked as resolved.

@bashirpartovi

Copy link
Copy Markdown
Contributor

Great job @nina-msft, really a solid PR. The overall architecture makes sense to me, which is workers collect, the controller merges, and only the controller emits. I also like that the worker output path treats the boundary seriously rather than just passing objects around and hoping for the best. I feel like we should definitely keep that part even if some of the surrounding design changes.

The first thing I would fix before merge is ordering. build_report() sorts by r.metadata.get("test_name", ""), but test_name is derived from node.nodeid.split("::")[-1], so it is only the trailing test name. That collides for same-named tests in different files, parametrized cases, and trial clones. Once two results have the same sort key, insertion order wins, and under xdist insertion order is basically worker completion order. So the output can still move around between runs. I think the fix is just to keep the full nodeid and sort on that. Maybe it is worth adding a small regression test that merges the same two worker payloads in reverse order and gets identical report output.

I am a little bit worried about the sink discovery piece. discover_sinks_from_conftest() / _resolve_sink_candidate() end up doing a partial version of pytest's fixture resolution which is to find a rampart_sinks attribute, unwrap fixture internals, inspect the signature, and call it only if it is parameterless. That works for the happy path, but if someone has a normal fixture like rampart_sinks(tmp_path_factory) or rampart_sinks(my_config), xdist mode skips that sink with a warning. If that fixture is their only sink configuration, the controller has no sinks to emit to, so the report they expected would be missing. I think that's a pretty silent problem, the tests could all be green, but the report never gets offloaded.

This would be cleaner as an explicit hook instead of fixture reflection. The key difference is that the existing rampart_sinks is a fixture, so it only gets resolved when something inside the fixture system asks for it. That works in single-process mode because the bootstrap fixture can call request.getfixturevalue("rampart_sinks"). In xdist though, the controller is the process that needs to emit the final report, and the controller does not run tests or fixtures. That is why this PR has to scan conftest modules and manually unwrap the fixture function.

A hook would be different:

def pytest_rampart_sinks(config):
    return [JsonFileReportSink(output_dir=Path(".report"))]

That would be a hook implementation, not another fixture. RAMPART would register the hook spec from its pytest plugin, and users would implement the hook from a root conftest.py or an installed plugin, the same way they implement other pytest hooks. Then the controller can call config.pluginmanager.hook.pytest_rampart_sinks(config=config) without running any fixtures. It still would not support fixture dependencies, but at least that limitation would be explicit instead of discovered through reflection.

The one thing to be careful about is having two sink configuration surfaces. If both pytest_rampart_sinks and the existing rampart_sinks fixture are present, the plugin should have a clear rule, probably something like "the hook is authoritative, the fixture is the legacy single-process fallback", and defining both generates a warning rather than emitting it twice. Without that precedence rule, this could get confusing quickly. With that rule documented, I would much rather support the hook as the xdist-compatible path than maintain a partial fixture resolver. I think for the longer term, it may be cleaner to deprecate the fixture-based sink registration entirely and make the hook the one supported sink API, once users have had time to migrate.

I think we need to put more throughts into whether to use workeroutput transport. I am not saying it is wrong, there is a good argument for it, since it is the native xdist channel and can support remote workers without requiring a shared filesystem. But it does mean results only get to the controller at worker shutdown. If a worker gets killed before pytest_sessionfinish, the run is correctly marked incomplete, but all results from that worker are gone, including tests that already finished. For small suites that may be fine. For long agent suites, that could be a lot of data to lose.

The size cap has a similar issue. If one worker payload goes over --rampart-xdist-max-bytes, we replace the whole payload with a truncation marker. So one huge transcript can make us lose the other results from that worker too. I do not think that has to block this PR, but I would call it out clearly in the docs. Longer term, a per-test stream or per-worker shard file would be more durable. workeroutput is simpler and works better for remote workers, so I mostly just want that tradeoff to be explicit.

One maintainability, there are now two serializers for roughly the same data. _xdist.py has _serialize_result / _deserialize_result, while JsonFileReportSink still has its own report serialization. Since the xdist path already has to round-trip Result, it might be worth pulling the shared bits into something like rampart/reporting/serde.py and making the JSON sink use it too; otherwise every future Result field has to be remembered in two places.

Two small notes

  1. the onedrive.py pyright suppression looks unrelated to xdist, so I would probably split that out unless it is needed to get this PR green.
  2. I am glad incomplete / incomplete_reasons are in the metadata, but I would think about surfacing incomplete runs somewhere CI users are likely to notice. The awkward case is all tests passing while the report is marked incomplete.

I like the direction. The two things I would most like to see tightened are the full-nodeid ordering and replacing the controller-side sink fixture scanning with an explicit hook or some similar explicit controller-safe sink registration path.

@spencrr spencrr left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Nina! I like:

  • one (de)serialization spot in _xdist.py with sanitization and schme versioning.
  • using trial_specs over worker channel so that --dist-load works correctly
  • workers disable _emit_sinks and controller is the single source of truth

Other small AI Finds

  • Worker-side register_trial_spec is dead work. Workers run pytest_collection_modifyitems, register specs, ship them across, and never aggregate or evaluate. That's fine for correctness, but it means the spec is computed twice (once on the worker, once on the controller's own collection pass, which the merge then drops via setdefault). Worth a one-line comment in pytest_collection_modifyitems clarifying why we register on every process even though only the controller acts on it - otherwise future readers will be tempted to "optimize" the worker path and break the fallback.
  • xdist_group marker is added unconditionally to every trial clone. Correct behavior (xdist sees it; pytest without xdist treats it as an unknown marker). If the repo ever turns on --strict-markers you'll need a conditional addinivalue_line("markers", "xdist_group: ...") somewhere; not a problem today.
  • _session.py and _xdist.py "documented deviations" notes. The architecture doc was actually updated in this PR to describe _session.py/_xdist.py as first-class members of the plugin package - so the Note: ... documented deviation from the architecture blocks at the top of those modules are now stale and can come out. Minor.
  • Trial-spec aggregation runs on every controller pytest_sessionfinish call, including when only one worker reported. That's fine for correctness; just observing that worker-result completeness isn't a precondition for trial aggregation, which is the right call - missing-worker should mark incomplete, not block the report.
  • _emit_sinks background-task path. pre-existing - when an event loop is already running at session-finish (e.g., some pytest-asyncio configurations) the emission becomes fire-and-forget with no awaiter. Not changed by this PR, but the new xdist controller path is the first place I'd actually worry about this in production, because the controller's sink emission carries cross-worker data that didn't exist before. Worth at least filing as a follow-up.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to unit tests

session: pytest.Session,
session: pytest.Session, # noqa: ARG001 — kept for hook signature symmetry
rampart_session: RampartSession,
) -> None:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just drop pytest session here. Is it true that we just stuff the results under the rampart key/rampart_session

Comment on lines +606 to 618
if is_xdist_controller(config=session.config):
_aggregate_trial_results(session=session, rampart_session=rampart_session)
_evaluate_gates(rampart_session=rampart_session)
_record_xdist_metadata(session=session, rampart_session=rampart_session)
controller_sinks = discover_sinks_from_conftest(config=session.config)
if controller_sinks:
rampart_session.add_sinks(sinks=controller_sinks)
_emit_sinks(rampart_session=rampart_session)
return

_aggregate_trial_results(session=session, rampart_session=rampart_session)
_evaluate_gates(rampart_session=rampart_session)
_emit_sinks(rampart_session=rampart_session)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: possible pull out just the xdist parts

Suggested change
if is_xdist_controller(config=session.config):
_aggregate_trial_results(session=session, rampart_session=rampart_session)
_evaluate_gates(rampart_session=rampart_session)
_record_xdist_metadata(session=session, rampart_session=rampart_session)
controller_sinks = discover_sinks_from_conftest(config=session.config)
if controller_sinks:
rampart_session.add_sinks(sinks=controller_sinks)
_emit_sinks(rampart_session=rampart_session)
return
_aggregate_trial_results(session=session, rampart_session=rampart_session)
_evaluate_gates(rampart_session=rampart_session)
_emit_sinks(rampart_session=rampart_session)
_aggregate_trial_results(session=session, rampart_session=rampart_session)
_evaluate_gates(rampart_session=rampart_session)
if is_xdist_controller(config=session.config):
_record_xdist_metadata(session=session, rampart_session=rampart_session)
controller_sinks = discover_sinks_from_conftest(config=session.config)
if controller_sinks:
rampart_session.add_sinks(sinks=controller_sinks)
_emit_sinks(rampart_session=rampart_session)

Comment on lines +99 to +114
def is_xdist_controller(*, config: pytest.Config) -> bool:
"""Return True when this process is the pytest-xdist controller.

The controller is defined as a process where xdist is active
(``--numprocesses`` is set) and which is NOT itself a worker.

Args:
config (pytest.Config): The pytest configuration object.

Returns:
bool: True if running in the xdist controller process.
"""
if is_xdist_worker(config=config):
return False
numprocesses = getattr(config.option, "numprocesses", None)
return numprocesses is not None and numprocesses != 0

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly reconsider from (xdist/plugin.py)

def is_xdist_controller(request_or_session) -> bool:
    return (
        not is_xdist_worker(request_or_session)
        and request_or_session.config.option.dist != "no"
    )

This PR uses numprocesses is not None and numprocesses != 0 instead. For -n N flows these agree (xdist's pytest_cmdline_main sets dist = "load" whenever numprocesses is truthy). They diverge for:

pytest -d --tx=... (the -d shortcut for --dist=load)
pytest --dist=loadgroup --tx=2*popen (any --tx-based setup without -n)
Any custom scheduler that sets dist without -n
In those cases, this PR's controller branch returns False, so pytest_sessionfinish falls into the non-xdist branch on the controller. The controller would then try to _aggregate_trial_results and _emit_sinks against its own (empty) _results_by_nodeid while workers still serialize into workeroutput. Therefore, the report would end up empty rather than incomplete.

Comment thread docs/usage/xdist.md
- **Arbitrary code execution** — strict JSON-safe primitives only; no `pickle`, `marshal`, or custom `__reduce__`.
- **Schema drift** — payloads with missing or unknown schema versions are rejected fail-closed.
- **Memory exhaustion** — worker payloads are capped at 64 MB by default.
- **Terminal/log injection** — ANSI escape sequences are stripped from free-form text at the deserialization boundary.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ai note:

ANSI stripping doesn't cover OSC, and the security notes claim it does.

_ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") matches CSI only. It does not match:

  • OSC (\x1b]…BEL or \x1b]…ST) — used for terminal hyperlinks (\x1b]8;;https://attacker/\x1b\\text\x1b]8;;\x1b\\) and window-title sets. Windows Terminal, iTerm2, modern GNOME Terminal, and Konsole all render hyperlinks. So an attacker-controlled response can produce a clickable link in pytest's terminal output even though "ANSI was stripped".
  • DCS / SOS / PM / APC (\x1b[PXY^_]…ST)
  • 8-bit C1 (\x9b…) — less common but allowed by ECMA-48

The PR explicitly markets ANSI stripping as a defense against terminal injection from compromised workers, so this gap directly undermines a stated security property. A stricter pattern that handles the C1 introducers, or just re.sub(r"[\x00-\x08\x0b-\x1f\x7f-\x9f]", "", text) to strip all C0/C1 controls, would close it. Whichever you pick, also worth adding a couple of OSC-flavored test cases to TestStripAnsi.

Comment on lines +471 to +478
def _package_version() -> str:
"""Return the installed rampart package version (best-effort)."""
try:
from importlib.metadata import version # noqa: PLC0415

return version("rampart")
except Exception: # noqa: BLE001
return "unknown"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what case would this happen?

)


def discover_sinks_from_conftest(*, config: pytest.Config) -> list[ReportSink]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ai find:

discover_sinks_from_conftest is a parallel sink-discovery path with a silent capability gap.

Two different mechanisms now resolve rampart_sinks:

Process Mechanism Can satisfy fixture deps?
Non-xdist / worker _rampart_sink_bootstrap autouse fixture → request.getfixturevalue("rampart_sinks") Yes
xdist controller discover_sinks_from_conftest → plugin attribute scan + inspect.signature() No (any param ≠ 0 ⇒ warn + skip)

So @pytest.fixture(scope="session") def rampart_sinks(tmp_path_factory): ... works perfectly under pytest -n0 and silently disappears under pytest -n2, leaving only a logger.warning(...).

Consider:

  • Hook-based contributor API: define a pytest_rampart_sinks(config) -> list[ReportSink] hookspec. Both controller and workers call it the same way; no fixture machinery dependency. Teams move their factory from a fixture into a hookimpl in conftest. The cost is asking users to update their conftest; the benefit is one unified path with no capability gap.

Also reaching into candidate._get_wrapped_function() is a private pytest API. The __wrapped__ fallback is fine; the _get_wrapped_function call is at risk on any pytest minor bump.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants