From a1ad17eb4e2bae96bcd535e41a04e99db8665239 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 19 May 2026 22:00:06 +0200 Subject: [PATCH 1/9] Add stable Modal pipeline discovery --- .github/workflows/pipeline.yaml | 1 + .github/workflows/pr.yaml | 5 + changelog.d/1049.added.md | 1 + .../engineering/skills/pipeline_operations.md | 37 +- modal_app/pipeline.py | 4 + modal_app/pipeline_discovery.py | 157 ++++++++ modal_app/pipeline_discovery_core.py | 357 ++++++++++++++++++ modal_app/pipeline_status.py | 6 +- policyengine_us_data/utils/run_context.py | 27 +- .../integration/test_modal_pipeline_seams.py | 56 +++ tests/unit/test_pipeline_discovery.py | 232 ++++++++++++ tests/unit/test_run_context.py | 11 + 12 files changed, 884 insertions(+), 10 deletions(-) create mode 100644 changelog.d/1049.added.md create mode 100644 modal_app/pipeline_discovery.py create mode 100644 modal_app/pipeline_discovery_core.py create mode 100644 tests/unit/test_pipeline_discovery.py diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index ab17a5d01..97fe46870 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -132,5 +132,6 @@ jobs: PARALLEL_MATRIX: ${{ inputs.parallel_matrix || 'false' }} NUM_MATRIX_WORKERS: ${{ inputs.num_matrix_workers || '50' }} run: | + modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline_discovery.py modal deploy --env="${MODAL_ENVIRONMENT}" --name="${US_DATA_MODAL_APP_NAME}" --tag="${US_DATA_RUN_ID}" modal_app/pipeline.py python .github/scripts/spawn_modal_pipeline.py diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index f20889dea..25ac1117b 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -170,10 +170,12 @@ jobs: # already exist, then stop/delete the PR resources in cleanup steps. MODAL_ENVIRONMENT: main MODAL_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} + MODAL_PIPELINE_DISCOVERY_APP_NAME: us-data-pipeline-discovery-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} MODAL_LOCAL_AREA_APP_NAME: us-data-local-area-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} MODAL_H5_TEST_HARNESS_APP_NAME: us-data-h5-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_PIPELINE_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_MODAL_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} + US_DATA_PIPELINE_DISCOVERY_APP_NAME: us-data-pipeline-discovery-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_LOCAL_AREA_APP_NAME: us-data-local-area-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_H5_HARNESS_APP_NAME: us-data-h5-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_PIPELINE_VOLUME_NAME: pipeline-artifacts-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} @@ -188,6 +190,8 @@ jobs: - run: uv sync --dev - name: Install integration test deps run: uv pip install modal pytest numpy pandas + - name: Deploy PR Modal pipeline discovery app + run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" --name="${MODAL_PIPELINE_DISCOVERY_APP_NAME}" modal_app/pipeline_discovery.py - name: Deploy PR Modal pipeline app run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline.py - name: Deploy PR Modal local-area app @@ -216,6 +220,7 @@ jobs: for app_name in \ "${MODAL_H5_TEST_HARNESS_APP_NAME}" \ "${MODAL_LOCAL_AREA_APP_NAME}" \ + "${MODAL_PIPELINE_DISCOVERY_APP_NAME}" \ "${MODAL_APP_NAME}" do yes | uv run modal app stop \ diff --git a/changelog.d/1049.added.md b/changelog.d/1049.added.md new file mode 100644 index 000000000..edca789d1 --- /dev/null +++ b/changelog.d/1049.added.md @@ -0,0 +1 @@ +Add a stable Modal discovery endpoint for deployed publication pipeline runs. diff --git a/docs/engineering/skills/pipeline_operations.md b/docs/engineering/skills/pipeline_operations.md index 80d018e7b..d2155b97d 100644 --- a/docs/engineering/skills/pipeline_operations.md +++ b/docs/engineering/skills/pipeline_operations.md @@ -19,21 +19,33 @@ The status system reports: ## Status Surfaces -The structured status payload is canonical. The pipeline status sub-app exposes -run-level and run-index Modal functions: +The structured status payload is canonical. Run-scoped pipeline apps expose +status for their own mounted pipeline volume: - `get_pipeline_status`: Python-callable structured JSON for agents, scripts, dashboards, and tests. Prefer this for diagnosis and automation. - `pipeline_status_endpoint`: protected HTTP endpoint returning the same structured JSON for non-Python clients. Use Modal proxy auth headers. - `list_pipeline_runs`: Python-callable structured JSON index of recent runs. - Use this for dashboards that need to discover candidate run IDs. + This is volume-local and only lists runs visible to that app's mounted + `US_DATA_PIPELINE_VOLUME_NAME`. - `pipeline_runs_endpoint`: protected HTTP endpoint returning the same - structured recent-run index for non-Python clients. + volume-local recent-run index for non-Python clients. - `pipeline_status_snippet`: human-readable text used by `modal run modal_app/pipeline.py::main --action status`. This is for quick terminal inspection only and must not be treated as a schema. +Cross-run discovery lives in the stable `policyengine-us-data-pipeline-status` +app, not in a run-scoped pipeline app: + +- `list_deployed_pipeline_runs`: Python-callable structured JSON index of + deployed publication pipeline apps. It discovers runs from Modal app names + matching `usdata-gha-a`, then calls each app's + `get_pipeline_status`. +- `deployed_pipeline_runs_endpoint`: protected HTTP endpoint returning the same + cross-app discovery payload. Use this for dashboards that need to discover all + deployed publication runs. + ## Fetch Status First identify the run context from the GitHub Actions summary, workflow logs, or @@ -74,6 +86,23 @@ If the local environment cannot sync the full project environment, use the same snippet with a Modal-only temporary environment by replacing `uv run python` with `uv run --no-sync --with modal python`. +To discover deployed publication runs before choosing a run ID, call the stable +status app: + +```bash +uv run --no-sync --with modal python - <<'PY' +import json +import modal + +fn = modal.Function.from_name( + "policyengine-us-data-pipeline-status", + "list_deployed_pipeline_runs", + environment_name="main", +) +print(json.dumps(fn.remote(limit=25), indent=2)) +PY +``` + If using the HTTP endpoint, authenticate with Modal proxy auth headers. Do not publish or paste proxy auth values into PRs, issues, logs, or docs. diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index ab1bf9d3f..228350cad 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -543,6 +543,8 @@ def verify_runtime_seams() -> dict: "uv.lock", "modal_app/worker_script.py", "modal_app/local_area.py", + "modal_app/pipeline_discovery.py", + "modal_app/pipeline_discovery_core.py", "modal_app/pipeline_status.py", "modal_app/h5_test_harness.py", "modal_app/step_manifests/specs.py", @@ -592,6 +594,8 @@ def verify_runtime_seams() -> dict: "modal_app.fixtures.h5_cases", "modal_app.h5_test_harness", "modal_app.local_area", + "modal_app.pipeline_discovery", + "modal_app.pipeline_discovery_core", "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", diff --git a/modal_app/pipeline_discovery.py b/modal_app/pipeline_discovery.py new file mode 100644 index 000000000..e422f2713 --- /dev/null +++ b/modal_app/pipeline_discovery.py @@ -0,0 +1,157 @@ +"""Stable Modal app for discovering deployed US data pipeline runs.""" + +from __future__ import annotations + +import asyncio +import os +import sys +from pathlib import Path +from typing import Any + +import modal + +_baked = "/root/policyengine-us-data" +_local = str(Path(__file__).resolve().parent.parent) +for _p in (_baked, _local): + if _p not in sys.path: + sys.path.insert(0, _p) + +from modal_app.images import cpu_image as image # noqa: E402 +from modal_app.pipeline_discovery_core import ( # noqa: E402 + build_deployed_pipeline_runs_payload, +) +from policyengine_us_data.utils.run_context import ( # noqa: E402 + DEFAULT_MODAL_ENVIRONMENT, +) + + +DISCOVERY_APP_NAME = "policyengine-us-data-pipeline-status" + +app = modal.App( + os.environ.get("US_DATA_PIPELINE_DISCOVERY_APP_NAME") or DISCOVERY_APP_NAME +) +status_image = image.pip_install("fastapi") + + +def _modal_environment(explicit: str = "") -> str: + return ( + explicit + or os.environ.get("US_DATA_MODAL_ENVIRONMENT") + or os.environ.get("MODAL_ENVIRONMENT") + or DEFAULT_MODAL_ENVIRONMENT + ) + + +def _modal_state_name(api_pb2, state: int) -> str: + try: + return api_pb2.AppState.Name(state).removeprefix("APP_STATE_").lower() + except ValueError: + return str(state) + + +async def _list_modal_app_records_async(environment_name: str) -> list[dict[str, Any]]: + from modal.client import _Client + from modal_proto import api_pb2 + + client = await _Client.from_env() + resp = await client.stub.AppList( + api_pb2.AppListRequest(environment_name=environment_name) + ) + return [ + { + "app_id": item.app_id, + "name": item.name or item.description, + "description": item.description, + "state": _modal_state_name(api_pb2, item.state), + "tasks": item.n_running_tasks, + "created_at": item.created_at, + "stopped_at": item.stopped_at, + } + for item in resp.apps + ] + + +def _list_modal_app_records(environment_name: str) -> list[dict[str, Any]]: + return asyncio.run(_list_modal_app_records_async(environment_name)) + + +def _pipeline_status_lookup( + *, + app_name: str, + run_id: str, + environment_name: str, +) -> dict[str, Any]: + status_fn = modal.Function.from_name( + app_name, + "get_pipeline_status", + environment_name=environment_name, + ) + return status_fn.remote(run_id) + + +def _build_deployed_pipeline_runs( + *, + limit: int | str | None, + status: str, + branch: str, + include_unreachable: bool, + modal_environment: str, +) -> dict[str, Any]: + environment_name = _modal_environment(modal_environment) + app_records = _list_modal_app_records(environment_name) + return build_deployed_pipeline_runs_payload( + app_records, + lambda candidate: _pipeline_status_lookup( + app_name=candidate.app_name, + run_id=candidate.run_id, + environment_name=environment_name, + ), + limit=limit, + status=status, + branch=branch, + include_unreachable=include_unreachable, + modal_environment=environment_name, + ) + + +@app.function(image=image, timeout=180) +def list_deployed_pipeline_runs( + limit: int = 25, + status: str = "", + branch: str = "", + include_unreachable: bool = True, + modal_environment: str = "", +) -> dict[str, Any]: + """Return deployed publication pipeline runs discovered from Modal app names.""" + + return _build_deployed_pipeline_runs( + limit=limit, + status=status, + branch=branch, + include_unreachable=include_unreachable, + modal_environment=modal_environment, + ) + + +@app.function(image=status_image, timeout=180) +@modal.fastapi_endpoint( + method="GET", + docs=False, + requires_proxy_auth=True, +) +def deployed_pipeline_runs_endpoint( + limit: int = 25, + status: str = "", + branch: str = "", + include_unreachable: bool = True, + modal_environment: str = "", +) -> dict[str, Any]: + """Protected HTTP endpoint for deployed publication pipeline discovery.""" + + return _build_deployed_pipeline_runs( + limit=limit, + status=status, + branch=branch, + include_unreachable=include_unreachable, + modal_environment=modal_environment, + ) diff --git a/modal_app/pipeline_discovery_core.py b/modal_app/pipeline_discovery_core.py new file mode 100644 index 000000000..6c02e5c14 --- /dev/null +++ b/modal_app/pipeline_discovery_core.py @@ -0,0 +1,357 @@ +"""Pure helpers for discovering deployed US data pipeline apps.""" + +from __future__ import annotations + +import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Callable, Iterable + +from policyengine_us_data.utils.error_redaction import ( + DEFAULT_ERROR_MESSAGE_MAX_CHARS, + redacted_bounded_error_text, +) + + +PIPELINE_DISCOVERY_SCHEMA_VERSION = "1" +PIPELINE_DISCOVERY_SOURCE = "modal_app_names" +DEFAULT_DISCOVERY_LIMIT = 25 +MAX_DISCOVERY_LIMIT = 100 +DEFAULT_DISCOVERY_WORKERS = 8 +PUBLICATION_APP_PREFIXES = ("us-data-", "policyengine-us-data-pub-") +RUN_ID_RE = re.compile(r"(usdata-gha\d+-a\d+)") +DEPLOYED_STATES = {"deployed", "app_state_deployed"} + + +@dataclass(frozen=True) +class PipelineAppCandidate: + """A deployed Modal app that appears to represent one publication run.""" + + app_id: str + app_name: str + run_id: str + state: str + task_count: int + created_at: str | None = None + stopped_at: str | None = None + + +StatusLookup = Callable[[PipelineAppCandidate], dict[str, Any]] + + +def derive_run_id_from_app_name(app_name: str) -> str: + """Extract the canonical US data run ID from a Modal app name.""" + + match = RUN_ID_RE.search(app_name or "") + return match.group(1) if match else "" + + +def is_publication_pipeline_app_name(app_name: str) -> bool: + """Return whether an app name belongs to a publication pipeline run.""" + + if not app_name.startswith(PUBLICATION_APP_PREFIXES): + return False + return bool(derive_run_id_from_app_name(app_name)) + + +def _bounded_limit(limit: int | str | None) -> int: + try: + parsed = int(limit if limit is not None else DEFAULT_DISCOVERY_LIMIT) + except (TypeError, ValueError): + parsed = DEFAULT_DISCOVERY_LIMIT + return max(0, min(parsed, MAX_DISCOVERY_LIMIT)) + + +def _int_value(value: Any) -> int: + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def _text_value(value: Any) -> str: + return "" if value is None else str(value) + + +def _timestamp_value(value: Any) -> str | None: + if value in (None, ""): + return None + if isinstance(value, (int, float)): + return datetime.fromtimestamp(float(value), timezone.utc).isoformat() + return str(value) + + +def _record_value(record: dict[str, Any], *keys: str) -> Any: + for key in keys: + if key in record: + return record[key] + return None + + +def pipeline_app_candidates( + app_records: Iterable[dict[str, Any]], +) -> list[PipelineAppCandidate]: + """Return deployed publication-pipeline app candidates from Modal records.""" + + candidates: list[PipelineAppCandidate] = [] + for record in app_records: + app_name = _text_value( + _record_value(record, "name", "description", "Description") + ) + state = _text_value(_record_value(record, "state", "State")).lower() + if state and state not in DEPLOYED_STATES: + continue + if not is_publication_pipeline_app_name(app_name): + continue + run_id = derive_run_id_from_app_name(app_name) + candidates.append( + PipelineAppCandidate( + app_id=_text_value(_record_value(record, "app_id", "App ID")), + app_name=app_name, + run_id=run_id, + state=state or "unknown", + task_count=_int_value(_record_value(record, "tasks", "Tasks")), + created_at=_timestamp_value( + _record_value(record, "created_at", "Created at") + ), + stopped_at=_timestamp_value( + _record_value(record, "stopped_at", "Stopped at") + ), + ) + ) + candidates.sort( + key=lambda candidate: ( + candidate.created_at or "", + candidate.app_name, + ), + reverse=True, + ) + return candidates + + +def _modal_fields(candidate: PipelineAppCandidate) -> dict[str, Any]: + return { + "modal_app_id": candidate.app_id, + "modal_app_name": candidate.app_name, + "modal_app_state": candidate.state, + "modal_task_count": candidate.task_count, + "modal_app_created_at": candidate.created_at, + "modal_app_stopped_at": candidate.stopped_at, + } + + +def _latest_manifest_payload( + stage_manifests: list[dict[str, Any]], +) -> dict[str, Any] | None: + if not stage_manifests: + return None + item = stage_manifests[-1] + manifest = item.get("manifest") or {} + return { + "step_id": item.get("step_id"), + "stage_id": item.get("stage_id"), + "substage_id": item.get("substage_id"), + "title": item.get("title"), + "status": item.get("status"), + "started_at": manifest.get("started_at"), + "completed_at": manifest.get("completed_at"), + "duration_s": manifest.get("duration_s"), + "reuse_decision": manifest.get("reuse_decision", "not_applicable"), + } + + +def _index_error_payload(error: dict[str, Any] | None) -> dict[str, Any] | None: + if error is None: + return None + allowed = ( + "stage_id", + "substage_id", + "surface", + "error_type", + "message", + "message_truncated", + "record_path", + "latest_path", + "traceback_available", + ) + return {key: error[key] for key in allowed if key in error} + + +def _status_item( + candidate: PipelineAppCandidate, + payload: dict[str, Any], +) -> dict[str, Any]: + run_manifest = payload.get("run_manifest") or {} + stage_manifests = payload.get("stage_manifests") or [] + missing = payload.get("missing_expected_manifest_ids") or [] + known_step_ids = run_manifest.get("known_step_ids") or [] + expected_count = max(len(known_step_ids), len(stage_manifests) + len(missing)) + status_lookup = "not_found" if payload.get("status") == "not_found" else "ok" + return { + **_modal_fields(candidate), + "run_id": payload.get("run_id") or candidate.run_id, + "status_lookup": status_lookup, + "status": payload.get("status", "unknown"), + "message": payload.get("message", ""), + "branch": run_manifest.get("branch"), + "sha": run_manifest.get("sha"), + "candidate_version": run_manifest.get("candidate_version"), + "release_version": run_manifest.get("release_version"), + "started_at": run_manifest.get("started_at"), + "updated_at": payload.get("updated_at"), + "completed_at": run_manifest.get("completed_at"), + "modal_environment": ( + payload.get("modal_environment") or run_manifest.get("modal_environment") + ), + "hf_staging_prefix": run_manifest.get("hf_staging_prefix"), + "github_run_url": (run_manifest.get("run_context") or {}).get("github_run_url"), + "latest_manifest": _latest_manifest_payload(stage_manifests), + "progress": { + "expected_manifests": expected_count, + "present_manifests": len(stage_manifests), + "missing_manifests": len(missing), + }, + "error": _index_error_payload(payload.get("error")), + } + + +def _lookup_error_item( + candidate: PipelineAppCandidate, + exc: BaseException, +) -> dict[str, Any]: + message = redacted_bounded_error_text( + f"{type(exc).__name__}: {exc}", + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ).text + return { + **_modal_fields(candidate), + "run_id": candidate.run_id, + "status_lookup": "unreachable", + "status": "unreachable", + "message": message, + "branch": None, + "sha": None, + "candidate_version": None, + "release_version": None, + "started_at": None, + "updated_at": None, + "completed_at": None, + "modal_environment": None, + "hf_staging_prefix": None, + "github_run_url": None, + "latest_manifest": None, + "progress": { + "expected_manifests": 0, + "present_manifests": 0, + "missing_manifests": 0, + }, + "error": { + "error_type": type(exc).__name__, + "message": message, + "traceback_available": False, + }, + } + + +def _passes_filters( + item: dict[str, Any], + *, + status: str, + branch: str, + include_unreachable: bool, +) -> bool: + if not include_unreachable and item.get("status_lookup") != "ok": + return False + if status and item.get("status") != status: + return False + if branch and item.get("branch") != branch: + return False + return True + + +def _lookup_items( + candidates: list[PipelineAppCandidate], + status_lookup: StatusLookup, + *, + max_workers: int, +) -> list[dict[str, Any]]: + if max_workers <= 1 or len(candidates) <= 1: + items = [] + for candidate in candidates: + try: + items.append(_status_item(candidate, status_lookup(candidate))) + except Exception as exc: + items.append(_lookup_error_item(candidate, exc)) + return items + + items: list[dict[str, Any]] = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(status_lookup, candidate): candidate + for candidate in candidates + } + for future in as_completed(futures): + candidate = futures[future] + try: + items.append(_status_item(candidate, future.result())) + except Exception as exc: + items.append(_lookup_error_item(candidate, exc)) + items.sort( + key=lambda item: ( + item.get("updated_at") or item.get("modal_app_created_at") or "", + item.get("run_id") or "", + ), + reverse=True, + ) + return items + + +def build_deployed_pipeline_runs_payload( + app_records: Iterable[dict[str, Any]], + status_lookup: StatusLookup, + *, + limit: int | str | None = DEFAULT_DISCOVERY_LIMIT, + status: str = "", + branch: str = "", + include_unreachable: bool = True, + modal_environment: str = "main", + max_workers: int = DEFAULT_DISCOVERY_WORKERS, +) -> dict[str, Any]: + """Build a cross-app pipeline run index from Modal app names.""" + + bounded_limit = _bounded_limit(limit) + filters = { + "status": status or "", + "branch": branch or "", + "include_unreachable": bool(include_unreachable), + } + candidates = pipeline_app_candidates(app_records) + needs_filter_window = ( + bool(filters["status"] or filters["branch"]) + or not filters["include_unreachable"] + ) + selected_limit = MAX_DISCOVERY_LIMIT if needs_filter_window else bounded_limit + selected = candidates[:selected_limit] + items = _lookup_items(selected, status_lookup, max_workers=max_workers) + runs = [ + item + for item in items + if _passes_filters( + item, + status=filters["status"], + branch=filters["branch"], + include_unreachable=filters["include_unreachable"], + ) + ][:bounded_limit] + return { + "schema_version": PIPELINE_DISCOVERY_SCHEMA_VERSION, + "source": PIPELINE_DISCOVERY_SOURCE, + "modal_environment": modal_environment, + "discovered_count": len(candidates), + "queried_count": len(selected), + "count": len(runs), + "limit": bounded_limit, + "filters": filters, + "runs": runs, + } diff --git a/modal_app/pipeline_status.py b/modal_app/pipeline_status.py index 5224d8564..610cfedab 100644 --- a/modal_app/pipeline_status.py +++ b/modal_app/pipeline_status.py @@ -1,4 +1,8 @@ -"""Modal status functions for deployed pipeline runs.""" +"""Run-scoped Modal status functions for deployed pipeline runs. + +These functions read only the pipeline volume mounted into their app. Cross-app +discovery lives in ``modal_app.pipeline_discovery``. +""" from __future__ import annotations diff --git a/policyengine_us_data/utils/run_context.py b/policyengine_us_data/utils/run_context.py index 7efdd946a..f6d6a665f 100644 --- a/policyengine_us_data/utils/run_context.py +++ b/policyengine_us_data/utils/run_context.py @@ -153,12 +153,29 @@ def build_modal_app_name( prefix: str = DEFAULT_MODAL_APP_PREFIX, max_length: int = DEFAULT_MAX_RESOURCE_NAME_LENGTH, ) -> str: - """Build a safe Modal app name from candidate scope and run ID.""" - return build_modal_resource_name( - candidate_run_segment(run_id, candidate_version), - prefix=prefix, - max_length=max_length, + """Build a safe Modal app name while preserving the run ID suffix.""" + resolved_run_id = sanitize_run_id(run_id) + resolved_candidate_version = ( + sanitize_staging_version(candidate_version) if (candidate_version) else "" + ) + name_prefix = _slugify( + "-".join( + part + for part in ( + prefix, + resolved_candidate_version, + ) + if part + ) ) + candidate = _slugify(f"{name_prefix}-{resolved_run_id}") + if len(candidate) <= max_length: + return candidate + suffix = resolved_run_id + if len(suffix) >= max_length: + return _truncate_with_digest(suffix, max_length) + prefix_length = max_length - len(suffix) - 1 + return f"{_truncate_with_digest(name_prefix, prefix_length)}-{suffix}" def staging_prefix( diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index 6cf73ba26..beca88f63 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -55,6 +55,8 @@ def test_pipeline_image_runtime_seams(): "uv.lock": True, "modal_app/worker_script.py": True, "modal_app/local_area.py": True, + "modal_app/pipeline_discovery.py": True, + "modal_app/pipeline_discovery_core.py": True, "modal_app/pipeline_status.py": True, "modal_app/h5_test_harness.py": True, "modal_app/step_manifests/specs.py": True, @@ -83,6 +85,8 @@ def test_pipeline_image_runtime_seams(): "modal_app.fixtures.h5_cases", "modal_app.h5_test_harness", "modal_app.local_area", + "modal_app.pipeline_discovery", + "modal_app.pipeline_discovery_core", "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", @@ -146,6 +150,29 @@ def test_pipeline_runs_callable_reports_structured_index(): assert result["filters"] == {"status": "", "branch": ""} +def test_pipeline_discovery_callable_reports_structured_index(): + _require_modal_tokens() + + fn = modal.Function.from_name( + "policyengine-us-data-pipeline-status", + "list_deployed_pipeline_runs", + environment_name=MODAL_ENVIRONMENT, + ) + result = fn.remote(limit=1, modal_environment=MODAL_ENVIRONMENT) + + assert result["schema_version"] == "1" + assert result["source"] == "modal_app_names" + assert result["modal_environment"] == MODAL_ENVIRONMENT + assert result["limit"] == 1 + assert result["count"] <= 1 + assert isinstance(result["runs"], list) + assert result["filters"] == { + "status": "", + "branch": "", + "include_unreachable": True, + } + + def test_pipeline_status_http_endpoint_reports_missing_run(): _require_modal_tokens() headers = _modal_proxy_auth_headers() @@ -201,6 +228,35 @@ def test_pipeline_runs_http_endpoint_reports_structured_index(): assert result["filters"] == {"status": "", "branch": ""} +def test_pipeline_discovery_http_endpoint_reports_structured_index(): + _require_modal_tokens() + headers = _modal_proxy_auth_headers() + + fn = modal.Function.from_name( + "policyengine-us-data-pipeline-status", + "deployed_pipeline_runs_endpoint", + environment_name=MODAL_ENVIRONMENT, + ) + endpoint = fn.get_web_url() + assert endpoint + + response = requests.get( + endpoint, + params={"limit": "1", "modal_environment": MODAL_ENVIRONMENT}, + headers=headers, + timeout=30, + ) + + assert response.status_code == 200, response.text[:500] + result = response.json() + assert result["schema_version"] == "1" + assert result["source"] == "modal_app_names" + assert result["modal_environment"] == MODAL_ENVIRONMENT + assert result["limit"] == 1 + assert result["count"] <= 1 + assert isinstance(result["runs"], list) + + def test_pipeline_status_cli_snippet_reports_missing_run(): _require_modal_tokens() diff --git a/tests/unit/test_pipeline_discovery.py b/tests/unit/test_pipeline_discovery.py new file mode 100644 index 000000000..9a7409b06 --- /dev/null +++ b/tests/unit/test_pipeline_discovery.py @@ -0,0 +1,232 @@ +import pytest + +from modal_app.pipeline_discovery_core import ( + build_deployed_pipeline_runs_payload, + derive_run_id_from_app_name, + is_publication_pipeline_app_name, + pipeline_app_candidates, +) + + +def _app_record( + name: str, + *, + app_id: str = "ap-1", + state: str = "deployed", + tasks: int = 0, + created_at: str = "2026-05-19T12:00:00+00:00", +) -> dict: + return { + "app_id": app_id, + "name": name, + "state": state, + "tasks": tasks, + "created_at": created_at, + } + + +def _status_payload( + run_id: str, + *, + status: str = "running", + branch: str = "main", + updated_at: str = "2026-05-19T12:30:00+00:00", +) -> dict: + return { + "schema_version": "1", + "run_id": run_id, + "status": status, + "message": f"Pipeline {status}.", + "updated_at": updated_at, + "run_manifest": { + "run_id": run_id, + "branch": branch, + "sha": "abc123", + "candidate_version": "1.115.4-minor", + "release_version": "", + "started_at": "2026-05-19T12:00:00+00:00", + "completed_at": None, + "known_step_ids": ["1_build_datasets", "2_build_calibration_package"], + "hf_staging_prefix": f"staging/1.115.4-minor-{run_id}", + "run_context": { + "github_run_url": ( + "https://github.com/PolicyEngine/policyengine-us-data/" + "actions/runs/123" + ) + }, + }, + "stage_manifests": [ + { + "step_id": "1_build_datasets", + "stage_id": "1_build_datasets", + "substage_id": None, + "title": "Build datasets", + "status": "completed", + "manifest": { + "started_at": "2026-05-19T12:00:00+00:00", + "completed_at": "2026-05-19T12:15:00+00:00", + "duration_s": 900, + "reuse_decision": "not_applicable", + }, + } + ], + "missing_expected_manifest_ids": ["2_build_calibration_package"], + "error": None, + "modal_app_name": f"us-data-1-115-4-minor-{run_id}", + "modal_environment": "main", + } + + +def test_derives_run_id_from_current_and_legacy_publication_app_names(): + assert ( + derive_run_id_from_app_name("us-data-1-115-4-minor-usdata-gha26114604836-a1") + == "usdata-gha26114604836-a1" + ) + assert ( + derive_run_id_from_app_name("policyengine-us-data-pub-usdata-gha123-a2") + == "usdata-gha123-a2" + ) + + +@pytest.mark.parametrize( + ("app_name", "expected"), + [ + ("us-data-1-115-4-minor-usdata-gha26114604836-a1", True), + ("policyengine-us-data-pub-usdata-gha123-a1", True), + ("us-data-pipeline-pr-1035-26117326123-1", False), + ("us-data-local-area-pr-1035-26117326123-1", False), + ("policyengine-us-data-pipeline", False), + ("state-research-tracker", False), + ], +) +def test_identifies_publication_pipeline_app_names(app_name, expected): + assert is_publication_pipeline_app_name(app_name) is expected + + +def test_pipeline_app_candidates_filters_to_deployed_publication_apps(): + records = [ + _app_record( + "us-data-1-115-4-minor-usdata-gha26114604836-a1", + app_id="ap-new", + tasks=2, + created_at="2026-05-19T13:00:00+00:00", + ), + _app_record("us-data-pipeline-pr-1035-26117326123-1"), + _app_record( + "us-data-1-115-4-patch-usdata-gha26114905403-a1", + state="stopped", + ), + ] + + candidates = pipeline_app_candidates(records) + + assert len(candidates) == 1 + assert candidates[0].app_id == "ap-new" + assert candidates[0].app_name == ("us-data-1-115-4-minor-usdata-gha26114604836-a1") + assert candidates[0].run_id == "usdata-gha26114604836-a1" + assert candidates[0].task_count == 2 + + +def test_build_deployed_pipeline_runs_payload_queries_status_by_derived_run_id(): + app_name = "us-data-1-115-4-minor-usdata-gha26114604836-a1" + seen = [] + + def lookup(candidate): + seen.append((candidate.app_name, candidate.run_id)) + return _status_payload(candidate.run_id) + + payload = build_deployed_pipeline_runs_payload( + [_app_record(app_name, app_id="ap-run", tasks=4)], + lookup, + limit=10, + max_workers=1, + ) + + assert seen == [(app_name, "usdata-gha26114604836-a1")] + assert payload["schema_version"] == "1" + assert payload["source"] == "modal_app_names" + assert payload["discovered_count"] == 1 + assert payload["queried_count"] == 1 + assert payload["count"] == 1 + run = payload["runs"][0] + assert run["run_id"] == "usdata-gha26114604836-a1" + assert run["status_lookup"] == "ok" + assert run["status"] == "running" + assert run["branch"] == "main" + assert run["modal_app_id"] == "ap-run" + assert run["modal_task_count"] == 4 + assert run["latest_manifest"]["step_id"] == "1_build_datasets" + assert run["progress"] == { + "expected_manifests": 2, + "present_manifests": 1, + "missing_manifests": 1, + } + + +def test_deployed_pipeline_runs_payload_keeps_unreachable_apps_structured(): + def lookup(_candidate): + raise RuntimeError("lookup failed with TOKEN=secret") + + payload = build_deployed_pipeline_runs_payload( + [_app_record("us-data-1-115-4-minor-usdata-gha26114604836-a1")], + lookup, + max_workers=1, + ) + + assert payload["count"] == 1 + run = payload["runs"][0] + assert run["run_id"] == "usdata-gha26114604836-a1" + assert run["status_lookup"] == "unreachable" + assert run["status"] == "unreachable" + assert run["error"]["error_type"] == "RuntimeError" + + +def test_deployed_pipeline_runs_payload_applies_limit_after_filters(): + records = [ + _app_record( + "us-data-1-115-4-minor-usdata-gha1-a1", + created_at="2026-05-19T13:00:00+00:00", + ), + _app_record( + "us-data-1-115-4-minor-usdata-gha2-a1", + created_at="2026-05-19T12:00:00+00:00", + ), + ] + + def lookup(candidate): + branch = "feature" if candidate.run_id.endswith("2-a1") else "main" + return _status_payload(candidate.run_id, branch=branch) + + payload = build_deployed_pipeline_runs_payload( + records, + lookup, + limit=1, + branch="main", + max_workers=1, + ) + + assert payload["limit"] == 1 + assert payload["filters"] == { + "status": "", + "branch": "main", + "include_unreachable": True, + } + assert payload["queried_count"] == 2 + assert [run["run_id"] for run in payload["runs"]] == ["usdata-gha1-a1"] + + +def test_deployed_pipeline_runs_payload_can_exclude_unreachable_apps(): + def lookup(_candidate): + raise RuntimeError("unavailable") + + payload = build_deployed_pipeline_runs_payload( + [_app_record("us-data-1-115-4-minor-usdata-gha26114604836-a1")], + lookup, + include_unreachable=False, + max_workers=1, + ) + + assert payload["discovered_count"] == 1 + assert payload["queried_count"] == 1 + assert payload["count"] == 0 + assert payload["runs"] == [] diff --git a/tests/unit/test_run_context.py b/tests/unit/test_run_context.py index 4a579714e..af5f58eed 100644 --- a/tests/unit/test_run_context.py +++ b/tests/unit/test_run_context.py @@ -58,6 +58,17 @@ def test_modal_app_name_scopes_by_candidate_version_and_run_id() -> None: ) +def test_modal_app_name_preserves_run_id_suffix_when_truncated() -> None: + run_id = "usdata-gha123456789012345-a2" + name = build_modal_app_name( + run_id, + candidate_version="1.73.0-minor-with-an-extremely-long-candidate-label", + ) + + assert name.endswith(f"-{run_id}") + assert len(name) <= 64 + + def test_candidate_scope_uses_base_release_and_bump() -> None: assert build_candidate_scope("1.73.0", "minor") == "1.73.0-minor" assert release_version_from_bump("1.73.0", "minor") == "1.74.0" From 3fdd980877ded62eccb6ce36b04499900065d274 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 19 May 2026 22:09:26 +0200 Subject: [PATCH 2/9] Type pipeline discovery results --- modal_app/pipeline_discovery.py | 21 +- modal_app/pipeline_discovery_core.py | 529 ++++++++++++++++++-------- tests/unit/test_pipeline_discovery.py | 62 +-- 3 files changed, 408 insertions(+), 204 deletions(-) diff --git a/modal_app/pipeline_discovery.py b/modal_app/pipeline_discovery.py index e422f2713..263176db1 100644 --- a/modal_app/pipeline_discovery.py +++ b/modal_app/pipeline_discovery.py @@ -6,7 +6,6 @@ import os import sys from pathlib import Path -from typing import Any import modal @@ -18,6 +17,8 @@ from modal_app.images import cpu_image as image # noqa: E402 from modal_app.pipeline_discovery_core import ( # noqa: E402 + JSONDict, + RawRecord, build_deployed_pipeline_runs_payload, ) from policyengine_us_data.utils.run_context import ( # noqa: E402 @@ -49,7 +50,7 @@ def _modal_state_name(api_pb2, state: int) -> str: return str(state) -async def _list_modal_app_records_async(environment_name: str) -> list[dict[str, Any]]: +async def _list_modal_app_records_async(environment_name: str) -> list[JSONDict]: from modal.client import _Client from modal_proto import api_pb2 @@ -71,7 +72,7 @@ async def _list_modal_app_records_async(environment_name: str) -> list[dict[str, ] -def _list_modal_app_records(environment_name: str) -> list[dict[str, Any]]: +def _list_modal_app_records(environment_name: str) -> list[JSONDict]: return asyncio.run(_list_modal_app_records_async(environment_name)) @@ -80,13 +81,14 @@ def _pipeline_status_lookup( app_name: str, run_id: str, environment_name: str, -) -> dict[str, Any]: +) -> RawRecord: status_fn = modal.Function.from_name( app_name, "get_pipeline_status", environment_name=environment_name, ) - return status_fn.remote(run_id) + payload = status_fn.remote(run_id) + return payload if isinstance(payload, dict) else {} def _build_deployed_pipeline_runs( @@ -96,10 +98,10 @@ def _build_deployed_pipeline_runs( branch: str, include_unreachable: bool, modal_environment: str, -) -> dict[str, Any]: +) -> JSONDict: environment_name = _modal_environment(modal_environment) app_records = _list_modal_app_records(environment_name) - return build_deployed_pipeline_runs_payload( + payload = build_deployed_pipeline_runs_payload( app_records, lambda candidate: _pipeline_status_lookup( app_name=candidate.app_name, @@ -112,6 +114,7 @@ def _build_deployed_pipeline_runs( include_unreachable=include_unreachable, modal_environment=environment_name, ) + return payload.to_dict() @app.function(image=image, timeout=180) @@ -121,7 +124,7 @@ def list_deployed_pipeline_runs( branch: str = "", include_unreachable: bool = True, modal_environment: str = "", -) -> dict[str, Any]: +) -> JSONDict: """Return deployed publication pipeline runs discovered from Modal app names.""" return _build_deployed_pipeline_runs( @@ -145,7 +148,7 @@ def deployed_pipeline_runs_endpoint( branch: str = "", include_unreachable: bool = True, modal_environment: str = "", -) -> dict[str, Any]: +) -> JSONDict: """Protected HTTP endpoint for deployed publication pipeline discovery.""" return _build_deployed_pipeline_runs( diff --git a/modal_app/pipeline_discovery_core.py b/modal_app/pipeline_discovery_core.py index 6c02e5c14..0fbd8d726 100644 --- a/modal_app/pipeline_discovery_core.py +++ b/modal_app/pipeline_discovery_core.py @@ -6,7 +6,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime, timezone -from typing import Any, Callable, Iterable +from typing import Callable, Iterable, Mapping from policyengine_us_data.utils.error_redaction import ( DEFAULT_ERROR_MESSAGE_MAX_CHARS, @@ -23,6 +23,9 @@ RUN_ID_RE = re.compile(r"(usdata-gha\d+-a\d+)") DEPLOYED_STATES = {"deployed", "app_state_deployed"} +JSONDict = dict[str, object] +RawRecord = Mapping[str, object] + @dataclass(frozen=True) class PipelineAppCandidate: @@ -37,7 +40,219 @@ class PipelineAppCandidate: stopped_at: str | None = None -StatusLookup = Callable[[PipelineAppCandidate], dict[str, Any]] +@dataclass(frozen=True) +class PipelineDiscoveryFilters: + """Filters applied to the deployed pipeline run index.""" + + status: str = "" + branch: str = "" + include_unreachable: bool = True + + def to_dict(self) -> JSONDict: + return { + "status": self.status, + "branch": self.branch, + "include_unreachable": self.include_unreachable, + } + + +@dataclass(frozen=True) +class PipelineProgressSummary: + """Manifest progress for one discovered pipeline run.""" + + expected_manifests: int + present_manifests: int + missing_manifests: int + + def to_dict(self) -> JSONDict: + return { + "expected_manifests": self.expected_manifests, + "present_manifests": self.present_manifests, + "missing_manifests": self.missing_manifests, + } + + +@dataclass(frozen=True) +class LatestManifestSummary: + """Summary of the latest stage or substage manifest for one run.""" + + step_id: str | None + stage_id: str | None + substage_id: str | None + title: str | None + status: str | None + started_at: str | None + completed_at: str | None + duration_s: float | int | None + reuse_decision: str + + def to_dict(self) -> JSONDict: + return { + "step_id": self.step_id, + "stage_id": self.stage_id, + "substage_id": self.substage_id, + "title": self.title, + "status": self.status, + "started_at": self.started_at, + "completed_at": self.completed_at, + "duration_s": self.duration_s, + "reuse_decision": self.reuse_decision, + } + + +@dataclass(frozen=True) +class PipelineLookupErrorSummary: + """Bounded error surfaced while querying one discovered app.""" + + error_type: str + message: str + stage_id: str | None = None + substage_id: str | None = None + surface: str | None = None + message_truncated: bool | None = None + record_path: str | None = None + latest_path: str | None = None + traceback_available: bool = False + + def to_dict(self) -> JSONDict: + payload: JSONDict = { + "error_type": self.error_type, + "message": self.message, + "traceback_available": self.traceback_available, + } + optional: JSONDict = { + "stage_id": self.stage_id, + "substage_id": self.substage_id, + "surface": self.surface, + "message_truncated": self.message_truncated, + "record_path": self.record_path, + "latest_path": self.latest_path, + } + payload.update( + {key: value for key, value in optional.items() if value is not None} + ) + return payload + + +@dataclass(frozen=True) +class DeployedPipelineRunSummary: + """Status summary for one deployed publication pipeline app.""" + + run_id: str + status_lookup: str + status: str + message: str + modal_app_id: str + modal_app_name: str + modal_app_state: str + modal_task_count: int + modal_app_created_at: str | None = None + modal_app_stopped_at: str | None = None + branch: str | None = None + sha: str | None = None + candidate_version: str | None = None + release_version: str | None = None + started_at: str | None = None + updated_at: str | None = None + completed_at: str | None = None + modal_environment: str | None = None + hf_staging_prefix: str | None = None + github_run_url: str | None = None + latest_manifest: LatestManifestSummary | None = None + progress: PipelineProgressSummary | None = None + error: PipelineLookupErrorSummary | None = None + + @classmethod + def unreachable( + cls, + candidate: PipelineAppCandidate, + exc: BaseException, + ) -> DeployedPipelineRunSummary: + message = redacted_bounded_error_text( + f"{type(exc).__name__}: {exc}", + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ).text + return cls( + run_id=candidate.run_id, + status_lookup="unreachable", + status="unreachable", + message=message, + modal_app_id=candidate.app_id, + modal_app_name=candidate.app_name, + modal_app_state=candidate.state, + modal_task_count=candidate.task_count, + modal_app_created_at=candidate.created_at, + modal_app_stopped_at=candidate.stopped_at, + progress=PipelineProgressSummary( + expected_manifests=0, + present_manifests=0, + missing_manifests=0, + ), + error=PipelineLookupErrorSummary( + error_type=type(exc).__name__, + message=message, + ), + ) + + def to_dict(self) -> JSONDict: + return { + "modal_app_id": self.modal_app_id, + "modal_app_name": self.modal_app_name, + "modal_app_state": self.modal_app_state, + "modal_task_count": self.modal_task_count, + "modal_app_created_at": self.modal_app_created_at, + "modal_app_stopped_at": self.modal_app_stopped_at, + "run_id": self.run_id, + "status_lookup": self.status_lookup, + "status": self.status, + "message": self.message, + "branch": self.branch, + "sha": self.sha, + "candidate_version": self.candidate_version, + "release_version": self.release_version, + "started_at": self.started_at, + "updated_at": self.updated_at, + "completed_at": self.completed_at, + "modal_environment": self.modal_environment, + "hf_staging_prefix": self.hf_staging_prefix, + "github_run_url": self.github_run_url, + "latest_manifest": ( + self.latest_manifest.to_dict() if self.latest_manifest else None + ), + "progress": self.progress.to_dict() if self.progress else None, + "error": self.error.to_dict() if self.error else None, + } + + +@dataclass(frozen=True) +class DeployedPipelineRunsPayload: + """Typed cross-app pipeline run index discovered from Modal app names.""" + + schema_version: str + source: str + modal_environment: str + discovered_count: int + queried_count: int + count: int + limit: int + filters: PipelineDiscoveryFilters + runs: tuple[DeployedPipelineRunSummary, ...] + + def to_dict(self) -> JSONDict: + return { + "schema_version": self.schema_version, + "source": self.source, + "modal_environment": self.modal_environment, + "discovered_count": self.discovered_count, + "queried_count": self.queried_count, + "count": self.count, + "limit": self.limit, + "filters": self.filters.to_dict(), + "runs": [run.to_dict() for run in self.runs], + } + + +StatusLookup = Callable[[PipelineAppCandidate], RawRecord] def derive_run_id_from_app_name(app_name: str) -> str: @@ -63,18 +278,28 @@ def _bounded_limit(limit: int | str | None) -> int: return max(0, min(parsed, MAX_DISCOVERY_LIMIT)) -def _int_value(value: Any) -> int: +def _int_value(value: object) -> int: try: return int(value) except (TypeError, ValueError): return 0 -def _text_value(value: Any) -> str: +def _text_value(value: object) -> str: return "" if value is None else str(value) -def _timestamp_value(value: Any) -> str | None: +def _string_or_none(value: object) -> str | None: + if value in (None, ""): + return None + return str(value) + + +def _number_or_none(value: object) -> float | int | None: + return value if isinstance(value, (float, int)) else None + + +def _timestamp_value(value: object) -> str | None: if value in (None, ""): return None if isinstance(value, (int, float)): @@ -82,15 +307,27 @@ def _timestamp_value(value: Any) -> str | None: return str(value) -def _record_value(record: dict[str, Any], *keys: str) -> Any: +def _record_value(record: RawRecord, *keys: str) -> object: for key in keys: if key in record: return record[key] return None +def _mapping_value(record: RawRecord, key: str) -> RawRecord: + value = record.get(key) + return value if isinstance(value, Mapping) else {} + + +def _list_of_mappings(record: RawRecord, key: str) -> list[RawRecord]: + value = record.get(key) + if not isinstance(value, list): + return [] + return [item for item in value if isinstance(item, Mapping)] + + def pipeline_app_candidates( - app_records: Iterable[dict[str, Any]], + app_records: Iterable[RawRecord], ) -> list[PipelineAppCandidate]: """Return deployed publication-pipeline app candidates from Modal records.""" @@ -130,142 +367,108 @@ def pipeline_app_candidates( return candidates -def _modal_fields(candidate: PipelineAppCandidate) -> dict[str, Any]: - return { - "modal_app_id": candidate.app_id, - "modal_app_name": candidate.app_name, - "modal_app_state": candidate.state, - "modal_task_count": candidate.task_count, - "modal_app_created_at": candidate.created_at, - "modal_app_stopped_at": candidate.stopped_at, - } - - -def _latest_manifest_payload( - stage_manifests: list[dict[str, Any]], -) -> dict[str, Any] | None: +def _latest_manifest_summary( + stage_manifests: list[RawRecord], +) -> LatestManifestSummary | None: if not stage_manifests: return None item = stage_manifests[-1] - manifest = item.get("manifest") or {} - return { - "step_id": item.get("step_id"), - "stage_id": item.get("stage_id"), - "substage_id": item.get("substage_id"), - "title": item.get("title"), - "status": item.get("status"), - "started_at": manifest.get("started_at"), - "completed_at": manifest.get("completed_at"), - "duration_s": manifest.get("duration_s"), - "reuse_decision": manifest.get("reuse_decision", "not_applicable"), - } - - -def _index_error_payload(error: dict[str, Any] | None) -> dict[str, Any] | None: - if error is None: + manifest = _mapping_value(item, "manifest") + return LatestManifestSummary( + step_id=_string_or_none(item.get("step_id")), + stage_id=_string_or_none(item.get("stage_id")), + substage_id=_string_or_none(item.get("substage_id")), + title=_string_or_none(item.get("title")), + status=_string_or_none(item.get("status")), + started_at=_string_or_none(manifest.get("started_at")), + completed_at=_string_or_none(manifest.get("completed_at")), + duration_s=_number_or_none(manifest.get("duration_s")), + reuse_decision=_text_value(manifest.get("reuse_decision")) or "not_applicable", + ) + + +def _index_error_summary(error: RawRecord) -> PipelineLookupErrorSummary | None: + if not error: return None - allowed = ( - "stage_id", - "substage_id", - "surface", - "error_type", - "message", - "message_truncated", - "record_path", - "latest_path", - "traceback_available", + return PipelineLookupErrorSummary( + stage_id=_string_or_none(error.get("stage_id")), + substage_id=_string_or_none(error.get("substage_id")), + surface=_string_or_none(error.get("surface")), + error_type=_text_value(error.get("error_type")) or "Error", + message=_text_value(error.get("message")), + message_truncated=( + error.get("message_truncated") + if isinstance(error.get("message_truncated"), bool) + else None + ), + record_path=_string_or_none(error.get("record_path")), + latest_path=_string_or_none(error.get("latest_path")), + traceback_available=( + error.get("traceback_available") + if isinstance(error.get("traceback_available"), bool) + else False + ), ) - return {key: error[key] for key in allowed if key in error} -def _status_item( +def _status_summary( candidate: PipelineAppCandidate, - payload: dict[str, Any], -) -> dict[str, Any]: - run_manifest = payload.get("run_manifest") or {} - stage_manifests = payload.get("stage_manifests") or [] - missing = payload.get("missing_expected_manifest_ids") or [] - known_step_ids = run_manifest.get("known_step_ids") or [] - expected_count = max(len(known_step_ids), len(stage_manifests) + len(missing)) - status_lookup = "not_found" if payload.get("status") == "not_found" else "ok" - return { - **_modal_fields(candidate), - "run_id": payload.get("run_id") or candidate.run_id, - "status_lookup": status_lookup, - "status": payload.get("status", "unknown"), - "message": payload.get("message", ""), - "branch": run_manifest.get("branch"), - "sha": run_manifest.get("sha"), - "candidate_version": run_manifest.get("candidate_version"), - "release_version": run_manifest.get("release_version"), - "started_at": run_manifest.get("started_at"), - "updated_at": payload.get("updated_at"), - "completed_at": run_manifest.get("completed_at"), - "modal_environment": ( - payload.get("modal_environment") or run_manifest.get("modal_environment") + payload: RawRecord, +) -> DeployedPipelineRunSummary: + run_manifest = _mapping_value(payload, "run_manifest") + stage_manifests = _list_of_mappings(payload, "stage_manifests") + missing = payload.get("missing_expected_manifest_ids") + missing_count = len(missing) if isinstance(missing, list) else 0 + known_step_ids = run_manifest.get("known_step_ids") + expected_count = max( + len(known_step_ids) if isinstance(known_step_ids, list) else 0, + len(stage_manifests) + missing_count, + ) + status = _text_value(payload.get("status")) or "unknown" + status_lookup = "not_found" if status == "not_found" else "ok" + return DeployedPipelineRunSummary( + run_id=_text_value(payload.get("run_id")) or candidate.run_id, + status_lookup=status_lookup, + status=status, + message=_text_value(payload.get("message")), + modal_app_id=candidate.app_id, + modal_app_name=candidate.app_name, + modal_app_state=candidate.state, + modal_task_count=candidate.task_count, + modal_app_created_at=candidate.created_at, + modal_app_stopped_at=candidate.stopped_at, + branch=_string_or_none(run_manifest.get("branch")), + sha=_string_or_none(run_manifest.get("sha")), + candidate_version=_string_or_none(run_manifest.get("candidate_version")), + release_version=_string_or_none(run_manifest.get("release_version")), + started_at=_string_or_none(run_manifest.get("started_at")), + updated_at=_string_or_none(payload.get("updated_at")), + completed_at=_string_or_none(run_manifest.get("completed_at")), + modal_environment=_string_or_none(payload.get("modal_environment")) + or _string_or_none(run_manifest.get("modal_environment")), + hf_staging_prefix=_string_or_none(run_manifest.get("hf_staging_prefix")), + github_run_url=_string_or_none( + _mapping_value(run_manifest, "run_context").get("github_run_url") ), - "hf_staging_prefix": run_manifest.get("hf_staging_prefix"), - "github_run_url": (run_manifest.get("run_context") or {}).get("github_run_url"), - "latest_manifest": _latest_manifest_payload(stage_manifests), - "progress": { - "expected_manifests": expected_count, - "present_manifests": len(stage_manifests), - "missing_manifests": len(missing), - }, - "error": _index_error_payload(payload.get("error")), - } - - -def _lookup_error_item( - candidate: PipelineAppCandidate, - exc: BaseException, -) -> dict[str, Any]: - message = redacted_bounded_error_text( - f"{type(exc).__name__}: {exc}", - max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, - ).text - return { - **_modal_fields(candidate), - "run_id": candidate.run_id, - "status_lookup": "unreachable", - "status": "unreachable", - "message": message, - "branch": None, - "sha": None, - "candidate_version": None, - "release_version": None, - "started_at": None, - "updated_at": None, - "completed_at": None, - "modal_environment": None, - "hf_staging_prefix": None, - "github_run_url": None, - "latest_manifest": None, - "progress": { - "expected_manifests": 0, - "present_manifests": 0, - "missing_manifests": 0, - }, - "error": { - "error_type": type(exc).__name__, - "message": message, - "traceback_available": False, - }, - } + latest_manifest=_latest_manifest_summary(stage_manifests), + progress=PipelineProgressSummary( + expected_manifests=expected_count, + present_manifests=len(stage_manifests), + missing_manifests=missing_count, + ), + error=_index_error_summary(_mapping_value(payload, "error")), + ) def _passes_filters( - item: dict[str, Any], - *, - status: str, - branch: str, - include_unreachable: bool, + item: DeployedPipelineRunSummary, + filters: PipelineDiscoveryFilters, ) -> bool: - if not include_unreachable and item.get("status_lookup") != "ok": + if not filters.include_unreachable and item.status_lookup != "ok": return False - if status and item.get("status") != status: + if filters.status and item.status != filters.status: return False - if branch and item.get("branch") != branch: + if filters.branch and item.branch != filters.branch: return False return True @@ -275,17 +478,17 @@ def _lookup_items( status_lookup: StatusLookup, *, max_workers: int, -) -> list[dict[str, Any]]: +) -> list[DeployedPipelineRunSummary]: if max_workers <= 1 or len(candidates) <= 1: items = [] for candidate in candidates: try: - items.append(_status_item(candidate, status_lookup(candidate))) + items.append(_status_summary(candidate, status_lookup(candidate))) except Exception as exc: - items.append(_lookup_error_item(candidate, exc)) + items.append(DeployedPipelineRunSummary.unreachable(candidate, exc)) return items - items: list[dict[str, Any]] = [] + items: list[DeployedPipelineRunSummary] = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = { executor.submit(status_lookup, candidate): candidate @@ -294,13 +497,13 @@ def _lookup_items( for future in as_completed(futures): candidate = futures[future] try: - items.append(_status_item(candidate, future.result())) + items.append(_status_summary(candidate, future.result())) except Exception as exc: - items.append(_lookup_error_item(candidate, exc)) + items.append(DeployedPipelineRunSummary.unreachable(candidate, exc)) items.sort( key=lambda item: ( - item.get("updated_at") or item.get("modal_app_created_at") or "", - item.get("run_id") or "", + item.updated_at or item.modal_app_created_at or "", + item.run_id, ), reverse=True, ) @@ -308,7 +511,7 @@ def _lookup_items( def build_deployed_pipeline_runs_payload( - app_records: Iterable[dict[str, Any]], + app_records: Iterable[RawRecord], status_lookup: StatusLookup, *, limit: int | str | None = DEFAULT_DISCOVERY_LIMIT, @@ -317,41 +520,33 @@ def build_deployed_pipeline_runs_payload( include_unreachable: bool = True, modal_environment: str = "main", max_workers: int = DEFAULT_DISCOVERY_WORKERS, -) -> dict[str, Any]: - """Build a cross-app pipeline run index from Modal app names.""" +) -> DeployedPipelineRunsPayload: + """Build a typed cross-app pipeline run index from Modal app names.""" bounded_limit = _bounded_limit(limit) - filters = { - "status": status or "", - "branch": branch or "", - "include_unreachable": bool(include_unreachable), - } + filters = PipelineDiscoveryFilters( + status=status or "", + branch=branch or "", + include_unreachable=bool(include_unreachable), + ) candidates = pipeline_app_candidates(app_records) needs_filter_window = ( - bool(filters["status"] or filters["branch"]) - or not filters["include_unreachable"] + bool(filters.status or filters.branch) or not filters.include_unreachable ) selected_limit = MAX_DISCOVERY_LIMIT if needs_filter_window else bounded_limit selected = candidates[:selected_limit] items = _lookup_items(selected, status_lookup, max_workers=max_workers) - runs = [ - item - for item in items - if _passes_filters( - item, - status=filters["status"], - branch=filters["branch"], - include_unreachable=filters["include_unreachable"], - ) - ][:bounded_limit] - return { - "schema_version": PIPELINE_DISCOVERY_SCHEMA_VERSION, - "source": PIPELINE_DISCOVERY_SOURCE, - "modal_environment": modal_environment, - "discovered_count": len(candidates), - "queried_count": len(selected), - "count": len(runs), - "limit": bounded_limit, - "filters": filters, - "runs": runs, - } + runs = tuple(item for item in items if _passes_filters(item, filters))[ + :bounded_limit + ] + return DeployedPipelineRunsPayload( + schema_version=PIPELINE_DISCOVERY_SCHEMA_VERSION, + source=PIPELINE_DISCOVERY_SOURCE, + modal_environment=modal_environment, + discovered_count=len(candidates), + queried_count=len(selected), + count=len(runs), + limit=bounded_limit, + filters=filters, + runs=runs, + ) diff --git a/tests/unit/test_pipeline_discovery.py b/tests/unit/test_pipeline_discovery.py index 9a7409b06..7bc0159e9 100644 --- a/tests/unit/test_pipeline_discovery.py +++ b/tests/unit/test_pipeline_discovery.py @@ -1,6 +1,7 @@ import pytest from modal_app.pipeline_discovery_core import ( + DeployedPipelineRunsPayload, build_deployed_pipeline_runs_payload, derive_run_id_from_app_name, is_publication_pipeline_app_name, @@ -143,24 +144,28 @@ def lookup(candidate): ) assert seen == [(app_name, "usdata-gha26114604836-a1")] - assert payload["schema_version"] == "1" - assert payload["source"] == "modal_app_names" - assert payload["discovered_count"] == 1 - assert payload["queried_count"] == 1 - assert payload["count"] == 1 - run = payload["runs"][0] - assert run["run_id"] == "usdata-gha26114604836-a1" - assert run["status_lookup"] == "ok" - assert run["status"] == "running" - assert run["branch"] == "main" - assert run["modal_app_id"] == "ap-run" - assert run["modal_task_count"] == 4 - assert run["latest_manifest"]["step_id"] == "1_build_datasets" - assert run["progress"] == { + assert isinstance(payload, DeployedPipelineRunsPayload) + assert payload.schema_version == "1" + assert payload.source == "modal_app_names" + assert payload.discovered_count == 1 + assert payload.queried_count == 1 + assert payload.count == 1 + run = payload.runs[0] + assert run.run_id == "usdata-gha26114604836-a1" + assert run.status_lookup == "ok" + assert run.status == "running" + assert run.branch == "main" + assert run.modal_app_id == "ap-run" + assert run.modal_task_count == 4 + assert run.latest_manifest is not None + assert run.latest_manifest.step_id == "1_build_datasets" + assert run.progress is not None + assert run.progress.to_dict() == { "expected_manifests": 2, "present_manifests": 1, "missing_manifests": 1, } + assert payload.to_dict()["runs"][0]["run_id"] == "usdata-gha26114604836-a1" def test_deployed_pipeline_runs_payload_keeps_unreachable_apps_structured(): @@ -173,12 +178,13 @@ def lookup(_candidate): max_workers=1, ) - assert payload["count"] == 1 - run = payload["runs"][0] - assert run["run_id"] == "usdata-gha26114604836-a1" - assert run["status_lookup"] == "unreachable" - assert run["status"] == "unreachable" - assert run["error"]["error_type"] == "RuntimeError" + assert payload.count == 1 + run = payload.runs[0] + assert run.run_id == "usdata-gha26114604836-a1" + assert run.status_lookup == "unreachable" + assert run.status == "unreachable" + assert run.error is not None + assert run.error.error_type == "RuntimeError" def test_deployed_pipeline_runs_payload_applies_limit_after_filters(): @@ -205,14 +211,14 @@ def lookup(candidate): max_workers=1, ) - assert payload["limit"] == 1 - assert payload["filters"] == { + assert payload.limit == 1 + assert payload.filters.to_dict() == { "status": "", "branch": "main", "include_unreachable": True, } - assert payload["queried_count"] == 2 - assert [run["run_id"] for run in payload["runs"]] == ["usdata-gha1-a1"] + assert payload.queried_count == 2 + assert [run.run_id for run in payload.runs] == ["usdata-gha1-a1"] def test_deployed_pipeline_runs_payload_can_exclude_unreachable_apps(): @@ -226,7 +232,7 @@ def lookup(_candidate): max_workers=1, ) - assert payload["discovered_count"] == 1 - assert payload["queried_count"] == 1 - assert payload["count"] == 0 - assert payload["runs"] == [] + assert payload.discovered_count == 1 + assert payload.queried_count == 1 + assert payload.count == 0 + assert payload.runs == () From 11d13b905d60316732dc33ed56c4b4ced98179bb Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 19 May 2026 22:13:55 +0200 Subject: [PATCH 3/9] Use typed discovery payloads --- modal_app/pipeline_discovery.py | 13 +-- modal_app/pipeline_discovery_core.py | 117 ++++++++++++++++++++++++--- 2 files changed, 114 insertions(+), 16 deletions(-) diff --git a/modal_app/pipeline_discovery.py b/modal_app/pipeline_discovery.py index 263176db1..9ec118456 100644 --- a/modal_app/pipeline_discovery.py +++ b/modal_app/pipeline_discovery.py @@ -17,7 +17,8 @@ from modal_app.images import cpu_image as image # noqa: E402 from modal_app.pipeline_discovery_core import ( # noqa: E402 - JSONDict, + DeployedPipelineRunsPayloadDict, + ModalAppRecord, RawRecord, build_deployed_pipeline_runs_payload, ) @@ -50,7 +51,7 @@ def _modal_state_name(api_pb2, state: int) -> str: return str(state) -async def _list_modal_app_records_async(environment_name: str) -> list[JSONDict]: +async def _list_modal_app_records_async(environment_name: str) -> list[ModalAppRecord]: from modal.client import _Client from modal_proto import api_pb2 @@ -72,7 +73,7 @@ async def _list_modal_app_records_async(environment_name: str) -> list[JSONDict] ] -def _list_modal_app_records(environment_name: str) -> list[JSONDict]: +def _list_modal_app_records(environment_name: str) -> list[ModalAppRecord]: return asyncio.run(_list_modal_app_records_async(environment_name)) @@ -98,7 +99,7 @@ def _build_deployed_pipeline_runs( branch: str, include_unreachable: bool, modal_environment: str, -) -> JSONDict: +) -> DeployedPipelineRunsPayloadDict: environment_name = _modal_environment(modal_environment) app_records = _list_modal_app_records(environment_name) payload = build_deployed_pipeline_runs_payload( @@ -124,7 +125,7 @@ def list_deployed_pipeline_runs( branch: str = "", include_unreachable: bool = True, modal_environment: str = "", -) -> JSONDict: +) -> DeployedPipelineRunsPayloadDict: """Return deployed publication pipeline runs discovered from Modal app names.""" return _build_deployed_pipeline_runs( @@ -148,7 +149,7 @@ def deployed_pipeline_runs_endpoint( branch: str = "", include_unreachable: bool = True, modal_environment: str = "", -) -> JSONDict: +) -> DeployedPipelineRunsPayloadDict: """Protected HTTP endpoint for deployed publication pipeline discovery.""" return _build_deployed_pipeline_runs( diff --git a/modal_app/pipeline_discovery_core.py b/modal_app/pipeline_discovery_core.py index 0fbd8d726..ed6f2e6a3 100644 --- a/modal_app/pipeline_discovery_core.py +++ b/modal_app/pipeline_discovery_core.py @@ -6,7 +6,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime, timezone -from typing import Callable, Iterable, Mapping +from typing import Callable, Iterable, Mapping, NotRequired, TypedDict from policyengine_us_data.utils.error_redaction import ( DEFAULT_ERROR_MESSAGE_MAX_CHARS, @@ -23,10 +23,107 @@ RUN_ID_RE = re.compile(r"(usdata-gha\d+-a\d+)") DEPLOYED_STATES = {"deployed", "app_state_deployed"} -JSONDict = dict[str, object] RawRecord = Mapping[str, object] +class PipelineDiscoveryFiltersPayload(TypedDict): + """Serialized discovery filters.""" + + status: str + branch: str + include_unreachable: bool + + +class PipelineProgressSummaryPayload(TypedDict): + """Serialized manifest progress for one run.""" + + expected_manifests: int + present_manifests: int + missing_manifests: int + + +class LatestManifestSummaryPayload(TypedDict): + """Serialized latest manifest summary.""" + + step_id: str | None + stage_id: str | None + substage_id: str | None + title: str | None + status: str | None + started_at: str | None + completed_at: str | None + duration_s: float | int | None + reuse_decision: str + + +class PipelineLookupErrorPayload(TypedDict): + """Serialized lookup or pipeline error summary.""" + + error_type: str + message: str + traceback_available: bool + stage_id: NotRequired[str] + substage_id: NotRequired[str] + surface: NotRequired[str] + message_truncated: NotRequired[bool] + record_path: NotRequired[str] + latest_path: NotRequired[str] + + +class DeployedPipelineRunPayload(TypedDict): + """Serialized status summary for one deployed publication pipeline app.""" + + modal_app_id: str + modal_app_name: str + modal_app_state: str + modal_task_count: int + modal_app_created_at: str | None + modal_app_stopped_at: str | None + run_id: str + status_lookup: str + status: str + message: str + branch: str | None + sha: str | None + candidate_version: str | None + release_version: str | None + started_at: str | None + updated_at: str | None + completed_at: str | None + modal_environment: str | None + hf_staging_prefix: str | None + github_run_url: str | None + latest_manifest: LatestManifestSummaryPayload | None + progress: PipelineProgressSummaryPayload | None + error: PipelineLookupErrorPayload | None + + +class DeployedPipelineRunsPayloadDict(TypedDict): + """Serialized cross-app pipeline run index.""" + + schema_version: str + source: str + modal_environment: str + discovered_count: int + queried_count: int + count: int + limit: int + filters: PipelineDiscoveryFiltersPayload + runs: list[DeployedPipelineRunPayload] + + +class ModalAppRecord(TypedDict): + """Normalized Modal app-list record consumed by discovery core.""" + + app_id: str + name: str + description: str + state: str + tasks: int + created_at: float | str | None + stopped_at: float | str | None + + @dataclass(frozen=True) class PipelineAppCandidate: """A deployed Modal app that appears to represent one publication run.""" @@ -48,7 +145,7 @@ class PipelineDiscoveryFilters: branch: str = "" include_unreachable: bool = True - def to_dict(self) -> JSONDict: + def to_dict(self) -> PipelineDiscoveryFiltersPayload: return { "status": self.status, "branch": self.branch, @@ -64,7 +161,7 @@ class PipelineProgressSummary: present_manifests: int missing_manifests: int - def to_dict(self) -> JSONDict: + def to_dict(self) -> PipelineProgressSummaryPayload: return { "expected_manifests": self.expected_manifests, "present_manifests": self.present_manifests, @@ -86,7 +183,7 @@ class LatestManifestSummary: duration_s: float | int | None reuse_decision: str - def to_dict(self) -> JSONDict: + def to_dict(self) -> LatestManifestSummaryPayload: return { "step_id": self.step_id, "stage_id": self.stage_id, @@ -114,13 +211,13 @@ class PipelineLookupErrorSummary: latest_path: str | None = None traceback_available: bool = False - def to_dict(self) -> JSONDict: - payload: JSONDict = { + def to_dict(self) -> PipelineLookupErrorPayload: + payload: PipelineLookupErrorPayload = { "error_type": self.error_type, "message": self.message, "traceback_available": self.traceback_available, } - optional: JSONDict = { + optional = { "stage_id": self.stage_id, "substage_id": self.substage_id, "surface": self.surface, @@ -194,7 +291,7 @@ def unreachable( ), ) - def to_dict(self) -> JSONDict: + def to_dict(self) -> DeployedPipelineRunPayload: return { "modal_app_id": self.modal_app_id, "modal_app_name": self.modal_app_name, @@ -238,7 +335,7 @@ class DeployedPipelineRunsPayload: filters: PipelineDiscoveryFilters runs: tuple[DeployedPipelineRunSummary, ...] - def to_dict(self) -> JSONDict: + def to_dict(self) -> DeployedPipelineRunsPayloadDict: return { "schema_version": self.schema_version, "source": self.source, From dab8c260f2304355fdf140760f75c46a77a68118 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 19 May 2026 22:25:20 +0200 Subject: [PATCH 4/9] Attach Modal discovery credentials --- .github/scripts/sync_modal_secrets.py | 14 + .github/workflows/pipeline.yaml | 1 + .../engineering/skills/pipeline_operations.md | 5 + modal_app/pipeline.py | 2 + modal_app/pipeline_discovery.py | 5 +- modal_app/pipeline_discovery_core.py | 363 +++--------------- modal_app/pipeline_discovery_schema.py | 302 +++++++++++++++ .../integration/test_modal_pipeline_seams.py | 6 + tests/unit/test_pipeline_discovery.py | 6 +- 9 files changed, 381 insertions(+), 323 deletions(-) create mode 100644 modal_app/pipeline_discovery_schema.py diff --git a/.github/scripts/sync_modal_secrets.py b/.github/scripts/sync_modal_secrets.py index fd5ae430f..7a4cbc43e 100644 --- a/.github/scripts/sync_modal_secrets.py +++ b/.github/scripts/sync_modal_secrets.py @@ -35,6 +35,20 @@ def main() -> None: ], ) create_secret_with_retry( + [ + "modal", + "secret", + "create", + "--env", + env_name, + "--force", + "modal-token", + f"MODAL_TOKEN_ID={os.environ['MODAL_TOKEN_ID']}", + f"MODAL_TOKEN_SECRET={os.environ['MODAL_TOKEN_SECRET']}", + ], + check=True, + ) + subprocess.run( [ "modal", "secret", diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 97fe46870..04e54b427 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -132,6 +132,7 @@ jobs: PARALLEL_MATRIX: ${{ inputs.parallel_matrix || 'false' }} NUM_MATRIX_WORKERS: ${{ inputs.num_matrix_workers || '50' }} run: | + modal secret create --env="${MODAL_ENVIRONMENT}" --force modal-token MODAL_TOKEN_ID="${MODAL_TOKEN_ID}" MODAL_TOKEN_SECRET="${MODAL_TOKEN_SECRET}" modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline_discovery.py modal deploy --env="${MODAL_ENVIRONMENT}" --name="${US_DATA_MODAL_APP_NAME}" --tag="${US_DATA_RUN_ID}" modal_app/pipeline.py python .github/scripts/spawn_modal_pipeline.py diff --git a/docs/engineering/skills/pipeline_operations.md b/docs/engineering/skills/pipeline_operations.md index d2155b97d..053e1105c 100644 --- a/docs/engineering/skills/pipeline_operations.md +++ b/docs/engineering/skills/pipeline_operations.md @@ -46,6 +46,11 @@ app, not in a run-scoped pipeline app: cross-app discovery payload. Use this for dashboards that need to discover all deployed publication runs. +The stable discovery app requires the `modal-token` Modal Secret in its +environment. That Secret must contain `MODAL_TOKEN_ID` and +`MODAL_TOKEN_SECRET`, and it should be attached only to functions that need +Modal control-plane access. + ## Fetch Status First identify the run context from the GitHub Actions summary, workflow logs, or diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index 228350cad..1a70d851a 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -545,6 +545,7 @@ def verify_runtime_seams() -> dict: "modal_app/local_area.py", "modal_app/pipeline_discovery.py", "modal_app/pipeline_discovery_core.py", + "modal_app/pipeline_discovery_schema.py", "modal_app/pipeline_status.py", "modal_app/h5_test_harness.py", "modal_app/step_manifests/specs.py", @@ -596,6 +597,7 @@ def verify_runtime_seams() -> dict: "modal_app.local_area", "modal_app.pipeline_discovery", "modal_app.pipeline_discovery_core", + "modal_app.pipeline_discovery_schema", "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", diff --git a/modal_app/pipeline_discovery.py b/modal_app/pipeline_discovery.py index 9ec118456..0b573b6b3 100644 --- a/modal_app/pipeline_discovery.py +++ b/modal_app/pipeline_discovery.py @@ -28,6 +28,7 @@ DISCOVERY_APP_NAME = "policyengine-us-data-pipeline-status" +modal_token_secret = modal.Secret.from_name("modal-token") app = modal.App( os.environ.get("US_DATA_PIPELINE_DISCOVERY_APP_NAME") or DISCOVERY_APP_NAME @@ -118,7 +119,7 @@ def _build_deployed_pipeline_runs( return payload.to_dict() -@app.function(image=image, timeout=180) +@app.function(image=image, timeout=180, secrets=[modal_token_secret]) def list_deployed_pipeline_runs( limit: int = 25, status: str = "", @@ -137,7 +138,7 @@ def list_deployed_pipeline_runs( ) -@app.function(image=status_image, timeout=180) +@app.function(image=status_image, timeout=180, secrets=[modal_token_secret]) @modal.fastapi_endpoint( method="GET", docs=False, diff --git a/modal_app/pipeline_discovery_core.py b/modal_app/pipeline_discovery_core.py index ed6f2e6a3..bb5edf131 100644 --- a/modal_app/pipeline_discovery_core.py +++ b/modal_app/pipeline_discovery_core.py @@ -6,11 +6,15 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime, timezone -from typing import Callable, Iterable, Mapping, NotRequired, TypedDict - -from policyengine_us_data.utils.error_redaction import ( - DEFAULT_ERROR_MESSAGE_MAX_CHARS, - redacted_bounded_error_text, +from typing import Callable, Iterable, Mapping + +from modal_app.pipeline_discovery_schema import ( + DeployedPipelineRunSummary, + DeployedPipelineRunsPayload, + LatestManifestSummary, + PipelineDiscoveryFilters, + PipelineLookupErrorSummary, + PipelineProgressSummary, ) @@ -19,111 +23,13 @@ DEFAULT_DISCOVERY_LIMIT = 25 MAX_DISCOVERY_LIMIT = 100 DEFAULT_DISCOVERY_WORKERS = 8 -PUBLICATION_APP_PREFIXES = ("us-data-", "policyengine-us-data-pub-") RUN_ID_RE = re.compile(r"(usdata-gha\d+-a\d+)") DEPLOYED_STATES = {"deployed", "app_state_deployed"} +EXCLUDED_APP_NAME_PATTERNS = ("-pipeline-pr-", "-local-area-pr-", "-h5-pr-") RawRecord = Mapping[str, object] -class PipelineDiscoveryFiltersPayload(TypedDict): - """Serialized discovery filters.""" - - status: str - branch: str - include_unreachable: bool - - -class PipelineProgressSummaryPayload(TypedDict): - """Serialized manifest progress for one run.""" - - expected_manifests: int - present_manifests: int - missing_manifests: int - - -class LatestManifestSummaryPayload(TypedDict): - """Serialized latest manifest summary.""" - - step_id: str | None - stage_id: str | None - substage_id: str | None - title: str | None - status: str | None - started_at: str | None - completed_at: str | None - duration_s: float | int | None - reuse_decision: str - - -class PipelineLookupErrorPayload(TypedDict): - """Serialized lookup or pipeline error summary.""" - - error_type: str - message: str - traceback_available: bool - stage_id: NotRequired[str] - substage_id: NotRequired[str] - surface: NotRequired[str] - message_truncated: NotRequired[bool] - record_path: NotRequired[str] - latest_path: NotRequired[str] - - -class DeployedPipelineRunPayload(TypedDict): - """Serialized status summary for one deployed publication pipeline app.""" - - modal_app_id: str - modal_app_name: str - modal_app_state: str - modal_task_count: int - modal_app_created_at: str | None - modal_app_stopped_at: str | None - run_id: str - status_lookup: str - status: str - message: str - branch: str | None - sha: str | None - candidate_version: str | None - release_version: str | None - started_at: str | None - updated_at: str | None - completed_at: str | None - modal_environment: str | None - hf_staging_prefix: str | None - github_run_url: str | None - latest_manifest: LatestManifestSummaryPayload | None - progress: PipelineProgressSummaryPayload | None - error: PipelineLookupErrorPayload | None - - -class DeployedPipelineRunsPayloadDict(TypedDict): - """Serialized cross-app pipeline run index.""" - - schema_version: str - source: str - modal_environment: str - discovered_count: int - queried_count: int - count: int - limit: int - filters: PipelineDiscoveryFiltersPayload - runs: list[DeployedPipelineRunPayload] - - -class ModalAppRecord(TypedDict): - """Normalized Modal app-list record consumed by discovery core.""" - - app_id: str - name: str - description: str - state: str - tasks: int - created_at: float | str | None - stopped_at: float | str | None - - @dataclass(frozen=True) class PipelineAppCandidate: """A deployed Modal app that appears to represent one publication run.""" @@ -137,218 +43,6 @@ class PipelineAppCandidate: stopped_at: str | None = None -@dataclass(frozen=True) -class PipelineDiscoveryFilters: - """Filters applied to the deployed pipeline run index.""" - - status: str = "" - branch: str = "" - include_unreachable: bool = True - - def to_dict(self) -> PipelineDiscoveryFiltersPayload: - return { - "status": self.status, - "branch": self.branch, - "include_unreachable": self.include_unreachable, - } - - -@dataclass(frozen=True) -class PipelineProgressSummary: - """Manifest progress for one discovered pipeline run.""" - - expected_manifests: int - present_manifests: int - missing_manifests: int - - def to_dict(self) -> PipelineProgressSummaryPayload: - return { - "expected_manifests": self.expected_manifests, - "present_manifests": self.present_manifests, - "missing_manifests": self.missing_manifests, - } - - -@dataclass(frozen=True) -class LatestManifestSummary: - """Summary of the latest stage or substage manifest for one run.""" - - step_id: str | None - stage_id: str | None - substage_id: str | None - title: str | None - status: str | None - started_at: str | None - completed_at: str | None - duration_s: float | int | None - reuse_decision: str - - def to_dict(self) -> LatestManifestSummaryPayload: - return { - "step_id": self.step_id, - "stage_id": self.stage_id, - "substage_id": self.substage_id, - "title": self.title, - "status": self.status, - "started_at": self.started_at, - "completed_at": self.completed_at, - "duration_s": self.duration_s, - "reuse_decision": self.reuse_decision, - } - - -@dataclass(frozen=True) -class PipelineLookupErrorSummary: - """Bounded error surfaced while querying one discovered app.""" - - error_type: str - message: str - stage_id: str | None = None - substage_id: str | None = None - surface: str | None = None - message_truncated: bool | None = None - record_path: str | None = None - latest_path: str | None = None - traceback_available: bool = False - - def to_dict(self) -> PipelineLookupErrorPayload: - payload: PipelineLookupErrorPayload = { - "error_type": self.error_type, - "message": self.message, - "traceback_available": self.traceback_available, - } - optional = { - "stage_id": self.stage_id, - "substage_id": self.substage_id, - "surface": self.surface, - "message_truncated": self.message_truncated, - "record_path": self.record_path, - "latest_path": self.latest_path, - } - payload.update( - {key: value for key, value in optional.items() if value is not None} - ) - return payload - - -@dataclass(frozen=True) -class DeployedPipelineRunSummary: - """Status summary for one deployed publication pipeline app.""" - - run_id: str - status_lookup: str - status: str - message: str - modal_app_id: str - modal_app_name: str - modal_app_state: str - modal_task_count: int - modal_app_created_at: str | None = None - modal_app_stopped_at: str | None = None - branch: str | None = None - sha: str | None = None - candidate_version: str | None = None - release_version: str | None = None - started_at: str | None = None - updated_at: str | None = None - completed_at: str | None = None - modal_environment: str | None = None - hf_staging_prefix: str | None = None - github_run_url: str | None = None - latest_manifest: LatestManifestSummary | None = None - progress: PipelineProgressSummary | None = None - error: PipelineLookupErrorSummary | None = None - - @classmethod - def unreachable( - cls, - candidate: PipelineAppCandidate, - exc: BaseException, - ) -> DeployedPipelineRunSummary: - message = redacted_bounded_error_text( - f"{type(exc).__name__}: {exc}", - max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, - ).text - return cls( - run_id=candidate.run_id, - status_lookup="unreachable", - status="unreachable", - message=message, - modal_app_id=candidate.app_id, - modal_app_name=candidate.app_name, - modal_app_state=candidate.state, - modal_task_count=candidate.task_count, - modal_app_created_at=candidate.created_at, - modal_app_stopped_at=candidate.stopped_at, - progress=PipelineProgressSummary( - expected_manifests=0, - present_manifests=0, - missing_manifests=0, - ), - error=PipelineLookupErrorSummary( - error_type=type(exc).__name__, - message=message, - ), - ) - - def to_dict(self) -> DeployedPipelineRunPayload: - return { - "modal_app_id": self.modal_app_id, - "modal_app_name": self.modal_app_name, - "modal_app_state": self.modal_app_state, - "modal_task_count": self.modal_task_count, - "modal_app_created_at": self.modal_app_created_at, - "modal_app_stopped_at": self.modal_app_stopped_at, - "run_id": self.run_id, - "status_lookup": self.status_lookup, - "status": self.status, - "message": self.message, - "branch": self.branch, - "sha": self.sha, - "candidate_version": self.candidate_version, - "release_version": self.release_version, - "started_at": self.started_at, - "updated_at": self.updated_at, - "completed_at": self.completed_at, - "modal_environment": self.modal_environment, - "hf_staging_prefix": self.hf_staging_prefix, - "github_run_url": self.github_run_url, - "latest_manifest": ( - self.latest_manifest.to_dict() if self.latest_manifest else None - ), - "progress": self.progress.to_dict() if self.progress else None, - "error": self.error.to_dict() if self.error else None, - } - - -@dataclass(frozen=True) -class DeployedPipelineRunsPayload: - """Typed cross-app pipeline run index discovered from Modal app names.""" - - schema_version: str - source: str - modal_environment: str - discovered_count: int - queried_count: int - count: int - limit: int - filters: PipelineDiscoveryFilters - runs: tuple[DeployedPipelineRunSummary, ...] - - def to_dict(self) -> DeployedPipelineRunsPayloadDict: - return { - "schema_version": self.schema_version, - "source": self.source, - "modal_environment": self.modal_environment, - "discovered_count": self.discovered_count, - "queried_count": self.queried_count, - "count": self.count, - "limit": self.limit, - "filters": self.filters.to_dict(), - "runs": [run.to_dict() for run in self.runs], - } - - StatusLookup = Callable[[PipelineAppCandidate], RawRecord] @@ -362,9 +56,13 @@ def derive_run_id_from_app_name(app_name: str) -> str: def is_publication_pipeline_app_name(app_name: str) -> bool: """Return whether an app name belongs to a publication pipeline run.""" - if not app_name.startswith(PUBLICATION_APP_PREFIXES): + if not derive_run_id_from_app_name(app_name): + return False + if not app_name.startswith(("us-data-", "policyengine-us-data-")): return False - return bool(derive_run_id_from_app_name(app_name)) + if any(pattern in app_name for pattern in EXCLUDED_APP_NAME_PATTERNS): + return False + return True def _bounded_limit(limit: int | str | None) -> int: @@ -557,6 +255,31 @@ def _status_summary( ) +def _unreachable_summary( + candidate: PipelineAppCandidate, + exc: BaseException, +) -> DeployedPipelineRunSummary: + error = PipelineLookupErrorSummary.from_exception(exc) + return DeployedPipelineRunSummary( + run_id=candidate.run_id, + status_lookup="unreachable", + status="unreachable", + message=error.message, + modal_app_id=candidate.app_id, + modal_app_name=candidate.app_name, + modal_app_state=candidate.state, + modal_task_count=candidate.task_count, + modal_app_created_at=candidate.created_at, + modal_app_stopped_at=candidate.stopped_at, + progress=PipelineProgressSummary( + expected_manifests=0, + present_manifests=0, + missing_manifests=0, + ), + error=error, + ) + + def _passes_filters( item: DeployedPipelineRunSummary, filters: PipelineDiscoveryFilters, @@ -582,7 +305,7 @@ def _lookup_items( try: items.append(_status_summary(candidate, status_lookup(candidate))) except Exception as exc: - items.append(DeployedPipelineRunSummary.unreachable(candidate, exc)) + items.append(_unreachable_summary(candidate, exc)) return items items: list[DeployedPipelineRunSummary] = [] @@ -596,7 +319,7 @@ def _lookup_items( try: items.append(_status_summary(candidate, future.result())) except Exception as exc: - items.append(DeployedPipelineRunSummary.unreachable(candidate, exc)) + items.append(_unreachable_summary(candidate, exc)) items.sort( key=lambda item: ( item.updated_at or item.modal_app_created_at or "", diff --git a/modal_app/pipeline_discovery_schema.py b/modal_app/pipeline_discovery_schema.py new file mode 100644 index 000000000..5fbb02ba3 --- /dev/null +++ b/modal_app/pipeline_discovery_schema.py @@ -0,0 +1,302 @@ +"""Typed result schemas for deployed pipeline discovery.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import NotRequired, TypedDict + +from policyengine_us_data.utils.error_redaction import ( + DEFAULT_ERROR_MESSAGE_MAX_CHARS, + redacted_bounded_error_text, +) + + +class PipelineDiscoveryFiltersPayload(TypedDict): + """Serialized discovery filters.""" + + status: str + branch: str + include_unreachable: bool + + +class PipelineProgressSummaryPayload(TypedDict): + """Serialized manifest progress for one run.""" + + expected_manifests: int + present_manifests: int + missing_manifests: int + + +class LatestManifestSummaryPayload(TypedDict): + """Serialized latest manifest summary.""" + + step_id: str | None + stage_id: str | None + substage_id: str | None + title: str | None + status: str | None + started_at: str | None + completed_at: str | None + duration_s: float | int | None + reuse_decision: str + + +class PipelineLookupErrorPayload(TypedDict): + """Serialized lookup or pipeline error summary.""" + + error_type: str + message: str + traceback_available: bool + stage_id: NotRequired[str] + substage_id: NotRequired[str] + surface: NotRequired[str] + message_truncated: NotRequired[bool] + record_path: NotRequired[str] + latest_path: NotRequired[str] + + +class DeployedPipelineRunPayload(TypedDict): + """Serialized status summary for one deployed publication pipeline app.""" + + modal_app_id: str + modal_app_name: str + modal_app_state: str + modal_task_count: int + modal_app_created_at: str | None + modal_app_stopped_at: str | None + run_id: str + status_lookup: str + status: str + message: str + branch: str | None + sha: str | None + candidate_version: str | None + release_version: str | None + started_at: str | None + updated_at: str | None + completed_at: str | None + modal_environment: str | None + hf_staging_prefix: str | None + github_run_url: str | None + latest_manifest: LatestManifestSummaryPayload | None + progress: PipelineProgressSummaryPayload | None + error: PipelineLookupErrorPayload | None + + +class DeployedPipelineRunsPayloadDict(TypedDict): + """Serialized cross-app pipeline run index.""" + + schema_version: str + source: str + modal_environment: str + discovered_count: int + queried_count: int + count: int + limit: int + filters: PipelineDiscoveryFiltersPayload + runs: list[DeployedPipelineRunPayload] + + +class ModalAppRecord(TypedDict): + """Normalized Modal app-list record consumed by discovery core.""" + + app_id: str + name: str + description: str + state: str + tasks: int + created_at: float | str | None + stopped_at: float | str | None + + +@dataclass(frozen=True) +class PipelineDiscoveryFilters: + """Filters applied to the deployed pipeline run index.""" + + status: str = "" + branch: str = "" + include_unreachable: bool = True + + def to_dict(self) -> PipelineDiscoveryFiltersPayload: + return { + "status": self.status, + "branch": self.branch, + "include_unreachable": self.include_unreachable, + } + + +@dataclass(frozen=True) +class PipelineProgressSummary: + """Manifest progress for one discovered pipeline run.""" + + expected_manifests: int + present_manifests: int + missing_manifests: int + + def to_dict(self) -> PipelineProgressSummaryPayload: + return { + "expected_manifests": self.expected_manifests, + "present_manifests": self.present_manifests, + "missing_manifests": self.missing_manifests, + } + + +@dataclass(frozen=True) +class LatestManifestSummary: + """Summary of the latest stage or substage manifest for one run.""" + + step_id: str | None + stage_id: str | None + substage_id: str | None + title: str | None + status: str | None + started_at: str | None + completed_at: str | None + duration_s: float | int | None + reuse_decision: str + + def to_dict(self) -> LatestManifestSummaryPayload: + return { + "step_id": self.step_id, + "stage_id": self.stage_id, + "substage_id": self.substage_id, + "title": self.title, + "status": self.status, + "started_at": self.started_at, + "completed_at": self.completed_at, + "duration_s": self.duration_s, + "reuse_decision": self.reuse_decision, + } + + +@dataclass(frozen=True) +class PipelineLookupErrorSummary: + """Bounded error surfaced while querying one discovered app.""" + + error_type: str + message: str + stage_id: str | None = None + substage_id: str | None = None + surface: str | None = None + message_truncated: bool | None = None + record_path: str | None = None + latest_path: str | None = None + traceback_available: bool = False + + @classmethod + def from_exception(cls, exc: BaseException) -> PipelineLookupErrorSummary: + """Build a bounded lookup-error summary from an exception.""" + + message = redacted_bounded_error_text( + f"{type(exc).__name__}: {exc}", + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ).text + return cls( + error_type=type(exc).__name__, + message=message, + ) + + def to_dict(self) -> PipelineLookupErrorPayload: + payload: PipelineLookupErrorPayload = { + "error_type": self.error_type, + "message": self.message, + "traceback_available": self.traceback_available, + } + optional = { + "stage_id": self.stage_id, + "substage_id": self.substage_id, + "surface": self.surface, + "message_truncated": self.message_truncated, + "record_path": self.record_path, + "latest_path": self.latest_path, + } + payload.update( + {key: value for key, value in optional.items() if value is not None} + ) + return payload + + +@dataclass(frozen=True) +class DeployedPipelineRunSummary: + """Status summary for one deployed publication pipeline app.""" + + run_id: str + status_lookup: str + status: str + message: str + modal_app_id: str + modal_app_name: str + modal_app_state: str + modal_task_count: int + modal_app_created_at: str | None = None + modal_app_stopped_at: str | None = None + branch: str | None = None + sha: str | None = None + candidate_version: str | None = None + release_version: str | None = None + started_at: str | None = None + updated_at: str | None = None + completed_at: str | None = None + modal_environment: str | None = None + hf_staging_prefix: str | None = None + github_run_url: str | None = None + latest_manifest: LatestManifestSummary | None = None + progress: PipelineProgressSummary | None = None + error: PipelineLookupErrorSummary | None = None + + def to_dict(self) -> DeployedPipelineRunPayload: + return { + "modal_app_id": self.modal_app_id, + "modal_app_name": self.modal_app_name, + "modal_app_state": self.modal_app_state, + "modal_task_count": self.modal_task_count, + "modal_app_created_at": self.modal_app_created_at, + "modal_app_stopped_at": self.modal_app_stopped_at, + "run_id": self.run_id, + "status_lookup": self.status_lookup, + "status": self.status, + "message": self.message, + "branch": self.branch, + "sha": self.sha, + "candidate_version": self.candidate_version, + "release_version": self.release_version, + "started_at": self.started_at, + "updated_at": self.updated_at, + "completed_at": self.completed_at, + "modal_environment": self.modal_environment, + "hf_staging_prefix": self.hf_staging_prefix, + "github_run_url": self.github_run_url, + "latest_manifest": ( + self.latest_manifest.to_dict() if self.latest_manifest else None + ), + "progress": self.progress.to_dict() if self.progress else None, + "error": self.error.to_dict() if self.error else None, + } + + +@dataclass(frozen=True) +class DeployedPipelineRunsPayload: + """Typed cross-app pipeline run index discovered from Modal app names.""" + + schema_version: str + source: str + modal_environment: str + discovered_count: int + queried_count: int + count: int + limit: int + filters: PipelineDiscoveryFilters + runs: tuple[DeployedPipelineRunSummary, ...] + + def to_dict(self) -> DeployedPipelineRunsPayloadDict: + return { + "schema_version": self.schema_version, + "source": self.source, + "modal_environment": self.modal_environment, + "discovered_count": self.discovered_count, + "queried_count": self.queried_count, + "count": self.count, + "limit": self.limit, + "filters": self.filters.to_dict(), + "runs": [run.to_dict() for run in self.runs], + } diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index beca88f63..f0a6033f8 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -57,6 +57,7 @@ def test_pipeline_image_runtime_seams(): "modal_app/local_area.py": True, "modal_app/pipeline_discovery.py": True, "modal_app/pipeline_discovery_core.py": True, + "modal_app/pipeline_discovery_schema.py": True, "modal_app/pipeline_status.py": True, "modal_app/h5_test_harness.py": True, "modal_app/step_manifests/specs.py": True, @@ -87,6 +88,7 @@ def test_pipeline_image_runtime_seams(): "modal_app.local_area", "modal_app.pipeline_discovery", "modal_app.pipeline_discovery_core", + "modal_app.pipeline_discovery_schema", "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", @@ -163,6 +165,8 @@ def test_pipeline_discovery_callable_reports_structured_index(): assert result["schema_version"] == "1" assert result["source"] == "modal_app_names" assert result["modal_environment"] == MODAL_ENVIRONMENT + assert isinstance(result["discovered_count"], int) + assert isinstance(result["queried_count"], int) assert result["limit"] == 1 assert result["count"] <= 1 assert isinstance(result["runs"], list) @@ -252,6 +256,8 @@ def test_pipeline_discovery_http_endpoint_reports_structured_index(): assert result["schema_version"] == "1" assert result["source"] == "modal_app_names" assert result["modal_environment"] == MODAL_ENVIRONMENT + assert isinstance(result["discovered_count"], int) + assert isinstance(result["queried_count"], int) assert result["limit"] == 1 assert result["count"] <= 1 assert isinstance(result["runs"], list) diff --git a/tests/unit/test_pipeline_discovery.py b/tests/unit/test_pipeline_discovery.py index 7bc0159e9..f9a0061f2 100644 --- a/tests/unit/test_pipeline_discovery.py +++ b/tests/unit/test_pipeline_discovery.py @@ -1,12 +1,14 @@ import pytest from modal_app.pipeline_discovery_core import ( - DeployedPipelineRunsPayload, build_deployed_pipeline_runs_payload, derive_run_id_from_app_name, is_publication_pipeline_app_name, pipeline_app_candidates, ) +from modal_app.pipeline_discovery_schema import ( + DeployedPipelineRunsPayload, +) def _app_record( @@ -94,8 +96,10 @@ def test_derives_run_id_from_current_and_legacy_publication_app_names(): [ ("us-data-1-115-4-minor-usdata-gha26114604836-a1", True), ("policyengine-us-data-pub-usdata-gha123-a1", True), + ("policyengine-us-data-1-115-4-minor-usdata-gha123-a1", True), ("us-data-pipeline-pr-1035-26117326123-1", False), ("us-data-local-area-pr-1035-26117326123-1", False), + ("us-data-h5-pr-1035-26117326123-1", False), ("policyengine-us-data-pipeline", False), ("state-research-tracker", False), ], From f994da6e01e2939a64cf3100d25853de73995780 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 20 May 2026 15:55:27 +0200 Subject: [PATCH 5/9] Fix Modal secret sync retry call --- .github/scripts/sync_modal_secrets.py | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/scripts/sync_modal_secrets.py b/.github/scripts/sync_modal_secrets.py index 7a4cbc43e..272e13077 100644 --- a/.github/scripts/sync_modal_secrets.py +++ b/.github/scripts/sync_modal_secrets.py @@ -46,7 +46,6 @@ def main() -> None: f"MODAL_TOKEN_ID={os.environ['MODAL_TOKEN_ID']}", f"MODAL_TOKEN_SECRET={os.environ['MODAL_TOKEN_SECRET']}", ], - check=True, ) subprocess.run( [ From e4f4e3c996b699fd9b7062a38895b591d2011f92 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 20 May 2026 16:39:28 +0200 Subject: [PATCH 6/9] Fix pipeline discovery entrypoint imports --- modal_app/pipeline_discovery.py | 6 ++- tests/unit/test_pipeline_discovery.py | 60 +++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 2 deletions(-) diff --git a/modal_app/pipeline_discovery.py b/modal_app/pipeline_discovery.py index 0b573b6b3..9405be9ce 100644 --- a/modal_app/pipeline_discovery.py +++ b/modal_app/pipeline_discovery.py @@ -17,11 +17,13 @@ from modal_app.images import cpu_image as image # noqa: E402 from modal_app.pipeline_discovery_core import ( # noqa: E402 - DeployedPipelineRunsPayloadDict, - ModalAppRecord, RawRecord, build_deployed_pipeline_runs_payload, ) +from modal_app.pipeline_discovery_schema import ( # noqa: E402 + DeployedPipelineRunsPayloadDict, + ModalAppRecord, +) from policyengine_us_data.utils.run_context import ( # noqa: E402 DEFAULT_MODAL_ENVIRONMENT, ) diff --git a/tests/unit/test_pipeline_discovery.py b/tests/unit/test_pipeline_discovery.py index f9a0061f2..d27bf2391 100644 --- a/tests/unit/test_pipeline_discovery.py +++ b/tests/unit/test_pipeline_discovery.py @@ -1,3 +1,7 @@ +import importlib +import sys +from types import SimpleNamespace + import pytest from modal_app.pipeline_discovery_core import ( @@ -11,6 +15,62 @@ ) +class _FakeModalImage: + def apt_install(self, *_args, **_kwargs): + return self + + def uv_sync(self, *_args, **_kwargs): + return self + + def add_local_dir(self, *_args, **_kwargs): + return self + + def workdir(self, *_args, **_kwargs): + return self + + def env(self, *_args, **_kwargs): + return self + + def pip_install(self, *_args, **_kwargs): + return self + + +class _FakeModalApp: + def __init__(self, *_args, **_kwargs): + pass + + def function(self, *_args, **_kwargs): + return lambda fn: fn + + +class _FakeModalSecret: + @staticmethod + def from_name(_name): + return object() + + +def _fake_modal_module(): + return SimpleNamespace( + App=_FakeModalApp, + Image=SimpleNamespace(debian_slim=lambda **_kwargs: _FakeModalImage()), + Secret=_FakeModalSecret, + fastapi_endpoint=lambda *_args, **_kwargs: (lambda fn: fn), + ) + + +def test_pipeline_discovery_entrypoint_imports_with_typed_schema(monkeypatch): + """Catch Modal deploy entrypoint import drift before deployment.""" + + monkeypatch.setitem(sys.modules, "modal", _fake_modal_module()) + monkeypatch.delitem(sys.modules, "modal_app.images", raising=False) + monkeypatch.delitem(sys.modules, "modal_app.pipeline_discovery", raising=False) + + module = importlib.import_module("modal_app.pipeline_discovery") + + assert module.list_deployed_pipeline_runs is not None + assert module.deployed_pipeline_runs_endpoint is not None + + def _app_record( name: str, *, From 1ac2d62198e4297e2ec9e852c509badc80eeeb0c Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 20 May 2026 16:49:26 +0200 Subject: [PATCH 7/9] Rerun PR checks after lint verification From 2ae875eb66e36d67e4429a1ee427d234ba46daf2 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 20 May 2026 16:59:13 +0200 Subject: [PATCH 8/9] Run repo-wide lint and document lint policy --- AGENTS.md | 4 ++++ docs/engineering/skills/github-prs.md | 6 +++++- docs/engineering/skills/testing.md | 11 +++++++++++ tests/unit/test_pipeline_discovery.py | 2 +- 4 files changed, 21 insertions(+), 2 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 73a77e779..57395d16e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -46,6 +46,10 @@ present in one path but missing from another. Read `docs/engineering/skills/github-prs.md` before opening, replacing, or sharing any pull request. +When asked to lint, before pushing PR updates, or before reporting that lint +passed, run the repository-wide lint target from the repository root. Do not +substitute file-scoped Ruff commands for whole-repository lint verification. + Never open `policyengine-us-data` PRs from forks. CI rejects fork-based PRs before running the real checks, which wastes the reviewer and agent loop. diff --git a/docs/engineering/skills/github-prs.md b/docs/engineering/skills/github-prs.md index 6a319b8a1..3b7eb3723 100644 --- a/docs/engineering/skills/github-prs.md +++ b/docs/engineering/skills/github-prs.md @@ -20,8 +20,12 @@ Before creating or sharing a PR: 4. Add a Towncrier changelog fragment under `changelog.d/` using the issue number and the appropriate configured type, for example `changelog.d/ISSUE_NUMBER.added`. -5. Run the repository lint target: +5. Run the repository-wide lint target from the repository root: `make lint`. + Do not substitute file-scoped commands such as + `ruff check path/to/file.py` or `ruff format --check path/to/file.py` for + this step. CI runs whole-repository formatting checks, so agents must verify + the whole repository before pushing or sharing a PR update. 6. Push the current branch to the canonical repository: `make push-pr-branch`. 7. Create the PR as a draft from that same repository: diff --git a/docs/engineering/skills/testing.md b/docs/engineering/skills/testing.md index 3f7a69092..a5468d3b3 100644 --- a/docs/engineering/skills/testing.md +++ b/docs/engineering/skills/testing.md @@ -73,6 +73,17 @@ Use this skill whenever adding, moving, or reviewing tests. ## Quality Guards +When adding, moving, or reviewing tests, run the repository-wide lint target +before pushing: + +```bash +make lint +``` + +Do not treat file-scoped Ruff commands as sufficient verification. CI runs +whole-repository lint and formatting checks, and test edits can fail formatting +under the repo-wide command even when a narrower local command appears clean. + Run this before opening or updating a PR: ```bash diff --git a/tests/unit/test_pipeline_discovery.py b/tests/unit/test_pipeline_discovery.py index d27bf2391..571797e31 100644 --- a/tests/unit/test_pipeline_discovery.py +++ b/tests/unit/test_pipeline_discovery.py @@ -54,7 +54,7 @@ def _fake_modal_module(): App=_FakeModalApp, Image=SimpleNamespace(debian_slim=lambda **_kwargs: _FakeModalImage()), Secret=_FakeModalSecret, - fastapi_endpoint=lambda *_args, **_kwargs: (lambda fn: fn), + fastapi_endpoint=lambda *_args, **_kwargs: lambda fn: fn, ) From 64c9b6570717f5a64cf62e2d5c947b41ab7e5d35 Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Wed, 20 May 2026 18:14:20 +0200 Subject: [PATCH 9/9] Fix PR discovery app seam lookup --- .github/workflows/pr.yaml | 8 ++++++++ docs/engineering/skills/pipeline_operations.md | 9 +++++++-- tests/integration/test_modal_pipeline_seams.py | 11 +++++++++-- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 25ac1117b..0b95b7eb8 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -190,6 +190,14 @@ jobs: - run: uv sync --dev - name: Install integration test deps run: uv pip install modal pytest numpy pandas + - name: Ensure PR Modal control-plane secret + run: > + uv run modal secret create + --env="${MODAL_ENVIRONMENT}" + --force + modal-token + MODAL_TOKEN_ID="${MODAL_TOKEN_ID}" + MODAL_TOKEN_SECRET="${MODAL_TOKEN_SECRET}" - name: Deploy PR Modal pipeline discovery app run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" --name="${MODAL_PIPELINE_DISCOVERY_APP_NAME}" modal_app/pipeline_discovery.py - name: Deploy PR Modal pipeline app diff --git a/docs/engineering/skills/pipeline_operations.md b/docs/engineering/skills/pipeline_operations.md index 053e1105c..7622fa8e3 100644 --- a/docs/engineering/skills/pipeline_operations.md +++ b/docs/engineering/skills/pipeline_operations.md @@ -35,8 +35,13 @@ status for their own mounted pipeline volume: `modal run modal_app/pipeline.py::main --action status`. This is for quick terminal inspection only and must not be treated as a schema. -Cross-run discovery lives in the stable `policyengine-us-data-pipeline-status` -app, not in a run-scoped pipeline app: +Cross-run discovery normally lives in the stable +`policyengine-us-data-pipeline-status` app, not in a run-scoped pipeline app. +PR integration jobs deploy an isolated discovery app into `main` and export its +name as `MODAL_PIPELINE_DISCOVERY_APP_NAME` and +`US_DATA_PIPELINE_DISCOVERY_APP_NAME`; tests and scripts running inside PR CI +must use those environment values instead of hardcoding the stable production +app name: - `list_deployed_pipeline_runs`: Python-callable structured JSON index of deployed publication pipeline apps. It discovers runs from Modal app names diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index f0a6033f8..28445567b 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -14,6 +14,13 @@ modal = pytest.importorskip("modal") APP_NAME = os.environ.get("MODAL_APP_NAME", "policyengine-us-data-pipeline") +DISCOVERY_APP_NAME = os.environ.get( + "MODAL_PIPELINE_DISCOVERY_APP_NAME", + os.environ.get( + "US_DATA_PIPELINE_DISCOVERY_APP_NAME", + "policyengine-us-data-pipeline-status", + ), +) MODAL_ENVIRONMENT = os.environ.get("MODAL_ENVIRONMENT", "main") HTTP_SEAM_TIMEOUT_SECONDS = 90 @@ -156,7 +163,7 @@ def test_pipeline_discovery_callable_reports_structured_index(): _require_modal_tokens() fn = modal.Function.from_name( - "policyengine-us-data-pipeline-status", + DISCOVERY_APP_NAME, "list_deployed_pipeline_runs", environment_name=MODAL_ENVIRONMENT, ) @@ -237,7 +244,7 @@ def test_pipeline_discovery_http_endpoint_reports_structured_index(): headers = _modal_proxy_auth_headers() fn = modal.Function.from_name( - "policyengine-us-data-pipeline-status", + DISCOVERY_APP_NAME, "deployed_pipeline_runs_endpoint", environment_name=MODAL_ENVIRONMENT, )