diff --git a/changelog.d/1086.changed b/changelog.d/1086.changed new file mode 100644 index 000000000..03c2c4612 --- /dev/null +++ b/changelog.d/1086.changed @@ -0,0 +1 @@ +Add Stage 2 geography assignment summary artifacts for calibration packages. diff --git a/docs/pipeline_map.yaml b/docs/pipeline_map.yaml index 223638262..e4ac337e2 100644 --- a/docs/pipeline_map.yaml +++ b/docs/pipeline_map.yaml @@ -812,6 +812,8 @@ stages: - stage2_target_catalog_reader - stage2_target_selection_policy - stage2_target_selection_result + - stage2_geography_assignment_spec + - stage2_geography_assignment_result - target_resolve - stage2_target_config_apply - target_uprate @@ -832,6 +834,7 @@ stages: - out_metadata - out_targets - out_target_facets + - out_geography_summary - stage2_calibration_package_contract_writer - out_contract - stage2_calibration_package_contract_validator @@ -896,6 +899,10 @@ stages: label: calibration_target_facets.json node_type: artifact description: Compact target counts by variable, geography level, target name, period, and constraint key + - id: out_geography_summary + label: geography_assignment_summary.json + node_type: artifact + description: Compact clone-major geography identity summary for block, county, state, and congressional district arrays - id: out_contract label: calibration_package_contract.json node_type: artifact @@ -981,6 +988,30 @@ stages: target: build_matrix_chunked edge_type: data_flow label: matrix target order + - source: in_cps_s5 + target: stage2_geography_assignment_spec + edge_type: data_flow + label: household AGI and fixed state overrides + - source: in_db_s5 + target: stage2_geography_assignment_spec + edge_type: external_source + label: district AGI targets + - source: stage2_geography_assignment_spec + target: stage2_geography_assignment_result + edge_type: data_flow + label: deterministic assignment inputs + - source: in_blocks_s5 + target: stage2_geography_assignment_result + edge_type: data_flow + label: block populations + - source: stage2_geography_assignment_result + target: build_matrix + edge_type: data_flow + label: clone-major geography + - source: stage2_geography_assignment_result + target: build_matrix_chunked + edge_type: data_flow + label: clone-major geography - source: stage2_target_catalog_load target: stage2_target_config_apply edge_type: data_flow @@ -1060,6 +1091,10 @@ stages: target: out_target_facets edge_type: produces_artifact label: derived facets + - source: stage2_geography_assignment_result + target: out_geography_summary + edge_type: produces_artifact + label: geography identity - source: out_pkg target: stage2_payload_reader edge_type: data_flow @@ -1078,6 +1113,10 @@ stages: target: stage2_calibration_package_contract_writer edge_type: data_flow label: target facet artifact + - source: out_geography_summary + target: stage2_calibration_package_contract_writer + edge_type: data_flow + label: geography summary artifact - source: stage2_artifact_specs target: stage2_calibration_package_contract_writer edge_type: uses_utility @@ -1091,6 +1130,9 @@ stages: - source: stage2_calibration_package_contract_writer target: out_target_facets edge_type: validates + - source: stage2_calibration_package_contract_writer + target: out_geography_summary + edge_type: validates - source: out_pkg target: stage2_calibration_package_contract_validator edge_type: validates diff --git a/policyengine_us_data/calibration/unified_calibration.py b/policyengine_us_data/calibration/unified_calibration.py index 467be0ade..f4ea62e95 100644 --- a/policyengine_us_data/calibration/unified_calibration.py +++ b/policyengine_us_data/calibration/unified_calibration.py @@ -49,10 +49,12 @@ CalibrationPackageReader, CalibrationPackageWriter, ) +from policyengine_us_data.calibration_package.geography import GeographyAssignmentSpec from policyengine_us_data.calibration_package.specs import ( DEFAULT_TARGET_CONFIG_PATH as DEFAULT_TARGET_CONFIG_RELATIVE_PATH, CALIBRATION_TARGET_FACETS_FILENAME, CALIBRATION_TARGETS_FILENAME, + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, TargetConfigIdentity, resolve_target_config_identity, ) @@ -1505,9 +1507,6 @@ def run_calibration( from policyengine_us import Microsimulation - from policyengine_us_data.calibration.clone_and_assign import ( - assign_random_geography, - ) from policyengine_us_data.calibration.unified_matrix_builder import ( UnifiedMatrixBuilder, ) @@ -1555,6 +1554,14 @@ def run_calibration( time_period=time_period, n_records=n_records, ) + geography_spec = GeographyAssignmentSpec.from_runtime_inputs( + n_records=n_records, + n_clones=n_clones, + seed=seed, + household_agi=base_agi, + cd_agi_targets=cd_agi_targets, + fixed_state_fips=fixed_state_fips, + ) # Step 2: Clone and assign geography logger.info( @@ -1563,14 +1570,21 @@ def run_calibration( n_clones, n_records * n_clones, ) - geography = assign_random_geography( - n_records=n_records, - n_clones=n_clones, - seed=seed, + geography_result = geography_spec.assign( household_agi=base_agi, cd_agi_targets=cd_agi_targets, fixed_state_fips=fixed_state_fips, ) + from policyengine_us_data.calibration.clone_and_assign import GeographyAssignment + + geography = GeographyAssignment( + block_geoid=geography_result.block_geoid, + cd_geoid=geography_result.cd_geoid, + county_fips=geography_result.county_fips, + state_fips=geography_result.state_fips, + n_records=n_records, + n_clones=n_clones, + ) # Step 3: Source imputation (if requested) dataset_for_matrix = dataset_path @@ -1722,6 +1736,9 @@ def run_calibration( "matrix_builder": "chunked" if chunked_matrix else "precompute", "chunk_size": chunk_size if chunked_matrix else None, "chunk_dir": chunk_dir if chunked_matrix else None, + "geography_assignment_spec": geography_spec.to_dict(), + "geography_assignment_sha256": geography_result.canonical_geography_sha256, + "geography_assignment_status": geography_result.status, "target_selection_sha256": target_selection.checksum, "target_selection_n_targets": target_selection.n_selected_targets, } @@ -1736,7 +1753,11 @@ def run_calibration( package_path = Path(package_output_path) targets_path = package_path.with_name(CALIBRATION_TARGETS_FILENAME) target_facets_path = package_path.with_name(CALIBRATION_TARGET_FACETS_FILENAME) + geography_summary_path = package_path.with_name( + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME + ) target_selection.write_artifacts(targets_path, target_facets_path) + geography_result.write_summary(geography_summary_path) package_payload = CalibrationPackagePayload( X_sparse=X_sparse, targets_df=targets_df, @@ -1790,6 +1811,8 @@ def run_calibration( target_metadata_path=targets_path, target_facets_path=target_facets_path, target_selection_summary=target_selection.summary(), + geography_summary_path=geography_summary_path, + geography_assignment_summary=geography_result.summary(), ) validate_calibration_package_contract( package_path=package_path, diff --git a/policyengine_us_data/calibration_package/__init__.py b/policyengine_us_data/calibration_package/__init__.py index 438a694b8..9b2e6b560 100644 --- a/policyengine_us_data/calibration_package/__init__.py +++ b/policyengine_us_data/calibration_package/__init__.py @@ -10,6 +10,7 @@ CALIBRATION_REPORTS_DIRNAME, DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, DEFAULT_TARGET_CONFIG_PATH, + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, MATRIX_BUILD_DIRNAME, SOURCE_DATASET_FILENAME, TARGET_CONFIG_IDENTITY_MODES, @@ -28,6 +29,14 @@ stage2_input_bundle_from_stage1_contract, stage2_input_bundle_from_stage1_contract_path, ) +from .geography import ( + GEOGRAPHY_ASSIGNMENT_ORDERING, + GEOGRAPHY_ASSIGNMENT_SCHEMA_VERSION, + GeographyAssignmentResult, + GeographyAssignmentSpec, + geography_spec_from_metadata, + geography_summary_from_package, +) from .payload import ( LEGACY_MISSING_GEOGRAPHY_WARNING, REQUIRED_PACKAGE_KEYS, @@ -53,6 +62,9 @@ "CALIBRATION_REPORTS_DIRNAME", "DATASET_BUILD_OUTPUT_CONTRACT_FILENAME", "DEFAULT_TARGET_CONFIG_PATH", + "GEOGRAPHY_ASSIGNMENT_ORDERING", + "GEOGRAPHY_ASSIGNMENT_SCHEMA_VERSION", + "GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME", "MATRIX_BUILD_DIRNAME", "SOURCE_DATASET_FILENAME", "TARGET_CONFIG_IDENTITY_MODES", @@ -62,6 +74,8 @@ "CalibrationPackagePayload", "CalibrationPackageReader", "CalibrationPackageWriter", + "GeographyAssignmentResult", + "GeographyAssignmentSpec", "LEGACY_MISSING_GEOGRAPHY_WARNING", "Stage2BuildContext", "Stage2InputBundle", @@ -74,6 +88,8 @@ "TargetSelectionPolicy", "TargetSelectionResult", "calibration_package_artifact_paths", + "geography_spec_from_metadata", + "geography_summary_from_package", "resolve_target_config_identity", "stage2_build_context_for_run", "stage2_input_bundle_from_artifacts_dir", diff --git a/policyengine_us_data/calibration_package/geography.py b/policyengine_us_data/calibration_package/geography.py new file mode 100644 index 000000000..2f70f402e --- /dev/null +++ b/policyengine_us_data/calibration_package/geography.py @@ -0,0 +1,685 @@ +"""Typed Stage 2 geography assignment boundary and summary writer.""" + +from __future__ import annotations + +import hashlib +import json +from collections.abc import Callable, Mapping +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import numpy as np + +from policyengine_us_data.pipeline_metadata import pipeline_node +from policyengine_us_data.pipeline_schema import PipelineNode +from policyengine_us_data.utils.geography_checksum import ( + canonical_geography_checksum, + hash_string_array, +) + +from .specs import GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME + +GEOGRAPHY_ASSIGNMENT_SCHEMA_VERSION = 1 +GEOGRAPHY_ASSIGNMENT_ORDERING = "clone_major" + + +@pipeline_node( + PipelineNode( + id="stage2_geography_assignment_spec", + label="Stage 2 Geography Assignment Spec", + node_type="library", + description="Capture deterministic inputs for Stage 2 geography assignment before clone-level sampling runs.", + source_file="policyengine_us_data/calibration_package/geography.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_in=["source_imputed_stratified_extended_cps.h5", "policy_data.db"], + validation_commands=[ + "uv run pytest tests/unit/calibration_package/test_geography.py" + ], + ) +) +@dataclass(frozen=True, kw_only=True) +class GeographyAssignmentSpec: + """Deterministic inputs that control Stage 2 geography assignment.""" + + n_records: int + n_clones: int + seed: int | None + agi_threshold_pctile: float = 90.0 + household_agi_sha256: str | None = None + household_agi_length: int | None = None + cd_agi_targets_sha256: str | None = None + cd_agi_target_count: int = 0 + fixed_state_fips_sha256: str | None = None + fixed_state_fips_length: int | None = None + fixed_state_fips_present_count: int = 0 + + def __post_init__(self) -> None: + _validate_positive_int(self.n_records, "n_records") + _validate_positive_int(self.n_clones, "n_clones") + if self.seed is not None: + _validate_non_negative_int(self.seed, "seed") + if self.agi_threshold_pctile < 0 or self.agi_threshold_pctile > 100: + raise ValueError("agi_threshold_pctile must be between 0 and 100") + _validate_optional_non_negative_int( + self.household_agi_length, + "household_agi_length", + ) + if ( + self.household_agi_length is not None + and self.household_agi_length != self.n_records + ): + raise ValueError("household_agi_length must equal n_records") + _validate_non_negative_int(self.cd_agi_target_count, "cd_agi_target_count") + _validate_optional_non_negative_int( + self.fixed_state_fips_length, + "fixed_state_fips_length", + ) + if ( + self.fixed_state_fips_length is not None + and self.fixed_state_fips_length != self.n_records + ): + raise ValueError("fixed_state_fips_length must equal n_records") + _validate_non_negative_int( + self.fixed_state_fips_present_count, + "fixed_state_fips_present_count", + ) + if self.fixed_state_fips_length is None: + if self.fixed_state_fips_present_count != 0: + raise ValueError( + "fixed_state_fips_present_count requires fixed_state_fips_length" + ) + elif self.fixed_state_fips_present_count > self.fixed_state_fips_length: + raise ValueError( + "fixed_state_fips_present_count cannot exceed fixed_state_fips_length" + ) + for key in ( + "household_agi_sha256", + "cd_agi_targets_sha256", + "fixed_state_fips_sha256", + ): + _validate_optional_sha256(getattr(self, key), key) + + @classmethod + def from_runtime_inputs( + cls, + *, + n_records: int, + n_clones: int, + seed: int, + household_agi: Any | None, + cd_agi_targets: Mapping[str, Any] | None, + fixed_state_fips: Any | None, + agi_threshold_pctile: float = 90.0, + ) -> "GeographyAssignmentSpec": + """Build a spec from the runtime inputs that affect assignment.""" + + household_agi_array = _optional_float_array(household_agi) + fixed_state_array = _optional_int_array(fixed_state_fips) + target_payload = _normalise_cd_agi_targets(cd_agi_targets) + return cls( + n_records=n_records, + n_clones=n_clones, + seed=seed, + agi_threshold_pctile=agi_threshold_pctile, + household_agi_sha256=( + _hash_float_array(household_agi_array) + if household_agi_array is not None + else None + ), + household_agi_length=( + int(len(household_agi_array)) + if household_agi_array is not None + else None + ), + cd_agi_targets_sha256=( + _hash_json_payload(target_payload) if target_payload else None + ), + cd_agi_target_count=len(target_payload), + fixed_state_fips_sha256=( + _hash_int_array(fixed_state_array) + if fixed_state_array is not None + else None + ), + fixed_state_fips_length=( + int(len(fixed_state_array)) if fixed_state_array is not None else None + ), + fixed_state_fips_present_count=( + int(np.count_nonzero(fixed_state_array > 0)) + if fixed_state_array is not None + else 0 + ), + ) + + @classmethod + def from_dict(cls, data: Mapping[str, Any]) -> "GeographyAssignmentSpec": + """Parse a JSON-compatible assignment spec.""" + + if not isinstance(data, Mapping): + raise ValueError("geography assignment spec must be a mapping") + return cls( + n_records=_required_int(data, "n_records"), + n_clones=_required_int(data, "n_clones"), + seed=_optional_int(data, "seed"), + agi_threshold_pctile=float(data.get("agi_threshold_pctile", 90.0)), + household_agi_sha256=_optional_string(data, "household_agi_sha256"), + household_agi_length=_optional_int(data, "household_agi_length"), + cd_agi_targets_sha256=_optional_string(data, "cd_agi_targets_sha256"), + cd_agi_target_count=_required_int(data, "cd_agi_target_count"), + fixed_state_fips_sha256=_optional_string( + data, + "fixed_state_fips_sha256", + ), + fixed_state_fips_length=_optional_int(data, "fixed_state_fips_length"), + fixed_state_fips_present_count=_required_int( + data, + "fixed_state_fips_present_count", + ), + ) + + def to_dict(self) -> dict[str, Any]: + """Return deterministic JSON-compatible spec material.""" + + return { + "agi_threshold_pctile": self.agi_threshold_pctile, + "cd_agi_target_count": self.cd_agi_target_count, + "cd_agi_targets_sha256": self.cd_agi_targets_sha256, + "fixed_state_fips_length": self.fixed_state_fips_length, + "fixed_state_fips_present_count": self.fixed_state_fips_present_count, + "fixed_state_fips_sha256": self.fixed_state_fips_sha256, + "household_agi_length": self.household_agi_length, + "household_agi_sha256": self.household_agi_sha256, + "n_clones": self.n_clones, + "n_records": self.n_records, + "seed": self.seed, + } + + def assign( + self, + *, + household_agi: Any | None, + cd_agi_targets: Mapping[str, Any] | None, + fixed_state_fips: Any | None, + assigner: Callable[..., Any] | None = None, + ) -> "GeographyAssignmentResult": + """Run geography assignment through the current production assigner.""" + + if assigner is None: + from policyengine_us_data.calibration.clone_and_assign import ( + assign_random_geography, + ) + + assigner = assign_random_geography + runtime_spec = GeographyAssignmentSpec.from_runtime_inputs( + n_records=self.n_records, + n_clones=self.n_clones, + seed=self.seed if self.seed is not None else 42, + household_agi=household_agi, + cd_agi_targets=cd_agi_targets, + fixed_state_fips=fixed_state_fips, + agi_threshold_pctile=self.agi_threshold_pctile, + ) + if runtime_spec != self: + raise ValueError("Geography assignment runtime inputs do not match spec") + assignment = assigner( + n_records=self.n_records, + n_clones=self.n_clones, + seed=self.seed if self.seed is not None else 42, + household_agi=household_agi, + cd_agi_targets=cd_agi_targets, + agi_threshold_pctile=self.agi_threshold_pctile, + fixed_state_fips=fixed_state_fips, + ) + return GeographyAssignmentResult.from_assignment(assignment, spec=self) + + +@pipeline_node( + PipelineNode( + id="stage2_geography_assignment_result", + label="Stage 2 Geography Assignment Result", + node_type="library", + description="Summarize assigned block, county, state, and congressional district arrays without requiring consumers to load the package pickle.", + source_file="policyengine_us_data/calibration_package/geography.py", + status="current", + stability="moving", + pathways=["calibration_package"], + artifacts_out=[GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME], + validation_commands=[ + "uv run pytest tests/unit/calibration_package/test_geography.py" + ], + ) +) +@dataclass(frozen=True, kw_only=True) +class GeographyAssignmentResult: + """Clone-major Stage 2 geography arrays and compact identity summary.""" + + spec: GeographyAssignmentSpec + block_geoid: Any + cd_geoid: Any + county_fips: Any + state_fips: Any + status: str = "completed" + validation_errors: tuple[str, ...] = () + + @classmethod + def from_assignment( + cls, + assignment: Any, + *, + spec: GeographyAssignmentSpec, + ) -> "GeographyAssignmentResult": + """Wrap an existing `GeographyAssignment` object.""" + + return cls.from_arrays( + spec=spec, + block_geoid=getattr(assignment, "block_geoid"), + cd_geoid=getattr(assignment, "cd_geoid"), + county_fips=getattr(assignment, "county_fips"), + state_fips=getattr(assignment, "state_fips"), + ) + + @classmethod + def from_arrays( + cls, + *, + spec: GeographyAssignmentSpec, + block_geoid: Any, + cd_geoid: Any, + county_fips: Any | None = None, + state_fips: Any | None = None, + fail_on_validation: bool = True, + ) -> "GeographyAssignmentResult": + """Build a result from arrays, deriving county/state when needed.""" + + block_array = _string_array(block_geoid, "block_geoid") + cd_array = _string_array(cd_geoid, "cd_geoid") + county_array = ( + _string_array(county_fips, "county_fips") + if county_fips is not None + else np.fromiter( + (str(block)[:5] for block in block_array), + dtype="U5", + count=len(block_array), + ) + ) + state_array = ( + _int_array(state_fips, "state_fips") + if state_fips is not None + else np.fromiter( + (int(str(block)[:2]) for block in block_array), + dtype=np.int32, + count=len(block_array), + ) + ) + errors = _geography_validation_errors( + spec=spec, + block_geoid=block_array, + cd_geoid=cd_array, + county_fips=county_array, + state_fips=state_array, + ) + if errors and fail_on_validation: + raise ValueError("; ".join(errors)) + return cls( + spec=spec, + block_geoid=block_array, + cd_geoid=cd_array, + county_fips=county_array, + state_fips=state_array, + status="failed" if errors else "completed", + validation_errors=tuple(errors), + ) + + @classmethod + def from_package( + cls, + *, + metadata: Mapping[str, Any], + block_geoid: Any, + cd_geoid: Any, + ) -> "GeographyAssignmentResult": + """Reconstruct summary material from package geography keys.""" + + spec = geography_spec_from_metadata(metadata) + return cls.from_arrays( + spec=spec, + block_geoid=block_geoid, + cd_geoid=cd_geoid, + ) + + @property + def n_rows(self) -> int: + """Return the clone-level row count.""" + + return int(len(self.block_geoid)) + + @property + def canonical_geography_sha256(self) -> str: + """Return the canonical cross-stage geography checksum.""" + + return canonical_geography_checksum( + block_geoid=self.block_geoid, + cd_geoid=self.cd_geoid, + county_fips=self.county_fips, + state_fips=self.state_fips, + n_records=self.spec.n_records, + n_clones=self.spec.n_clones, + ) + + def summary(self) -> dict[str, Any]: + """Return compact JSON-compatible geography identity material.""" + + return { + "block_geoid_length": self.n_rows, + "block_geoid_sha256": hash_string_array(self.block_geoid), + "block_geoid_unique_count": _unique_count(self.block_geoid), + "canonical_geography_sha256": self.canonical_geography_sha256, + "cd_geoid_length": int(len(self.cd_geoid)), + "cd_geoid_sha256": hash_string_array(self.cd_geoid), + "cd_geoid_unique_count": _unique_count(self.cd_geoid), + "county_fips_length": int(len(self.county_fips)), + "county_fips_sha256": hash_string_array(self.county_fips), + "county_fips_unique_count": _unique_count(self.county_fips), + "has_block_geoid": True, + "has_cd_geoid": True, + "has_county_fips": True, + "has_state_fips": True, + "n_clones": self.spec.n_clones, + "n_records": self.spec.n_records, + "n_rows": self.n_rows, + "ordering": GEOGRAPHY_ASSIGNMENT_ORDERING, + "schema_version": GEOGRAPHY_ASSIGNMENT_SCHEMA_VERSION, + "source_kind": "calibration_package", + "spec": self.spec.to_dict(), + "state_fips_length": int(len(self.state_fips)), + "state_fips_sha256": hash_string_array( + np.asarray(self.state_fips, dtype=str) + ), + "state_fips_unique_count": _unique_count(self.state_fips), + "status": self.status, + "validation_errors": list(self.validation_errors), + } + + def to_contract_summary(self) -> Any: + """Return the typed contract geography summary.""" + + from policyengine_us_data.stage_contracts.calibration_package_schema import ( + GeographyAssignmentSummary, + ) + + return GeographyAssignmentSummary.from_dict(self.summary()) + + def write_summary(self, path: str | Path) -> Path: + """Write `geography_assignment_summary.json` and return its path.""" + + output_path = Path(path) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text( + json.dumps(self.summary(), indent=2, sort_keys=True), + encoding="utf-8", + ) + return output_path + + +def geography_spec_from_metadata( + metadata: Mapping[str, Any], +) -> GeographyAssignmentSpec: + """Return a geography spec from package metadata, using legacy fallbacks.""" + + spec = metadata.get("geography_assignment_spec") + if isinstance(spec, Mapping): + return GeographyAssignmentSpec.from_dict(spec) + return GeographyAssignmentSpec( + n_records=_metadata_int(metadata, "base_n_records"), + n_clones=_metadata_int(metadata, "n_clones"), + seed=_metadata_optional_int(metadata, "seed"), + ) + + +def geography_summary_from_package( + *, + metadata: Mapping[str, Any], + block_geoid: Any | None, + cd_geoid: Any | None, +) -> Any: + """Return the typed geography summary for package-backed arrays.""" + + from policyengine_us_data.stage_contracts.calibration_package_schema import ( + GeographyAssignmentSummary, + ) + + spec = geography_spec_from_metadata(metadata) + if block_geoid is None and cd_geoid is None: + return GeographyAssignmentSummary.from_dict( + unavailable_geography_summary(spec=spec) + ) + if block_geoid is None or cd_geoid is None: + raise ValueError( + "Calibration package geography requires both block_geoid and cd_geoid" + ) + return GeographyAssignmentResult.from_arrays( + spec=spec, + block_geoid=block_geoid, + cd_geoid=cd_geoid, + ).to_contract_summary() + + +def unavailable_geography_summary( + *, + spec: GeographyAssignmentSpec, +) -> dict[str, Any]: + """Return JSON-compatible material for legacy packages without geography.""" + + return { + "block_geoid_length": None, + "block_geoid_sha256": None, + "block_geoid_unique_count": None, + "canonical_geography_sha256": None, + "cd_geoid_length": None, + "cd_geoid_sha256": None, + "cd_geoid_unique_count": None, + "county_fips_length": None, + "county_fips_sha256": None, + "county_fips_unique_count": None, + "has_block_geoid": False, + "has_cd_geoid": False, + "has_county_fips": False, + "has_state_fips": False, + "n_clones": spec.n_clones, + "n_records": spec.n_records, + "n_rows": None, + "ordering": None, + "schema_version": GEOGRAPHY_ASSIGNMENT_SCHEMA_VERSION, + "source_kind": "unavailable", + "spec": spec.to_dict(), + "state_fips_length": None, + "state_fips_sha256": None, + "state_fips_unique_count": None, + "status": "unavailable", + "validation_errors": [], + } + + +def _geography_validation_errors( + *, + spec: GeographyAssignmentSpec, + block_geoid: np.ndarray, + cd_geoid: np.ndarray, + county_fips: np.ndarray, + state_fips: np.ndarray, +) -> list[str]: + errors: list[str] = [] + expected_rows = spec.n_records * spec.n_clones + lengths = { + "block_geoid": len(block_geoid), + "cd_geoid": len(cd_geoid), + "county_fips": len(county_fips), + "state_fips": len(state_fips), + } + for key, length in lengths.items(): + if length != expected_rows: + errors.append(f"{key} length {length} does not match {expected_rows}") + if len(set(lengths.values())) != 1: + errors.append("geography arrays have mismatched lengths") + if len(block_geoid) == len(county_fips): + block_counties = np.fromiter( + (str(block)[:5] for block in block_geoid), + dtype="U5", + count=len(block_geoid), + ) + if np.any(block_counties != county_fips.astype(str)): + errors.append("county_fips must match block_geoid prefixes") + if len(block_geoid) == len(state_fips): + block_states = np.fromiter( + (int(str(block)[:2]) for block in block_geoid), + dtype=np.int32, + count=len(block_geoid), + ) + if np.any(block_states != state_fips.astype(np.int32)): + errors.append("state_fips must match block_geoid prefixes") + return errors + + +def _string_array(value: Any, key: str) -> np.ndarray: + array = np.asarray(value, dtype=str) + if array.ndim != 1: + raise ValueError(f"{key} must be one-dimensional") + if len(array) and np.any(array == ""): + raise ValueError(f"{key} contains empty values") + return array + + +def _int_array(value: Any, key: str) -> np.ndarray: + array = np.asarray(value, dtype=np.int32) + if array.ndim != 1: + raise ValueError(f"{key} must be one-dimensional") + return array + + +def _optional_float_array(value: Any | None) -> np.ndarray | None: + if value is None: + return None + array = np.asarray(value, dtype=np.float64) + if array.ndim != 1: + raise ValueError("household_agi must be one-dimensional") + return array + + +def _optional_int_array(value: Any | None) -> np.ndarray | None: + if value is None: + return None + return _int_array(value, "fixed_state_fips") + + +def _hash_float_array(values: np.ndarray) -> str: + digest = hashlib.sha256() + digest.update(b"policyengine-us-data:float-array:v1") + digest.update(len(values).to_bytes(8, byteorder="big", signed=False)) + digest.update(np.ascontiguousarray(values.astype(np.float64)).tobytes()) + return f"sha256:{digest.hexdigest()}" + + +def _hash_int_array(values: np.ndarray) -> str: + digest = hashlib.sha256() + digest.update(b"policyengine-us-data:int-array:v1") + digest.update(len(values).to_bytes(8, byteorder="big", signed=False)) + digest.update(np.ascontiguousarray(values.astype(np.int32)).tobytes()) + return f"sha256:{digest.hexdigest()}" + + +def _hash_json_payload(payload: Any) -> str: + encoded = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + return f"sha256:{hashlib.sha256(encoded).hexdigest()}" + + +def _normalise_cd_agi_targets( + targets: Mapping[str, Any] | None, +) -> dict[str, float]: + if not targets: + return {} + return { + str(key): float(targets[key]) + for key in sorted(targets, key=lambda item: str(item)) + } + + +def _unique_count(values: Any) -> int: + return int(len(np.unique(np.asarray(values)))) + + +def _metadata_int(metadata: Mapping[str, Any], key: str) -> int: + value = metadata.get(key) + if value is None: + raise ValueError(f"Calibration package metadata {key!r} is required") + return int(value) + + +def _metadata_optional_int(metadata: Mapping[str, Any], key: str) -> int | None: + value = metadata.get(key) + if value is None: + return None + return int(value) + + +def _required_int(data: Mapping[str, Any], key: str) -> int: + value = data[key] + if isinstance(value, bool) or not isinstance(value, int): + raise ValueError(f"geography assignment spec {key!r} must be an integer") + return value + + +def _optional_int(data: Mapping[str, Any], key: str) -> int | None: + value = data.get(key) + if value is None: + return None + if isinstance(value, bool) or not isinstance(value, int): + raise ValueError( + f"geography assignment spec {key!r} must be an integer or None" + ) + return value + + +def _optional_string(data: Mapping[str, Any], key: str) -> str | None: + value = data.get(key) + if value is None: + return None + if not isinstance(value, str): + raise ValueError(f"geography assignment spec {key!r} must be a string or None") + return value + + +def _validate_positive_int(value: Any, key: str) -> None: + if isinstance(value, bool) or not isinstance(value, int) or value <= 0: + raise ValueError(f"{key} must be a positive integer") + + +def _validate_non_negative_int(value: Any, key: str) -> None: + if isinstance(value, bool) or not isinstance(value, int) or value < 0: + raise ValueError(f"{key} must be a non-negative integer") + + +def _validate_optional_non_negative_int(value: Any, key: str) -> None: + if value is not None: + _validate_non_negative_int(value, key) + + +def _validate_optional_sha256(value: Any, key: str) -> None: + if value is None: + return + if not isinstance(value, str) or not value.startswith("sha256:"): + raise ValueError(f"{key} must be a SHA-256 digest") + if len(value) != len("sha256:") + 64: + raise ValueError(f"{key} must be a SHA-256 digest") + + +__all__ = [ + "GEOGRAPHY_ASSIGNMENT_ORDERING", + "GEOGRAPHY_ASSIGNMENT_SCHEMA_VERSION", + "GeographyAssignmentResult", + "GeographyAssignmentSpec", + "geography_spec_from_metadata", + "geography_summary_from_package", + "unavailable_geography_summary", +] diff --git a/policyengine_us_data/calibration_package/payload.py b/policyengine_us_data/calibration_package/payload.py index 5108e0b9e..ae50d6d56 100644 --- a/policyengine_us_data/calibration_package/payload.py +++ b/policyengine_us_data/calibration_package/payload.py @@ -11,12 +11,9 @@ from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.pipeline_schema import PipelineNode -from policyengine_us_data.utils.geography_checksum import ( - canonical_geography_checksum, - hash_string_array, -) from policyengine_us_data.utils.step_manifest import sha256_file +from .geography import geography_summary_from_package from .specs import CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_METADATA_FILENAME if TYPE_CHECKING: @@ -157,75 +154,10 @@ def summary(self) -> CalibrationPackageSummary: def geography_summary(self) -> GeographyAssignmentSummary: """Return the contract-safe geography assignment summary.""" - from policyengine_us_data.stage_contracts.calibration_package_schema import ( - GeographyAssignmentSummary, - ) - - n_records = self.metadata_int("base_n_records") - n_clones = self.metadata_int("n_clones") - has_blocks = self.block_geoid is not None - has_cds = self.cd_geoid is not None - - if not has_blocks and not has_cds: - return GeographyAssignmentSummary( - source_kind="unavailable", - n_records=n_records, - n_clones=n_clones, - n_rows=None, - has_block_geoid=False, - has_cd_geoid=False, - block_geoid_length=None, - cd_geoid_length=None, - block_geoid_sha256=None, - cd_geoid_sha256=None, - canonical_geography_sha256=None, - ) - if not has_blocks or not has_cds: - raise ValueError( - "Calibration package geography requires both block_geoid and cd_geoid" - ) - if n_records is None or n_clones is None: - raise ValueError( - "Calibration package geography requires metadata base_n_records and n_clones" - ) - if n_records <= 0 or n_clones <= 0: - raise ValueError( - "Calibration package geography requires positive base_n_records and n_clones" - ) - - block_geoids = _one_dimensional_string_array(self.block_geoid, "block_geoid") - cd_geoids = _one_dimensional_string_array(self.cd_geoid, "cd_geoid") - n_rows = int(len(block_geoids)) - if n_rows == 0: - raise ValueError("Calibration package geography arrays must be non-empty") - if len(cd_geoids) != n_rows: - raise ValueError( - "Calibration package geography has mismatched block_geoid and cd_geoid " - f"lengths: {n_rows} != {len(cd_geoids)}" - ) - if n_records * n_clones != n_rows: - raise ValueError( - "Calibration package geography length does not match metadata: " - f"{n_rows} rows for {n_records} records x {n_clones} clones" - ) - - return GeographyAssignmentSummary( - source_kind="calibration_package", - n_records=n_records, - n_clones=n_clones, - n_rows=n_rows, - has_block_geoid=True, - has_cd_geoid=True, - block_geoid_length=n_rows, - cd_geoid_length=int(len(cd_geoids)), - block_geoid_sha256=hash_string_array(block_geoids), - cd_geoid_sha256=hash_string_array(cd_geoids), - canonical_geography_sha256=canonical_geography_checksum( - block_geoid=block_geoids, - cd_geoid=cd_geoids, - n_records=n_records, - n_clones=n_clones, - ), + return geography_summary_from_package( + metadata=self.metadata, + block_geoid=self.block_geoid, + cd_geoid=self.cd_geoid, ) def metadata_string(self, key: str) -> str | None: @@ -367,17 +299,6 @@ def _optional_len(value: Any) -> int | None: return int(len(value)) -def _one_dimensional_string_array(value: Any, key: str) -> Any: - import numpy as np - - array = np.asarray(value, dtype=str) - if array.ndim != 1: - raise ValueError(f"Calibration package geography {key} must be one-dimensional") - if np.any(array == ""): - raise ValueError(f"Calibration package geography {key} contains empty values") - return array - - __all__ = [ "LEGACY_MISSING_GEOGRAPHY_WARNING", "REQUIRED_PACKAGE_KEYS", diff --git a/policyengine_us_data/calibration_package/specs.py b/policyengine_us_data/calibration_package/specs.py index 4f0c2bcb5..712882d58 100644 --- a/policyengine_us_data/calibration_package/specs.py +++ b/policyengine_us_data/calibration_package/specs.py @@ -23,6 +23,7 @@ CALIBRATION_PACKAGE_CONTRACT_FILENAME = "calibration_package_contract.json" CALIBRATION_TARGETS_FILENAME = "calibration_targets.jsonl" CALIBRATION_TARGET_FACETS_FILENAME = "calibration_target_facets.json" +GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME = "geography_assignment_summary.json" CALIBRATION_REPORTS_DIRNAME = "calibration_reports" MATRIX_BUILD_DIRNAME = "matrix_build" CALIBRATION_PACKAGE_SUBSTAGE_ID = "2a_matrix_build_calibration_target_construction" @@ -189,14 +190,21 @@ class CalibrationPackageOutputBundle: contract: Path targets: Path target_facets: Path + geography_summary: Path reports_dir: Path matrix_build_dir: Path @property - def manifest_outputs(self) -> tuple[Path, Path, Path, Path]: + def manifest_outputs(self) -> tuple[Path, Path, Path, Path, Path]: """Return the durable Stage 2 outputs recorded in step manifests.""" - return (self.package, self.contract, self.targets, self.target_facets) + return ( + self.package, + self.contract, + self.targets, + self.target_facets, + self.geography_summary, + ) CalibrationPackageArtifactPaths = CalibrationPackageOutputBundle @@ -318,6 +326,7 @@ def stage2_input_bundle_from_stage1_contract_path( artifacts_out=[ CALIBRATION_PACKAGE_FILENAME, CALIBRATION_PACKAGE_CONTRACT_FILENAME, + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, ], validation_commands=[ "uv run pytest tests/unit/calibration_package/test_specs.py" @@ -375,6 +384,7 @@ def stage2_build_context_for_run( CALIBRATION_PACKAGE_CONTRACT_FILENAME, CALIBRATION_TARGETS_FILENAME, CALIBRATION_TARGET_FACETS_FILENAME, + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, ], validation_commands=[ "uv run pytest tests/unit/calibration_package/test_specs.py" @@ -394,6 +404,7 @@ def calibration_package_artifact_paths( contract=root / CALIBRATION_PACKAGE_CONTRACT_FILENAME, targets=root / CALIBRATION_TARGETS_FILENAME, target_facets=root / CALIBRATION_TARGET_FACETS_FILENAME, + geography_summary=root / GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, reports_dir=root / CALIBRATION_REPORTS_DIRNAME, matrix_build_dir=root / MATRIX_BUILD_DIRNAME, ) @@ -507,6 +518,7 @@ def _artifact_uri_to_path(uri: str) -> Path: "CALIBRATION_REPORTS_DIRNAME", "DATASET_BUILD_OUTPUT_CONTRACT_FILENAME", "DEFAULT_TARGET_CONFIG_PATH", + "GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME", "MATRIX_BUILD_DIRNAME", "SOURCE_DATASET_FILENAME", "TARGET_CONFIG_IDENTITY_MODES", diff --git a/policyengine_us_data/stage_contracts/calibration_package.py b/policyengine_us_data/stage_contracts/calibration_package.py index a235068c1..91cdfd0b4 100644 --- a/policyengine_us_data/stage_contracts/calibration_package.py +++ b/policyengine_us_data/stage_contracts/calibration_package.py @@ -2,9 +2,11 @@ from __future__ import annotations +import json from collections.abc import Mapping from pathlib import Path from typing import Any +from urllib.parse import unquote, urlparse from policyengine_us_data.calibration_package.payload import ( CalibrationPackagePayload, @@ -15,6 +17,7 @@ CALIBRATION_PACKAGE_SUBSTAGE_ID, CALIBRATION_TARGET_FACETS_FILENAME, CALIBRATION_TARGETS_FILENAME, + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, ) from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.pipeline_schema import PipelineNode @@ -70,6 +73,10 @@ def build_calibration_package_contract( target_metadata_path: Path | None = None, target_facets_path: Path | None = None, target_selection_summary: Mapping[str, Any] | None = None, + geography_summary_path: Path | None = None, + geography_assignment_summary: GeographyAssignmentSummary + | Mapping[str, Any] + | None = None, ) -> StageContract: """Build the Stage 2 handoff contract from a calibration package.""" @@ -88,7 +95,9 @@ def build_calibration_package_contract( metadata, ) package_summary = payload.summary().to_dict() - geography_summary = payload.geography_summary().to_dict() + geography_summary = _geography_assignment_summary( + geography_assignment_summary or payload.geography_summary() + ).to_dict() inputs = ( _artifact_ref_from_path( logical_name="source_imputed_stratified_extended_cps", @@ -147,6 +156,22 @@ def build_calibration_package_contract( }, ) ) + if geography_summary_path is not None: + _require_existing_file(geography_summary_path, "geography assignment summary") + outputs.append( + _artifact_ref_from_path( + logical_name="geography_assignment_summary", + path=Path(geography_summary_path), + media_type="application/json", + metadata={ + "artifact_family": "geography_assignment", + "substage_id": CALIBRATION_PACKAGE_SUBSTAGE_ID, + "canonical_geography_sha256": geography_summary.get( + "canonical_geography_sha256" + ), + }, + ) + ) outputs = tuple(outputs) code_sha = code_sha or _optional_metadata_string(metadata, "git_commit") package_version = package_version or _optional_metadata_string( @@ -226,6 +251,7 @@ def build_calibration_package_contract( CALIBRATION_PACKAGE_CONTRACT_FILENAME, CALIBRATION_TARGETS_FILENAME, CALIBRATION_TARGET_FACETS_FILENAME, + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, ], validation_commands=[ "uv run pytest tests/unit/test_calibration_package_stage_contract.py" @@ -249,6 +275,10 @@ def write_calibration_package_contract( target_metadata_path: Path | None = None, target_facets_path: Path | None = None, target_selection_summary: Mapping[str, Any] | None = None, + geography_summary_path: Path | None = None, + geography_assignment_summary: GeographyAssignmentSummary + | Mapping[str, Any] + | None = None, ) -> StageContract: """Write and return the Stage 2 calibration-package contract.""" @@ -268,6 +298,8 @@ def write_calibration_package_contract( target_metadata_path=target_metadata_path, target_facets_path=target_facets_path, target_selection_summary=target_selection_summary, + geography_summary_path=geography_summary_path, + geography_assignment_summary=geography_assignment_summary, ) write_contract( contract, @@ -359,6 +391,22 @@ def validate_calibration_package_contract( raise ValueError( "Calibration package contract geography assignment does not match pickle" ) + geography_artifact = _optional_artifact( + contract.outputs, + "geography_assignment_summary", + ) + if geography_artifact is not None: + geography_summary_path = _artifact_uri_to_path(geography_artifact.uri) + _assert_artifact_matches_file(geography_artifact, geography_summary_path) + persisted_geography = canonicalize_for_fingerprint( + GeographyAssignmentSummary.from_dict( + json.loads(geography_summary_path.read_text(encoding="utf-8")) + ).to_dict() + ) + if persisted_geography != expected_geography: + raise ValueError( + "Calibration package geography summary artifact does not match pickle" + ) return contract @@ -433,6 +481,14 @@ def _calibration_package_parameters( return CalibrationPackageParameters.from_dict(parameters) +def _geography_assignment_summary( + summary: GeographyAssignmentSummary | Mapping[str, Any], +) -> GeographyAssignmentSummary: + if isinstance(summary, GeographyAssignmentSummary): + return summary + return GeographyAssignmentSummary.from_dict(summary) + + def _parameters_with_package_identity( parameters: Mapping[str, Any], metadata: Mapping[str, Any], @@ -526,6 +582,20 @@ def _single_artifact( return matches[0] +def _optional_artifact( + artifacts: tuple[ArtifactRef, ...], + logical_name: str, +) -> ArtifactRef | None: + matches = [ + artifact for artifact in artifacts if artifact.logical_name == logical_name + ] + if len(matches) > 1: + raise ValueError( + f"Expected at most one artifact named {logical_name!r}, found {len(matches)}" + ) + return matches[0] if matches else None + + def _assert_artifact_matches_file(artifact: ArtifactRef, path: Path) -> None: _require_existing_file(path, artifact.logical_name) expected_sha = f"sha256:{sha256_file(path)}" @@ -539,3 +609,12 @@ def _assert_artifact_matches_file(artifact: ArtifactRef, path: Path) -> None: f"Artifact {artifact.logical_name!r} size mismatch: " f"{artifact.size_bytes!r} != {path.stat().st_size!r}" ) + + +def _artifact_uri_to_path(uri: str) -> Path: + parsed = urlparse(uri) + if parsed.scheme == "file": + return Path(unquote(parsed.path)) + if not parsed.scheme: + return Path(uri) + raise ValueError(f"Unsupported artifact URI scheme: {uri}") diff --git a/policyengine_us_data/stage_contracts/calibration_package_schema.py b/policyengine_us_data/stage_contracts/calibration_package_schema.py index 7d00b9800..e3c332aab 100644 --- a/policyengine_us_data/stage_contracts/calibration_package_schema.py +++ b/policyengine_us_data/stage_contracts/calibration_package_schema.py @@ -17,19 +17,41 @@ "unavailable", } ) +GEOGRAPHY_ASSIGNMENT_STATUSES = frozenset( + { + "completed", + "failed", + "unavailable", + } +) GEOGRAPHY_ASSIGNMENT_SUMMARY_KEYS = frozenset( { "block_geoid_length", "block_geoid_sha256", + "block_geoid_unique_count", "canonical_geography_sha256", "cd_geoid_length", "cd_geoid_sha256", + "cd_geoid_unique_count", + "county_fips_length", + "county_fips_sha256", + "county_fips_unique_count", "has_block_geoid", "has_cd_geoid", + "has_county_fips", + "has_state_fips", "n_clones", "n_records", "n_rows", + "ordering", + "schema_version", "source_kind", + "spec", + "state_fips_length", + "state_fips_sha256", + "state_fips_unique_count", + "status", + "validation_errors", } ) CALIBRATION_PACKAGE_PARAMETER_KEYS = frozenset( @@ -80,24 +102,52 @@ class GeographyAssignmentSummary: """Canonical summary of package-backed Stage 2 geography assignment.""" + schema_version: int source_kind: str + status: str + ordering: str | None n_records: int | None n_clones: int | None n_rows: int | None has_block_geoid: bool has_cd_geoid: bool + has_county_fips: bool + has_state_fips: bool block_geoid_length: int | None cd_geoid_length: int | None + county_fips_length: int | None + state_fips_length: int | None + block_geoid_unique_count: int | None + cd_geoid_unique_count: int | None + county_fips_unique_count: int | None + state_fips_unique_count: int | None block_geoid_sha256: str | None cd_geoid_sha256: str | None + county_fips_sha256: str | None + state_fips_sha256: str | None canonical_geography_sha256: str | None + spec: Mapping[str, Any] + validation_errors: tuple[str, ...] def __post_init__(self) -> None: + _validate_positive_int(self.schema_version, "schema_version") if self.source_kind not in GEOGRAPHY_ASSIGNMENT_SOURCE_KINDS: raise ValueError( "source_kind must be one of " f"{sorted(GEOGRAPHY_ASSIGNMENT_SOURCE_KINDS)}" ) + if self.status not in GEOGRAPHY_ASSIGNMENT_STATUSES: + raise ValueError( + f"status must be one of {sorted(GEOGRAPHY_ASSIGNMENT_STATUSES)}" + ) + if self.ordering is not None and not isinstance(self.ordering, str): + raise ValueError("ordering must be a string or None") + if not isinstance(self.spec, Mapping): + raise ValueError("spec must be a mapping") + if not isinstance(self.validation_errors, tuple) or not all( + isinstance(item, str) for item in self.validation_errors + ): + raise ValueError("validation_errors must be a tuple of strings") _validate_optional_non_negative_int(self.n_records, "n_records") _validate_optional_non_negative_int(self.n_clones, "n_clones") _validate_optional_non_negative_int(self.n_rows, "n_rows") @@ -109,10 +159,29 @@ def __post_init__(self) -> None: self.cd_geoid_length, "cd_geoid_length", ) + _validate_optional_non_negative_int( + self.county_fips_length, + "county_fips_length", + ) + _validate_optional_non_negative_int( + self.state_fips_length, + "state_fips_length", + ) + for key in ( + "block_geoid_unique_count", + "cd_geoid_unique_count", + "county_fips_unique_count", + "state_fips_unique_count", + ): + _validate_optional_non_negative_int(getattr(self, key), key) _validate_bool(self.has_block_geoid, "has_block_geoid") _validate_bool(self.has_cd_geoid, "has_cd_geoid") + _validate_bool(self.has_county_fips, "has_county_fips") + _validate_bool(self.has_state_fips, "has_state_fips") _validate_optional_sha256(self.block_geoid_sha256, "block_geoid_sha256") _validate_optional_sha256(self.cd_geoid_sha256, "cd_geoid_sha256") + _validate_optional_sha256(self.county_fips_sha256, "county_fips_sha256") + _validate_optional_sha256(self.state_fips_sha256, "state_fips_sha256") _validate_optional_sha256( self.canonical_geography_sha256, "canonical_geography_sha256", @@ -124,17 +193,30 @@ def __post_init__(self) -> None: "n_rows", "block_geoid_length", "cd_geoid_length", + "county_fips_length", + "state_fips_length", + "block_geoid_unique_count", + "cd_geoid_unique_count", + "county_fips_unique_count", + "state_fips_unique_count", "block_geoid_sha256", "cd_geoid_sha256", + "county_fips_sha256", + "state_fips_sha256", "canonical_geography_sha256", ): if getattr(self, key) is None: raise ValueError( f"{key} is required for calibration_package geography" ) - if not self.has_block_geoid or not self.has_cd_geoid: + if not ( + self.has_block_geoid + and self.has_cd_geoid + and self.has_county_fips + and self.has_state_fips + ): raise ValueError( - "calibration_package geography requires block and CD arrays" + "calibration_package geography requires all geography arrays" ) if self.n_records * self.n_clones != self.n_rows: raise ValueError( @@ -144,21 +226,40 @@ def __post_init__(self) -> None: raise ValueError("block_geoid_length must equal n_rows") if self.cd_geoid_length != self.n_rows: raise ValueError("cd_geoid_length must equal n_rows") + if self.county_fips_length != self.n_rows: + raise ValueError("county_fips_length must equal n_rows") + if self.state_fips_length != self.n_rows: + raise ValueError("state_fips_length must equal n_rows") else: - if self.has_block_geoid or self.has_cd_geoid: + if ( + self.has_block_geoid + or self.has_cd_geoid + or self.has_county_fips + or self.has_state_fips + ): raise ValueError("unavailable geography cannot report present arrays") for key in ( "n_rows", "block_geoid_length", "cd_geoid_length", + "county_fips_length", + "state_fips_length", + "block_geoid_unique_count", + "cd_geoid_unique_count", + "county_fips_unique_count", + "state_fips_unique_count", "block_geoid_sha256", "cd_geoid_sha256", + "county_fips_sha256", + "state_fips_sha256", "canonical_geography_sha256", ): if getattr(self, key) is not None: raise ValueError( f"{key} must be None when geography is unavailable" ) + if self.status != "unavailable": + raise ValueError("unavailable geography must have unavailable status") @classmethod def from_dict(cls, data: Mapping[str, Any]) -> "GeographyAssignmentSummary": @@ -172,20 +273,47 @@ def from_dict(cls, data: Mapping[str, Any]) -> "GeographyAssignmentSummary": GEOGRAPHY_ASSIGNMENT_SUMMARY_KEYS, ) return cls( + schema_version=_required_int_field(data, "schema_version"), source_kind=_required_string_field(data, "source_kind"), + status=_required_string_field(data, "status"), + ordering=_optional_string_field(data, "ordering"), n_records=_optional_int_field(data, "n_records"), n_clones=_optional_int_field(data, "n_clones"), n_rows=_optional_int_field(data, "n_rows"), has_block_geoid=_required_bool_field(data, "has_block_geoid"), has_cd_geoid=_required_bool_field(data, "has_cd_geoid"), + has_county_fips=_required_bool_field(data, "has_county_fips"), + has_state_fips=_required_bool_field(data, "has_state_fips"), block_geoid_length=_optional_int_field(data, "block_geoid_length"), cd_geoid_length=_optional_int_field(data, "cd_geoid_length"), + county_fips_length=_optional_int_field(data, "county_fips_length"), + state_fips_length=_optional_int_field(data, "state_fips_length"), + block_geoid_unique_count=_optional_int_field( + data, + "block_geoid_unique_count", + ), + cd_geoid_unique_count=_optional_int_field( + data, + "cd_geoid_unique_count", + ), + county_fips_unique_count=_optional_int_field( + data, + "county_fips_unique_count", + ), + state_fips_unique_count=_optional_int_field( + data, + "state_fips_unique_count", + ), block_geoid_sha256=_optional_string_field(data, "block_geoid_sha256"), cd_geoid_sha256=_optional_string_field(data, "cd_geoid_sha256"), + county_fips_sha256=_optional_string_field(data, "county_fips_sha256"), + state_fips_sha256=_optional_string_field(data, "state_fips_sha256"), canonical_geography_sha256=_optional_string_field( data, "canonical_geography_sha256", ), + spec=_mapping_field(data, "spec"), + validation_errors=_string_tuple_field(data, "validation_errors"), ) def to_dict(self) -> dict[str, Any]: @@ -194,15 +322,30 @@ def to_dict(self) -> dict[str, Any]: return { "block_geoid_length": self.block_geoid_length, "block_geoid_sha256": self.block_geoid_sha256, + "block_geoid_unique_count": self.block_geoid_unique_count, "canonical_geography_sha256": self.canonical_geography_sha256, "cd_geoid_length": self.cd_geoid_length, "cd_geoid_sha256": self.cd_geoid_sha256, + "cd_geoid_unique_count": self.cd_geoid_unique_count, + "county_fips_length": self.county_fips_length, + "county_fips_sha256": self.county_fips_sha256, + "county_fips_unique_count": self.county_fips_unique_count, "has_block_geoid": self.has_block_geoid, "has_cd_geoid": self.has_cd_geoid, + "has_county_fips": self.has_county_fips, + "has_state_fips": self.has_state_fips, "n_clones": self.n_clones, "n_records": self.n_records, "n_rows": self.n_rows, + "ordering": self.ordering, + "schema_version": self.schema_version, "source_kind": self.source_kind, + "spec": dict(self.spec), + "state_fips_length": self.state_fips_length, + "state_fips_sha256": self.state_fips_sha256, + "state_fips_unique_count": self.state_fips_unique_count, + "status": self.status, + "validation_errors": list(self.validation_errors), } @@ -571,6 +714,22 @@ def _required_string_field(data: Mapping[str, Any], key: str) -> str: return value +def _mapping_field(data: Mapping[str, Any], key: str) -> Mapping[str, Any]: + value = data[key] + if not isinstance(value, Mapping): + raise ValueError(f"Calibration package field {key!r} must be a mapping") + return dict(value) + + +def _string_tuple_field(data: Mapping[str, Any], key: str) -> tuple[str, ...]: + value = data[key] + if not isinstance(value, tuple | list) or not all( + isinstance(item, str) for item in value + ): + raise ValueError(f"Calibration package field {key!r} must be a list of strings") + return tuple(value) + + def _required_float_field(data: Mapping[str, Any], key: str) -> float: value = data[key] if isinstance(value, bool) or not isinstance(value, int | float): diff --git a/tests/unit/calibration_package/test_geography.py b/tests/unit/calibration_package/test_geography.py new file mode 100644 index 000000000..c24826454 --- /dev/null +++ b/tests/unit/calibration_package/test_geography.py @@ -0,0 +1,139 @@ +import json +from types import SimpleNamespace + +import numpy as np +import pytest + +from policyengine_us_data.calibration_package.geography import ( + GEOGRAPHY_ASSIGNMENT_ORDERING, + GeographyAssignmentResult, + GeographyAssignmentSpec, +) +from policyengine_us_data.stage_contracts.calibration_package_schema import ( + GeographyAssignmentSummary, +) + + +def _assignment_for_seed(**kwargs): + seed = int(kwargs["seed"]) + n_records = int(kwargs["n_records"]) + n_clones = int(kwargs["n_clones"]) + n_rows = n_records * n_clones + start_state = (seed % 50) + 1 + states = np.array( + [((start_state + index) % 50) + 1 for index in range(n_rows)], + dtype=np.int32, + ) + blocks = np.array( + [f"{state:02d}0010001001000" for state in states], + dtype=" GeographyAssignmentSpec: + return GeographyAssignmentSpec.from_runtime_inputs( + n_records=2, + n_clones=2, + seed=seed, + household_agi=np.array([10_000.0, 250_000.0]), + cd_agi_targets={"0101": 1_000.0, "0201": 2_000.0}, + fixed_state_fips=np.array([1, 0], dtype=np.int32), + ) + + +def test_geography_assignment_spec_records_runtime_input_identity(): + spec = _spec() + payload = spec.to_dict() + + assert payload["n_records"] == 2 + assert payload["n_clones"] == 2 + assert payload["seed"] == 42 + assert payload["household_agi_sha256"].startswith("sha256:") + assert payload["cd_agi_targets_sha256"].startswith("sha256:") + assert payload["fixed_state_fips_present_count"] == 1 + assert GeographyAssignmentSpec.from_dict(payload) == spec + + +def test_geography_assignment_is_deterministic_for_fixed_seed(): + first = _spec(seed=42).assign( + household_agi=np.array([10_000.0, 250_000.0]), + cd_agi_targets={"0101": 1_000.0, "0201": 2_000.0}, + fixed_state_fips=np.array([1, 0], dtype=np.int32), + assigner=_assignment_for_seed, + ) + second = _spec(seed=42).assign( + household_agi=np.array([10_000.0, 250_000.0]), + cd_agi_targets={"0101": 1_000.0, "0201": 2_000.0}, + fixed_state_fips=np.array([1, 0], dtype=np.int32), + assigner=_assignment_for_seed, + ) + + assert first.canonical_geography_sha256 == second.canonical_geography_sha256 + assert first.summary() == second.summary() + + +def test_geography_assignment_checksum_changes_with_seed(): + first = _spec(seed=42).assign( + household_agi=np.array([10_000.0, 250_000.0]), + cd_agi_targets={"0101": 1_000.0, "0201": 2_000.0}, + fixed_state_fips=np.array([1, 0], dtype=np.int32), + assigner=_assignment_for_seed, + ) + second = _spec(seed=43).assign( + household_agi=np.array([10_000.0, 250_000.0]), + cd_agi_targets={"0101": 1_000.0, "0201": 2_000.0}, + fixed_state_fips=np.array([1, 0], dtype=np.int32), + assigner=_assignment_for_seed, + ) + + assert first.canonical_geography_sha256 != second.canonical_geography_sha256 + + +def test_geography_assignment_validates_row_count_and_ordering(): + spec = _spec() + + with pytest.raises(ValueError, match="block_geoid length 3"): + GeographyAssignmentResult.from_arrays( + spec=spec, + block_geoid=np.array(["010010001", "010010002", "010010003"]), + cd_geoid=np.array(["0101", "0101", "0101"]), + ) + + result = GeographyAssignmentResult.from_arrays( + spec=spec, + block_geoid=np.array( + [ + "010010001001000", + "020010001001000", + "010010001001001", + "020010001001001", + ] + ), + cd_geoid=np.array(["0101", "0201", "0101", "0201"]), + ) + + assert result.summary()["ordering"] == GEOGRAPHY_ASSIGNMENT_ORDERING + assert result.summary()["n_rows"] == 4 + + +def test_geography_summary_json_round_trips_through_contract_schema(tmp_path): + result = _spec().assign( + household_agi=np.array([10_000.0, 250_000.0]), + cd_agi_targets={"0101": 1_000.0, "0201": 2_000.0}, + fixed_state_fips=np.array([1, 0], dtype=np.int32), + assigner=_assignment_for_seed, + ) + + summary_path = result.write_summary(tmp_path / "geography_assignment_summary.json") + summary = json.loads(summary_path.read_text(encoding="utf-8")) + + assert GeographyAssignmentSummary.from_dict(summary).to_dict() == summary + assert summary["block_geoid_unique_count"] == 4 + assert summary["county_fips_sha256"].startswith("sha256:") + assert summary["state_fips_sha256"].startswith("sha256:") diff --git a/tests/unit/calibration_package/test_specs.py b/tests/unit/calibration_package/test_specs.py index 2f124d835..177267f65 100644 --- a/tests/unit/calibration_package/test_specs.py +++ b/tests/unit/calibration_package/test_specs.py @@ -12,6 +12,7 @@ CALIBRATION_REPORTS_DIRNAME, DATASET_BUILD_OUTPUT_CONTRACT_FILENAME, DEFAULT_TARGET_CONFIG_PATH, + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME, MATRIX_BUILD_DIRNAME, SOURCE_DATASET_FILENAME, TARGET_DATABASE_FILENAME, @@ -104,6 +105,9 @@ def test_calibration_package_artifact_paths(): assert paths.target_facets == Path("/pipeline/artifacts/run-a") / ( CALIBRATION_TARGET_FACETS_FILENAME ) + assert paths.geography_summary == Path("/pipeline/artifacts/run-a") / ( + GEOGRAPHY_ASSIGNMENT_SUMMARY_FILENAME + ) assert paths.reports_dir == Path("/pipeline/artifacts/run-a") / ( CALIBRATION_REPORTS_DIRNAME ) @@ -115,6 +119,7 @@ def test_calibration_package_artifact_paths(): paths.contract, paths.targets, paths.target_facets, + paths.geography_summary, ) diff --git a/tests/unit/test_calibration_package_stage_contract.py b/tests/unit/test_calibration_package_stage_contract.py index 444b88062..fad52cf21 100644 --- a/tests/unit/test_calibration_package_stage_contract.py +++ b/tests/unit/test_calibration_package_stage_contract.py @@ -1,3 +1,5 @@ +import json + from tests.unit.fixtures.calibration_package_stage_contract import ( TARGET_CONFIG_PATH, calibration_package_contract, @@ -63,7 +65,7 @@ def test_calibration_package_contract_references_target_metadata_artifacts(tmp_p targets_path.write_text('{"target_id":1,"target_index":0}\n', encoding="utf-8") facets_path.write_text('{"target_count":1}\n', encoding="utf-8") - contract = build_calibration_package_contract( + contract = write_calibration_package_contract( package_path=package_path, dataset_path=dataset_path, db_path=db_path, @@ -83,6 +85,46 @@ def test_calibration_package_contract_references_target_metadata_artifacts(tmp_p assert contract.execution.reuse_summary.expected_outputs == 3 +def test_calibration_package_contract_references_geography_summary_artifact(tmp_path): + dataset_path, db_path, package_path = contract_input_paths(tmp_path) + package = write_calibration_package_payload(package_path) + geography_summary = summarize_geography_assignment(package) + geography_path = tmp_path / "geography_assignment_summary.json" + geography_path.write_text( + json.dumps(geography_summary.to_dict(), sort_keys=True), + encoding="utf-8", + ) + + contract = write_calibration_package_contract( + package_path=package_path, + dataset_path=dataset_path, + db_path=db_path, + package=package, + parameters=calibration_package_parameters(), + run_id="run-a", + completed_at="2026-05-08T12:02:00Z", + geography_summary_path=geography_path, + geography_assignment_summary=geography_summary, + ) + + outputs = {artifact.logical_name: artifact for artifact in contract.outputs} + assert outputs["geography_assignment_summary"].media_type == "application/json" + assert ( + outputs["geography_assignment_summary"].metadata["canonical_geography_sha256"] + == geography_summary.canonical_geography_sha256 + ) + assert contract.execution.reuse_summary.expected_outputs == 2 + assert ( + validate_calibration_package_contract( + package_path=package_path, + package=package, + dataset_path=dataset_path, + db_path=db_path, + ) + == contract + ) + + def test_calibration_package_parameters_parse_runtime_args(): params = CalibrationPackageParameters.from_runtime_args( workers=8, @@ -149,12 +191,17 @@ def test_geography_assignment_summary_round_trips_through_schema(): assert isinstance(summary, GeographyAssignmentSummary) assert GeographyAssignmentSummary.from_dict(summary.to_dict()) == summary + assert summary.schema_version == 1 assert summary.source_kind == "calibration_package" + assert summary.status == "completed" + assert summary.ordering == "clone_major" assert summary.n_records == 1 assert summary.n_clones == 3 assert summary.n_rows == 3 assert summary.block_geoid_sha256.startswith("sha256:") assert summary.cd_geoid_sha256.startswith("sha256:") + assert summary.county_fips_sha256.startswith("sha256:") + assert summary.state_fips_sha256.startswith("sha256:") assert summary.canonical_geography_sha256.startswith("sha256:") @@ -184,6 +231,7 @@ def test_geography_assignment_summary_allows_unavailable_package_geography(): summary = summarize_geography_assignment(package) assert summary.source_kind == "unavailable" + assert summary.status == "unavailable" assert summary.n_records == 1 assert summary.n_clones == 3 assert summary.n_rows is None diff --git a/tests/unit/test_pipeline_docs_extractor.py b/tests/unit/test_pipeline_docs_extractor.py index 85d140e6d..e3bf5b242 100644 --- a/tests/unit/test_pipeline_docs_extractor.py +++ b/tests/unit/test_pipeline_docs_extractor.py @@ -136,6 +136,8 @@ def test_pipeline_map_manifest_validates(): "stage2_target_catalog_reader", "stage2_target_selection_policy", "stage2_target_selection_result", + "stage2_geography_assignment_spec", + "stage2_geography_assignment_result", "build_matrix", "build_matrix_chunked", "stage2_payload_boundary",