Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/scripts/sync_modal_secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ def main() -> None:
],
)
create_secret_with_retry(
[
"modal",
"secret",
"create",
"--env",
env_name,
"--force",
"modal-token",
f"MODAL_TOKEN_ID={os.environ['MODAL_TOKEN_ID']}",
f"MODAL_TOKEN_SECRET={os.environ['MODAL_TOKEN_SECRET']}",
],
)
subprocess.run(
[
"modal",
"secret",
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,7 @@ jobs:
PARALLEL_MATRIX: ${{ inputs.parallel_matrix || 'false' }}
NUM_MATRIX_WORKERS: ${{ inputs.num_matrix_workers || '50' }}
run: |
modal secret create --env="${MODAL_ENVIRONMENT}" --force modal-token MODAL_TOKEN_ID="${MODAL_TOKEN_ID}" MODAL_TOKEN_SECRET="${MODAL_TOKEN_SECRET}"
modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline_discovery.py
modal deploy --env="${MODAL_ENVIRONMENT}" --name="${US_DATA_MODAL_APP_NAME}" --tag="${US_DATA_RUN_ID}" modal_app/pipeline.py
python .github/scripts/spawn_modal_pipeline.py
13 changes: 13 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ jobs:
# already exist, then stop/delete the PR resources in cleanup steps.
MODAL_ENVIRONMENT: main
MODAL_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
MODAL_PIPELINE_DISCOVERY_APP_NAME: us-data-pipeline-discovery-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
MODAL_LOCAL_AREA_APP_NAME: us-data-local-area-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
MODAL_H5_TEST_HARNESS_APP_NAME: us-data-h5-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
US_DATA_PIPELINE_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
US_DATA_MODAL_APP_NAME: us-data-pipeline-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
US_DATA_PIPELINE_DISCOVERY_APP_NAME: us-data-pipeline-discovery-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
US_DATA_LOCAL_AREA_APP_NAME: us-data-local-area-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
US_DATA_H5_HARNESS_APP_NAME: us-data-h5-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
US_DATA_PIPELINE_VOLUME_NAME: pipeline-artifacts-pr-${{ github.event.pull_request.number }}-${{ github.run_id }}-${{ github.run_attempt }}
Expand All @@ -188,6 +190,16 @@ jobs:
- run: uv sync --dev
- name: Install integration test deps
run: uv pip install modal pytest numpy pandas
- name: Ensure PR Modal control-plane secret
run: >
uv run modal secret create
--env="${MODAL_ENVIRONMENT}"
--force
modal-token
MODAL_TOKEN_ID="${MODAL_TOKEN_ID}"
MODAL_TOKEN_SECRET="${MODAL_TOKEN_SECRET}"
- name: Deploy PR Modal pipeline discovery app
run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" --name="${MODAL_PIPELINE_DISCOVERY_APP_NAME}" modal_app/pipeline_discovery.py
- name: Deploy PR Modal pipeline app
run: uv run modal deploy --env="${MODAL_ENVIRONMENT}" modal_app/pipeline.py
- name: Deploy PR Modal local-area app
Expand Down Expand Up @@ -216,6 +228,7 @@ jobs:
for app_name in \
"${MODAL_H5_TEST_HARNESS_APP_NAME}" \
"${MODAL_LOCAL_AREA_APP_NAME}" \
"${MODAL_PIPELINE_DISCOVERY_APP_NAME}" \
"${MODAL_APP_NAME}"
do
yes | uv run modal app stop \
Expand Down
4 changes: 4 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ present in one path but missing from another.
Read `docs/engineering/skills/github-prs.md` before opening, replacing, or
sharing any pull request.

When asked to lint, before pushing PR updates, or before reporting that lint
passed, run the repository-wide lint target from the repository root. Do not
substitute file-scoped Ruff commands for whole-repository lint verification.

Never open `policyengine-us-data` PRs from forks. CI rejects fork-based PRs
before running the real checks, which wastes the reviewer and agent loop.

Expand Down
1 change: 1 addition & 0 deletions changelog.d/1049.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a stable Modal discovery endpoint for deployed publication pipeline runs.
6 changes: 5 additions & 1 deletion docs/engineering/skills/github-prs.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ Before creating or sharing a PR:
4. Add a Towncrier changelog fragment under `changelog.d/` using the issue
number and the appropriate configured type, for example
`changelog.d/ISSUE_NUMBER.added`.
5. Run the repository lint target:
5. Run the repository-wide lint target from the repository root:
`make lint`.
Do not substitute file-scoped commands such as
`ruff check path/to/file.py` or `ruff format --check path/to/file.py` for
this step. CI runs whole-repository formatting checks, so agents must verify
the whole repository before pushing or sharing a PR update.
6. Push the current branch to the canonical repository:
`make push-pr-branch`.
7. Create the PR as a draft from that same repository:
Expand Down
47 changes: 43 additions & 4 deletions docs/engineering/skills/pipeline_operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,43 @@ The status system reports:

## Status Surfaces

The structured status payload is canonical. The pipeline status sub-app exposes
run-level and run-index Modal functions:
The structured status payload is canonical. Run-scoped pipeline apps expose
status for their own mounted pipeline volume:

- `get_pipeline_status`: Python-callable structured JSON for agents, scripts,
dashboards, and tests. Prefer this for diagnosis and automation.
- `pipeline_status_endpoint`: protected HTTP endpoint returning the same
structured JSON for non-Python clients. Use Modal proxy auth headers.
- `list_pipeline_runs`: Python-callable structured JSON index of recent runs.
Use this for dashboards that need to discover candidate run IDs.
This is volume-local and only lists runs visible to that app's mounted
`US_DATA_PIPELINE_VOLUME_NAME`.
- `pipeline_runs_endpoint`: protected HTTP endpoint returning the same
structured recent-run index for non-Python clients.
volume-local recent-run index for non-Python clients.
- `pipeline_status_snippet`: human-readable text used by
`modal run modal_app/pipeline.py::main --action status`. This is for quick
terminal inspection only and must not be treated as a schema.

Cross-run discovery normally lives in the stable
`policyengine-us-data-pipeline-status` app, not in a run-scoped pipeline app.
PR integration jobs deploy an isolated discovery app into `main` and export its
name as `MODAL_PIPELINE_DISCOVERY_APP_NAME` and
`US_DATA_PIPELINE_DISCOVERY_APP_NAME`; tests and scripts running inside PR CI
must use those environment values instead of hardcoding the stable production
app name:

- `list_deployed_pipeline_runs`: Python-callable structured JSON index of
deployed publication pipeline apps. It discovers runs from Modal app names
matching `usdata-gha<github_run_id>-a<attempt>`, then calls each app's
`get_pipeline_status`.
- `deployed_pipeline_runs_endpoint`: protected HTTP endpoint returning the same
cross-app discovery payload. Use this for dashboards that need to discover all
deployed publication runs.

The stable discovery app requires the `modal-token` Modal Secret in its
environment. That Secret must contain `MODAL_TOKEN_ID` and
`MODAL_TOKEN_SECRET`, and it should be attached only to functions that need
Modal control-plane access.

## Fetch Status

First identify the run context from the GitHub Actions summary, workflow logs, or
Expand Down Expand Up @@ -74,6 +96,23 @@ If the local environment cannot sync the full project environment, use the same
snippet with a Modal-only temporary environment by replacing `uv run python`
with `uv run --no-sync --with modal python`.

To discover deployed publication runs before choosing a run ID, call the stable
status app:

```bash
uv run --no-sync --with modal python - <<'PY'
import json
import modal

fn = modal.Function.from_name(
"policyengine-us-data-pipeline-status",
"list_deployed_pipeline_runs",
environment_name="main",
)
print(json.dumps(fn.remote(limit=25), indent=2))
PY
```

If using the HTTP endpoint, authenticate with Modal proxy auth headers. Do not
publish or paste proxy auth values into PRs, issues, logs, or docs.

Expand Down
11 changes: 11 additions & 0 deletions docs/engineering/skills/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ Use this skill whenever adding, moving, or reviewing tests.

## Quality Guards

When adding, moving, or reviewing tests, run the repository-wide lint target
before pushing:

```bash
make lint
```

Do not treat file-scoped Ruff commands as sufficient verification. CI runs
whole-repository lint and formatting checks, and test edits can fail formatting
under the repo-wide command even when a narrower local command appears clean.

Run this before opening or updating a PR:

```bash
Expand Down
6 changes: 6 additions & 0 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ def verify_runtime_seams() -> dict:
"uv.lock",
"modal_app/worker_script.py",
"modal_app/local_area.py",
"modal_app/pipeline_discovery.py",
"modal_app/pipeline_discovery_core.py",
"modal_app/pipeline_discovery_schema.py",
"modal_app/pipeline_status.py",
"modal_app/h5_test_harness.py",
"modal_app/step_manifests/specs.py",
Expand Down Expand Up @@ -592,6 +595,9 @@ def verify_runtime_seams() -> dict:
"modal_app.fixtures.h5_cases",
"modal_app.h5_test_harness",
"modal_app.local_area",
"modal_app.pipeline_discovery",
"modal_app.pipeline_discovery_core",
"modal_app.pipeline_discovery_schema",
"modal_app.pipeline_status",
"modal_app.remote_calibration_runner",
"modal_app.step_manifests.specs",
Expand Down
164 changes: 164 additions & 0 deletions modal_app/pipeline_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""Stable Modal app for discovering deployed US data pipeline runs."""

from __future__ import annotations

import asyncio
import os
import sys
from pathlib import Path

import modal

_baked = "/root/policyengine-us-data"
_local = str(Path(__file__).resolve().parent.parent)
for _p in (_baked, _local):
if _p not in sys.path:
sys.path.insert(0, _p)

from modal_app.images import cpu_image as image # noqa: E402
from modal_app.pipeline_discovery_core import ( # noqa: E402
RawRecord,
build_deployed_pipeline_runs_payload,
)
from modal_app.pipeline_discovery_schema import ( # noqa: E402
DeployedPipelineRunsPayloadDict,
ModalAppRecord,
)
from policyengine_us_data.utils.run_context import ( # noqa: E402
DEFAULT_MODAL_ENVIRONMENT,
)


DISCOVERY_APP_NAME = "policyengine-us-data-pipeline-status"
modal_token_secret = modal.Secret.from_name("modal-token")

app = modal.App(
os.environ.get("US_DATA_PIPELINE_DISCOVERY_APP_NAME") or DISCOVERY_APP_NAME
)
status_image = image.pip_install("fastapi")


def _modal_environment(explicit: str = "") -> str:
return (
explicit
or os.environ.get("US_DATA_MODAL_ENVIRONMENT")
or os.environ.get("MODAL_ENVIRONMENT")
or DEFAULT_MODAL_ENVIRONMENT
)


def _modal_state_name(api_pb2, state: int) -> str:
try:
return api_pb2.AppState.Name(state).removeprefix("APP_STATE_").lower()
except ValueError:
return str(state)


async def _list_modal_app_records_async(environment_name: str) -> list[ModalAppRecord]:
from modal.client import _Client
from modal_proto import api_pb2

client = await _Client.from_env()
resp = await client.stub.AppList(
api_pb2.AppListRequest(environment_name=environment_name)
)
return [
{
"app_id": item.app_id,
"name": item.name or item.description,
"description": item.description,
"state": _modal_state_name(api_pb2, item.state),
"tasks": item.n_running_tasks,
"created_at": item.created_at,
"stopped_at": item.stopped_at,
}
for item in resp.apps
]


def _list_modal_app_records(environment_name: str) -> list[ModalAppRecord]:
return asyncio.run(_list_modal_app_records_async(environment_name))


def _pipeline_status_lookup(
*,
app_name: str,
run_id: str,
environment_name: str,
) -> RawRecord:
status_fn = modal.Function.from_name(
app_name,
"get_pipeline_status",
environment_name=environment_name,
)
payload = status_fn.remote(run_id)
return payload if isinstance(payload, dict) else {}


def _build_deployed_pipeline_runs(
*,
limit: int | str | None,
status: str,
branch: str,
include_unreachable: bool,
modal_environment: str,
) -> DeployedPipelineRunsPayloadDict:
environment_name = _modal_environment(modal_environment)
app_records = _list_modal_app_records(environment_name)
payload = build_deployed_pipeline_runs_payload(
app_records,
lambda candidate: _pipeline_status_lookup(
app_name=candidate.app_name,
run_id=candidate.run_id,
environment_name=environment_name,
),
limit=limit,
status=status,
branch=branch,
include_unreachable=include_unreachable,
modal_environment=environment_name,
)
return payload.to_dict()


@app.function(image=image, timeout=180, secrets=[modal_token_secret])
def list_deployed_pipeline_runs(
limit: int = 25,
status: str = "",
branch: str = "",
include_unreachable: bool = True,
modal_environment: str = "",
) -> DeployedPipelineRunsPayloadDict:
"""Return deployed publication pipeline runs discovered from Modal app names."""

return _build_deployed_pipeline_runs(
limit=limit,
status=status,
branch=branch,
include_unreachable=include_unreachable,
modal_environment=modal_environment,
)


@app.function(image=status_image, timeout=180, secrets=[modal_token_secret])
@modal.fastapi_endpoint(
method="GET",
docs=False,
requires_proxy_auth=True,
)
def deployed_pipeline_runs_endpoint(
limit: int = 25,
status: str = "",
branch: str = "",
include_unreachable: bool = True,
modal_environment: str = "",
) -> DeployedPipelineRunsPayloadDict:
"""Protected HTTP endpoint for deployed publication pipeline discovery."""

return _build_deployed_pipeline_runs(
limit=limit,
status=status,
branch=branch,
include_unreachable=include_unreachable,
modal_environment=modal_environment,
)
Loading
Loading