From f39b2ada57fc985c2dfb052059f6f58ae3b1f43f Mon Sep 17 00:00:00 2001 From: Anthony Volk Date: Tue, 17 Mar 2026 23:21:44 +0100 Subject: [PATCH 1/2] Add stage-organized pipeline artifact uploads to HuggingFace MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New utility module (pipeline_artifacts.py) mirrors existing build artifacts to policyengine/policyengine-us-data-pipeline with a stage-organized folder structure. Each stage gets a manifest.json with SHA256 checksums and git provenance. Hook points added at 4 existing upload sites: - upload_completed_datasets.py: stage_0_raw + stage_1_base - remote_calibration_runner.py: stage_4_source_imputed + stage_6_weights - local_area.py: stage_7_local_area (manifest-only) All mirror uploads are additive and failure-tolerant — they never block the main pipeline. Co-Authored-By: Claude Opus 4.6 --- modal_app/local_area.py | 22 ++ modal_app/remote_calibration_runner.py | 63 +++++ .../storage/upload_completed_datasets.py | 21 ++ .../utils/pipeline_artifacts.py | 233 ++++++++++++++++++ tests/unit/test_pipeline_artifacts.py | 167 +++++++++++++ 5 files changed, 506 insertions(+) create mode 100644 policyengine_us_data/utils/pipeline_artifacts.py create mode 100644 tests/unit/test_pipeline_artifacts.py diff --git a/modal_app/local_area.py b/modal_app/local_area.py index 036f069a..bce9ef8b 100644 --- a/modal_app/local_area.py +++ b/modal_app/local_area.py @@ -608,6 +608,28 @@ def promote_publish(branch: str = "main", version: str = "", run_id: str = "") - if result.returncode != 0: raise RuntimeError(f"Promote failed: {result.stderr}") + # Mirror manifest-only to pipeline artifact repo (files + # are too large to double-upload; checksums are recorded). + try: + from policyengine_us_data.utils.pipeline_artifacts import ( + mirror_to_pipeline, + ) + + version_dir = Path(VOLUME_MOUNT) / version + h5_files = [ + version_dir / rp for rp in manifest["files"] if (version_dir / rp).exists() + ] + mirror_to_pipeline( + "stage_7_local_area", + h5_files, + manifest_only=True, + ) + except Exception: + print( + "WARNING: Failed to mirror stage_7 manifest to pipeline repo", + flush=True, + ) + return ( f"Successfully promoted version {version} with {len(manifest['files'])} files" ) diff --git a/modal_app/remote_calibration_runner.py b/modal_app/remote_calibration_runner.py index 30126e24..8a1ff28e 100644 --- a/modal_app/remote_calibration_runner.py +++ b/modal_app/remote_calibration_runner.py @@ -145,6 +145,50 @@ def _trigger_repository_dispatch(event_type: str = "calibration-updated"): return True +def _upload_source_imputed(lines): + """Parse SOURCE_IMPUTED_PATH from output and upload to HF.""" + source_path = None + for line in lines: + if "SOURCE_IMPUTED_PATH:" in line: + raw = line.split("SOURCE_IMPUTED_PATH:")[1].strip() + source_path = raw.split("]")[-1].strip() if "]" in raw else raw + if not source_path or not os.path.exists(source_path): + return + print(f"Uploading source-imputed dataset: {source_path}", flush=True) + rc, _ = _run_streaming( + [ + "uv", + "run", + "python", + "-c", + "from policyengine_us_data.utils.huggingface import upload; " + f"upload('{source_path}', " + "'policyengine/policyengine-us-data', " + "'calibration/" + "source_imputed_stratified_extended_cps.h5')", + ], + env=os.environ.copy(), + label="upload-source-imputed", + ) + if rc != 0: + print( + "WARNING: Failed to upload source-imputed dataset", + flush=True, + ) + else: + print("Source-imputed dataset uploaded to HF", flush=True) + + # Mirror to pipeline artifact repo. + from pathlib import Path + + from policyengine_us_data.utils.pipeline_artifacts import ( + mirror_to_pipeline, + ) + + mirror_to_pipeline( + "stage_4_source_imputed", + [Path(source_path)], + ) def _fit_weights_impl( branch: str, epochs: int, @@ -1046,6 +1090,25 @@ def main( prefix=prefix, ) + # Mirror to pipeline artifact repo. + from pathlib import Path + + from policyengine_us_data.utils.pipeline_artifacts import ( + mirror_to_pipeline, + ) + + stage_6_files = [Path(output)] + if result.get("geography"): + stage_6_files.append(Path(geography_output)) + for name in [ + cal_log_output, + log_output, + config_output, + ]: + if os.path.exists(name): + stage_6_files.append(Path(name)) + mirror_to_pipeline("stage_6_weights", stage_6_files) + if trigger_publish: _trigger_repository_dispatch() diff --git a/policyengine_us_data/storage/upload_completed_datasets.py b/policyengine_us_data/storage/upload_completed_datasets.py index a21a94b3..3dee4c89 100644 --- a/policyengine_us_data/storage/upload_completed_datasets.py +++ b/policyengine_us_data/storage/upload_completed_datasets.py @@ -5,6 +5,7 @@ from policyengine_us_data.datasets import EnhancedCPS_2024 from policyengine_us_data.datasets.cps.cps import CPS_2024 +from policyengine_us_data.datasets.cps.enhanced_cps import clone_diagnostics_path from policyengine_us_data.storage import STORAGE_FOLDER from policyengine_us_data.utils.data_upload import upload_data_files from policyengine_us_data.utils.dataset_validation import ( @@ -224,6 +225,26 @@ def upload_datasets(require_enhanced_cps: bool = True): gcs_bucket_name="policyengine-us-data", ) + # Mirror to pipeline artifact repo (stage-organized). + from policyengine_us_data.utils.pipeline_artifacts import ( + mirror_to_pipeline, + ) + + run_id = mirror_to_pipeline( + "stage_0_raw", + [STORAGE_FOLDER / "calibration" / "policy_data.db"], + ) + mirror_to_pipeline( + "stage_1_base", + [ + EnhancedCPS_2024.file_path, + clone_diagnostics_path(EnhancedCPS_2024.file_path), + CPS_2024.file_path, + STORAGE_FOLDER / "small_enhanced_cps_2024.h5", + ], + run_id=run_id, + ) + def validate_all_datasets(): """Validate all main datasets in storage. Called by `make validate-data`.""" diff --git a/policyengine_us_data/utils/pipeline_artifacts.py b/policyengine_us_data/utils/pipeline_artifacts.py new file mode 100644 index 00000000..c7a613ee --- /dev/null +++ b/policyengine_us_data/utils/pipeline_artifacts.py @@ -0,0 +1,233 @@ +"""Upload pipeline artifacts to the stage-organized pipeline repo. + +Mirrors existing build artifacts to +``policyengine/policyengine-us-data-pipeline`` with a folder +structure that groups files by pipeline stage and timestamps +each run. All operations are additive — the production repo +(``policyengine/policyengine-us-data``) is never modified. + +Failures are logged but never raised so that mirror uploads +cannot block the main pipeline. +""" + +import json +import logging +import os +import subprocess +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Dict, List, Optional + +from huggingface_hub import CommitOperationAdd, HfApi + +from policyengine_us_data.utils.data_upload import ( + hf_create_commit_with_retry, +) +from policyengine_us_data.utils.manifest import ( + compute_file_checksum, +) + +logger = logging.getLogger(__name__) + +PIPELINE_REPO = "policyengine/policyengine-us-data-pipeline" +PIPELINE_REPO_TYPE = "model" + + +def get_pipeline_run_id() -> str: + """Return a UTC timestamp identifier for this pipeline run. + + Checks the ``PIPELINE_RUN_ID`` environment variable first so + that a single identifier can be shared across processes (e.g. + separate Modal containers). If unset, generates a new + timestamp. + + Returns: + String like ``'20260317T143000Z'``. + """ + env_id = os.environ.get("PIPELINE_RUN_ID") + if env_id: + return env_id + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def _get_git_info() -> Dict[str, object]: + """Capture lightweight git provenance. + + Reimplemented here (~15 lines) instead of importing + ``unified_calibration.get_git_provenance`` to avoid pulling + in torch / l0-python and other heavy calibration deps. + """ + info: Dict[str, object] = { + "git_commit": None, + "git_branch": None, + "git_dirty": None, + } + try: + info["git_commit"] = ( + subprocess.check_output( + ["git", "rev-parse", "HEAD"], + stderr=subprocess.DEVNULL, + ) + .decode() + .strip() + ) + info["git_branch"] = ( + subprocess.check_output( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], + stderr=subprocess.DEVNULL, + ) + .decode() + .strip() + ) + status = ( + subprocess.check_output( + ["git", "status", "--porcelain"], + stderr=subprocess.DEVNULL, + ) + .decode() + .strip() + ) + info["git_dirty"] = len(status) > 0 + except Exception: + pass + return info + + +def generate_stage_manifest( + stage_name: str, + run_id: str, + files: List[Path], +) -> Dict: + """Build a manifest dict for a pipeline stage. + + Args: + stage_name: Stage identifier (e.g. ``'stage_1_base'``). + run_id: Pipeline run identifier (UTC timestamp). + files: Paths to the artifact files. + + Returns: + Manifest dictionary matching the schema documented in + ``pipeline_improvements.md``. + """ + manifest: Dict[str, object] = { + "stage": stage_name, + "run_id": run_id, + "created_at": datetime.now(timezone.utc).isoformat(), + **_get_git_info(), + "files": {}, + } + for f in files: + p = Path(f) + if not p.exists(): + logger.warning("Skipping missing file for manifest: %s", p) + continue + manifest["files"][p.name] = { + "sha256": compute_file_checksum(p), + "size_bytes": p.stat().st_size, + } + return manifest + + +def mirror_to_pipeline( + stage_name: str, + files: List[Path], + run_id: Optional[str] = None, + manifest_only: bool = False, + repo: str = PIPELINE_REPO, +) -> str: + """Upload artifacts and manifest to the pipeline repo. + + This is the single-call interface used at every hook point. + It generates a manifest, optionally uploads the files + themselves, and commits everything to the pipeline repo + under ``{run_id}/{stage_name}/``. + + Failures are logged as warnings and never propagated so + that mirror uploads cannot block the main pipeline. + + Args: + stage_name: Stage identifier (e.g. ``'stage_6_weights'``). + files: Paths to the artifact files. + run_id: Pipeline run identifier. If ``None``, one is + generated via :func:`get_pipeline_run_id`. + manifest_only: If ``True``, upload only the manifest + (with checksums) but not the actual files. Used + for stage_7 where files are too large to + double-upload. + repo: HuggingFace repo ID. + + Returns: + The ``run_id`` that was used (so callers can pass it + to subsequent stages for consistency). + """ + if run_id is None: + run_id = get_pipeline_run_id() + + try: + manifest = generate_stage_manifest(stage_name, run_id, files) + + with tempfile.NamedTemporaryFile( + mode="w", + suffix=".json", + delete=False, + ) as tmp: + json.dump(manifest, tmp, indent=2) + manifest_path = tmp.name + + prefix = f"{run_id}/{stage_name}" + operations = [ + CommitOperationAdd( + path_in_repo=f"{prefix}/manifest.json", + path_or_fileobj=manifest_path, + ) + ] + + if not manifest_only: + for f in files: + p = Path(f) + if not p.exists(): + logger.warning("Skipping missing file: %s", p) + continue + operations.append( + CommitOperationAdd( + path_in_repo=f"{prefix}/{p.name}", + path_or_fileobj=str(p), + ) + ) + + token = os.environ.get("HUGGING_FACE_TOKEN") + api = HfApi() + + hf_create_commit_with_retry( + api=api, + operations=operations, + repo_id=repo, + repo_type=PIPELINE_REPO_TYPE, + token=token, + commit_message=(f"Upload {stage_name} artifacts for run {run_id}"), + ) + + n_files = len(operations) - 1 # exclude manifest + mode = "manifest-only" if manifest_only else "with files" + logger.info( + "Mirrored %s to pipeline repo (%s, %d files)", + stage_name, + mode, + n_files, + ) + + # Clean up temp manifest file + try: + os.unlink(manifest_path) + except OSError: + pass + + except Exception: + logger.warning( + "Failed to mirror %s to pipeline repo — continuing without blocking", + stage_name, + exc_info=True, + ) + + return run_id diff --git a/tests/unit/test_pipeline_artifacts.py b/tests/unit/test_pipeline_artifacts.py new file mode 100644 index 00000000..6508abca --- /dev/null +++ b/tests/unit/test_pipeline_artifacts.py @@ -0,0 +1,167 @@ +"""Tests for pipeline artifact utilities.""" + +import re +from pathlib import Path +from unittest.mock import patch + +from policyengine_us_data.utils.pipeline_artifacts import ( + PIPELINE_REPO, + PIPELINE_REPO_TYPE, + generate_stage_manifest, + get_pipeline_run_id, + mirror_to_pipeline, +) + + +class TestGetPipelineRunId: + def test_format(self): + run_id = get_pipeline_run_id() + assert re.match(r"^\d{8}T\d{6}Z$", run_id) + + def test_env_var_override(self, monkeypatch): + monkeypatch.setenv("PIPELINE_RUN_ID", "20260101T000000Z") + assert get_pipeline_run_id() == "20260101T000000Z" + + def test_env_var_not_set(self, monkeypatch): + monkeypatch.delenv("PIPELINE_RUN_ID", raising=False) + run_id = get_pipeline_run_id() + assert re.match(r"^\d{8}T\d{6}Z$", run_id) + + +class TestGenerateStageManifest: + def test_schema(self, tmp_path): + f1 = tmp_path / "data.h5" + f1.write_bytes(b"fake h5 content") + f2 = tmp_path / "weights.npy" + f2.write_bytes(b"fake weights") + + manifest = generate_stage_manifest("stage_1_base", "20260317T143000Z", [f1, f2]) + + assert manifest["stage"] == "stage_1_base" + assert manifest["run_id"] == "20260317T143000Z" + assert "created_at" in manifest + assert "git_commit" in manifest + assert "git_branch" in manifest + assert "git_dirty" in manifest + assert "data.h5" in manifest["files"] + assert "weights.npy" in manifest["files"] + + def test_file_checksums(self, tmp_path): + f1 = tmp_path / "test.bin" + f1.write_bytes(b"deterministic content") + + manifest = generate_stage_manifest("stage_0_raw", "20260317T143000Z", [f1]) + + entry = manifest["files"]["test.bin"] + assert "sha256" in entry + assert len(entry["sha256"]) == 64 + assert entry["size_bytes"] == len(b"deterministic content") + + def test_missing_file_skipped(self, tmp_path): + existing = tmp_path / "exists.h5" + existing.write_bytes(b"data") + missing = tmp_path / "missing.h5" + + manifest = generate_stage_manifest( + "stage_1_base", + "20260317T143000Z", + [existing, missing], + ) + + assert "exists.h5" in manifest["files"] + assert "missing.h5" not in manifest["files"] + + def test_empty_files_list(self): + manifest = generate_stage_manifest("stage_0_raw", "20260317T143000Z", []) + assert manifest["files"] == {} + + +class TestMirrorToPipeline: + @patch("policyengine_us_data.utils.pipeline_artifacts.hf_create_commit_with_retry") + def test_uploads_files_and_manifest(self, mock_commit, tmp_path): + f1 = tmp_path / "cps_2024.h5" + f1.write_bytes(b"cps data") + + run_id = mirror_to_pipeline( + "stage_1_base", + [f1], + run_id="20260317T143000Z", + ) + + assert run_id == "20260317T143000Z" + mock_commit.assert_called_once() + call_kwargs = mock_commit.call_args + ops = call_kwargs.kwargs.get("operations", call_kwargs[1].get("operations")) + + paths = [op.path_in_repo for op in ops] + assert any("manifest.json" in p for p in paths) + assert any("cps_2024.h5" in p for p in paths) + + assert ( + call_kwargs.kwargs.get("repo_id", call_kwargs[1].get("repo_id")) + == PIPELINE_REPO + ) + assert ( + call_kwargs.kwargs.get("repo_type", call_kwargs[1].get("repo_type")) + == PIPELINE_REPO_TYPE + ) + + @patch("policyengine_us_data.utils.pipeline_artifacts.hf_create_commit_with_retry") + def test_manifest_only(self, mock_commit, tmp_path): + f1 = tmp_path / "SC.h5" + f1.write_bytes(b"state data") + + mirror_to_pipeline( + "stage_7_local_area", + [f1], + run_id="20260317T143000Z", + manifest_only=True, + ) + + call_kwargs = mock_commit.call_args + ops = call_kwargs.kwargs.get("operations", call_kwargs[1].get("operations")) + + paths = [op.path_in_repo for op in ops] + assert len(ops) == 1 + assert "manifest.json" in paths[0] + + @patch("policyengine_us_data.utils.pipeline_artifacts.hf_create_commit_with_retry") + def test_returns_run_id_when_none(self, mock_commit, tmp_path): + f1 = tmp_path / "test.bin" + f1.write_bytes(b"data") + + run_id = mirror_to_pipeline("stage_0_raw", [f1]) + assert re.match(r"^\d{8}T\d{6}Z$", run_id) + + @patch( + "policyengine_us_data.utils.pipeline_artifacts.hf_create_commit_with_retry", + side_effect=Exception("No token"), + ) + def test_error_does_not_raise(self, mock_commit, tmp_path): + f1 = tmp_path / "test.bin" + f1.write_bytes(b"data") + + # Should not raise. + run_id = mirror_to_pipeline( + "stage_0_raw", + [f1], + run_id="20260317T143000Z", + ) + assert run_id == "20260317T143000Z" + + @patch("policyengine_us_data.utils.pipeline_artifacts.hf_create_commit_with_retry") + def test_folder_structure(self, mock_commit, tmp_path): + f1 = tmp_path / "weights.npy" + f1.write_bytes(b"weights") + + mirror_to_pipeline( + "stage_6_weights", + [f1], + run_id="20260317T143000Z", + ) + + call_kwargs = mock_commit.call_args + ops = call_kwargs.kwargs.get("operations", call_kwargs[1].get("operations")) + + for op in ops: + assert op.path_in_repo.startswith("20260317T143000Z/stage_6_weights/") From 5cb89127bc0bce5e6a3a2bd6c9a84d41f8c82e92 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Thu, 9 Apr 2026 13:20:39 -0400 Subject: [PATCH 2/2] Revive pipeline artifact uploads on current main --- changelog.d/617.fixed | 1 + modal_app/data_build.py | 29 +++++++++++++ modal_app/remote_calibration_runner.py | 46 -------------------- tests/unit/test_modal_data_build.py | 59 ++++++++++++++++++++++++++ tests/unit/test_pipeline_artifacts.py | 1 - 5 files changed, 89 insertions(+), 47 deletions(-) create mode 100644 changelog.d/617.fixed diff --git a/changelog.d/617.fixed b/changelog.d/617.fixed new file mode 100644 index 00000000..a70ae3b0 --- /dev/null +++ b/changelog.d/617.fixed @@ -0,0 +1 @@ +Added stage-organized Hugging Face pipeline artifact mirroring for base datasets, calibration outputs, and local-area publish manifests. diff --git a/modal_app/data_build.py b/modal_app/data_build.py index 18453e28..6fd67156 100644 --- a/modal_app/data_build.py +++ b/modal_app/data_build.py @@ -261,6 +261,30 @@ def validate_and_maybe_upload_datasets( ) +def mirror_source_imputed_artifact( + artifacts_dir: Path, + *, + run_id: str = "", +) -> None: + source_imputed_path = artifacts_dir / "source_imputed_stratified_extended_cps.h5" + if not source_imputed_path.exists(): + return + + from policyengine_us_data.utils.pipeline_artifacts import ( + mirror_to_pipeline, + ) + + mirror_kwargs = {} + if run_id: + mirror_kwargs["run_id"] = run_id + + mirror_to_pipeline( + "stage_4_source_imputed", + [source_imputed_path], + **mirror_kwargs, + ) + + def run_script_with_checkpoint( script_path: str, output_files: str | list[str], @@ -413,6 +437,9 @@ def build_datasets( checkpoint_volume.commit() env = os.environ.copy() + if run_id: + env["PIPELINE_RUN_ID"] = run_id + os.environ["PIPELINE_RUN_ID"] = run_id # Open persistent build log with provenance header commit = get_current_commit() @@ -646,6 +673,8 @@ def build_datasets( if si.exists(): shutil.copy2(si, artifacts_dir / "source_imputed_stratified_extended_cps.h5") + mirror_source_imputed_artifact(artifacts_dir, run_id=run_id) + shutil.copy2( "policyengine_us_data/storage/calibration/policy_data.db", artifacts_dir / "policy_data.db", diff --git a/modal_app/remote_calibration_runner.py b/modal_app/remote_calibration_runner.py index 8a1ff28e..446262b1 100644 --- a/modal_app/remote_calibration_runner.py +++ b/modal_app/remote_calibration_runner.py @@ -145,50 +145,6 @@ def _trigger_repository_dispatch(event_type: str = "calibration-updated"): return True -def _upload_source_imputed(lines): - """Parse SOURCE_IMPUTED_PATH from output and upload to HF.""" - source_path = None - for line in lines: - if "SOURCE_IMPUTED_PATH:" in line: - raw = line.split("SOURCE_IMPUTED_PATH:")[1].strip() - source_path = raw.split("]")[-1].strip() if "]" in raw else raw - if not source_path or not os.path.exists(source_path): - return - print(f"Uploading source-imputed dataset: {source_path}", flush=True) - rc, _ = _run_streaming( - [ - "uv", - "run", - "python", - "-c", - "from policyengine_us_data.utils.huggingface import upload; " - f"upload('{source_path}', " - "'policyengine/policyengine-us-data', " - "'calibration/" - "source_imputed_stratified_extended_cps.h5')", - ], - env=os.environ.copy(), - label="upload-source-imputed", - ) - if rc != 0: - print( - "WARNING: Failed to upload source-imputed dataset", - flush=True, - ) - else: - print("Source-imputed dataset uploaded to HF", flush=True) - - # Mirror to pipeline artifact repo. - from pathlib import Path - - from policyengine_us_data.utils.pipeline_artifacts import ( - mirror_to_pipeline, - ) - - mirror_to_pipeline( - "stage_4_source_imputed", - [Path(source_path)], - ) def _fit_weights_impl( branch: str, epochs: int, @@ -1098,8 +1054,6 @@ def main( ) stage_6_files = [Path(output)] - if result.get("geography"): - stage_6_files.append(Path(geography_output)) for name in [ cal_log_output, log_output, diff --git a/tests/unit/test_modal_data_build.py b/tests/unit/test_modal_data_build.py index 33385094..d7ef8f32 100644 --- a/tests/unit/test_modal_data_build.py +++ b/tests/unit/test_modal_data_build.py @@ -1,5 +1,6 @@ import importlib import sys +from pathlib import Path from types import ModuleType, SimpleNamespace @@ -88,3 +89,61 @@ def fake_run_script(script_path, args=None, env=None, log_file=None): {"TEST_ENV": "1"}, ), ] + + +def test_mirror_source_imputed_artifact_uploads_when_present(monkeypatch, tmp_path): + data_build = _load_data_build_module() + artifacts_dir = tmp_path / "artifacts" + artifacts_dir.mkdir() + source_imputed = artifacts_dir / "source_imputed_stratified_extended_cps.h5" + source_imputed.write_text("placeholder") + + calls = [] + + def fake_mirror(stage_name, files, **kwargs): + calls.append((stage_name, files, kwargs)) + + fake_pipeline_artifacts = ModuleType( + "policyengine_us_data.utils.pipeline_artifacts" + ) + fake_pipeline_artifacts.mirror_to_pipeline = fake_mirror + monkeypatch.setitem( + sys.modules, + "policyengine_us_data.utils.pipeline_artifacts", + fake_pipeline_artifacts, + ) + + data_build.mirror_source_imputed_artifact(artifacts_dir, run_id="run-123") + + assert calls == [ + ( + "stage_4_source_imputed", + [source_imputed], + {"run_id": "run-123"}, + ) + ] + + +def test_mirror_source_imputed_artifact_skips_when_missing(monkeypatch, tmp_path): + data_build = _load_data_build_module() + artifacts_dir = tmp_path / "artifacts" + artifacts_dir.mkdir() + + calls = [] + + def fake_mirror(stage_name, files, **kwargs): + calls.append((stage_name, files, kwargs)) + + fake_pipeline_artifacts = ModuleType( + "policyengine_us_data.utils.pipeline_artifacts" + ) + fake_pipeline_artifacts.mirror_to_pipeline = fake_mirror + monkeypatch.setitem( + sys.modules, + "policyengine_us_data.utils.pipeline_artifacts", + fake_pipeline_artifacts, + ) + + data_build.mirror_source_imputed_artifact(Path(artifacts_dir)) + + assert calls == [] diff --git a/tests/unit/test_pipeline_artifacts.py b/tests/unit/test_pipeline_artifacts.py index 6508abca..05493bac 100644 --- a/tests/unit/test_pipeline_artifacts.py +++ b/tests/unit/test_pipeline_artifacts.py @@ -1,7 +1,6 @@ """Tests for pipeline artifact utilities.""" import re -from pathlib import Path from unittest.mock import patch from policyengine_us_data.utils.pipeline_artifacts import (