diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml
index 474f5e236..48e43586a 100644
--- a/.github/workflows/pr.yaml
+++ b/.github/workflows/pr.yaml
@@ -378,6 +378,10 @@ jobs:
uses: rapidsai/shared-workflows/.github/workflows/checks.yaml@main
with:
enable_check_generated_files: false
+ # pr-test-summary is informational (posts the failure-classification
+ # comment) and intentionally not in pr-builder's needs, so the
+ # dependency checker must skip it.
+ ignored_pr_jobs: pr-test-summary
conda-cpp-build:
needs: [checks, compute-matrix-filters, changed-files]
# Consumed by conda-cpp-tests, conda-python-build, and (transitively) docs-build.
@@ -627,3 +631,19 @@ jobs:
with:
build_type: pull-request
script: ci/test_self_hosted_service.sh
+ pr-test-summary:
+ needs:
+ - conda-cpp-tests
+ - conda-python-tests
+ - wheel-tests-cuopt
+ - wheel-tests-cuopt-server
+ - test-self-hosted-server
+ if: always()
+ permissions:
+ contents: read
+ pull-requests: write
+ uses: ./.github/workflows/pr_test_summary.yaml
+ secrets:
+ CUOPT_AWS_ACCESS_KEY_ID: ${{ secrets.CUOPT_AWS_ACCESS_KEY_ID }}
+ CUOPT_AWS_SECRET_ACCESS_KEY: ${{ secrets.CUOPT_AWS_SECRET_ACCESS_KEY }}
+ CUOPT_S3_URI: ${{ secrets.CUOPT_S3_URI }}
diff --git a/.github/workflows/pr_test_summary.yaml b/.github/workflows/pr_test_summary.yaml
new file mode 100644
index 000000000..3d983ca99
--- /dev/null
+++ b/.github/workflows/pr_test_summary.yaml
@@ -0,0 +1,69 @@
+# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+# Reusable workflow: aggregate per-matrix PR test summaries from S3,
+# classify every failure as NEW (introduced by this PR) vs. KNOWN
+# (recurring on nightly, known flaky on nightly, or flaked in this run),
+# and post (or update) a single sticky comment on the PR.
+#
+# Called from pr.yaml after the PR test jobs finish. Purely informational
+# — never gates the PR.
+
+name: pr-test-summary
+
+on:
+ workflow_call:
+ secrets:
+ CUOPT_AWS_ACCESS_KEY_ID:
+ required: true
+ CUOPT_AWS_SECRET_ACCESS_KEY:
+ required: true
+ CUOPT_S3_URI:
+ required: true
+
+jobs:
+ pr-test-summary:
+ runs-on: linux-amd64-cpu4
+ container:
+ image: python:3.14-slim
+ permissions:
+ contents: read
+ pull-requests: write
+ steps:
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ with:
+ persist-credentials: false
+ - name: Install dependencies
+ run: |
+ apt-get update
+ apt-get install -y --no-install-recommends curl
+ pip install awscli
+ - name: Aggregate per-matrix summaries and post sticky comment
+ env:
+ CUOPT_AWS_ACCESS_KEY_ID: ${{ secrets.CUOPT_AWS_ACCESS_KEY_ID }}
+ CUOPT_AWS_SECRET_ACCESS_KEY: ${{ secrets.CUOPT_AWS_SECRET_ACCESS_KEY }}
+ CUOPT_S3_URI: ${{ secrets.CUOPT_S3_URI }}
+ GITHUB_REPOSITORY: ${{ github.repository }}
+ GITHUB_RUN_ID: ${{ github.run_id }}
+ GITHUB_SHA: ${{ github.sha }}
+ GITHUB_TOKEN: ${{ github.token }}
+ run: |
+ set -euo pipefail
+ # Resolve PR number from the pull-request/{N} branch ref.
+ # Read $GITHUB_REF via env, not a workflow expression — zizmor
+ # flags inlining github.ref into a shell as a code-injection
+ # vector.
+ PR_NUMBER=$(echo "$GITHUB_REF" | sed 's|refs/heads/pull-request/||')
+ if ! [[ "${PR_NUMBER}" =~ ^[0-9]+$ ]]; then
+ echo "ERROR: could not parse PR number from $GITHUB_REF" >&2
+ exit 1
+ fi
+ export PR_NUMBER
+
+ # Push events don't expose the PR target branch; ask the API.
+ GITHUB_BASE_REF=$(python3 ci/utils/pr_comment_helper.py base-ref \
+ --repo "${GITHUB_REPOSITORY}" --pr "${PR_NUMBER}")
+ export GITHUB_BASE_REF
+ echo "PR #${PR_NUMBER} → target branch: ${GITHUB_BASE_REF}"
+
+ bash ci/pr_summary.sh
diff --git a/ci/pr_summary.sh b/ci/pr_summary.sh
new file mode 100755
index 000000000..ade470d5e
--- /dev/null
+++ b/ci/pr_summary.sh
@@ -0,0 +1,52 @@
+#!/bin/bash
+# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Aggregate per-matrix PR test summaries from S3 and post (or update)
+# the sticky PR classification comment. See ci/utils/aggregate_pr.py
+# (content) and ci/utils/pr_comment_helper.py (GitHub API).
+
+set -euo pipefail
+
+: "${PR_NUMBER:?required}"
+: "${GITHUB_REPOSITORY:?required}"
+: "${GITHUB_RUN_ID:?required}"
+: "${GITHUB_BASE_REF:?required}"
+: "${GITHUB_SHA:?required}"
+: "${GITHUB_TOKEN:?required}"
+: "${CUOPT_S3_URI:?required}"
+: "${CUOPT_AWS_ACCESS_KEY_ID:?required}"
+: "${CUOPT_AWS_SECRET_ACCESS_KEY:?required}"
+
+SCRIPT_DIR="$(dirname "$(realpath "${BASH_SOURCE[0]}")")"
+OUTPUT_DIR="${PWD}/pr-aggregate-output"
+mkdir -p "${OUTPUT_DIR}"
+
+# aws CLI reads the standard AWS_* env vars; map the cuOpt-prefixed
+# secrets onto them.
+export AWS_ACCESS_KEY_ID="${CUOPT_AWS_ACCESS_KEY_ID}"
+export AWS_SECRET_ACCESS_KEY="${CUOPT_AWS_SECRET_ACCESS_KEY}"
+unset AWS_SESSION_TOKEN
+
+GITHUB_RUN_URL="https://github.com/${GITHUB_REPOSITORY}/actions/runs/${GITHUB_RUN_ID}"
+S3_PR_SUMMARIES_PREFIX="${CUOPT_S3_URI}ci_test_reports/pr/run-${GITHUB_RUN_ID}/"
+COMMENT_FILE="${OUTPUT_DIR}/pr_comment.md"
+
+echo "Aggregating PR per-matrix summaries from ${S3_PR_SUMMARIES_PREFIX}"
+python3 "${SCRIPT_DIR}/utils/aggregate_pr.py" \
+ --s3-pr-summaries-prefix "${S3_PR_SUMMARIES_PREFIX}" \
+ --output-dir "${OUTPUT_DIR}" \
+ --target-branch "${GITHUB_BASE_REF}" \
+ --sha "${GITHUB_SHA}" \
+ --github-run-url "${GITHUB_RUN_URL}" \
+ --run-date "$(date +%F)"
+
+if [ ! -s "${COMMENT_FILE}" ]; then
+ echo "No failures or flakes; not posting a PR comment."
+ exit 0
+fi
+
+python3 "${SCRIPT_DIR}/utils/pr_comment_helper.py" post \
+ --repo "${GITHUB_REPOSITORY}" \
+ --pr "${PR_NUMBER}" \
+ --body-file "${COMMENT_FILE}"
diff --git a/ci/utils/aggregate_common.py b/ci/utils/aggregate_common.py
new file mode 100644
index 000000000..1c000a4bc
--- /dev/null
+++ b/ci/utils/aggregate_common.py
@@ -0,0 +1,248 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Shared helpers for the nightly and PR aggregators.
+
+Both aggregators consume per-matrix summary JSONs produced by
+``nightly_report.py`` and merge them into a single view. The merge logic,
+S3 listing, and HTML escaping are identical in both cases and live here.
+
+Renderers (HTML dashboard for nightly; Markdown comment for PRs) stay in the
+respective aggregator scripts since their output formats diverge.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import sys
+from pathlib import Path
+from typing import Any
+
+# Ensure ci/utils is importable when invoked from a sibling script
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+from s3_helpers import s3_download, s3_list # noqa: E402
+
+
+def download_summaries(
+ s3_prefix: str,
+ local_dir: str | os.PathLike[str],
+ s3_fallback_prefix: str = "",
+) -> list[dict[str, Any]]:
+ """Download all JSON summaries from an S3 prefix into a local directory.
+
+ If ``s3_fallback_prefix`` is set and no summaries are found at
+ ``s3_prefix``, retries with the fallback (used when the run-scoped
+ path is empty because uploads landed under the branch-scoped path).
+
+ Args:
+ s3_prefix: Primary S3 URI prefix to list (e.g.,
+ ``s3://bucket/ci_test_reports/pr/run-12345/``).
+ local_dir: Local directory to download into. Created if absent.
+ s3_fallback_prefix: Optional secondary prefix to try when
+ ``s3_prefix`` yields no ``*.json`` summaries.
+
+ Returns:
+ List of loaded summary dicts. Files that fail to parse are
+ skipped with a warning to stderr; the list contains only
+ successfully loaded entries.
+
+ Raises:
+ This function does not raise. Underlying S3 / IO / JSON parse
+ errors are caught and logged.
+ """
+ local_dir = Path(local_dir)
+ local_dir.mkdir(parents=True, exist_ok=True)
+
+ uris = s3_list(s3_prefix)
+ json_uris = [
+ u
+ for u in uris
+ if u.endswith(".json") and not u.endswith("/consolidated.json")
+ ]
+
+ if (
+ not json_uris
+ and s3_fallback_prefix
+ and s3_fallback_prefix != s3_prefix
+ ):
+ print(
+ f"No summaries at {s3_prefix}, trying fallback: {s3_fallback_prefix}"
+ )
+ uris = s3_list(s3_fallback_prefix)
+ json_uris = [
+ u
+ for u in uris
+ if u.endswith(".json") and not u.endswith("/consolidated.json")
+ ]
+ if json_uris:
+ s3_prefix = s3_fallback_prefix
+
+ print(f"Found {len(json_uris)} summary file(s) at {s3_prefix}")
+
+ summaries = []
+ for uri in json_uris:
+ filename = uri.rsplit("/", 1)[-1]
+ local_path = str(local_dir / filename)
+ if s3_download(uri, local_path):
+ try:
+ with open(local_path) as f:
+ summaries.append(json.load(f))
+ except (json.JSONDecodeError, OSError) as exc:
+ print(
+ f"WARNING: Failed to parse {local_path}: {exc}",
+ file=sys.stderr,
+ )
+ return summaries
+
+
+def load_local_summaries(
+ local_dir: str | os.PathLike[str],
+) -> list[dict[str, Any]]:
+ """Load JSON summaries from a local directory (for testing without S3).
+
+ Args:
+ local_dir: Directory containing ``*.json`` per-matrix summaries.
+
+ Returns:
+ List of loaded summary dicts. Files that fail to parse are
+ skipped with a warning to stderr.
+
+ Raises:
+ This function does not raise. IO / JSON parse errors are caught
+ and logged.
+ """
+ local_dir = Path(local_dir)
+ summaries = []
+ for json_file in sorted(local_dir.glob("*.json")):
+ try:
+ with open(json_file) as f:
+ summaries.append(json.load(f))
+ except (json.JSONDecodeError, OSError) as exc:
+ print(
+ f"WARNING: Failed to parse {json_file}: {exc}", file=sys.stderr
+ )
+ return summaries
+
+
+def aggregate_summaries(
+ summaries: list[dict[str, Any]],
+) -> dict[str, Any]:
+ """Merge per-matrix summaries into a single consolidated view.
+
+ Args:
+ summaries: List of per-matrix summary dicts as produced by
+ ``nightly_report.py`` (either nightly or PR mode). Each
+ dict is expected to provide at least ``test_type``,
+ ``matrix_label``, ``counts``, and the per-bucket failure
+ lists; missing fields default to safe values.
+
+ Returns:
+ Consolidated dict with keys:
+
+ - ``matrix_grid``: list of per-matrix status dicts (sorted
+ by ``test_type`` then ``matrix_label``).
+ - ``totals``: aggregate test counts across all matrices.
+ - ``all_new_failures``, ``all_recurring_failures``,
+ ``all_flaky_tests``, ``all_resolved_tests``: merged failure
+ lists with per-entry ``test_type`` / ``matrix_label``
+ context added.
+ - ``has_new_flaky``: True iff any summary flagged a new flaky.
+
+ Raises:
+ This function does not raise. Malformed entries are tolerated.
+ """
+ grid = []
+ totals = {
+ "total": 0,
+ "passed": 0,
+ "failed": 0,
+ "flaky": 0,
+ "skipped": 0,
+ "resolved": 0,
+ }
+ all_new_failures = []
+ all_recurring_failures = []
+ all_flaky_tests = []
+ all_resolved_tests = []
+ any_new_flaky = False
+
+ for s in summaries:
+ test_type = s.get("test_type", "unknown")
+ matrix_label = s.get("matrix_label", "unknown")
+ counts = s.get("counts", {})
+
+ failed = counts.get("failed", 0)
+ flaky = counts.get("flaky", 0)
+ has_new = s.get("has_new_failures", False)
+ if s.get("has_new_flaky", False):
+ any_new_flaky = True
+
+ if failed > 0:
+ status = "failed-new" if has_new else "failed-recurring"
+ elif flaky > 0:
+ status = "flaky"
+ elif counts.get("total", 0) == 0:
+ status = "no-results"
+ else:
+ status = "passed"
+
+ grid.append(
+ {
+ "test_type": test_type,
+ "matrix_label": matrix_label,
+ "status": status,
+ "counts": counts,
+ "sha": s.get("sha", ""),
+ }
+ )
+
+ for key in totals:
+ totals[key] += counts.get(key, 0)
+
+ ctx = {"test_type": test_type, "matrix_label": matrix_label}
+ for entry in s.get("new_failures", []):
+ all_new_failures.append({**entry, **ctx})
+ for entry in s.get("recurring_failures", []):
+ all_recurring_failures.append({**entry, **ctx})
+ for entry in s.get("flaky_tests", []):
+ all_flaky_tests.append({**entry, **ctx})
+ for entry in s.get("resolved_tests", []):
+ all_resolved_tests.append({**entry, **ctx})
+
+ grid.sort(key=lambda g: (g["test_type"], g["matrix_label"]))
+
+ return {
+ "matrix_grid": grid,
+ "totals": totals,
+ "all_new_failures": all_new_failures,
+ "all_recurring_failures": all_recurring_failures,
+ "all_flaky_tests": all_flaky_tests,
+ "all_resolved_tests": all_resolved_tests,
+ "has_new_flaky": any_new_flaky,
+ }
+
+
+def html_escape(text: Any) -> str:
+ """Escape HTML special characters in ``text``.
+
+ Args:
+ text: Any value; converted to ``str`` before escaping.
+
+ Returns:
+ ``str(text)`` with ``&``, ``<``, ``>`` and ``"`` replaced by
+ their HTML entity equivalents. Safe for inclusion in HTML
+ attribute values and element bodies.
+
+ Raises:
+ This function does not raise.
+ """
+ return (
+ str(text)
+ .replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace('"', """)
+ )
diff --git a/ci/utils/aggregate_nightly.py b/ci/utils/aggregate_nightly.py
index 4901fab7c..8b62a5ff9 100644
--- a/ci/utils/aggregate_nightly.py
+++ b/ci/utils/aggregate_nightly.py
@@ -30,168 +30,13 @@
# Ensure ci/utils is importable when invoked as a script
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
-from s3_helpers import s3_download, s3_upload, s3_list # noqa: E402
-
-
-# ---------------------------------------------------------------------------
-# Download and merge summaries
-# ---------------------------------------------------------------------------
-
-
-def download_summaries(s3_prefix, local_dir, s3_fallback_prefix=""):
- """Download all JSON summaries from S3 prefix into local_dir.
- If s3_fallback_prefix is set and no summaries found at s3_prefix,
- retries with the fallback (used when RAPIDS_BRANCH in rapidsai
- containers doesn't match the branch input).
- Returns list of loaded summary dicts."""
- local_dir = Path(local_dir)
- local_dir.mkdir(parents=True, exist_ok=True)
-
- uris = s3_list(s3_prefix)
- json_uris = [
- u
- for u in uris
- if u.endswith(".json") and not u.endswith("/consolidated.json")
- ]
-
- # Fallback: search the parent date prefix if branch-specific path is empty
- if (
- not json_uris
- and s3_fallback_prefix
- and s3_fallback_prefix != s3_prefix
- ):
- print(
- f"No summaries at {s3_prefix}, trying fallback: {s3_fallback_prefix}"
- )
- uris = s3_list(s3_fallback_prefix)
- json_uris = [
- u
- for u in uris
- if u.endswith(".json") and not u.endswith("/consolidated.json")
- ]
- if json_uris:
- s3_prefix = s3_fallback_prefix
-
- print(f"Found {len(json_uris)} summary file(s) at {s3_prefix}")
-
- summaries = []
- for uri in json_uris:
- filename = uri.rsplit("/", 1)[-1]
- local_path = str(local_dir / filename)
- if s3_download(uri, local_path):
- try:
- with open(local_path) as f:
- summaries.append(json.load(f))
- except (json.JSONDecodeError, OSError) as exc:
- print(
- f"WARNING: Failed to parse {local_path}: {exc}",
- file=sys.stderr,
- )
- return summaries
-
-
-def load_local_summaries(local_dir):
- """Load summaries from a local directory (for testing without S3)."""
- local_dir = Path(local_dir)
- summaries = []
- for json_file in sorted(local_dir.glob("*.json")):
- try:
- with open(json_file) as f:
- summaries.append(json.load(f))
- except (json.JSONDecodeError, OSError) as exc:
- print(
- f"WARNING: Failed to parse {json_file}: {exc}", file=sys.stderr
- )
- return summaries
-
-
-# ---------------------------------------------------------------------------
-# Aggregation
-# ---------------------------------------------------------------------------
-
-
-def aggregate_summaries(summaries):
- """Merge per-matrix summaries into a consolidated view.
-
- Returns a dict with:
- - matrix_grid: list of {test_type, matrix_label, status, counts, ...}
- - totals: aggregate counts
- - all_new_failures, all_recurring_failures, all_flaky_tests,
- all_resolved_tests: merged lists with matrix context added
- """
- grid = []
- totals = {
- "total": 0,
- "passed": 0,
- "failed": 0,
- "flaky": 0,
- "skipped": 0,
- "resolved": 0,
- }
- all_new_failures = []
- all_recurring_failures = []
- all_flaky_tests = []
- all_resolved_tests = []
- any_new_flaky = False
-
- for s in summaries:
- test_type = s.get("test_type", "unknown")
- matrix_label = s.get("matrix_label", "unknown")
- counts = s.get("counts", {})
-
- # Determine job status
- failed = counts.get("failed", 0)
- flaky = counts.get("flaky", 0)
- has_new = s.get("has_new_failures", False)
- if s.get("has_new_flaky", False):
- any_new_flaky = True
-
- if failed > 0:
- status = "failed-new" if has_new else "failed-recurring"
- elif flaky > 0:
- status = "flaky"
- elif counts.get("total", 0) == 0:
- status = "no-results"
- else:
- status = "passed"
-
- grid.append(
- {
- "test_type": test_type,
- "matrix_label": matrix_label,
- "status": status,
- "counts": counts,
- "sha": s.get("sha", ""),
- }
- )
-
- # Accumulate totals
- for key in totals:
- totals[key] += counts.get(key, 0)
-
- # Merge failure lists with matrix context
- ctx = {"test_type": test_type, "matrix_label": matrix_label}
- for entry in s.get("new_failures", []):
- all_new_failures.append({**entry, **ctx})
- for entry in s.get("recurring_failures", []):
- all_recurring_failures.append({**entry, **ctx})
- for entry in s.get("flaky_tests", []):
- all_flaky_tests.append({**entry, **ctx})
- for entry in s.get("resolved_tests", []):
- all_resolved_tests.append({**entry, **ctx})
-
- # Sort grid for consistent display
- grid.sort(key=lambda g: (g["test_type"], g["matrix_label"]))
-
- return {
- "matrix_grid": grid,
- "totals": totals,
- "all_new_failures": all_new_failures,
- "all_recurring_failures": all_recurring_failures,
- "all_flaky_tests": all_flaky_tests,
- "all_resolved_tests": all_resolved_tests,
- "has_new_flaky": any_new_flaky,
- }
+from aggregate_common import ( # noqa: E402
+ aggregate_summaries,
+ download_summaries,
+ html_escape as _html_escape,
+ load_local_summaries,
+)
+from s3_helpers import s3_download, s3_upload # noqa: E402
# ---------------------------------------------------------------------------
@@ -298,16 +143,6 @@ def generate_consolidated_json(
# ---------------------------------------------------------------------------
-def _html_escape(text):
- return (
- str(text)
- .replace("&", "&")
- .replace("<", "<")
- .replace(">", ">")
- .replace('"', """)
- )
-
-
def _status_badge(status):
"""Return an HTML badge for a matrix cell status."""
colors = {
diff --git a/ci/utils/aggregate_pr.py b/ci/utils/aggregate_pr.py
new file mode 100644
index 000000000..1981a628c
--- /dev/null
+++ b/ci/utils/aggregate_pr.py
@@ -0,0 +1,513 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Aggregate per-matrix PR test summaries into a Markdown body for the PR
+classification comment.
+
+Each PR test job runs ``nightly_report.py --mode pr`` which writes a
+per-matrix summary JSON to::
+
+ s3://bucket/ci_test_reports/pr/run-${GITHUB_RUN_ID}/{test_type}-{matrix}.json
+
+This script downloads them, merges with the shared aggregator helpers, and
+emits two Markdown sections:
+
+ - **NEW failures** — failures introduced by this PR (not in nightly
+ history, or only present as resolved-and-not-flaky).
+ - **KNOWN issues** — pre-existing breakage (active on nightly) or known
+ flakes (flagged on nightly, or flaked in this PR run).
+
+The output Markdown is prefixed with a hidden marker comment so the
+comment poster (``ci/pr_summary.sh``) can find and update an existing
+comment in place.
+
+If nothing failed or flaked across the run, this script writes an empty
+file and the poster skips commenting.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import re
+import sys
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any
+
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+from aggregate_common import ( # noqa: E402
+ aggregate_summaries,
+ download_summaries,
+ load_local_summaries,
+)
+from pr_comment_helper import COMMENT_MARKER # noqa: E402
+
+# Maximum total comment body size we are willing to post. GitHub allows
+# ~65k characters per comment, but we cap earlier and truncate the failure
+# tables so the comment stays readable.
+MAX_BODY_CHARS = 60000
+MAX_ROWS_PER_BUCKET = 80
+# Crash entries get their full message in a code block, capped only at a
+# generous limit since the diagnostic line is the whole point of the entry.
+MAX_CRASH_MESSAGE_CHARS = 2000
+
+# Crashes write a JUnit case named "PROCESS_CRASH" with a message
+# containing "crashed with SIG..." (see ci/utils/crash_helpers.sh from
+# PR #1191). Match either fingerprint defensively.
+_CRASH_NAME = "PROCESS_CRASH"
+_CRASH_MESSAGE_RE = re.compile(r"crashed with SIG[A-Z]+", re.IGNORECASE)
+
+
+def _is_crash(entry):
+ if entry.get("name") == _CRASH_NAME:
+ return True
+ return bool(_CRASH_MESSAGE_RE.search(entry.get("message", "") or ""))
+
+
+def _split_crashes(failures):
+ """Partition a failures list into ``(crashes, non_crash)``."""
+ crashes = []
+ non_crash = []
+ for entry in failures:
+ (crashes if _is_crash(entry) else non_crash).append(entry)
+ return crashes, non_crash
+
+
+def _short_msg(msg, limit=300):
+ """Single-line summary of an error message for table cells."""
+ if not msg:
+ return ""
+ lines = [ln for ln in msg.splitlines() if ln.strip()]
+ summary = lines[-1] if lines else ""
+ if len(summary) > limit:
+ summary = summary[: limit - 1] + "…"
+ return summary.replace("|", "\\|")
+
+
+def _classify_known_subgroups(recurring, flaky):
+ """Split the KNOWN bucket into the three sub-groups for the comment.
+
+ Returns ``(broken_on_nightly, known_flaky_nightly, flaked_in_pr_run)``.
+ Each entry retains its full per-matrix context.
+ """
+ broken_on_nightly = []
+ known_flaky_nightly = []
+ flaked_in_pr_run = []
+
+ for entry in recurring:
+ cls = entry.get("pr_classification", "")
+ if cls == "known_recurring":
+ broken_on_nightly.append(entry)
+ elif cls == "known_flaky_nightly":
+ known_flaky_nightly.append(entry)
+ else:
+ broken_on_nightly.append(entry)
+
+ for entry in flaky:
+ cls = entry.get("pr_classification", "")
+ if cls == "known_flaky_nightly":
+ known_flaky_nightly.append(entry)
+ elif cls == "known_recurring":
+ broken_on_nightly.append(entry)
+ else:
+ flaked_in_pr_run.append(entry)
+
+ return broken_on_nightly, known_flaky_nightly, flaked_in_pr_run
+
+
+def _matrix_grid_table(grid):
+ if not grid:
+ return ""
+ lines = [
+ "| Test type | Matrix | Status | Passed | Failed | Flaky | Skipped |",
+ "|-----------|--------|--------|--------|--------|-------|---------|",
+ ]
+ badge_for = {
+ "passed": "PASS",
+ "failed-new": "NEW FAIL",
+ "failed-recurring": "RECURRING",
+ "flaky": "FLAKY",
+ "no-results": "NO DATA",
+ }
+ for g in grid:
+ c = g.get("counts", {})
+ lines.append(
+ f"| {g['test_type']} | `{g['matrix_label']}` | "
+ f"{badge_for.get(g['status'], g['status'])} | "
+ f"{c.get('passed', 0)} | {c.get('failed', 0)} | "
+ f"{c.get('flaky', 0)} | {c.get('skipped', 0)} |"
+ )
+ return "\n".join(lines)
+
+
+def _failure_table(entries, columns, row_fn, cap=MAX_ROWS_PER_BUCKET):
+ if not entries:
+ return ""
+ lines = ["| " + " | ".join(columns) + " |"]
+ lines.append("|" + "|".join(["---"] * len(columns)) + "|")
+ for entry in entries[:cap]:
+ lines.append(row_fn(entry))
+ if len(entries) > cap:
+ lines.append(f"\n_…and {len(entries) - cap} more not shown._")
+ return "\n".join(lines)
+
+
+def build_comment_body(
+ agg: dict[str, Any],
+ target_branch: str,
+ github_run_url: str,
+ sha: str = "",
+ run_date: str = "",
+) -> str:
+ """Build the Markdown body for the sticky PR test-classification comment.
+
+ Renders a CAUTION callout for crashes (with collapsible details),
+ a CAUTION callout + table for NEW failures, and plain sub-sections
+ for KNOWN issues (recurring on nightly, known flaky on nightly,
+ flaked in this PR run only).
+
+ Args:
+ agg: Output of ``aggregate_summaries(...)``. Must contain
+ ``all_new_failures``, ``all_recurring_failures``,
+ ``all_flaky_tests``, and ``matrix_grid``.
+ target_branch: PR target branch (e.g., ``main``); surfaced in
+ the comment meta line.
+ github_run_url: Workflow run URL — linked from the meta line.
+ sha: PR head SHA (truncated to 12 chars for display).
+ run_date: ``YYYY-MM-DD`` run date string.
+
+ Returns:
+ The full Markdown body, prefixed with ``COMMENT_MARKER`` and
+ capped at ``MAX_BODY_CHARS`` (with a "comment truncated" note
+ appended when the cap is hit). Empty string when there are
+ no failures or flakes — callers must skip posting in that case.
+
+ Raises:
+ This function does not raise.
+ """
+ new_failures = agg["all_new_failures"]
+ recurring = agg["all_recurring_failures"]
+ flaky = agg["all_flaky_tests"]
+
+ if not new_failures and not recurring and not flaky:
+ return ""
+
+ # Pulled out so crashes render in their own CAUTION block and don't
+ # get drowned out by ordinary assertion failures.
+ new_crashes, new_failures = _split_crashes(new_failures)
+ recurring_crashes, recurring = _split_crashes(recurring)
+ all_crashes = new_crashes + recurring_crashes
+
+ broken_on_nightly, known_flaky_nightly, flaked_in_pr_run = (
+ _classify_known_subgroups(recurring, flaky)
+ )
+
+ parts = [COMMENT_MARKER]
+ parts.append("## PR Test Classification")
+ parts.append("")
+
+ headline = []
+ if all_crashes:
+ headline.append(f"**{len(all_crashes)} CRASH(es)**")
+ if new_failures:
+ headline.append(f"**{len(new_failures)} NEW** failure(s)")
+ known_total = (
+ len(broken_on_nightly)
+ + len(known_flaky_nightly)
+ + len(flaked_in_pr_run)
+ )
+ if known_total:
+ headline.append(f"**{known_total} KNOWN** issue(s)")
+ if headline:
+ parts.append(" • ".join(headline))
+ parts.append("")
+
+ meta = []
+ if target_branch:
+ meta.append(f"Compared against nightly history for `{target_branch}`")
+ if sha:
+ meta.append(f"PR head: `{sha[:12]}`")
+ if run_date:
+ meta.append(f"Run date: {run_date}")
+ if github_run_url:
+ meta.append(f"[Workflow run]({github_run_url})")
+ if meta:
+ parts.append(" · ".join(meta))
+ parts.append("")
+
+ grid_md = _matrix_grid_table(agg["matrix_grid"])
+ if grid_md:
+ parts.append("Per-matrix status
\n")
+ parts.append(grid_md)
+ parts.append("\n ")
+ parts.append("")
+
+ if all_crashes:
+ parts.append("> [!CAUTION]")
+ parts.append(
+ "> **CRASHES detected — a test process was terminated by a signal mid-run.**"
+ )
+ parts.append(
+ "> These need urgent investigation. The JUnit XML was not "
+ "finalized, so the specific test that triggered the crash "
+ "may not be identified; check the workflow run log for the "
+ "last test invoked before the signal."
+ )
+ parts.append("")
+ crash_word = "crash" if len(all_crashes) == 1 else "crashes"
+ parts.append("")
+ parts.append(
+ f"{len(all_crashes)} {crash_word}"
+ " — click to expand details
"
+ )
+ parts.append("") # blank line so the body renders as Markdown
+ for entry in all_crashes:
+ heading_tag = (
+ "NEW" if entry.get("pr_classification") == "new" else "KNOWN"
+ )
+ parts.append(
+ f"#### `{entry.get('suite', '?')}` — "
+ f"`{entry.get('name', 'PROCESS_CRASH')}` "
+ f"_[{entry['test_type']} / {entry['matrix_label']}]_ "
+ f"— {heading_tag}"
+ )
+ msg = (entry.get("message") or "").strip()
+ if msg:
+ if len(msg) > MAX_CRASH_MESSAGE_CHARS:
+ msg = msg[:MAX_CRASH_MESSAGE_CHARS] + "\n…[truncated]"
+ parts.append("")
+ parts.append("```")
+ parts.append(msg)
+ parts.append("```")
+ parts.append("")
+ parts.append(" ")
+ parts.append("")
+
+ if new_failures:
+ parts.append("> [!CAUTION]")
+ parts.append(
+ f"> **NEW failures ({len(new_failures)}) — likely introduced by this PR**"
+ )
+ parts.append("")
+ parts.append(
+ _failure_table(
+ new_failures,
+ ["Test type", "Matrix", "Suite", "Test", "Error"],
+ lambda e: (
+ f"| {e['test_type']} | `{e['matrix_label']}` | "
+ f"{e['suite']} | `{e['name']}` | "
+ f"{_short_msg(e.get('message', ''))} |"
+ ),
+ )
+ )
+ parts.append("")
+
+ if known_total:
+ parts.append("### KNOWN issues (pre-existing, not caused by this PR)")
+ parts.append("")
+
+ if broken_on_nightly:
+ parts.append("**Already broken on nightly** (recurring)")
+ parts.append("")
+ parts.append(
+ _failure_table(
+ broken_on_nightly,
+ [
+ "Test type",
+ "Matrix",
+ "Suite",
+ "Test",
+ "First seen",
+ "Failure count",
+ "Error",
+ ],
+ lambda e: (
+ f"| {e['test_type']} | `{e['matrix_label']}` | "
+ f"{e['suite']} | `{e['name']}` | "
+ f"{e.get('first_seen', 'unknown')} | "
+ f"{e.get('failure_count', '?')} | "
+ f"{_short_msg(e.get('message', ''))} |"
+ ),
+ )
+ )
+ parts.append("")
+
+ if known_flaky_nightly:
+ parts.append("**Known flaky on nightly**")
+ parts.append("")
+ parts.append(
+ _failure_table(
+ known_flaky_nightly,
+ [
+ "Test type",
+ "Matrix",
+ "Suite",
+ "Test",
+ "First seen",
+ "Error",
+ ],
+ lambda e: (
+ f"| {e['test_type']} | `{e['matrix_label']}` | "
+ f"{e['suite']} | `{e['name']}` | "
+ f"{e.get('first_seen', 'unknown')} | "
+ f"{_short_msg(e.get('message', ''))} |"
+ ),
+ )
+ )
+ parts.append("")
+
+ if flaked_in_pr_run:
+ parts.append(
+ "**Flaked in this PR run** (passed on retry; not previously known to flake)"
+ )
+ parts.append("")
+ parts.append(
+ _failure_table(
+ flaked_in_pr_run,
+ [
+ "Test type",
+ "Matrix",
+ "Suite",
+ "Test",
+ "Retries",
+ "Error",
+ ],
+ lambda e: (
+ f"| {e['test_type']} | `{e['matrix_label']}` | "
+ f"{e['suite']} | `{e['name']}` | "
+ f"{e.get('retry_count', '?')} | "
+ f"{_short_msg(e.get('message', ''))} |"
+ ),
+ )
+ )
+ parts.append("")
+
+ parts.append(
+ "_Classification compares each failure against the most recent "
+ "nightly history for the target branch. Tests passed on retry "
+ "via `pytest-rerunfailures` are reported as flaky._"
+ )
+
+ body = "\n".join(parts)
+ if len(body) > MAX_BODY_CHARS:
+ body = body[: MAX_BODY_CHARS - 200] + (
+ "\n\n…_comment truncated; see workflow run for full details._"
+ )
+ return body
+
+
+def main() -> int:
+ """CLI entry point — aggregate PR summaries and write the comment body.
+
+ Reads per-matrix summary JSONs (from S3 or a local directory),
+ classifies failures, writes ``pr_comment.md`` and
+ ``pr_consolidated.json`` to ``--output-dir``. When all tests
+ pass, writes an empty ``pr_comment.md`` so the poster skips
+ commenting.
+
+ Returns:
+ ``0`` on success (including the all-green / empty-body case).
+
+ Raises:
+ SystemExit: Indirectly via ``argparse`` if argument parsing
+ fails (missing required arguments).
+ """
+ parser = argparse.ArgumentParser(
+ description="Aggregate per-matrix PR test summaries into a Markdown PR comment."
+ )
+ src = parser.add_mutually_exclusive_group(required=True)
+ src.add_argument(
+ "--s3-pr-summaries-prefix",
+ help=(
+ "S3 prefix where ``nightly_report.py --mode pr`` uploaded "
+ "per-matrix summaries for this run. Example: "
+ "s3://bucket/ci_test_reports/pr/run-12345/"
+ ),
+ )
+ src.add_argument(
+ "--local-summaries-dir",
+ help="Local directory of summaries (for testing without S3).",
+ )
+ parser.add_argument(
+ "--output-dir",
+ default="aggregate-output",
+ help="Directory to write pr_comment.md and consolidated.json into.",
+ )
+ parser.add_argument(
+ "--target-branch",
+ required=True,
+ help="PR target branch — surfaced in the comment for context.",
+ )
+ parser.add_argument(
+ "--sha",
+ required=True,
+ help="PR head SHA — surfaced in the comment for context.",
+ )
+ parser.add_argument(
+ "--github-run-url",
+ required=True,
+ help="Workflow run URL — linked from the comment footer.",
+ )
+ parser.add_argument(
+ "--run-date",
+ default=datetime.now(timezone.utc).strftime("%Y-%m-%d"),
+ help="Date the run started (YYYY-MM-DD). Defaults to today (UTC).",
+ )
+ args = parser.parse_args()
+
+ output_dir = Path(args.output_dir)
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ if args.local_summaries_dir:
+ summaries = load_local_summaries(args.local_summaries_dir)
+ else:
+ summaries = download_summaries(
+ args.s3_pr_summaries_prefix, output_dir / "summaries"
+ )
+
+ if not summaries:
+ print("No PR per-matrix summaries found; nothing to comment on.")
+ (output_dir / "pr_comment.md").write_text("")
+ return 0
+
+ agg = aggregate_summaries(summaries)
+ body = build_comment_body(
+ agg,
+ target_branch=args.target_branch,
+ github_run_url=args.github_run_url,
+ sha=args.sha,
+ run_date=args.run_date,
+ )
+
+ (output_dir / "pr_comment.md").write_text(body)
+ consolidated = {
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ "target_branch": args.target_branch,
+ "sha": args.sha,
+ "run_date": args.run_date,
+ "totals": agg["totals"],
+ "matrix_grid": agg["matrix_grid"],
+ "new_failures": agg["all_new_failures"],
+ "recurring_failures": agg["all_recurring_failures"],
+ "flaky_tests": agg["all_flaky_tests"],
+ }
+ (output_dir / "pr_consolidated.json").write_text(
+ json.dumps(consolidated, indent=2) + "\n"
+ )
+
+ if not body:
+ print("All tests passed (no failures or flakes); skipping PR comment.")
+ else:
+ print(
+ f"PR comment body written to {output_dir / 'pr_comment.md'} "
+ f"({len(body)} chars)."
+ )
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/ci/utils/nightly_report.py b/ci/utils/nightly_report.py
index 674245858..aad46304f 100755
--- a/ci/utils/nightly_report.py
+++ b/ci/utils/nightly_report.py
@@ -34,6 +34,8 @@
--s3-summary-uri s3://bucket/ci_test_reports/nightly/summaries/2026-04-13/python-cuda12.9-py3.12-x86_64.json
"""
+from __future__ import annotations
+
import argparse
import json
import os
@@ -41,6 +43,7 @@
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
+from typing import Any
from xml.etree import ElementTree
# Ensure ci/utils is importable when invoked as a script
@@ -368,6 +371,119 @@ def update_history(history, classified, sha, date_str):
)
+# ---------------------------------------------------------------------------
+# PR-mode classification (read-only against nightly history)
+# ---------------------------------------------------------------------------
+
+
+def classify_pr_against_history(
+ classified: dict[str, list[dict[str, Any]]],
+ history: dict[str, Any],
+) -> tuple[
+ list[dict[str, Any]],
+ list[dict[str, Any]],
+ list[dict[str, Any]],
+]:
+ """Classify PR run results against the nightly failure history.
+
+ Read-only: never mutates ``history``. Each failure is annotated with a
+ ``pr_classification`` field used by the PR comment renderer.
+
+ Routing into the existing summary lists so ``aggregate_summaries``
+ consumes PR summaries without changes:
+
+ - ``new_failures``: hard failures the PR introduced. ``pr_classification=new``.
+ - ``recurring_failures``: hard failures known to nightly.
+ ``pr_classification`` is ``known_flaky_nightly`` when the history
+ flagged the test as cross-run flaky (checked first, since it is a
+ more specific signal than mere "currently active"), or
+ ``known_recurring`` when the test is active on nightly but not
+ flagged flaky.
+ - ``flaky_tests``: tests that passed on retry within the PR run.
+ ``pr_classification`` is ``known_flaky_nightly`` (already known
+ flaky), ``known_recurring`` (hard-failing on nightly but flaked
+ here), or ``known_flaky_pr`` (only flaked in this PR run).
+
+ Args:
+ classified: Output of ``classify_failures(...)`` — a mapping with
+ keys ``passed``, ``failed``, ``error``, ``flaky``, ``skipped``
+ whose values are lists of per-testcase dicts.
+ history: Loaded nightly history JSON. Expected to contain a
+ ``tests`` mapping keyed by ``suite::classname::name``; absent
+ or malformed input is treated as empty history.
+
+ Returns:
+ Tuple ``(new_failures, recurring_failures, flaky_tests)``. Each
+ list contains the original testcase dict augmented with a
+ ``pr_classification`` string and, where applicable, ``first_seen``
+ and ``failure_count`` keys pulled from history.
+
+ Raises:
+ This function does not raise. Malformed ``history`` (missing
+ ``tests`` mapping, missing per-test fields) is tolerated.
+ """
+ tests_history = history.get("tests", {})
+
+ new_failures: list[dict[str, Any]] = []
+ recurring_failures: list[dict[str, Any]] = []
+ flaky_tests: list[dict[str, Any]] = []
+
+ def _key(entry: dict[str, Any]) -> str:
+ return f"{entry['suite']}::{entry['classname']}::{entry['name']}"
+
+ # Hard failures: failed/errored in PR run, did NOT pass on retry.
+ # Check ``is_flaky`` before ``status == 'active'`` so a test marked
+ # both active and flaky lands in ``known_flaky_nightly`` (the more
+ # specific signal). Matches the precedence in the flaky-in-run loop
+ # below.
+ for entry in classified["failed"] + classified["error"]:
+ rec = tests_history.get(_key(entry))
+ if rec and rec.get("is_flaky"):
+ recurring_failures.append(
+ {
+ **entry,
+ "first_seen": rec.get("first_seen_date", "unknown"),
+ "failure_count": rec.get("failure_count", 0),
+ "pr_classification": "known_flaky_nightly",
+ }
+ )
+ elif rec and rec.get("status") == "active":
+ recurring_failures.append(
+ {
+ **entry,
+ "first_seen": rec.get("first_seen_date", "unknown"),
+ "failure_count": rec.get("failure_count", 0),
+ "pr_classification": "known_recurring",
+ }
+ )
+ else:
+ # Not in history, or history says resolved-and-not-flaky:
+ # this PR is the cause.
+ new_failures.append({**entry, "pr_classification": "new"})
+
+ # Flaky in PR run: passed on retry within the same run.
+ for entry in classified["flaky"]:
+ rec = tests_history.get(_key(entry))
+ if rec and rec.get("is_flaky"):
+ classification = "known_flaky_nightly"
+ first_seen = rec.get("first_seen_date", "unknown")
+ elif rec and rec.get("status") == "active":
+ classification = "known_recurring"
+ first_seen = rec.get("first_seen_date", "unknown")
+ else:
+ classification = "known_flaky_pr"
+ first_seen = "unknown"
+ flaky_tests.append(
+ {
+ **entry,
+ "first_seen": first_seen,
+ "pr_classification": classification,
+ }
+ )
+
+ return new_failures, recurring_failures, flaky_tests
+
+
def save_history(history, history_path):
"""Write history to a local JSON file."""
with open(history_path, "w") as f:
@@ -542,19 +658,35 @@ def generate_json_summary(
recurring_failures,
resolved_tests,
new_flaky_tests=None,
+ flaky_tests=None,
test_type="",
matrix_label="",
sha="",
date_str="",
+ mode="nightly",
):
- """Generate a JSON summary for downstream tools (Slack notifier, dashboard)."""
+ """Generate a JSON summary for downstream tools (Slack notifier, dashboard, PR comment).
+
+ ``flaky_tests`` lets PR mode pass its own annotated list (with
+ ``pr_classification`` and ``first_seen``). In nightly mode it defaults
+ to ``classified["flaky"]`` to preserve existing behavior.
+ """
if new_flaky_tests is None:
new_flaky_tests = []
+ if flaky_tests is None:
+ flaky_tests = classified["flaky"]
new_flaky_keys = {
f"{e['classname']}::{e['name']}" for e in new_flaky_tests
}
+
+ def _opt(d, key):
+ if key in d:
+ return {key: d[key]}
+ return {}
+
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
+ "mode": mode,
"test_type": test_type,
"matrix_label": matrix_label,
"sha": sha,
@@ -575,6 +707,7 @@ def generate_json_summary(
"name": e["name"],
"classname": e["classname"],
"message": e.get("message", ""),
+ **_opt(e, "pr_classification"),
}
for e in new_failures
],
@@ -585,6 +718,8 @@ def generate_json_summary(
"classname": e["classname"],
"first_seen": e.get("first_seen", "unknown"),
"message": e.get("message", ""),
+ **_opt(e, "pr_classification"),
+ **_opt(e, "failure_count"),
}
for e in recurring_failures
],
@@ -596,8 +731,10 @@ def generate_json_summary(
"retry_count": e.get("retry_count", 0),
"message": e.get("message", ""),
"is_new": f"{e['classname']}::{e['name']}" in new_flaky_keys,
+ **_opt(e, "pr_classification"),
+ **_opt(e, "first_seen"),
}
- for e in classified["flaky"]
+ for e in flaky_tests
],
"resolved_tests": [
{
@@ -945,12 +1082,24 @@ def main():
default=os.environ.get("GITHUB_STEP_SUMMARY", ""),
help="Path to write GitHub Actions step summary",
)
+ parser.add_argument(
+ "--mode",
+ choices=["nightly", "pr"],
+ default="nightly",
+ help=(
+ "nightly: update and upload history; classify against and "
+ "evolve the long-term failure state. "
+ "pr: read history but never write it; classify each PR failure "
+ "as new vs. known (recurring or flaky) for the PR comment."
+ ),
+ )
args = parser.parse_args()
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
local_history_path = str(output_dir / "test_failure_history.json")
+ pr_mode = args.mode == "pr"
# ---- Step 1: Download history from S3 ----
if args.s3_history_uri:
@@ -982,31 +1131,45 @@ def main():
f"{len(classified['skipped'])} skipped"
)
- # ---- Step 3: Update history ----
+ # ---- Step 3: Classify against history ----
history = load_history(local_history_path)
- (
- history,
- new_failures,
- recurring_failures,
- resolved_tests,
- new_flaky_tests,
- ) = update_history(history, classified, args.sha, args.date)
+ pr_flaky_tests = None # populated only in PR mode
- if new_flaky_tests:
- print(
- f"NEW FLAKY: {len(new_flaky_tests)} test(s) flaky for the first time"
+ if pr_mode:
+ new_failures, recurring_failures, pr_flaky_tests = (
+ classify_pr_against_history(classified, history)
)
- if resolved_tests:
+ resolved_tests = []
+ new_flaky_tests = []
print(
- f"Stabilized: {len(resolved_tests)} previously-failing test(s) now pass"
+ f"PR classification: {len(new_failures)} new, "
+ f"{len(recurring_failures)} known recurring/flaky-on-nightly, "
+ f"{len(pr_flaky_tests)} flaky in run"
)
+ else:
+ (
+ history,
+ new_failures,
+ recurring_failures,
+ resolved_tests,
+ new_flaky_tests,
+ ) = update_history(history, classified, args.sha, args.date)
+
+ if new_flaky_tests:
+ print(
+ f"NEW FLAKY: {len(new_flaky_tests)} test(s) flaky for the first time"
+ )
+ if resolved_tests:
+ print(
+ f"Stabilized: {len(resolved_tests)} previously-failing test(s) now pass"
+ )
- save_history(history, local_history_path)
- print(f"Updated local history at {local_history_path}")
+ save_history(history, local_history_path)
+ print(f"Updated local history at {local_history_path}")
- # ---- Step 4: Upload history back to S3 ----
- if args.s3_history_uri:
- s3_upload(local_history_path, args.s3_history_uri)
+ # ---- Step 4: Upload history back to S3 ----
+ if args.s3_history_uri:
+ s3_upload(local_history_path, args.s3_history_uri)
# ---- Step 5: Generate reports ----
report_kwargs = dict(
@@ -1046,6 +1209,8 @@ def main():
recurring_failures,
resolved_tests,
new_flaky_tests,
+ flaky_tests=pr_flaky_tests,
+ mode=args.mode,
**report_kwargs,
)
json_path = output_dir / "nightly_summary.json"
@@ -1097,7 +1262,10 @@ def main():
print(
f"\nFAILED: {genuine_failures} genuine test failure(s) detected."
)
- return 1
+ # PR mode is reporting-only; the underlying test job already conveys
+ # pass/fail. Returning 0 keeps post-test summary jobs uncoupled from
+ # this script's exit code.
+ return 0 if pr_mode else 1
if classified["flaky"]:
print(
f"\nWARNING: All tests passed but {len(classified['flaky'])} flaky test(s) detected."
diff --git a/ci/utils/nightly_report_helper.sh b/ci/utils/nightly_report_helper.sh
index c65fc22f0..113253af3 100755
--- a/ci/utils/nightly_report_helper.sh
+++ b/ci/utils/nightly_report_helper.sh
@@ -2,7 +2,8 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
-# Shared helper for generating nightly test reports with matrix-aware S3 paths.
+# Shared helper for generating nightly and PR test reports with
+# matrix-aware S3 paths.
#
# Usage (source from any test script):
#
@@ -15,6 +16,16 @@
# # For wheel tests:
# generate_nightly_report "wheel-python" --with-python-version
#
+# Behavior depends on RAPIDS_BUILD_TYPE:
+# - "nightly": update and upload the long-term failure history,
+# upload a per-matrix summary under summaries/, plus HTML.
+# - "pull-request" (PR mode): read the target branch's nightly history
+# (no writes), classify each PR failure as new vs. known
+# (recurring or flaky), and upload only a run-scoped
+# per-matrix summary under ci_test_reports/pr/run-${GITHUB_RUN_ID}/
+# for the PR comment aggregator to consume.
+# - other: local report only; no S3 reads or writes.
+#
# Prerequisites (set before calling):
# RAPIDS_TESTS_DIR - directory containing JUnit XML test results
#
@@ -22,10 +33,11 @@
# RAPIDS_CUDA_VERSION - CUDA version (e.g., "12.9")
# RAPIDS_PY_VERSION - Python version (e.g., "3.12"), used with --with-python-version
# RAPIDS_BRANCH - branch name (e.g., "main")
-# RAPIDS_BUILD_TYPE - build type; S3 history/summary/HTML uploads are
-# only enabled when this equals "nightly"
-# CUOPT_S3_URI - S3 bucket root (e.g., s3://cuopt-datasets/);
-# only consulted when RAPIDS_BUILD_TYPE=nightly
+# RAPIDS_BUILD_TYPE - build type ("nightly", "pull-request", ...)
+# GITHUB_BASE_REF - PR target branch; in PR mode the helper reads
+# the nightly history from this branch. Falls
+# back to RAPIDS_BRANCH or "main".
+# CUOPT_S3_URI - S3 bucket root (e.g., s3://cuopt-datasets/)
# GITHUB_SHA - commit SHA
# GITHUB_RUN_ID - GitHub Actions run ID (scopes summaries to this run)
# GITHUB_STEP_SUMMARY - path for GitHub Actions step summary
@@ -78,10 +90,8 @@ generate_nightly_report() {
local s3_summary_uri=""
local s3_summary_branch_uri=""
local s3_html_uri=""
+ local mode="nightly"
- # Only upload to S3 for nightly runs. For PRs and other build types we
- # still generate the local report and GitHub Step Summary, but skip S3
- # so PR runs don't pollute the nightly history/summary/report buckets.
if [ "${RAPIDS_BUILD_TYPE:-}" = "nightly" ] && [ -n "${CUOPT_S3_URI:-}" ]; then
local s3_base="${CUOPT_S3_URI}ci_test_reports/nightly"
s3_history_uri="${s3_base}/history/${branch_slug}/${test_type}-${matrix_label}.json"
@@ -102,10 +112,45 @@ generate_nightly_report() {
fi
s3_summary_branch_uri="${s3_base}/summaries/${run_date}/${branch_slug}/${summary_filename}"
s3_html_uri="${s3_base}/reports/${run_date}/${branch_slug}/${test_type}-${matrix_label}.html"
+ elif [ "${RAPIDS_BUILD_TYPE:-}" = "pull-request" ] && [ -n "${CUOPT_S3_URI:-}" ]; then
+ # PR mode: read the target branch's nightly history (never write
+ # back), and write a run-scoped per-matrix summary that the PR
+ # comment aggregator picks up.
+ mode="pr"
+
+ # GITHUB_BASE_REF is unset for the `push` events the PR workflow
+ # triggers on (GHA only populates it for `pull_request` events).
+ # The shared rapidsai test workflows don't propagate a target
+ # branch into the test container, so we fall back to RAPIDS_BRANCH
+ # then "main". Follow-up: centralize PR classification in the
+ # pr-test-summary job so this fallback is no longer needed
+ # (see PR #1194 description).
+ local target_branch="${GITHUB_BASE_REF:-${RAPIDS_BRANCH:-main}}"
+ local target_branch_slug
+ target_branch_slug=$(echo "${target_branch}" | tr '/' '-')
+
+ local s3_nightly_base="${CUOPT_S3_URI}ci_test_reports/nightly"
+ s3_history_uri="${s3_nightly_base}/history/${target_branch_slug}/${test_type}-${matrix_label}.json"
+ # Fall back to main's history when the target branch has no history yet
+ # (e.g. PRs into a fresh release branch).
+ if [ "${target_branch_slug}" != "main" ]; then
+ s3_history_seed_uri="${s3_nightly_base}/history/main/${test_type}-${matrix_label}.json"
+ fi
+
+ # PR summaries live under a separate prefix so they never mix with
+ # nightly data. Scoped by GITHUB_RUN_ID so each workflow run is
+ # isolated; cleaned up via bucket lifecycle policy.
+ local pr_base="${CUOPT_S3_URI}ci_test_reports/pr"
+ if [ -n "${GITHUB_RUN_ID:-}" ]; then
+ s3_summary_uri="${pr_base}/run-${GITHUB_RUN_ID}/${test_type}-${matrix_label}.json"
+ else
+ echo "WARNING: GITHUB_RUN_ID unset; skipping PR summary upload" >&2
+ fi
fi
# --- Run nightly report ---
python3 "${_HELPER_DIR}/nightly_report.py" \
+ --mode "${mode}" \
--results-dir "${RAPIDS_TESTS_DIR}" \
--output-dir "${report_output_dir}" \
--sha "${GITHUB_SHA:-unknown}" \
diff --git a/ci/utils/pr_comment_helper.py b/ci/utils/pr_comment_helper.py
new file mode 100644
index 000000000..c07fc9727
--- /dev/null
+++ b/ci/utils/pr_comment_helper.py
@@ -0,0 +1,242 @@
+#!/usr/bin/env python3
+# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""GitHub PR helpers for the PR test-summary workflow.
+
+Two subcommands:
+
+ base-ref Print the PR's target branch (e.g., ``main``).
+ post Post (or update) a single sticky comment identified by a
+ hidden HTML-comment marker.
+
+Stdlib only (urllib + json) so this runs in slim CI containers without
+extra installs. Both ``ci/pr_summary.sh`` and ``pr_test_summary.yaml``
+dispatch into this module rather than embedding inline Python.
+
+The hidden marker is defined here as the single source of truth and
+re-used by ``aggregate_pr.py`` when it builds the comment body.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import sys
+from urllib import error, request
+
+GITHUB_API = "https://api.github.com"
+
+# Imported by aggregate_pr.py so the body it writes and the marker the
+# poster searches for stay in sync.
+COMMENT_MARKER = ""
+
+
+def _gh_request(method, url, token, payload=None, timeout=30):
+ """Issue a GitHub API request and return parsed JSON (or ``None``)."""
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Accept": "application/vnd.github+json",
+ "X-GitHub-Api-Version": "2022-11-28",
+ }
+ data = None
+ if payload is not None:
+ data = json.dumps(payload).encode()
+ headers["Content-Type"] = "application/json"
+
+ req = request.Request(url, data=data, headers=headers, method=method)
+ try:
+ with request.urlopen(req, timeout=timeout) as resp:
+ body = resp.read().decode()
+ except error.HTTPError as exc:
+ detail = exc.read().decode()[:300]
+ raise RuntimeError(
+ f"GitHub API {method} {url} failed: {exc.code} {detail}"
+ ) from exc
+ except error.URLError as exc:
+ raise RuntimeError(f"GitHub API {method} {url} failed: {exc}") from exc
+
+ if not body:
+ return None
+ try:
+ return json.loads(body)
+ except json.JSONDecodeError:
+ return None
+
+
+def resolve_base_ref(repo: str, pr_number: int, token: str) -> str:
+ """Return the PR's target branch (e.g. ``main``).
+
+ Args:
+ repo: GitHub ``owner/name`` slug.
+ pr_number: Pull-request number.
+ token: GitHub token with at least ``pull-requests: read``.
+
+ Returns:
+ The PR's base ref, or ``"main"`` if the API response lacks one.
+
+ Raises:
+ RuntimeError: If the underlying GitHub API call fails.
+ """
+ data = _gh_request(
+ "GET", f"{GITHUB_API}/repos/{repo}/pulls/{pr_number}", token
+ )
+ return ((data or {}).get("base") or {}).get("ref", "main")
+
+
+def find_existing_comment_id(
+ repo: str, pr_number: int, token: str, marker: str = COMMENT_MARKER
+) -> int | None:
+ """Find the id of a PR comment whose body starts with ``marker``.
+
+ Paginates through issue comments (100 per page) until a match is
+ found or all pages are exhausted.
+
+ Args:
+ repo: GitHub ``owner/name`` slug.
+ pr_number: Pull-request number.
+ token: GitHub token with ``pull-requests: read``.
+ marker: Hidden HTML-comment marker that identifies the sticky
+ comment (matched after stripping leading whitespace).
+
+ Returns:
+ The integer comment id, or ``None`` if no comment matches.
+
+ Raises:
+ RuntimeError: If a GitHub API call fails.
+ """
+ page = 1
+ while True:
+ url = (
+ f"{GITHUB_API}/repos/{repo}/issues/{pr_number}/comments"
+ f"?per_page=100&page={page}"
+ )
+ comments = _gh_request("GET", url, token) or []
+ for c in comments:
+ body = (c.get("body") or "").lstrip()
+ if body.startswith(marker):
+ return c["id"]
+ if len(comments) < 100:
+ return None
+ page += 1
+
+
+def post_or_update_comment(
+ repo: str,
+ pr_number: int,
+ token: str,
+ body: str,
+ marker: str = COMMENT_MARKER,
+) -> str:
+ """Update the existing sticky PR comment if present; otherwise create one.
+
+ Looks up an existing comment by ``marker``; if found, ``PATCH``es it
+ in place; otherwise ``POST``s a new one.
+
+ Args:
+ repo: GitHub ``owner/name`` slug.
+ pr_number: Pull-request number.
+ token: GitHub token with ``pull-requests: write``.
+ body: Full Markdown body to post (must already include
+ ``marker`` somewhere near the top for future lookups).
+ marker: Hidden HTML-comment marker that identifies the sticky
+ comment.
+
+ Returns:
+ The created/updated comment's ``html_url``, or ``""`` if the
+ API response lacked one.
+
+ Raises:
+ RuntimeError: If a GitHub API call fails.
+ """
+ existing_id = find_existing_comment_id(repo, pr_number, token, marker)
+ payload = {"body": body}
+ if existing_id is not None:
+ resp = _gh_request(
+ "PATCH",
+ f"{GITHUB_API}/repos/{repo}/issues/comments/{existing_id}",
+ token,
+ payload=payload,
+ )
+ action = "Updated"
+ else:
+ resp = _gh_request(
+ "POST",
+ f"{GITHUB_API}/repos/{repo}/issues/{pr_number}/comments",
+ token,
+ payload=payload,
+ )
+ action = "Created"
+ url = (resp or {}).get("html_url", "")
+ print(f"{action} PR comment: {url}")
+ return url
+
+
+def _cmd_base_ref(args: argparse.Namespace, token: str) -> int:
+ print(resolve_base_ref(args.repo, args.pr, token))
+ return 0
+
+
+def _cmd_post(args: argparse.Namespace, token: str) -> int:
+ with open(args.body_file) as f:
+ body = f.read()
+ if not body.strip():
+ print("Empty body; nothing to post.")
+ return 0
+ post_or_update_comment(args.repo, args.pr, token, body)
+ return 0
+
+
+def _add_common_args(sp: argparse.ArgumentParser) -> None:
+ sp.add_argument("--repo", required=True, help="owner/name")
+ sp.add_argument("--pr", required=True, type=int, help="PR number")
+
+
+def main() -> int:
+ """Dispatch to the requested subcommand.
+
+ Reads ``GITHUB_TOKEN`` from the environment (the GitHub convention);
+ there is no ``--token`` CLI flag so configuration comes from a
+ single source.
+
+ Returns:
+ ``0`` on success, ``1`` if a GitHub API call failed, or ``2``
+ if ``GITHUB_TOKEN`` is not set in the environment.
+
+ Raises:
+ SystemExit: Indirectly via ``argparse`` if argument parsing
+ fails.
+ """
+ p = argparse.ArgumentParser(description=__doc__)
+ sub = p.add_subparsers(dest="cmd", required=True)
+
+ sp_base = sub.add_parser("base-ref", help="Print the PR's target branch.")
+ _add_common_args(sp_base)
+ sp_base.set_defaults(func=_cmd_base_ref)
+
+ sp_post = sub.add_parser(
+ "post", help="Post or update a sticky PR comment."
+ )
+ _add_common_args(sp_post)
+ sp_post.add_argument(
+ "--body-file",
+ required=True,
+ help="File whose contents are the comment body.",
+ )
+ sp_post.set_defaults(func=_cmd_post)
+
+ args = p.parse_args()
+ token = os.environ.get("GITHUB_TOKEN")
+ if not token:
+ print("ERROR: GITHUB_TOKEN env var must be set.", file=sys.stderr)
+ return 2
+ try:
+ return args.func(args, token)
+ except RuntimeError as exc:
+ print(f"ERROR: {exc}", file=sys.stderr)
+ return 1
+
+
+if __name__ == "__main__":
+ sys.exit(main())