Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/617.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added stage-organized Hugging Face pipeline artifact mirroring for base datasets, calibration outputs, and local-area publish manifests.
29 changes: 29 additions & 0 deletions modal_app/data_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,30 @@ def validate_and_maybe_upload_datasets(
)


def mirror_source_imputed_artifact(
artifacts_dir: Path,
*,
run_id: str = "",
) -> None:
source_imputed_path = artifacts_dir / "source_imputed_stratified_extended_cps.h5"
if not source_imputed_path.exists():
return

from policyengine_us_data.utils.pipeline_artifacts import (
mirror_to_pipeline,
)

mirror_kwargs = {}
if run_id:
mirror_kwargs["run_id"] = run_id

mirror_to_pipeline(
"stage_4_source_imputed",
[source_imputed_path],
**mirror_kwargs,
)


def run_script_with_checkpoint(
script_path: str,
output_files: str | list[str],
Expand Down Expand Up @@ -413,6 +437,9 @@ def build_datasets(
checkpoint_volume.commit()

env = os.environ.copy()
if run_id:
env["PIPELINE_RUN_ID"] = run_id
os.environ["PIPELINE_RUN_ID"] = run_id

# Open persistent build log with provenance header
commit = get_current_commit()
Expand Down Expand Up @@ -646,6 +673,8 @@ def build_datasets(
if si.exists():
shutil.copy2(si, artifacts_dir / "source_imputed_stratified_extended_cps.h5")

mirror_source_imputed_artifact(artifacts_dir, run_id=run_id)

shutil.copy2(
"policyengine_us_data/storage/calibration/policy_data.db",
artifacts_dir / "policy_data.db",
Expand Down
22 changes: 22 additions & 0 deletions modal_app/local_area.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,28 @@ def promote_publish(branch: str = "main", version: str = "", run_id: str = "") -
if result.returncode != 0:
raise RuntimeError(f"Promote failed: {result.stderr}")

# Mirror manifest-only to pipeline artifact repo (files
# are too large to double-upload; checksums are recorded).
try:
from policyengine_us_data.utils.pipeline_artifacts import (
mirror_to_pipeline,
)

version_dir = Path(VOLUME_MOUNT) / version
h5_files = [
version_dir / rp for rp in manifest["files"] if (version_dir / rp).exists()
]
mirror_to_pipeline(
"stage_7_local_area",
h5_files,
manifest_only=True,
)
except Exception:
print(
"WARNING: Failed to mirror stage_7 manifest to pipeline repo",
flush=True,
)

return (
f"Successfully promoted version {version} with {len(manifest['files'])} files"
)
Expand Down
17 changes: 17 additions & 0 deletions modal_app/remote_calibration_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,23 @@ def main(
prefix=prefix,
)

# Mirror to pipeline artifact repo.
from pathlib import Path

from policyengine_us_data.utils.pipeline_artifacts import (
mirror_to_pipeline,
)

stage_6_files = [Path(output)]
for name in [
cal_log_output,
log_output,
config_output,
]:
if os.path.exists(name):
stage_6_files.append(Path(name))
mirror_to_pipeline("stage_6_weights", stage_6_files)

if trigger_publish:
_trigger_repository_dispatch()

Expand Down
21 changes: 21 additions & 0 deletions policyengine_us_data/storage/upload_completed_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from policyengine_us_data.datasets import EnhancedCPS_2024
from policyengine_us_data.datasets.cps.cps import CPS_2024
from policyengine_us_data.datasets.cps.enhanced_cps import clone_diagnostics_path
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.utils.data_upload import upload_data_files
from policyengine_us_data.utils.dataset_validation import (
Expand Down Expand Up @@ -224,6 +225,26 @@ def upload_datasets(require_enhanced_cps: bool = True):
gcs_bucket_name="policyengine-us-data",
)

# Mirror to pipeline artifact repo (stage-organized).
from policyengine_us_data.utils.pipeline_artifacts import (
mirror_to_pipeline,
)

run_id = mirror_to_pipeline(
"stage_0_raw",
[STORAGE_FOLDER / "calibration" / "policy_data.db"],
)
mirror_to_pipeline(
"stage_1_base",
[
EnhancedCPS_2024.file_path,
clone_diagnostics_path(EnhancedCPS_2024.file_path),
CPS_2024.file_path,
STORAGE_FOLDER / "small_enhanced_cps_2024.h5",
],
run_id=run_id,
)


def validate_all_datasets():
"""Validate all main datasets in storage. Called by `make validate-data`."""
Expand Down
233 changes: 233 additions & 0 deletions policyengine_us_data/utils/pipeline_artifacts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""Upload pipeline artifacts to the stage-organized pipeline repo.

Mirrors existing build artifacts to
``policyengine/policyengine-us-data-pipeline`` with a folder
structure that groups files by pipeline stage and timestamps
each run. All operations are additive — the production repo
(``policyengine/policyengine-us-data``) is never modified.

Failures are logged but never raised so that mirror uploads
cannot block the main pipeline.
"""

import json
import logging
import os
import subprocess
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional

from huggingface_hub import CommitOperationAdd, HfApi

from policyengine_us_data.utils.data_upload import (
hf_create_commit_with_retry,
)
from policyengine_us_data.utils.manifest import (
compute_file_checksum,
)

logger = logging.getLogger(__name__)

PIPELINE_REPO = "policyengine/policyengine-us-data-pipeline"
PIPELINE_REPO_TYPE = "model"


def get_pipeline_run_id() -> str:
"""Return a UTC timestamp identifier for this pipeline run.

Checks the ``PIPELINE_RUN_ID`` environment variable first so
that a single identifier can be shared across processes (e.g.
separate Modal containers). If unset, generates a new
timestamp.

Returns:
String like ``'20260317T143000Z'``.
"""
env_id = os.environ.get("PIPELINE_RUN_ID")
if env_id:
return env_id
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")


def _get_git_info() -> Dict[str, object]:
"""Capture lightweight git provenance.

Reimplemented here (~15 lines) instead of importing
``unified_calibration.get_git_provenance`` to avoid pulling
in torch / l0-python and other heavy calibration deps.
"""
info: Dict[str, object] = {
"git_commit": None,
"git_branch": None,
"git_dirty": None,
}
try:
info["git_commit"] = (
subprocess.check_output(
["git", "rev-parse", "HEAD"],
stderr=subprocess.DEVNULL,
)
.decode()
.strip()
)
info["git_branch"] = (
subprocess.check_output(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
stderr=subprocess.DEVNULL,
)
.decode()
.strip()
)
status = (
subprocess.check_output(
["git", "status", "--porcelain"],
stderr=subprocess.DEVNULL,
)
.decode()
.strip()
)
info["git_dirty"] = len(status) > 0
except Exception:
pass
return info


def generate_stage_manifest(
stage_name: str,
run_id: str,
files: List[Path],
) -> Dict:
"""Build a manifest dict for a pipeline stage.

Args:
stage_name: Stage identifier (e.g. ``'stage_1_base'``).
run_id: Pipeline run identifier (UTC timestamp).
files: Paths to the artifact files.

Returns:
Manifest dictionary matching the schema documented in
``pipeline_improvements.md``.
"""
manifest: Dict[str, object] = {
"stage": stage_name,
"run_id": run_id,
"created_at": datetime.now(timezone.utc).isoformat(),
**_get_git_info(),
"files": {},
}
for f in files:
p = Path(f)
if not p.exists():
logger.warning("Skipping missing file for manifest: %s", p)
continue
manifest["files"][p.name] = {
"sha256": compute_file_checksum(p),
"size_bytes": p.stat().st_size,
}
return manifest


def mirror_to_pipeline(
stage_name: str,
files: List[Path],
run_id: Optional[str] = None,
manifest_only: bool = False,
repo: str = PIPELINE_REPO,
) -> str:
"""Upload artifacts and manifest to the pipeline repo.

This is the single-call interface used at every hook point.
It generates a manifest, optionally uploads the files
themselves, and commits everything to the pipeline repo
under ``{run_id}/{stage_name}/``.

Failures are logged as warnings and never propagated so
that mirror uploads cannot block the main pipeline.

Args:
stage_name: Stage identifier (e.g. ``'stage_6_weights'``).
files: Paths to the artifact files.
run_id: Pipeline run identifier. If ``None``, one is
generated via :func:`get_pipeline_run_id`.
manifest_only: If ``True``, upload only the manifest
(with checksums) but not the actual files. Used
for stage_7 where files are too large to
double-upload.
repo: HuggingFace repo ID.

Returns:
The ``run_id`` that was used (so callers can pass it
to subsequent stages for consistency).
"""
if run_id is None:
run_id = get_pipeline_run_id()

try:
manifest = generate_stage_manifest(stage_name, run_id, files)

with tempfile.NamedTemporaryFile(
mode="w",
suffix=".json",
delete=False,
) as tmp:
json.dump(manifest, tmp, indent=2)
manifest_path = tmp.name

prefix = f"{run_id}/{stage_name}"
operations = [
CommitOperationAdd(
path_in_repo=f"{prefix}/manifest.json",
path_or_fileobj=manifest_path,
)
]

if not manifest_only:
for f in files:
p = Path(f)
if not p.exists():
logger.warning("Skipping missing file: %s", p)
continue
operations.append(
CommitOperationAdd(
path_in_repo=f"{prefix}/{p.name}",
path_or_fileobj=str(p),
)
)

token = os.environ.get("HUGGING_FACE_TOKEN")
api = HfApi()

hf_create_commit_with_retry(
api=api,
operations=operations,
repo_id=repo,
repo_type=PIPELINE_REPO_TYPE,
token=token,
commit_message=(f"Upload {stage_name} artifacts for run {run_id}"),
)

n_files = len(operations) - 1 # exclude manifest
mode = "manifest-only" if manifest_only else "with files"
logger.info(
"Mirrored %s to pipeline repo (%s, %d files)",
stage_name,
mode,
n_files,
)

# Clean up temp manifest file
try:
os.unlink(manifest_path)
except OSError:
pass

except Exception:
logger.warning(
"Failed to mirror %s to pipeline repo — continuing without blocking",
stage_name,
exc_info=True,
)

return run_id
Loading
Loading