diff --git a/.github/scripts/sync_modal_secrets.py b/.github/scripts/sync_modal_secrets.py index fd5ae430f..272e13077 100644 --- a/.github/scripts/sync_modal_secrets.py +++ b/.github/scripts/sync_modal_secrets.py @@ -35,6 +35,19 @@ def main() -> None: ], ) create_secret_with_retry( + [ + "modal", + "secret", + "create", + "--env", + env_name, + "--force", + "modal-token", + f"MODAL_TOKEN_ID={os.environ['MODAL_TOKEN_ID']}", + f"MODAL_TOKEN_SECRET={os.environ['MODAL_TOKEN_SECRET']}", + ], + ) + subprocess.run( [ "modal", "secret", diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index ab17a5d01..04e54b427 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -132,5 +132,7 @@ jobs: PARALLEL_MATRIX: ${{ inputs.parallel_matrix || 'false' }} NUM_MATRIX_WORKERS: ${{ inputs.num_matrix_workers || '50' }} run: | + modal secret create --env="${MODAL_ENVIRONMENT}" --force modal-token MODAL_TOKEN_ID="${MODAL_TOKEN_ID}" MODAL_TOKEN_SECRET="${MODAL_TOKEN_SECRET}" + modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline_discovery.py modal deploy --env="${MODAL_ENVIRONMENT}" --name="${US_DATA_MODAL_APP_NAME}" --tag="${US_DATA_RUN_ID}" modal_app/pipeline.py python .github/scripts/spawn_modal_pipeline.py diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index f20889dea..0b95b7eb8 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -170,10 +170,12 @@ jobs: # already exist, then stop/delete the PR resources in cleanup steps. MODAL_ENVIRONMENT: main MODAL_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} + MODAL_PIPELINE_DISCOVERY_APP_NAME: us-data-pipeline-discovery-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} MODAL_LOCAL_AREA_APP_NAME: us-data-local-area-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} MODAL_H5_TEST_HARNESS_APP_NAME: us-data-h5-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_PIPELINE_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_MODAL_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} + US_DATA_PIPELINE_DISCOVERY_APP_NAME: us-data-pipeline-discovery-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_LOCAL_AREA_APP_NAME: us-data-local-area-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_H5_HARNESS_APP_NAME: us-data-h5-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} US_DATA_PIPELINE_VOLUME_NAME: pipeline-artifacts-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }} @@ -188,6 +190,16 @@ jobs: - run: uv sync --dev - name: Install integration test deps run: uv pip install modal pytest numpy pandas + - name: Ensure PR Modal control-plane secret + run: > + uv run modal secret create + --env="${MODAL_ENVIRONMENT}" + --force + modal-token + MODAL_TOKEN_ID="${MODAL_TOKEN_ID}" + MODAL_TOKEN_SECRET="${MODAL_TOKEN_SECRET}" + - name: Deploy PR Modal pipeline discovery app + run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" --name="${MODAL_PIPELINE_DISCOVERY_APP_NAME}" modal_app/pipeline_discovery.py - name: Deploy PR Modal pipeline app run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline.py - name: Deploy PR Modal local-area app @@ -216,6 +228,7 @@ jobs: for app_name in \ "${MODAL_H5_TEST_HARNESS_APP_NAME}" \ "${MODAL_LOCAL_AREA_APP_NAME}" \ + "${MODAL_PIPELINE_DISCOVERY_APP_NAME}" \ "${MODAL_APP_NAME}" do yes | uv run modal app stop \ diff --git a/AGENTS.md b/AGENTS.md index 73a77e779..57395d16e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -46,6 +46,10 @@ present in one path but missing from another. Read `docs/engineering/skills/github-prs.md` before opening, replacing, or sharing any pull request. +When asked to lint, before pushing PR updates, or before reporting that lint +passed, run the repository-wide lint target from the repository root. Do not +substitute file-scoped Ruff commands for whole-repository lint verification. + Never open `policyengine-us-data` PRs from forks. CI rejects fork-based PRs before running the real checks, which wastes the reviewer and agent loop. diff --git a/changelog.d/1049.added.md b/changelog.d/1049.added.md new file mode 100644 index 000000000..edca789d1 --- /dev/null +++ b/changelog.d/1049.added.md @@ -0,0 +1 @@ +Add a stable Modal discovery endpoint for deployed publication pipeline runs. diff --git a/docs/engineering/skills/github-prs.md b/docs/engineering/skills/github-prs.md index 6a319b8a1..3b7eb3723 100644 --- a/docs/engineering/skills/github-prs.md +++ b/docs/engineering/skills/github-prs.md @@ -20,8 +20,12 @@ Before creating or sharing a PR: 4. Add a Towncrier changelog fragment under `changelog.d/` using the issue number and the appropriate configured type, for example `changelog.d/ISSUE_NUMBER.added`. -5. Run the repository lint target: +5. Run the repository-wide lint target from the repository root: `make lint`. + Do not substitute file-scoped commands such as + `ruff check path/to/file.py` or `ruff format --check path/to/file.py` for + this step. CI runs whole-repository formatting checks, so agents must verify + the whole repository before pushing or sharing a PR update. 6. Push the current branch to the canonical repository: `make push-pr-branch`. 7. Create the PR as a draft from that same repository: diff --git a/docs/engineering/skills/pipeline_operations.md b/docs/engineering/skills/pipeline_operations.md index 80d018e7b..7622fa8e3 100644 --- a/docs/engineering/skills/pipeline_operations.md +++ b/docs/engineering/skills/pipeline_operations.md @@ -19,21 +19,43 @@ The status system reports: ## Status Surfaces -The structured status payload is canonical. The pipeline status sub-app exposes -run-level and run-index Modal functions: +The structured status payload is canonical. Run-scoped pipeline apps expose +status for their own mounted pipeline volume: - `get_pipeline_status`: Python-callable structured JSON for agents, scripts, dashboards, and tests. Prefer this for diagnosis and automation. - `pipeline_status_endpoint`: protected HTTP endpoint returning the same structured JSON for non-Python clients. Use Modal proxy auth headers. - `list_pipeline_runs`: Python-callable structured JSON index of recent runs. - Use this for dashboards that need to discover candidate run IDs. + This is volume-local and only lists runs visible to that app's mounted + `US_DATA_PIPELINE_VOLUME_NAME`. - `pipeline_runs_endpoint`: protected HTTP endpoint returning the same - structured recent-run index for non-Python clients. + volume-local recent-run index for non-Python clients. - `pipeline_status_snippet`: human-readable text used by `modal run modal_app/pipeline.py::main --action status`. This is for quick terminal inspection only and must not be treated as a schema. +Cross-run discovery normally lives in the stable +`policyengine-us-data-pipeline-status` app, not in a run-scoped pipeline app. +PR integration jobs deploy an isolated discovery app into `main` and export its +name as `MODAL_PIPELINE_DISCOVERY_APP_NAME` and +`US_DATA_PIPELINE_DISCOVERY_APP_NAME`; tests and scripts running inside PR CI +must use those environment values instead of hardcoding the stable production +app name: + +- `list_deployed_pipeline_runs`: Python-callable structured JSON index of + deployed publication pipeline apps. It discovers runs from Modal app names + matching `usdata-gha-a`, then calls each app's + `get_pipeline_status`. +- `deployed_pipeline_runs_endpoint`: protected HTTP endpoint returning the same + cross-app discovery payload. Use this for dashboards that need to discover all + deployed publication runs. + +The stable discovery app requires the `modal-token` Modal Secret in its +environment. That Secret must contain `MODAL_TOKEN_ID` and +`MODAL_TOKEN_SECRET`, and it should be attached only to functions that need +Modal control-plane access. + ## Fetch Status First identify the run context from the GitHub Actions summary, workflow logs, or @@ -74,6 +96,23 @@ If the local environment cannot sync the full project environment, use the same snippet with a Modal-only temporary environment by replacing `uv run python` with `uv run --no-sync --with modal python`. +To discover deployed publication runs before choosing a run ID, call the stable +status app: + +```bash +uv run --no-sync --with modal python - <<'PY' +import json +import modal + +fn = modal.Function.from_name( + "policyengine-us-data-pipeline-status", + "list_deployed_pipeline_runs", + environment_name="main", +) +print(json.dumps(fn.remote(limit=25), indent=2)) +PY +``` + If using the HTTP endpoint, authenticate with Modal proxy auth headers. Do not publish or paste proxy auth values into PRs, issues, logs, or docs. diff --git a/docs/engineering/skills/testing.md b/docs/engineering/skills/testing.md index 3f7a69092..a5468d3b3 100644 --- a/docs/engineering/skills/testing.md +++ b/docs/engineering/skills/testing.md @@ -73,6 +73,17 @@ Use this skill whenever adding, moving, or reviewing tests. ## Quality Guards +When adding, moving, or reviewing tests, run the repository-wide lint target +before pushing: + +```bash +make lint +``` + +Do not treat file-scoped Ruff commands as sufficient verification. CI runs +whole-repository lint and formatting checks, and test edits can fail formatting +under the repo-wide command even when a narrower local command appears clean. + Run this before opening or updating a PR: ```bash diff --git a/modal_app/pipeline.py b/modal_app/pipeline.py index ab1bf9d3f..1a70d851a 100644 --- a/modal_app/pipeline.py +++ b/modal_app/pipeline.py @@ -543,6 +543,9 @@ def verify_runtime_seams() -> dict: "uv.lock", "modal_app/worker_script.py", "modal_app/local_area.py", + "modal_app/pipeline_discovery.py", + "modal_app/pipeline_discovery_core.py", + "modal_app/pipeline_discovery_schema.py", "modal_app/pipeline_status.py", "modal_app/h5_test_harness.py", "modal_app/step_manifests/specs.py", @@ -592,6 +595,9 @@ def verify_runtime_seams() -> dict: "modal_app.fixtures.h5_cases", "modal_app.h5_test_harness", "modal_app.local_area", + "modal_app.pipeline_discovery", + "modal_app.pipeline_discovery_core", + "modal_app.pipeline_discovery_schema", "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", diff --git a/modal_app/pipeline_discovery.py b/modal_app/pipeline_discovery.py new file mode 100644 index 000000000..9405be9ce --- /dev/null +++ b/modal_app/pipeline_discovery.py @@ -0,0 +1,164 @@ +"""Stable Modal app for discovering deployed US data pipeline runs.""" + +from __future__ import annotations + +import asyncio +import os +import sys +from pathlib import Path + +import modal + +_baked = "/root/policyengine-us-data" +_local = str(Path(__file__).resolve().parent.parent) +for _p in (_baked, _local): + if _p not in sys.path: + sys.path.insert(0, _p) + +from modal_app.images import cpu_image as image # noqa: E402 +from modal_app.pipeline_discovery_core import ( # noqa: E402 + RawRecord, + build_deployed_pipeline_runs_payload, +) +from modal_app.pipeline_discovery_schema import ( # noqa: E402 + DeployedPipelineRunsPayloadDict, + ModalAppRecord, +) +from policyengine_us_data.utils.run_context import ( # noqa: E402 + DEFAULT_MODAL_ENVIRONMENT, +) + + +DISCOVERY_APP_NAME = "policyengine-us-data-pipeline-status" +modal_token_secret = modal.Secret.from_name("modal-token") + +app = modal.App( + os.environ.get("US_DATA_PIPELINE_DISCOVERY_APP_NAME") or DISCOVERY_APP_NAME +) +status_image = image.pip_install("fastapi") + + +def _modal_environment(explicit: str = "") -> str: + return ( + explicit + or os.environ.get("US_DATA_MODAL_ENVIRONMENT") + or os.environ.get("MODAL_ENVIRONMENT") + or DEFAULT_MODAL_ENVIRONMENT + ) + + +def _modal_state_name(api_pb2, state: int) -> str: + try: + return api_pb2.AppState.Name(state).removeprefix("APP_STATE_").lower() + except ValueError: + return str(state) + + +async def _list_modal_app_records_async(environment_name: str) -> list[ModalAppRecord]: + from modal.client import _Client + from modal_proto import api_pb2 + + client = await _Client.from_env() + resp = await client.stub.AppList( + api_pb2.AppListRequest(environment_name=environment_name) + ) + return [ + { + "app_id": item.app_id, + "name": item.name or item.description, + "description": item.description, + "state": _modal_state_name(api_pb2, item.state), + "tasks": item.n_running_tasks, + "created_at": item.created_at, + "stopped_at": item.stopped_at, + } + for item in resp.apps + ] + + +def _list_modal_app_records(environment_name: str) -> list[ModalAppRecord]: + return asyncio.run(_list_modal_app_records_async(environment_name)) + + +def _pipeline_status_lookup( + *, + app_name: str, + run_id: str, + environment_name: str, +) -> RawRecord: + status_fn = modal.Function.from_name( + app_name, + "get_pipeline_status", + environment_name=environment_name, + ) + payload = status_fn.remote(run_id) + return payload if isinstance(payload, dict) else {} + + +def _build_deployed_pipeline_runs( + *, + limit: int | str | None, + status: str, + branch: str, + include_unreachable: bool, + modal_environment: str, +) -> DeployedPipelineRunsPayloadDict: + environment_name = _modal_environment(modal_environment) + app_records = _list_modal_app_records(environment_name) + payload = build_deployed_pipeline_runs_payload( + app_records, + lambda candidate: _pipeline_status_lookup( + app_name=candidate.app_name, + run_id=candidate.run_id, + environment_name=environment_name, + ), + limit=limit, + status=status, + branch=branch, + include_unreachable=include_unreachable, + modal_environment=environment_name, + ) + return payload.to_dict() + + +@app.function(image=image, timeout=180, secrets=[modal_token_secret]) +def list_deployed_pipeline_runs( + limit: int = 25, + status: str = "", + branch: str = "", + include_unreachable: bool = True, + modal_environment: str = "", +) -> DeployedPipelineRunsPayloadDict: + """Return deployed publication pipeline runs discovered from Modal app names.""" + + return _build_deployed_pipeline_runs( + limit=limit, + status=status, + branch=branch, + include_unreachable=include_unreachable, + modal_environment=modal_environment, + ) + + +@app.function(image=status_image, timeout=180, secrets=[modal_token_secret]) +@modal.fastapi_endpoint( + method="GET", + docs=False, + requires_proxy_auth=True, +) +def deployed_pipeline_runs_endpoint( + limit: int = 25, + status: str = "", + branch: str = "", + include_unreachable: bool = True, + modal_environment: str = "", +) -> DeployedPipelineRunsPayloadDict: + """Protected HTTP endpoint for deployed publication pipeline discovery.""" + + return _build_deployed_pipeline_runs( + limit=limit, + status=status, + branch=branch, + include_unreachable=include_unreachable, + modal_environment=modal_environment, + ) diff --git a/modal_app/pipeline_discovery_core.py b/modal_app/pipeline_discovery_core.py new file mode 100644 index 000000000..bb5edf131 --- /dev/null +++ b/modal_app/pipeline_discovery_core.py @@ -0,0 +1,372 @@ +"""Pure helpers for discovering deployed US data pipeline apps.""" + +from __future__ import annotations + +import re +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Callable, Iterable, Mapping + +from modal_app.pipeline_discovery_schema import ( + DeployedPipelineRunSummary, + DeployedPipelineRunsPayload, + LatestManifestSummary, + PipelineDiscoveryFilters, + PipelineLookupErrorSummary, + PipelineProgressSummary, +) + + +PIPELINE_DISCOVERY_SCHEMA_VERSION = "1" +PIPELINE_DISCOVERY_SOURCE = "modal_app_names" +DEFAULT_DISCOVERY_LIMIT = 25 +MAX_DISCOVERY_LIMIT = 100 +DEFAULT_DISCOVERY_WORKERS = 8 +RUN_ID_RE = re.compile(r"(usdata-gha\d+-a\d+)") +DEPLOYED_STATES = {"deployed", "app_state_deployed"} +EXCLUDED_APP_NAME_PATTERNS = ("-pipeline-pr-", "-local-area-pr-", "-h5-pr-") + +RawRecord = Mapping[str, object] + + +@dataclass(frozen=True) +class PipelineAppCandidate: + """A deployed Modal app that appears to represent one publication run.""" + + app_id: str + app_name: str + run_id: str + state: str + task_count: int + created_at: str | None = None + stopped_at: str | None = None + + +StatusLookup = Callable[[PipelineAppCandidate], RawRecord] + + +def derive_run_id_from_app_name(app_name: str) -> str: + """Extract the canonical US data run ID from a Modal app name.""" + + match = RUN_ID_RE.search(app_name or "") + return match.group(1) if match else "" + + +def is_publication_pipeline_app_name(app_name: str) -> bool: + """Return whether an app name belongs to a publication pipeline run.""" + + if not derive_run_id_from_app_name(app_name): + return False + if not app_name.startswith(("us-data-", "policyengine-us-data-")): + return False + if any(pattern in app_name for pattern in EXCLUDED_APP_NAME_PATTERNS): + return False + return True + + +def _bounded_limit(limit: int | str | None) -> int: + try: + parsed = int(limit if limit is not None else DEFAULT_DISCOVERY_LIMIT) + except (TypeError, ValueError): + parsed = DEFAULT_DISCOVERY_LIMIT + return max(0, min(parsed, MAX_DISCOVERY_LIMIT)) + + +def _int_value(value: object) -> int: + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def _text_value(value: object) -> str: + return "" if value is None else str(value) + + +def _string_or_none(value: object) -> str | None: + if value in (None, ""): + return None + return str(value) + + +def _number_or_none(value: object) -> float | int | None: + return value if isinstance(value, (float, int)) else None + + +def _timestamp_value(value: object) -> str | None: + if value in (None, ""): + return None + if isinstance(value, (int, float)): + return datetime.fromtimestamp(float(value), timezone.utc).isoformat() + return str(value) + + +def _record_value(record: RawRecord, *keys: str) -> object: + for key in keys: + if key in record: + return record[key] + return None + + +def _mapping_value(record: RawRecord, key: str) -> RawRecord: + value = record.get(key) + return value if isinstance(value, Mapping) else {} + + +def _list_of_mappings(record: RawRecord, key: str) -> list[RawRecord]: + value = record.get(key) + if not isinstance(value, list): + return [] + return [item for item in value if isinstance(item, Mapping)] + + +def pipeline_app_candidates( + app_records: Iterable[RawRecord], +) -> list[PipelineAppCandidate]: + """Return deployed publication-pipeline app candidates from Modal records.""" + + candidates: list[PipelineAppCandidate] = [] + for record in app_records: + app_name = _text_value( + _record_value(record, "name", "description", "Description") + ) + state = _text_value(_record_value(record, "state", "State")).lower() + if state and state not in DEPLOYED_STATES: + continue + if not is_publication_pipeline_app_name(app_name): + continue + run_id = derive_run_id_from_app_name(app_name) + candidates.append( + PipelineAppCandidate( + app_id=_text_value(_record_value(record, "app_id", "App ID")), + app_name=app_name, + run_id=run_id, + state=state or "unknown", + task_count=_int_value(_record_value(record, "tasks", "Tasks")), + created_at=_timestamp_value( + _record_value(record, "created_at", "Created at") + ), + stopped_at=_timestamp_value( + _record_value(record, "stopped_at", "Stopped at") + ), + ) + ) + candidates.sort( + key=lambda candidate: ( + candidate.created_at or "", + candidate.app_name, + ), + reverse=True, + ) + return candidates + + +def _latest_manifest_summary( + stage_manifests: list[RawRecord], +) -> LatestManifestSummary | None: + if not stage_manifests: + return None + item = stage_manifests[-1] + manifest = _mapping_value(item, "manifest") + return LatestManifestSummary( + step_id=_string_or_none(item.get("step_id")), + stage_id=_string_or_none(item.get("stage_id")), + substage_id=_string_or_none(item.get("substage_id")), + title=_string_or_none(item.get("title")), + status=_string_or_none(item.get("status")), + started_at=_string_or_none(manifest.get("started_at")), + completed_at=_string_or_none(manifest.get("completed_at")), + duration_s=_number_or_none(manifest.get("duration_s")), + reuse_decision=_text_value(manifest.get("reuse_decision")) or "not_applicable", + ) + + +def _index_error_summary(error: RawRecord) -> PipelineLookupErrorSummary | None: + if not error: + return None + return PipelineLookupErrorSummary( + stage_id=_string_or_none(error.get("stage_id")), + substage_id=_string_or_none(error.get("substage_id")), + surface=_string_or_none(error.get("surface")), + error_type=_text_value(error.get("error_type")) or "Error", + message=_text_value(error.get("message")), + message_truncated=( + error.get("message_truncated") + if isinstance(error.get("message_truncated"), bool) + else None + ), + record_path=_string_or_none(error.get("record_path")), + latest_path=_string_or_none(error.get("latest_path")), + traceback_available=( + error.get("traceback_available") + if isinstance(error.get("traceback_available"), bool) + else False + ), + ) + + +def _status_summary( + candidate: PipelineAppCandidate, + payload: RawRecord, +) -> DeployedPipelineRunSummary: + run_manifest = _mapping_value(payload, "run_manifest") + stage_manifests = _list_of_mappings(payload, "stage_manifests") + missing = payload.get("missing_expected_manifest_ids") + missing_count = len(missing) if isinstance(missing, list) else 0 + known_step_ids = run_manifest.get("known_step_ids") + expected_count = max( + len(known_step_ids) if isinstance(known_step_ids, list) else 0, + len(stage_manifests) + missing_count, + ) + status = _text_value(payload.get("status")) or "unknown" + status_lookup = "not_found" if status == "not_found" else "ok" + return DeployedPipelineRunSummary( + run_id=_text_value(payload.get("run_id")) or candidate.run_id, + status_lookup=status_lookup, + status=status, + message=_text_value(payload.get("message")), + modal_app_id=candidate.app_id, + modal_app_name=candidate.app_name, + modal_app_state=candidate.state, + modal_task_count=candidate.task_count, + modal_app_created_at=candidate.created_at, + modal_app_stopped_at=candidate.stopped_at, + branch=_string_or_none(run_manifest.get("branch")), + sha=_string_or_none(run_manifest.get("sha")), + candidate_version=_string_or_none(run_manifest.get("candidate_version")), + release_version=_string_or_none(run_manifest.get("release_version")), + started_at=_string_or_none(run_manifest.get("started_at")), + updated_at=_string_or_none(payload.get("updated_at")), + completed_at=_string_or_none(run_manifest.get("completed_at")), + modal_environment=_string_or_none(payload.get("modal_environment")) + or _string_or_none(run_manifest.get("modal_environment")), + hf_staging_prefix=_string_or_none(run_manifest.get("hf_staging_prefix")), + github_run_url=_string_or_none( + _mapping_value(run_manifest, "run_context").get("github_run_url") + ), + latest_manifest=_latest_manifest_summary(stage_manifests), + progress=PipelineProgressSummary( + expected_manifests=expected_count, + present_manifests=len(stage_manifests), + missing_manifests=missing_count, + ), + error=_index_error_summary(_mapping_value(payload, "error")), + ) + + +def _unreachable_summary( + candidate: PipelineAppCandidate, + exc: BaseException, +) -> DeployedPipelineRunSummary: + error = PipelineLookupErrorSummary.from_exception(exc) + return DeployedPipelineRunSummary( + run_id=candidate.run_id, + status_lookup="unreachable", + status="unreachable", + message=error.message, + modal_app_id=candidate.app_id, + modal_app_name=candidate.app_name, + modal_app_state=candidate.state, + modal_task_count=candidate.task_count, + modal_app_created_at=candidate.created_at, + modal_app_stopped_at=candidate.stopped_at, + progress=PipelineProgressSummary( + expected_manifests=0, + present_manifests=0, + missing_manifests=0, + ), + error=error, + ) + + +def _passes_filters( + item: DeployedPipelineRunSummary, + filters: PipelineDiscoveryFilters, +) -> bool: + if not filters.include_unreachable and item.status_lookup != "ok": + return False + if filters.status and item.status != filters.status: + return False + if filters.branch and item.branch != filters.branch: + return False + return True + + +def _lookup_items( + candidates: list[PipelineAppCandidate], + status_lookup: StatusLookup, + *, + max_workers: int, +) -> list[DeployedPipelineRunSummary]: + if max_workers <= 1 or len(candidates) <= 1: + items = [] + for candidate in candidates: + try: + items.append(_status_summary(candidate, status_lookup(candidate))) + except Exception as exc: + items.append(_unreachable_summary(candidate, exc)) + return items + + items: list[DeployedPipelineRunSummary] = [] + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(status_lookup, candidate): candidate + for candidate in candidates + } + for future in as_completed(futures): + candidate = futures[future] + try: + items.append(_status_summary(candidate, future.result())) + except Exception as exc: + items.append(_unreachable_summary(candidate, exc)) + items.sort( + key=lambda item: ( + item.updated_at or item.modal_app_created_at or "", + item.run_id, + ), + reverse=True, + ) + return items + + +def build_deployed_pipeline_runs_payload( + app_records: Iterable[RawRecord], + status_lookup: StatusLookup, + *, + limit: int | str | None = DEFAULT_DISCOVERY_LIMIT, + status: str = "", + branch: str = "", + include_unreachable: bool = True, + modal_environment: str = "main", + max_workers: int = DEFAULT_DISCOVERY_WORKERS, +) -> DeployedPipelineRunsPayload: + """Build a typed cross-app pipeline run index from Modal app names.""" + + bounded_limit = _bounded_limit(limit) + filters = PipelineDiscoveryFilters( + status=status or "", + branch=branch or "", + include_unreachable=bool(include_unreachable), + ) + candidates = pipeline_app_candidates(app_records) + needs_filter_window = ( + bool(filters.status or filters.branch) or not filters.include_unreachable + ) + selected_limit = MAX_DISCOVERY_LIMIT if needs_filter_window else bounded_limit + selected = candidates[:selected_limit] + items = _lookup_items(selected, status_lookup, max_workers=max_workers) + runs = tuple(item for item in items if _passes_filters(item, filters))[ + :bounded_limit + ] + return DeployedPipelineRunsPayload( + schema_version=PIPELINE_DISCOVERY_SCHEMA_VERSION, + source=PIPELINE_DISCOVERY_SOURCE, + modal_environment=modal_environment, + discovered_count=len(candidates), + queried_count=len(selected), + count=len(runs), + limit=bounded_limit, + filters=filters, + runs=runs, + ) diff --git a/modal_app/pipeline_discovery_schema.py b/modal_app/pipeline_discovery_schema.py new file mode 100644 index 000000000..5fbb02ba3 --- /dev/null +++ b/modal_app/pipeline_discovery_schema.py @@ -0,0 +1,302 @@ +"""Typed result schemas for deployed pipeline discovery.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import NotRequired, TypedDict + +from policyengine_us_data.utils.error_redaction import ( + DEFAULT_ERROR_MESSAGE_MAX_CHARS, + redacted_bounded_error_text, +) + + +class PipelineDiscoveryFiltersPayload(TypedDict): + """Serialized discovery filters.""" + + status: str + branch: str + include_unreachable: bool + + +class PipelineProgressSummaryPayload(TypedDict): + """Serialized manifest progress for one run.""" + + expected_manifests: int + present_manifests: int + missing_manifests: int + + +class LatestManifestSummaryPayload(TypedDict): + """Serialized latest manifest summary.""" + + step_id: str | None + stage_id: str | None + substage_id: str | None + title: str | None + status: str | None + started_at: str | None + completed_at: str | None + duration_s: float | int | None + reuse_decision: str + + +class PipelineLookupErrorPayload(TypedDict): + """Serialized lookup or pipeline error summary.""" + + error_type: str + message: str + traceback_available: bool + stage_id: NotRequired[str] + substage_id: NotRequired[str] + surface: NotRequired[str] + message_truncated: NotRequired[bool] + record_path: NotRequired[str] + latest_path: NotRequired[str] + + +class DeployedPipelineRunPayload(TypedDict): + """Serialized status summary for one deployed publication pipeline app.""" + + modal_app_id: str + modal_app_name: str + modal_app_state: str + modal_task_count: int + modal_app_created_at: str | None + modal_app_stopped_at: str | None + run_id: str + status_lookup: str + status: str + message: str + branch: str | None + sha: str | None + candidate_version: str | None + release_version: str | None + started_at: str | None + updated_at: str | None + completed_at: str | None + modal_environment: str | None + hf_staging_prefix: str | None + github_run_url: str | None + latest_manifest: LatestManifestSummaryPayload | None + progress: PipelineProgressSummaryPayload | None + error: PipelineLookupErrorPayload | None + + +class DeployedPipelineRunsPayloadDict(TypedDict): + """Serialized cross-app pipeline run index.""" + + schema_version: str + source: str + modal_environment: str + discovered_count: int + queried_count: int + count: int + limit: int + filters: PipelineDiscoveryFiltersPayload + runs: list[DeployedPipelineRunPayload] + + +class ModalAppRecord(TypedDict): + """Normalized Modal app-list record consumed by discovery core.""" + + app_id: str + name: str + description: str + state: str + tasks: int + created_at: float | str | None + stopped_at: float | str | None + + +@dataclass(frozen=True) +class PipelineDiscoveryFilters: + """Filters applied to the deployed pipeline run index.""" + + status: str = "" + branch: str = "" + include_unreachable: bool = True + + def to_dict(self) -> PipelineDiscoveryFiltersPayload: + return { + "status": self.status, + "branch": self.branch, + "include_unreachable": self.include_unreachable, + } + + +@dataclass(frozen=True) +class PipelineProgressSummary: + """Manifest progress for one discovered pipeline run.""" + + expected_manifests: int + present_manifests: int + missing_manifests: int + + def to_dict(self) -> PipelineProgressSummaryPayload: + return { + "expected_manifests": self.expected_manifests, + "present_manifests": self.present_manifests, + "missing_manifests": self.missing_manifests, + } + + +@dataclass(frozen=True) +class LatestManifestSummary: + """Summary of the latest stage or substage manifest for one run.""" + + step_id: str | None + stage_id: str | None + substage_id: str | None + title: str | None + status: str | None + started_at: str | None + completed_at: str | None + duration_s: float | int | None + reuse_decision: str + + def to_dict(self) -> LatestManifestSummaryPayload: + return { + "step_id": self.step_id, + "stage_id": self.stage_id, + "substage_id": self.substage_id, + "title": self.title, + "status": self.status, + "started_at": self.started_at, + "completed_at": self.completed_at, + "duration_s": self.duration_s, + "reuse_decision": self.reuse_decision, + } + + +@dataclass(frozen=True) +class PipelineLookupErrorSummary: + """Bounded error surfaced while querying one discovered app.""" + + error_type: str + message: str + stage_id: str | None = None + substage_id: str | None = None + surface: str | None = None + message_truncated: bool | None = None + record_path: str | None = None + latest_path: str | None = None + traceback_available: bool = False + + @classmethod + def from_exception(cls, exc: BaseException) -> PipelineLookupErrorSummary: + """Build a bounded lookup-error summary from an exception.""" + + message = redacted_bounded_error_text( + f"{type(exc).__name__}: {exc}", + max_chars=DEFAULT_ERROR_MESSAGE_MAX_CHARS, + ).text + return cls( + error_type=type(exc).__name__, + message=message, + ) + + def to_dict(self) -> PipelineLookupErrorPayload: + payload: PipelineLookupErrorPayload = { + "error_type": self.error_type, + "message": self.message, + "traceback_available": self.traceback_available, + } + optional = { + "stage_id": self.stage_id, + "substage_id": self.substage_id, + "surface": self.surface, + "message_truncated": self.message_truncated, + "record_path": self.record_path, + "latest_path": self.latest_path, + } + payload.update( + {key: value for key, value in optional.items() if value is not None} + ) + return payload + + +@dataclass(frozen=True) +class DeployedPipelineRunSummary: + """Status summary for one deployed publication pipeline app.""" + + run_id: str + status_lookup: str + status: str + message: str + modal_app_id: str + modal_app_name: str + modal_app_state: str + modal_task_count: int + modal_app_created_at: str | None = None + modal_app_stopped_at: str | None = None + branch: str | None = None + sha: str | None = None + candidate_version: str | None = None + release_version: str | None = None + started_at: str | None = None + updated_at: str | None = None + completed_at: str | None = None + modal_environment: str | None = None + hf_staging_prefix: str | None = None + github_run_url: str | None = None + latest_manifest: LatestManifestSummary | None = None + progress: PipelineProgressSummary | None = None + error: PipelineLookupErrorSummary | None = None + + def to_dict(self) -> DeployedPipelineRunPayload: + return { + "modal_app_id": self.modal_app_id, + "modal_app_name": self.modal_app_name, + "modal_app_state": self.modal_app_state, + "modal_task_count": self.modal_task_count, + "modal_app_created_at": self.modal_app_created_at, + "modal_app_stopped_at": self.modal_app_stopped_at, + "run_id": self.run_id, + "status_lookup": self.status_lookup, + "status": self.status, + "message": self.message, + "branch": self.branch, + "sha": self.sha, + "candidate_version": self.candidate_version, + "release_version": self.release_version, + "started_at": self.started_at, + "updated_at": self.updated_at, + "completed_at": self.completed_at, + "modal_environment": self.modal_environment, + "hf_staging_prefix": self.hf_staging_prefix, + "github_run_url": self.github_run_url, + "latest_manifest": ( + self.latest_manifest.to_dict() if self.latest_manifest else None + ), + "progress": self.progress.to_dict() if self.progress else None, + "error": self.error.to_dict() if self.error else None, + } + + +@dataclass(frozen=True) +class DeployedPipelineRunsPayload: + """Typed cross-app pipeline run index discovered from Modal app names.""" + + schema_version: str + source: str + modal_environment: str + discovered_count: int + queried_count: int + count: int + limit: int + filters: PipelineDiscoveryFilters + runs: tuple[DeployedPipelineRunSummary, ...] + + def to_dict(self) -> DeployedPipelineRunsPayloadDict: + return { + "schema_version": self.schema_version, + "source": self.source, + "modal_environment": self.modal_environment, + "discovered_count": self.discovered_count, + "queried_count": self.queried_count, + "count": self.count, + "limit": self.limit, + "filters": self.filters.to_dict(), + "runs": [run.to_dict() for run in self.runs], + } diff --git a/modal_app/pipeline_status.py b/modal_app/pipeline_status.py index 5224d8564..610cfedab 100644 --- a/modal_app/pipeline_status.py +++ b/modal_app/pipeline_status.py @@ -1,4 +1,8 @@ -"""Modal status functions for deployed pipeline runs.""" +"""Run-scoped Modal status functions for deployed pipeline runs. + +These functions read only the pipeline volume mounted into their app. Cross-app +discovery lives in ``modal_app.pipeline_discovery``. +""" from __future__ import annotations diff --git a/policyengine_us_data/utils/run_context.py b/policyengine_us_data/utils/run_context.py index 7efdd946a..f6d6a665f 100644 --- a/policyengine_us_data/utils/run_context.py +++ b/policyengine_us_data/utils/run_context.py @@ -153,12 +153,29 @@ def build_modal_app_name( prefix: str = DEFAULT_MODAL_APP_PREFIX, max_length: int = DEFAULT_MAX_RESOURCE_NAME_LENGTH, ) -> str: - """Build a safe Modal app name from candidate scope and run ID.""" - return build_modal_resource_name( - candidate_run_segment(run_id, candidate_version), - prefix=prefix, - max_length=max_length, + """Build a safe Modal app name while preserving the run ID suffix.""" + resolved_run_id = sanitize_run_id(run_id) + resolved_candidate_version = ( + sanitize_staging_version(candidate_version) if (candidate_version) else "" + ) + name_prefix = _slugify( + "-".join( + part + for part in ( + prefix, + resolved_candidate_version, + ) + if part + ) ) + candidate = _slugify(f"{name_prefix}-{resolved_run_id}") + if len(candidate) <= max_length: + return candidate + suffix = resolved_run_id + if len(suffix) >= max_length: + return _truncate_with_digest(suffix, max_length) + prefix_length = max_length - len(suffix) - 1 + return f"{_truncate_with_digest(name_prefix, prefix_length)}-{suffix}" def staging_prefix( diff --git a/tests/integration/test_modal_pipeline_seams.py b/tests/integration/test_modal_pipeline_seams.py index 6cf73ba26..28445567b 100644 --- a/tests/integration/test_modal_pipeline_seams.py +++ b/tests/integration/test_modal_pipeline_seams.py @@ -14,6 +14,13 @@ modal = pytest.importorskip("modal") APP_NAME = os.environ.get("MODAL_APP_NAME", "policyengine-us-data-pipeline") +DISCOVERY_APP_NAME = os.environ.get( + "MODAL_PIPELINE_DISCOVERY_APP_NAME", + os.environ.get( + "US_DATA_PIPELINE_DISCOVERY_APP_NAME", + "policyengine-us-data-pipeline-status", + ), +) MODAL_ENVIRONMENT = os.environ.get("MODAL_ENVIRONMENT", "main") HTTP_SEAM_TIMEOUT_SECONDS = 90 @@ -55,6 +62,9 @@ def test_pipeline_image_runtime_seams(): "uv.lock": True, "modal_app/worker_script.py": True, "modal_app/local_area.py": True, + "modal_app/pipeline_discovery.py": True, + "modal_app/pipeline_discovery_core.py": True, + "modal_app/pipeline_discovery_schema.py": True, "modal_app/pipeline_status.py": True, "modal_app/h5_test_harness.py": True, "modal_app/step_manifests/specs.py": True, @@ -83,6 +93,9 @@ def test_pipeline_image_runtime_seams(): "modal_app.fixtures.h5_cases", "modal_app.h5_test_harness", "modal_app.local_area", + "modal_app.pipeline_discovery", + "modal_app.pipeline_discovery_core", + "modal_app.pipeline_discovery_schema", "modal_app.pipeline_status", "modal_app.remote_calibration_runner", "modal_app.step_manifests.specs", @@ -146,6 +159,31 @@ def test_pipeline_runs_callable_reports_structured_index(): assert result["filters"] == {"status": "", "branch": ""} +def test_pipeline_discovery_callable_reports_structured_index(): + _require_modal_tokens() + + fn = modal.Function.from_name( + DISCOVERY_APP_NAME, + "list_deployed_pipeline_runs", + environment_name=MODAL_ENVIRONMENT, + ) + result = fn.remote(limit=1, modal_environment=MODAL_ENVIRONMENT) + + assert result["schema_version"] == "1" + assert result["source"] == "modal_app_names" + assert result["modal_environment"] == MODAL_ENVIRONMENT + assert isinstance(result["discovered_count"], int) + assert isinstance(result["queried_count"], int) + assert result["limit"] == 1 + assert result["count"] <= 1 + assert isinstance(result["runs"], list) + assert result["filters"] == { + "status": "", + "branch": "", + "include_unreachable": True, + } + + def test_pipeline_status_http_endpoint_reports_missing_run(): _require_modal_tokens() headers = _modal_proxy_auth_headers() @@ -201,6 +239,37 @@ def test_pipeline_runs_http_endpoint_reports_structured_index(): assert result["filters"] == {"status": "", "branch": ""} +def test_pipeline_discovery_http_endpoint_reports_structured_index(): + _require_modal_tokens() + headers = _modal_proxy_auth_headers() + + fn = modal.Function.from_name( + DISCOVERY_APP_NAME, + "deployed_pipeline_runs_endpoint", + environment_name=MODAL_ENVIRONMENT, + ) + endpoint = fn.get_web_url() + assert endpoint + + response = requests.get( + endpoint, + params={"limit": "1", "modal_environment": MODAL_ENVIRONMENT}, + headers=headers, + timeout=30, + ) + + assert response.status_code == 200, response.text[:500] + result = response.json() + assert result["schema_version"] == "1" + assert result["source"] == "modal_app_names" + assert result["modal_environment"] == MODAL_ENVIRONMENT + assert isinstance(result["discovered_count"], int) + assert isinstance(result["queried_count"], int) + assert result["limit"] == 1 + assert result["count"] <= 1 + assert isinstance(result["runs"], list) + + def test_pipeline_status_cli_snippet_reports_missing_run(): _require_modal_tokens() diff --git a/tests/unit/test_pipeline_discovery.py b/tests/unit/test_pipeline_discovery.py new file mode 100644 index 000000000..571797e31 --- /dev/null +++ b/tests/unit/test_pipeline_discovery.py @@ -0,0 +1,302 @@ +import importlib +import sys +from types import SimpleNamespace + +import pytest + +from modal_app.pipeline_discovery_core import ( + build_deployed_pipeline_runs_payload, + derive_run_id_from_app_name, + is_publication_pipeline_app_name, + pipeline_app_candidates, +) +from modal_app.pipeline_discovery_schema import ( + DeployedPipelineRunsPayload, +) + + +class _FakeModalImage: + def apt_install(self, *_args, **_kwargs): + return self + + def uv_sync(self, *_args, **_kwargs): + return self + + def add_local_dir(self, *_args, **_kwargs): + return self + + def workdir(self, *_args, **_kwargs): + return self + + def env(self, *_args, **_kwargs): + return self + + def pip_install(self, *_args, **_kwargs): + return self + + +class _FakeModalApp: + def __init__(self, *_args, **_kwargs): + pass + + def function(self, *_args, **_kwargs): + return lambda fn: fn + + +class _FakeModalSecret: + @staticmethod + def from_name(_name): + return object() + + +def _fake_modal_module(): + return SimpleNamespace( + App=_FakeModalApp, + Image=SimpleNamespace(debian_slim=lambda **_kwargs: _FakeModalImage()), + Secret=_FakeModalSecret, + fastapi_endpoint=lambda *_args, **_kwargs: lambda fn: fn, + ) + + +def test_pipeline_discovery_entrypoint_imports_with_typed_schema(monkeypatch): + """Catch Modal deploy entrypoint import drift before deployment.""" + + monkeypatch.setitem(sys.modules, "modal", _fake_modal_module()) + monkeypatch.delitem(sys.modules, "modal_app.images", raising=False) + monkeypatch.delitem(sys.modules, "modal_app.pipeline_discovery", raising=False) + + module = importlib.import_module("modal_app.pipeline_discovery") + + assert module.list_deployed_pipeline_runs is not None + assert module.deployed_pipeline_runs_endpoint is not None + + +def _app_record( + name: str, + *, + app_id: str = "ap-1", + state: str = "deployed", + tasks: int = 0, + created_at: str = "2026-05-19T12:00:00+00:00", +) -> dict: + return { + "app_id": app_id, + "name": name, + "state": state, + "tasks": tasks, + "created_at": created_at, + } + + +def _status_payload( + run_id: str, + *, + status: str = "running", + branch: str = "main", + updated_at: str = "2026-05-19T12:30:00+00:00", +) -> dict: + return { + "schema_version": "1", + "run_id": run_id, + "status": status, + "message": f"Pipeline {status}.", + "updated_at": updated_at, + "run_manifest": { + "run_id": run_id, + "branch": branch, + "sha": "abc123", + "candidate_version": "1.115.4-minor", + "release_version": "", + "started_at": "2026-05-19T12:00:00+00:00", + "completed_at": None, + "known_step_ids": ["1_build_datasets", "2_build_calibration_package"], + "hf_staging_prefix": f"staging/1.115.4-minor-{run_id}", + "run_context": { + "github_run_url": ( + "https://github.com/PolicyEngine/policyengine-us-data/" + "actions/runs/123" + ) + }, + }, + "stage_manifests": [ + { + "step_id": "1_build_datasets", + "stage_id": "1_build_datasets", + "substage_id": None, + "title": "Build datasets", + "status": "completed", + "manifest": { + "started_at": "2026-05-19T12:00:00+00:00", + "completed_at": "2026-05-19T12:15:00+00:00", + "duration_s": 900, + "reuse_decision": "not_applicable", + }, + } + ], + "missing_expected_manifest_ids": ["2_build_calibration_package"], + "error": None, + "modal_app_name": f"us-data-1-115-4-minor-{run_id}", + "modal_environment": "main", + } + + +def test_derives_run_id_from_current_and_legacy_publication_app_names(): + assert ( + derive_run_id_from_app_name("us-data-1-115-4-minor-usdata-gha26114604836-a1") + == "usdata-gha26114604836-a1" + ) + assert ( + derive_run_id_from_app_name("policyengine-us-data-pub-usdata-gha123-a2") + == "usdata-gha123-a2" + ) + + +@pytest.mark.parametrize( + ("app_name", "expected"), + [ + ("us-data-1-115-4-minor-usdata-gha26114604836-a1", True), + ("policyengine-us-data-pub-usdata-gha123-a1", True), + ("policyengine-us-data-1-115-4-minor-usdata-gha123-a1", True), + ("us-data-pipeline-pr-1035-26117326123-1", False), + ("us-data-local-area-pr-1035-26117326123-1", False), + ("us-data-h5-pr-1035-26117326123-1", False), + ("policyengine-us-data-pipeline", False), + ("state-research-tracker", False), + ], +) +def test_identifies_publication_pipeline_app_names(app_name, expected): + assert is_publication_pipeline_app_name(app_name) is expected + + +def test_pipeline_app_candidates_filters_to_deployed_publication_apps(): + records = [ + _app_record( + "us-data-1-115-4-minor-usdata-gha26114604836-a1", + app_id="ap-new", + tasks=2, + created_at="2026-05-19T13:00:00+00:00", + ), + _app_record("us-data-pipeline-pr-1035-26117326123-1"), + _app_record( + "us-data-1-115-4-patch-usdata-gha26114905403-a1", + state="stopped", + ), + ] + + candidates = pipeline_app_candidates(records) + + assert len(candidates) == 1 + assert candidates[0].app_id == "ap-new" + assert candidates[0].app_name == ("us-data-1-115-4-minor-usdata-gha26114604836-a1") + assert candidates[0].run_id == "usdata-gha26114604836-a1" + assert candidates[0].task_count == 2 + + +def test_build_deployed_pipeline_runs_payload_queries_status_by_derived_run_id(): + app_name = "us-data-1-115-4-minor-usdata-gha26114604836-a1" + seen = [] + + def lookup(candidate): + seen.append((candidate.app_name, candidate.run_id)) + return _status_payload(candidate.run_id) + + payload = build_deployed_pipeline_runs_payload( + [_app_record(app_name, app_id="ap-run", tasks=4)], + lookup, + limit=10, + max_workers=1, + ) + + assert seen == [(app_name, "usdata-gha26114604836-a1")] + assert isinstance(payload, DeployedPipelineRunsPayload) + assert payload.schema_version == "1" + assert payload.source == "modal_app_names" + assert payload.discovered_count == 1 + assert payload.queried_count == 1 + assert payload.count == 1 + run = payload.runs[0] + assert run.run_id == "usdata-gha26114604836-a1" + assert run.status_lookup == "ok" + assert run.status == "running" + assert run.branch == "main" + assert run.modal_app_id == "ap-run" + assert run.modal_task_count == 4 + assert run.latest_manifest is not None + assert run.latest_manifest.step_id == "1_build_datasets" + assert run.progress is not None + assert run.progress.to_dict() == { + "expected_manifests": 2, + "present_manifests": 1, + "missing_manifests": 1, + } + assert payload.to_dict()["runs"][0]["run_id"] == "usdata-gha26114604836-a1" + + +def test_deployed_pipeline_runs_payload_keeps_unreachable_apps_structured(): + def lookup(_candidate): + raise RuntimeError("lookup failed with TOKEN=secret") + + payload = build_deployed_pipeline_runs_payload( + [_app_record("us-data-1-115-4-minor-usdata-gha26114604836-a1")], + lookup, + max_workers=1, + ) + + assert payload.count == 1 + run = payload.runs[0] + assert run.run_id == "usdata-gha26114604836-a1" + assert run.status_lookup == "unreachable" + assert run.status == "unreachable" + assert run.error is not None + assert run.error.error_type == "RuntimeError" + + +def test_deployed_pipeline_runs_payload_applies_limit_after_filters(): + records = [ + _app_record( + "us-data-1-115-4-minor-usdata-gha1-a1", + created_at="2026-05-19T13:00:00+00:00", + ), + _app_record( + "us-data-1-115-4-minor-usdata-gha2-a1", + created_at="2026-05-19T12:00:00+00:00", + ), + ] + + def lookup(candidate): + branch = "feature" if candidate.run_id.endswith("2-a1") else "main" + return _status_payload(candidate.run_id, branch=branch) + + payload = build_deployed_pipeline_runs_payload( + records, + lookup, + limit=1, + branch="main", + max_workers=1, + ) + + assert payload.limit == 1 + assert payload.filters.to_dict() == { + "status": "", + "branch": "main", + "include_unreachable": True, + } + assert payload.queried_count == 2 + assert [run.run_id for run in payload.runs] == ["usdata-gha1-a1"] + + +def test_deployed_pipeline_runs_payload_can_exclude_unreachable_apps(): + def lookup(_candidate): + raise RuntimeError("unavailable") + + payload = build_deployed_pipeline_runs_payload( + [_app_record("us-data-1-115-4-minor-usdata-gha26114604836-a1")], + lookup, + include_unreachable=False, + max_workers=1, + ) + + assert payload.discovered_count == 1 + assert payload.queried_count == 1 + assert payload.count == 0 + assert payload.runs == () diff --git a/tests/unit/test_run_context.py b/tests/unit/test_run_context.py index 4a579714e..af5f58eed 100644 --- a/tests/unit/test_run_context.py +++ b/tests/unit/test_run_context.py @@ -58,6 +58,17 @@ def test_modal_app_name_scopes_by_candidate_version_and_run_id() -> None: ) +def test_modal_app_name_preserves_run_id_suffix_when_truncated() -> None: + run_id = "usdata-gha123456789012345-a2" + name = build_modal_app_name( + run_id, + candidate_version="1.73.0-minor-with-an-extremely-long-candidate-label", + ) + + assert name.endswith(f"-{run_id}") + assert len(name) <= 64 + + def test_candidate_scope_uses_base_release_and_bump() -> None: assert build_candidate_scope("1.73.0", "minor") == "1.73.0-minor" assert release_version_from_bump("1.73.0", "minor") == "1.74.0"