Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1041.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stage 2 calibration package manifests now track the explicit target config identity and contract artifact path.
1 change: 1 addition & 0 deletions changelog.d/1065.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stage 2 calibration package construction now resolves its inputs and outputs through run-scoped artifact bundles.
1 change: 1 addition & 0 deletions changelog.d/1073.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add typed Stage 2 calibration package payload reader and writer helpers.
1 change: 1 addition & 0 deletions changelog.d/1083.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Stage 2 target catalog selection artifacts for calibration packages.
188 changes: 186 additions & 2 deletions docs/pipeline_map.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -805,16 +805,41 @@ stages:
label: run_calibration()
description: 'Build phase: resolve targets and constraints, assemble clone values, and package the sparse calibration matrix'
node_ids:
- stage2_input_bundle
- stage2_build_context
- stage2_target_config_identity
- stage2_target_catalog_load
- stage2_target_catalog_reader
- stage2_target_selection_policy
- stage2_target_selection_result
- target_resolve
- stage2_target_config_apply
- target_uprate
- geo_build
- constraint_resolve
- state_precomp
- clone_assembly
- takeup_rerand
- sparse_build
- build_matrix
- build_matrix_chunked
- stage2_artifact_specs
- stage2_payload_boundary
- stage2_payload_writer
- stage2_payload_reader
- stage2_calibration_package_writer
- out_pkg
- out_metadata
- out_targets
- out_target_facets
- stage2_calibration_package_contract_writer
- out_contract
- stage2_calibration_package_contract_validator
extra_nodes:
- id: in_stage1_contract_s2
label: dataset_build_output.json
node_type: artifact
description: Stage 1 handoff contract preferred for Stage 2 input resolution
- id: in_cps_s5
label: source_imputed_stratified_extended_cps.h5
node_type: artifact
Expand Down Expand Up @@ -859,6 +884,22 @@ stages:
label: calibration_package.pkl
node_type: artifact
description: X_sparse CSR matrix, targets_df, initial_weights, metadata
- id: out_metadata
label: calibration_package_meta.json
node_type: artifact
description: Metadata sidecar generated from the typed package payload and Stage 2 contract
- id: out_targets
label: calibration_targets.jsonl
node_type: artifact
description: Row-level selected target metadata with stable target_id and target_index join keys
- id: out_target_facets
label: calibration_target_facets.json
node_type: artifact
description: Compact target counts by variable, geography level, target name, period, and constraint key
- id: out_contract
label: calibration_package_contract.json
node_type: artifact
description: Stage 2 package handoff contract written next to calibration_package.pkl
- id: util_sql
label: sqlalchemy
node_type: utility
Expand All @@ -876,20 +917,82 @@ stages:
node_type: utility
description: CSR/COO matrix construction
edges:
- source: in_stage1_contract_s2
target: stage2_input_bundle
edge_type: data_flow
label: preferred input contract
- source: in_cps_s5
target: stage2_input_bundle
edge_type: data_flow
label: compatibility fallback
- source: in_db_s5
target: stage2_input_bundle
edge_type: external_source
label: compatibility fallback
- source: stage2_input_bundle
target: stage2_build_context
edge_type: data_flow
label: validated inputs
- source: stage2_artifact_specs
target: stage2_build_context
edge_type: uses_utility
label: output bundle paths
- source: stage2_build_context
target: target_resolve
edge_type: data_flow
label: dataset and database paths
- source: stage2_build_context
target: stage2_calibration_package_writer
edge_type: uses_utility
label: package output bundle
- source: in_db_s5
target: target_resolve
edge_type: external_source
label: SQL targets
- source: in_config_s5
target: target_resolve
target: stage2_target_config_identity
edge_type: data_flow
label: config file
- source: stage2_target_config_identity
target: stage2_target_catalog_load
edge_type: data_flow
label: resolved path and checksum
- source: in_db_s5
target: stage2_target_catalog_reader
edge_type: external_source
label: active and disabled targets
- source: stage2_target_catalog_reader
target: stage2_target_selection_policy
edge_type: data_flow
label: target catalog
- source: stage2_target_catalog_load
target: stage2_target_selection_policy
edge_type: data_flow
label: include/exclude rules
- source: stage2_target_selection_policy
target: stage2_target_selection_result
edge_type: data_flow
label: selected targets
- source: stage2_target_selection_result
target: build_matrix
edge_type: data_flow
label: include list
label: matrix target order
- source: stage2_target_selection_result
target: build_matrix_chunked
edge_type: data_flow
label: matrix target order
- source: stage2_target_catalog_load
target: stage2_target_config_apply
edge_type: data_flow
label: include/exclude rules
- source: target_resolve
target: stage2_target_config_apply
edge_type: data_flow
label: candidate targets
- source: stage2_target_config_apply
target: target_uprate
edge_type: data_flow
label: selected targets
- source: target_uprate
target: geo_build
edge_type: data_flow
Expand Down Expand Up @@ -917,8 +1020,89 @@ stages:
target: sparse_build
edge_type: data_flow
- source: sparse_build
target: build_matrix
edge_type: uses_library
label: non-chunked path
- source: sparse_build
target: build_matrix_chunked
edge_type: uses_library
label: chunked path
- source: build_matrix
target: stage2_payload_boundary
edge_type: data_flow
- source: build_matrix_chunked
target: stage2_payload_boundary
edge_type: data_flow
- source: stage2_payload_boundary
target: stage2_payload_writer
edge_type: data_flow
label: typed pickle payload
- source: stage2_payload_writer
target: stage2_calibration_package_writer
edge_type: uses_library
label: pickle write
- source: stage2_artifact_specs
target: stage2_calibration_package_writer
edge_type: uses_utility
label: package path
- source: stage2_calibration_package_writer
target: out_pkg
edge_type: produces_artifact
- source: stage2_payload_writer
target: out_metadata
edge_type: produces_artifact
label: sidecar metadata
- source: stage2_target_selection_result
target: out_targets
edge_type: produces_artifact
label: row-level selected targets
- source: stage2_target_selection_result
target: out_target_facets
edge_type: produces_artifact
label: derived facets
- source: out_pkg
target: stage2_payload_reader
edge_type: data_flow
- source: out_pkg
target: stage2_calibration_package_contract_writer
edge_type: data_flow
- source: stage2_payload_reader
target: stage2_calibration_package_contract_writer
edge_type: uses_library
label: summary and checksum
- source: out_targets
target: stage2_calibration_package_contract_writer
edge_type: data_flow
label: target metadata artifact
- source: out_target_facets
target: stage2_calibration_package_contract_writer
edge_type: data_flow
label: target facet artifact
- source: stage2_artifact_specs
target: stage2_calibration_package_contract_writer
edge_type: uses_utility
label: contract path
- source: stage2_calibration_package_contract_writer
target: out_contract
edge_type: produces_artifact
- source: stage2_calibration_package_contract_writer
target: out_targets
edge_type: validates
- source: stage2_calibration_package_contract_writer
target: out_target_facets
edge_type: validates
- source: out_pkg
target: stage2_calibration_package_contract_validator
edge_type: validates
- source: out_contract
target: stage2_calibration_package_contract_validator
edge_type: validates
- source: in_cps_s5
target: stage2_calibration_package_contract_validator
edge_type: validates
- source: in_db_s5
target: stage2_calibration_package_contract_validator
edge_type: validates
- source: util_sql
target: target_resolve
edge_type: uses_utility
Expand Down
40 changes: 32 additions & 8 deletions modal_app/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@
write_run_meta,
)
from policyengine_us_data.utils.run_context import RunContext, resolve_run_id # noqa: E402
from policyengine_us_data.calibration_package.specs import ( # noqa: E402
Stage2InputBundleError,
resolve_target_config_identity,
stage2_build_context_for_run,
)
from policyengine_us_data.utils.error_redaction import ( # noqa: E402
redacted_bounded_error_text,
redact_error_text,
Expand Down Expand Up @@ -162,18 +167,25 @@ def _calibration_package_parameters(
workers: int,
n_clones: int,
target_config: str | None,
all_active_targets: bool = False,
skip_county: bool,
chunked_matrix: bool,
chunk_size: int,
parallel_matrix: bool,
num_matrix_workers: int,
) -> dict:
"""Return manifest parameters that affect package construction."""
target_config_identity = resolve_target_config_identity(
target_config,
all_active_targets=all_active_targets,
)
effective_parallel = bool(chunked_matrix and parallel_matrix)
params = {
"workers": workers if not chunked_matrix else None,
"n_clones": n_clones,
"target_config": target_config,
"target_config": target_config_identity.path,
"target_config_sha256": target_config_identity.sha256,
"target_config_mode": target_config_identity.mode,
"skip_county": skip_county,
"chunked_matrix": bool(chunked_matrix),
"chunk_size": chunk_size if chunked_matrix else None,
Expand Down Expand Up @@ -547,6 +559,7 @@ def verify_runtime_seams() -> dict:
"modal_app/step_manifests/errors.py",
"modal_app/step_manifests/status.py",
"modal_app/fixtures/h5_cases.py",
"policyengine_us_data/calibration_package/specs.py",
"tests/integration/test_fixture_50hh.h5",
"policyengine_us_data/calibration/target_config.yaml",
"policyengine_us_data/calibration/target_config_full.yaml",
Expand Down Expand Up @@ -1231,13 +1244,13 @@ def run_pipeline(
print(f" Completed in {completed_build_manifest.duration_s}s")

# ── Step 2: Build calibration package ──
package_context = stage2_build_context_for_run(PIPELINE_MOUNT, run_id)
package_input_validation = package_context.input_bundle.validation_report()
package_inputs = _artifact_identities(
{
"dataset": _artifacts_dir(run_id)
/ "source_imputed_stratified_extended_cps.h5",
"database": _artifacts_dir(run_id) / "policy_data.db",
}
package_context.input_bundle.manifest_inputs
)
package_inputs["input_validation"] = package_input_validation.to_dict()
package_artifacts = package_context.output_bundle
package_parameters = _calibration_package_parameters(
workers=num_workers,
n_clones=n_clones,
Expand All @@ -1248,6 +1261,18 @@ def run_pipeline(
parallel_matrix=parallel_matrix,
num_matrix_workers=num_matrix_workers,
)
if package_input_validation.status != "pass":
active_step_manifest = _start_step_manifest(
meta,
BUILD_CALIBRATION_PACKAGE,
parameters=package_parameters,
input_identities=package_inputs,
vol=pipeline_volume,
)
raise Stage2InputBundleError(
package_context.input_bundle,
package_input_validation,
)
package_reuse = _step_reusable(
meta,
BUILD_CALIBRATION_PACKAGE,
Expand Down Expand Up @@ -1302,8 +1327,7 @@ def run_pipeline(
completed_package_manifest = _complete_step_manifest(
active_step_manifest,
outputs=collect_artifacts(
[_artifacts_dir(run_id) / "calibration_package.pkl"],
missing_ok=True,
package_artifacts.manifest_outputs,
),
vol=pipeline_volume,
)
Expand Down
Loading