From 29209f7ea747c8c6daa1e1b368c55168127d3e55 Mon Sep 17 00:00:00 2001 From: Daphne Hansell <128793799+daphnehanse11@users.noreply.github.com> Date: Wed, 20 May 2026 13:28:01 -0400 Subject: [PATCH 1/3] Add ACS primary residence value imputation --- changelog.d/1079.added | 1 + docs/appendix.md | 3 +- docs/data.md | 8 +- docs/pipeline_map.yaml | 8 +- .../calibration/source_impute.py | 31 ++++- policyengine_us_data/datasets/acs/acs.py | 7 +- .../datasets/acs/census_acs.py | 1 + policyengine_us_data/datasets/cps/cps.py | 33 ++++- tests/integration/support/tiny_stage_1.py | 2 + tests/integration/support/tiny_stage_2.py | 3 + tests/integration/test_cps_generation.py | 16 ++- tests/unit/calibration/test_source_impute.py | 115 ++++++++++++++++++ .../test_acs_tax_unit_construction.py | 37 ++++++ tests/unit/datasets/test_cps_file_handles.py | 3 + 14 files changed, 245 insertions(+), 23 deletions(-) create mode 100644 changelog.d/1079.added diff --git a/changelog.d/1079.added b/changelog.d/1079.added new file mode 100644 index 000000000..76dc0baed --- /dev/null +++ b/changelog.d/1079.added @@ -0,0 +1 @@ +Added ACS VALP-backed primary_residence_value imputation to CPS and source-imputed outputs. diff --git a/docs/appendix.md b/docs/appendix.md index d1c84bcf8..7d8f12364 100644 --- a/docs/appendix.md +++ b/docs/appendix.md @@ -152,7 +152,8 @@ within the same record. - auto_loan_balance - auto_loan_interest -#### Variables Imputed from American Community Survey (2 variables) +#### Variables Imputed from American Community Survey (3 variables) - rent - real_estate_taxes +- primary_residence_value diff --git a/docs/data.md b/docs/data.md index d8c16d3a4..cea12b83e 100644 --- a/docs/data.md +++ b/docs/data.md @@ -9,7 +9,7 @@ sources. | ------------------- | ----------------------- | ---------------------------------------------------------------------- | | CPS ASEC | 2024 (income year 2023) | Base microdata; pipeline ages values to target policy year | | IRS PUF | 2015 | Pipeline ages values to target policy year using income growth indices | -| ACS | 2022 | Provides rent and real estate tax imputation targets | +| ACS | 2022 | Provides rent, real estate tax, and primary residence value targets | | SCF | 2022 | Provides wealth and debt variable imputation targets | | SIPP | 2023 | Provides tip income and asset imputation targets | | Calibration targets | Primarily 2023–2024 | Varies by source; see calibration data sources below | @@ -93,8 +93,10 @@ proper matching. The ACS provides housing and geographic data that supplements the CPS housing information. For homeowners, we impute property taxes based on state of residence, household income, and demographic -characteristics. We also impute rent values for specific tenure types where CPS data is incomplete, -along with additional housing characteristics not captured in the CPS. These imputations use +characteristics. We also impute owner-occupied primary residence market value from ACS property +value records, with non-owner households set to zero. Rent values are imputed for specific tenure +types where CPS data is incomplete, along with additional housing characteristics not captured in +the CPS. These imputations use Quantile Regression Forests to preserve distributional characteristics while accounting for household heterogeneity. diff --git a/docs/pipeline_map.yaml b/docs/pipeline_map.yaml index a496a8266..1426180f5 100644 --- a/docs/pipeline_map.yaml +++ b/docs/pipeline_map.yaml @@ -205,7 +205,7 @@ stages: - id: in_acs label: ACS 2022 node_type: artifact - description: Training data for rent QRF + description: Training data for housing QRF - id: in_sipp label: SIPP 2023 node_type: artifact @@ -653,7 +653,7 @@ stages: legacy_stage_id: '4' manifest_step_ids: - 01_build_datasets - description: Impute wealth/assets from external surveys onto stratified CPS via QRF + description: Impute housing, wealth/assets, and labor-market variables from external surveys onto stratified CPS via QRF country: us extra_nodes: - id: in_strat_s4 @@ -663,7 +663,7 @@ stages: - id: in_acs_s4 label: ACS_2022 node_type: artifact - description: American Community Survey - has state_fips predictor + description: American Community Survey - has state_fips predictor and housing targets - id: in_sipp_s4 label: SIPP 2023 node_type: external @@ -679,7 +679,7 @@ stages: - id: out_imputed label: source_imputed_stratified_extended_cps.h5 node_type: artifact - description: Enriched with ACS/SIPP/SCF vars - uploaded to HuggingFace + description: Enriched with ACS/SIPP/ORG/SCF vars - uploaded to HuggingFace - id: util_clone_assign label: clone_and_assign.py node_type: utility diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index e2989e325..02b1c7274 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -6,7 +6,8 @@ financial predictors. Sources and variables: - ACS -> rent, real_estate_taxes (with state predictor) + ACS -> rent, real_estate_taxes, primary_residence_value + (with state predictor) SIPP -> tip_income, bank_account_assets, stock_assets, bond_assets, household_vehicles_owned, household_vehicles_value (no state predictor) @@ -29,6 +30,7 @@ import logging from typing import Dict, Optional +import h5py import numpy as np import pandas as pd from policyengine_us_data.datasets.cps.tipped_occupation import ( @@ -72,6 +74,12 @@ ACS_IMPUTED_VARIABLES = [ "rent", "real_estate_taxes", + "primary_residence_value", +] + +ACS_CALCULATED_IMPUTED_VARIABLES = [ + "rent", + "real_estate_taxes", ] SIPP_IMPUTED_VARIABLES = [ @@ -150,6 +158,7 @@ "RENTED": 2, "NONE": 0, } +OWNER_TENURE_CODE = 1 SIPP_JOB_OCCUPATION_COLUMNS = [f"TJB{i}_OCC" for i in range(1, 8)] @@ -321,7 +330,7 @@ def _person_state_fips( id="acs_qrf", label="ACS QRF Imputation", node_type="library", - description="Impute rent and real estate tax variables from ACS donor data.", + description="Impute housing value, rent, and real estate tax variables from ACS donor data.", source_file="policyengine_us_data/calibration/source_impute.py", status="current", stability="moving", @@ -337,7 +346,7 @@ def _impute_acs( time_period: int, dataset_path: Optional[str] = None, ) -> Dict[str, Dict[int, np.ndarray]]: - """Impute rent and real_estate_taxes from ACS with state. + """Impute rent, real_estate_taxes, and primary_residence_value from ACS. Args: data: CPS data dict. @@ -357,11 +366,17 @@ def _impute_acs( predictors = ACS_PREDICTORS + ["state_fips"] acs_df = acs.calculate_dataframe( - ACS_PREDICTORS + ACS_IMPUTED_VARIABLES, map_to="person" + ACS_PREDICTORS + ACS_CALCULATED_IMPUTED_VARIABLES, + map_to="person", ) acs_df["state_fips"] = acs.calculate("state_fips", map_to="person").values.astype( np.float32 ) + with h5py.File(ACS_2022.file_path, "r") as acs_h5: + acs_df["primary_residence_value"] = np.asarray( + acs_h5["primary_residence_value"], + dtype=np.float32, + ) train_df = acs_df[acs_df.is_household_head].sample(10_000, random_state=42) train_df = _encode_tenure_type(train_df) @@ -402,18 +417,22 @@ def _impute_acs( imputed_variables=ACS_IMPUTED_VARIABLES, ) predictions = fitted.predict(X_test=cps_heads) + owner_head_mask = cps_heads["tenure_type"].to_numpy() == OWNER_TENURE_CODE n_persons = len(data["person_id"][time_period]) for var in ACS_IMPUTED_VARIABLES: values = np.zeros(n_persons, dtype=np.float32) - values[mask] = predictions[var].values + predicted_values = predictions[var].values + if var == "primary_residence_value": + predicted_values = np.where(owner_head_mask, predicted_values, 0) + values[mask] = predicted_values data[var] = {time_period: values} data["pre_subsidy_rent"] = {time_period: data["rent"][time_period].copy()} del fitted, predictions gc.collect() - logger.info("ACS imputation complete: rent, real_estate_taxes") + logger.info("ACS imputation complete: %s", ", ".join(ACS_IMPUTED_VARIABLES)) return data diff --git a/policyengine_us_data/datasets/acs/acs.py b/policyengine_us_data/datasets/acs/acs.py index 8bbc39af9..f382acf9b 100644 --- a/policyengine_us_data/datasets/acs/acs.py +++ b/policyengine_us_data/datasets/acs/acs.py @@ -71,17 +71,20 @@ def add_person_variables( acs["self_employment_income"] = person.SEMP acs["social_security"] = person.SSP acs["taxable_private_pension_income"] = person.RETP - person[["rent", "real_estate_taxes"]] = ( + person[["rent", "real_estate_taxes", "primary_residence_value", "TEN"]] = ( household.set_index("household_id") - .loc[person["household_id"]][["RNTP", "TAXAMT"]] + .loc[person["household_id"]][["RNTP", "TAXAMT", "VALP", "TEN"]] .values ) acs["is_household_head"] = person.SPORDER == 1 factor = person.SPORDER == 1 + owner_occupied = person.TEN.astype(int).isin([1, 2]) person.rent *= factor * 12 person.real_estate_taxes *= factor + person.primary_residence_value *= factor * owner_occupied acs["rent"] = person.rent acs["real_estate_taxes"] = person.real_estate_taxes + acs["primary_residence_value"] = person.primary_residence_value acs["tenure_type"] = ( household.TEN.astype(int) .map( diff --git a/policyengine_us_data/datasets/acs/census_acs.py b/policyengine_us_data/datasets/acs/census_acs.py index cc913115c..ed152a05d 100644 --- a/policyengine_us_data/datasets/acs/census_acs.py +++ b/policyengine_us_data/datasets/acs/census_acs.py @@ -57,6 +57,7 @@ "RMSP", # Number of rooms "RNTP", # Monthly rent "TEN", # Tenure + "VALP", # Property value "VEH", # Number of vehicles "FINCP", # Total income "GRNTP", # Gross rent diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 641e2f2d8..79fdf8c28 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -73,6 +73,10 @@ from policyengine_us_data.pipeline_metadata import pipeline_node from policyengine_us_data.pipeline_schema import PipelineNode +ACS_CALCULATED_IMPUTED_VARIABLES = ["rent", "real_estate_taxes"] +ACS_IMPUTED_VARIABLES = [*ACS_CALCULATED_IMPUTED_VARIABLES, "primary_residence_value"] +OWNER_TENURE_TYPES = {"OWNED_WITH_MORTGAGE", "OWNED_OUTRIGHT"} + CURRENT_HEALTH_COVERAGE_REPORTED_VAR_MAP = { "reported_has_direct_purchase_health_coverage_at_interview": "NOW_DIR", "reported_has_marketplace_health_coverage_at_interview": "NOW_MRK", @@ -341,7 +345,7 @@ def downsample(self, frac: float) -> None: id="add_rent", label="Rent Imputation", node_type="library", - description="Impute rent and real estate taxes using ACS donor data.", + description="Impute housing values, rent, and real estate taxes using ACS donor data.", source_file="policyengine_us_data/datasets/cps/cps.py", status="legacy", stability="moving", @@ -398,8 +402,10 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): "state_code_str", "household_size", ] - IMPUTATIONS = ["rent", "real_estate_taxes"] - train_df = acs.calculate_dataframe(PREDICTORS + IMPUTATIONS, map_to="person") + train_df = acs.calculate_dataframe( + PREDICTORS + ACS_CALCULATED_IMPUTED_VARIABLES, + map_to="person", + ) # TODO(PolicyEngine/policyengine-core#482): policyengine-core 3.24.0+ # silently drops user-supplied ETERNITY inputs on dataset reload because # _user_input_keys records the user-supplied period instead of the @@ -413,6 +419,10 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): train_df["is_household_head"] = np.asarray( acs_h5["is_household_head"], dtype=bool ) + train_df["primary_residence_value"] = np.asarray( + acs_h5["primary_residence_value"], + dtype=float, + ) train_df.tenure_type = train_df.tenure_type.map( { "OWNED_OUTRIGHT": "OWNED_WITH_MORTGAGE", @@ -424,15 +434,16 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): inference_df["is_household_head"] = np.asarray(cps["is_household_head"], dtype=bool) mask = inference_df.is_household_head.values inference_df = inference_df[mask] + owner_head_mask = inference_df.tenure_type.astype(str).isin(OWNER_TENURE_TYPES) qrf = QRF() - logging.info("Training imputation model for rent and real estate taxes.") + logging.info("Training imputation model for ACS housing variables.") fitted_model = qrf.fit( X_train=train_df, predictors=PREDICTORS, - imputed_variables=IMPUTATIONS, + imputed_variables=ACS_IMPUTED_VARIABLES, ) - logging.info("Imputing rent and real estate taxes.") + logging.info("Imputing ACS housing variables.") imputed_values = fitted_model.predict(X_test=inference_df) logging.info("Imputation complete.") # ``cps["age"]`` has an integer dtype, so ``np.zeros_like(cps["age"])`` @@ -444,6 +455,16 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): cps["pre_subsidy_rent"] = cps["rent"] cps["real_estate_taxes"] = np.zeros(len(cps["age"]), dtype=float) cps["real_estate_taxes"][mask] = imputed_values["real_estate_taxes"] + primary_residence_values = np.asarray( + imputed_values["primary_residence_value"], + dtype=float, + ) + cps["primary_residence_value"] = np.zeros(len(cps["age"]), dtype=float) + cps["primary_residence_value"][mask] = np.where( + owner_head_mask, + primary_residence_values, + 0, + ) TEMPORARY_TAKEUP_SOURCE_ANCHORS = ("snap_reported", "ssi_reported") diff --git a/tests/integration/support/tiny_stage_1.py b/tests/integration/support/tiny_stage_1.py index c177523a2..34bed0545 100644 --- a/tests/integration/support/tiny_stage_1.py +++ b/tests/integration/support/tiny_stage_1.py @@ -39,6 +39,7 @@ "is_household_head", "rent", "real_estate_taxes", + "primary_residence_value", ) ACS_HOUSEHOLD_ARRAYS = ( @@ -229,6 +230,7 @@ def write_tiny_acs(path: Path) -> None: "is_household_head": np.array([True, False, True], dtype=np.bool_), "rent": np.array([0, 0, 14_400], dtype=np.float32), "real_estate_taxes": np.array([2_400, 0, 0], dtype=np.float32), + "primary_residence_value": np.array([275_000, 0, 0], dtype=np.float32), "tenure_type": np.array([b"OWNED_WITH_MORTGAGE", b"RENTED"]), "household_vehicles_owned": np.array([2, 1], dtype=np.int16), "state_fips": np.array([37, 37], dtype=np.int16), diff --git a/tests/integration/support/tiny_stage_2.py b/tests/integration/support/tiny_stage_2.py index 6a51cf288..ed896c7ea 100644 --- a/tests/integration/support/tiny_stage_2.py +++ b/tests/integration/support/tiny_stage_2.py @@ -51,6 +51,7 @@ "non_qualified_dividend_income", "rent", "real_estate_taxes", + "primary_residence_value", "deductible_mortgage_interest", "is_tax_unit_head", "is_tax_unit_spouse", @@ -160,6 +161,7 @@ def write_tiny_cps( "non_qualified_dividend_income": np.array([10, 5, 0], dtype=np.float32), "rent": acs["rent"][:], "real_estate_taxes": acs["real_estate_taxes"][:], + "primary_residence_value": acs["primary_residence_value"][:], "deductible_mortgage_interest": np.array([1_800, 0, 0], dtype=np.float32), "is_tax_unit_head": np.array([True, False, True], dtype=np.bool_), "is_tax_unit_spouse": np.array([False, True, False], dtype=np.bool_), @@ -239,6 +241,7 @@ def write_tiny_puf( ), "rent": np.zeros(person_count, dtype=np.float32), "real_estate_taxes": raw["E18500"].to_numpy(dtype=np.float32), + "primary_residence_value": np.zeros(person_count, dtype=np.float32), "deductible_mortgage_interest": raw["E19200"].to_numpy(dtype=np.float32), "is_tax_unit_head": np.ones(person_count, dtype=np.bool_), "is_tax_unit_spouse": np.zeros(person_count, dtype=np.bool_), diff --git a/tests/integration/test_cps_generation.py b/tests/integration/test_cps_generation.py index cfba2c92a..55654c2a4 100644 --- a/tests/integration/test_cps_generation.py +++ b/tests/integration/test_cps_generation.py @@ -51,6 +51,7 @@ def calculate(self, variable_name): "receives_wic": [False, False], "hud_income_level": ["VERY_LOW"], "spm_unit_tenure_type": ["RENTER"], + "is_eligible_for_housing_assistance": [True], "tax_unit_child_dependents": [0], "age_head": [40], } @@ -258,6 +259,10 @@ def test_add_rent_requests_person_level_frames(monkeypatch, tmp_path): "is_household_head", data=np.ones(10_050, dtype=bool), ) + fake_acs_h5.create_dataset( + "primary_residence_value", + data=np.full(10_050, 300_000.0, dtype=np.float32), + ) class FakeACSDataset: file_path = fake_acs_path @@ -324,6 +329,7 @@ def predict(self, X_test): { "rent": [1_200.0, 0.0], "real_estate_taxes": [0.0, 4_000.0], + "primary_residence_value": [250_000.0, 600_000.0], } ) @@ -331,7 +337,11 @@ class FakeQRF: def fit(self, X_train, predictors, imputed_variables): assert len(X_train) == 10_000 assert predictors[-1] == "household_size" - assert imputed_variables == ["rent", "real_estate_taxes"] + assert imputed_variables == [ + "rent", + "real_estate_taxes", + "primary_residence_value", + ] return FakeQRFModel() monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) @@ -357,6 +367,10 @@ def fit(self, X_train, predictors, imputed_variables): cps["real_estate_taxes"], np.array([0, 0, 4000], dtype=np.int32), ) + np.testing.assert_array_equal( + cps["primary_residence_value"], + np.array([0, 0, 600_000], dtype=np.int32), + ) assert not dataset.file_path.exists() diff --git a/tests/unit/calibration/test_source_impute.py b/tests/unit/calibration/test_source_impute.py index 188141753..33c0af505 100644 --- a/tests/unit/calibration/test_source_impute.py +++ b/tests/unit/calibration/test_source_impute.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +import h5py from policyengine_us_data.calibration.source_impute import ( ACS_IMPUTED_VARIABLES, @@ -53,6 +54,7 @@ def _make_data_dict(n_persons=20, time_period=2024): }, "rent": {time_period: np.zeros(n_persons)}, "real_estate_taxes": {time_period: np.zeros(n_persons)}, + "primary_residence_value": {time_period: np.zeros(n_persons)}, "tip_income": {time_period: np.zeros(n_persons)}, "bank_account_assets": {time_period: np.zeros(n_persons)}, "stock_assets": {time_period: np.zeros(n_persons)}, @@ -74,6 +76,7 @@ class TestConstants: def test_acs_variables_defined(self): assert "rent" in ACS_IMPUTED_VARIABLES assert "real_estate_taxes" in ACS_IMPUTED_VARIABLES + assert "primary_residence_value" in ACS_IMPUTED_VARIABLES def test_sipp_variables_defined(self): assert "tip_income" in SIPP_IMPUTED_VARIABLES @@ -172,6 +175,7 @@ def test_skip_flags_preserve_data(self): for var in [ "rent", "real_estate_taxes", + "primary_residence_value", "tip_income", "hourly_wage", "is_union_member_or_covered", @@ -238,6 +242,117 @@ class TestSubfunctions: def test_impute_acs_exists(self): assert callable(_impute_acs) + def test_impute_acs_sets_primary_residence_value_only_for_owner_heads( + self, monkeypatch, tmp_path + ): + import microimpute.models.qrf as qrf_module + import policyengine_us + import policyengine_us_data.datasets.acs.acs as acs_module + + fake_acs_path = tmp_path / "acs.h5" + rows = 10_050 + with h5py.File(fake_acs_path, mode="w") as fake_acs: + fake_acs.create_dataset( + "primary_residence_value", + data=np.full(rows, 300_000, dtype=np.float32), + ) + + class FakeStateValues: + values = np.ones(rows, dtype=np.float32) * 6 + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = dataset + + def calculate_dataframe(self, variables, map_to=None): + if self.dataset is acs_module.ACS_2022: + return pd.DataFrame( + { + "is_household_head": np.ones(rows, dtype=bool), + "age": np.full(rows, 55, dtype=np.float32), + "is_male": np.zeros(rows, dtype=bool), + "tenure_type": ["OWNED_WITH_MORTGAGE"] * rows, + "employment_income": np.full( + rows, 75_000, dtype=np.float32 + ), + "self_employment_income": np.zeros(rows, dtype=np.float32), + "social_security": np.zeros(rows, dtype=np.float32), + "pension_income": np.zeros(rows, dtype=np.float32), + "household_size": np.full(rows, 2, dtype=np.float32), + "rent": np.zeros(rows, dtype=np.float32), + "real_estate_taxes": np.full(rows, 4_000, dtype=np.float32), + } + ) + return pd.DataFrame( + { + "is_household_head": [True, False, True], + "age": [55, 53, 31], + "is_male": [True, False, False], + "tenure_type": [ + "OWNED_WITH_MORTGAGE", + "OWNED_WITH_MORTGAGE", + "RENTED", + ], + "employment_income": [80_000, 30_000, 45_000], + "self_employment_income": [0, 0, 0], + "social_security": [0, 0, 0], + "pension_income": [0, 0, 0], + "household_size": [2, 2, 1], + } + ) + + def calculate(self, variable, map_to=None): + assert variable == "state_fips" + return FakeStateValues() + + class FakeQRFModel: + def predict(self, X_test): + assert len(X_test) == 2 + return pd.DataFrame( + { + "rent": [0, 1_200], + "real_estate_taxes": [4_000, 0], + "primary_residence_value": [500_000, 700_000], + } + ) + + class FakeQRF: + def fit(self, X_train, predictors, imputed_variables): + assert len(X_train) == 10_000 + assert "primary_residence_value" in X_train + assert imputed_variables == ACS_IMPUTED_VARIABLES + return FakeQRFModel() + + monkeypatch.setattr(acs_module.ACS_2022, "file_path", fake_acs_path) + monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) + monkeypatch.setattr(qrf_module, "QRF", FakeQRF) + + data = { + "person_id": {2024: np.arange(3)}, + "household_id": {2024: np.array([0, 1])}, + "person_household_id": {2024: np.array([0, 0, 1])}, + } + + result = _impute_acs( + data, + state_fips=np.array([6, 48], dtype=np.int32), + time_period=2024, + dataset_path="fake-cps.h5", + ) + + np.testing.assert_array_equal( + result["rent"][2024], + np.array([0, 0, 1_200], dtype=np.float32), + ) + np.testing.assert_array_equal( + result["real_estate_taxes"][2024], + np.array([4_000, 0, 0], dtype=np.float32), + ) + np.testing.assert_array_equal( + result["primary_residence_value"][2024], + np.array([500_000, 0, 0], dtype=np.float32), + ) + def test_impute_sipp_exists(self): assert callable(_impute_sipp) diff --git a/tests/unit/datasets/test_acs_tax_unit_construction.py b/tests/unit/datasets/test_acs_tax_unit_construction.py index 61bbf986c..988c4f057 100644 --- a/tests/unit/datasets/test_acs_tax_unit_construction.py +++ b/tests/unit/datasets/test_acs_tax_unit_construction.py @@ -196,6 +196,43 @@ def test_acs_add_id_variables_writes_tax_unit_ids(): assert tax_unit_id.tolist() == [1, 2] +def test_acs_add_person_variables_writes_primary_residence_value_for_owner_heads(): + person = pd.DataFrame( + { + "household_id": [0, 0, 1], + "SPORDER": [1, 2, 1], + "AGEP": [45, 43, 30], + "SEX": [1, 2, 1], + "WAGP": [60_000, 40_000, 50_000], + "SEMP": [0, 0, 0], + "SSP": [0, 0, 0], + "RETP": [0, 0, 0], + } + ) + household = pd.DataFrame( + { + "household_id": [0, 1], + "RNTP": [0, 1_000], + "TAXAMT": [2_400, 0], + "VALP": [300_000, 500_000], + "TEN": [1, 3], + } + ) + + with h5py.File("memory", mode="w", driver="core", backing_store=False) as acs: + ACS.add_person_variables(acs, person, household) + rent = acs["rent"][:] + real_estate_taxes = acs["real_estate_taxes"][:] + primary_residence_value = acs["primary_residence_value"][:] + + np.testing.assert_array_equal(rent, np.array([0, 0, 12_000])) + np.testing.assert_array_equal(real_estate_taxes, np.array([2_400, 0, 0])) + np.testing.assert_array_equal( + primary_residence_value, + np.array([300_000, 0, 0]), + ) + + def test_acs_add_id_variables_handles_duplicate_person_index_labels(): person = _acs_person_fixture( SERIALNO=["1", "2"], diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index 9d500953f..6a465fbd1 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -390,6 +390,7 @@ def recording_hdfstore(path, mode="a", *args, **kwargs): acs_fixture_path = tmp_path / "acs_fixture.h5" with h5py.File(acs_fixture_path, "w") as acs_fixture: acs_fixture["is_household_head"] = np.ones(10_000, dtype=bool) + acs_fixture["primary_residence_value"] = np.full(10_000, 250_000.0) real_h5py_file = cps_module.h5py.File opened_h5_paths = [] @@ -411,6 +412,7 @@ def predict(self, X_test): { "rent": np.full(len(X_test), 1_000.0), "real_estate_taxes": np.full(len(X_test), 250.0), + "primary_residence_value": np.full(len(X_test), 500_000.0), } ) @@ -494,3 +496,4 @@ class FakeACS_2022: assert not existing_path.exists() np.testing.assert_array_equal(cps["rent"], np.array([1_000.0])) np.testing.assert_array_equal(cps["real_estate_taxes"], np.array([250.0])) + np.testing.assert_array_equal(cps["primary_residence_value"], np.array([0.0])) From 681437685a6662cd9a9dd0199341e225728b981c Mon Sep 17 00:00:00 2001 From: Daphne Hansell <128793799+daphnehanse11@users.noreply.github.com> Date: Wed, 20 May 2026 14:15:09 -0400 Subject: [PATCH 2/3] Harden primary residence value imputation --- docs/pipeline_map.yaml | 2 +- policyengine_us_data/calibration/source_impute.py | 7 ++++++- policyengine_us_data/datasets/cps/cps.py | 9 +++++++-- tests/integration/test_cps_generation.py | 9 +++++++-- tests/unit/calibration/test_source_impute.py | 3 ++- 5 files changed, 23 insertions(+), 7 deletions(-) diff --git a/docs/pipeline_map.yaml b/docs/pipeline_map.yaml index 1426180f5..0e2d3a1ee 100644 --- a/docs/pipeline_map.yaml +++ b/docs/pipeline_map.yaml @@ -648,7 +648,7 @@ stages: stability: moving - id: 1f_source_imputation label: 1f - title: 'Substage 1f: Source Imputation (ACS + SIPP + SCF)' + title: 'Substage 1f: Source Imputation (ACS + SIPP + ORG + SCF)' canonical_stage_id: 1_build_datasets legacy_stage_id: '4' manifest_step_ids: diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index 02b1c7274..cab5717ce 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -368,6 +368,7 @@ def _impute_acs( acs_df = acs.calculate_dataframe( ACS_PREDICTORS + ACS_CALCULATED_IMPUTED_VARIABLES, map_to="person", + use_weights=False, ) acs_df["state_fips"] = acs.calculate("state_fips", map_to="person").values.astype( np.float32 @@ -384,7 +385,11 @@ def _impute_acs( if dataset_path is not None: cps_sim = Microsimulation(dataset=dataset_path) - cps_df = cps_sim.calculate_dataframe(ACS_PREDICTORS, map_to="person") + cps_df = cps_sim.calculate_dataframe( + ACS_PREDICTORS, + map_to="person", + use_weights=False, + ) del cps_sim else: cps_df = pd.DataFrame() diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 79fdf8c28..66ce5ffda 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -343,7 +343,7 @@ def downsample(self, frac: float) -> None: @pipeline_node( PipelineNode( id="add_rent", - label="Rent Imputation", + label="ACS Housing Imputation", node_type="library", description="Impute housing values, rent, and real estate taxes using ACS donor data.", source_file="policyengine_us_data/datasets/cps/cps.py", @@ -405,6 +405,7 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): train_df = acs.calculate_dataframe( PREDICTORS + ACS_CALCULATED_IMPUTED_VARIABLES, map_to="person", + use_weights=False, ) # TODO(PolicyEngine/policyengine-core#482): policyengine-core 3.24.0+ # silently drops user-supplied ETERNITY inputs on dataset reload because @@ -430,7 +431,11 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): na_action="ignore", ).fillna(train_df.tenure_type) train_df = train_df[train_df.is_household_head].sample(10_000) - inference_df = cps_sim.calculate_dataframe(PREDICTORS, map_to="person") + inference_df = cps_sim.calculate_dataframe( + PREDICTORS, + map_to="person", + use_weights=False, + ) inference_df["is_household_head"] = np.asarray(cps["is_household_head"], dtype=bool) mask = inference_df.is_household_head.values inference_df = inference_df[mask] diff --git a/tests/integration/test_cps_generation.py b/tests/integration/test_cps_generation.py index 55654c2a4..a9f790473 100644 --- a/tests/integration/test_cps_generation.py +++ b/tests/integration/test_cps_generation.py @@ -287,7 +287,9 @@ def __init__(self, dataset): def calculate_dataframe( self, columns, period=None, map_to=None, use_weights=True ): - FakeMicrosimulation.calls.append((self.dataset, tuple(columns), map_to)) + FakeMicrosimulation.calls.append( + (self.dataset, tuple(columns), map_to, use_weights) + ) if self.dataset is fake_acs_dataset: rows = 10_050 return pd.DataFrame( @@ -361,7 +363,10 @@ def fit(self, X_train, predictors, imputed_variables): add_rent(dataset, cps, person, household) - assert [call[2] for call in FakeMicrosimulation.calls] == ["person", "person"] + assert [(call[2], call[3]) for call in FakeMicrosimulation.calls] == [ + ("person", False), + ("person", False), + ] np.testing.assert_array_equal(cps["rent"], np.array([1200, 0, 0], dtype=np.int32)) np.testing.assert_array_equal( cps["real_estate_taxes"], diff --git a/tests/unit/calibration/test_source_impute.py b/tests/unit/calibration/test_source_impute.py index 33c0af505..f3d735812 100644 --- a/tests/unit/calibration/test_source_impute.py +++ b/tests/unit/calibration/test_source_impute.py @@ -264,7 +264,8 @@ class FakeMicrosimulation: def __init__(self, dataset): self.dataset = dataset - def calculate_dataframe(self, variables, map_to=None): + def calculate_dataframe(self, variables, map_to=None, use_weights=True): + assert use_weights is False if self.dataset is acs_module.ACS_2022: return pd.DataFrame( { From efae45d283db916ad6a24a4d7ac551bdb1a2d541 Mon Sep 17 00:00:00 2001 From: Daphne Hansell <128793799+daphnehanse11@users.noreply.github.com> Date: Thu, 21 May 2026 13:52:25 -0400 Subject: [PATCH 3/3] Remove ACS imputation weight flag cleanup --- policyengine_us_data/calibration/source_impute.py | 2 -- policyengine_us_data/datasets/cps/cps.py | 2 -- tests/integration/test_cps_generation.py | 4 ++-- tests/unit/calibration/test_source_impute.py | 1 - 4 files changed, 2 insertions(+), 7 deletions(-) diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index cab5717ce..0f8bb8bab 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -368,7 +368,6 @@ def _impute_acs( acs_df = acs.calculate_dataframe( ACS_PREDICTORS + ACS_CALCULATED_IMPUTED_VARIABLES, map_to="person", - use_weights=False, ) acs_df["state_fips"] = acs.calculate("state_fips", map_to="person").values.astype( np.float32 @@ -388,7 +387,6 @@ def _impute_acs( cps_df = cps_sim.calculate_dataframe( ACS_PREDICTORS, map_to="person", - use_weights=False, ) del cps_sim else: diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 66ce5ffda..1beedd5a3 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -405,7 +405,6 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): train_df = acs.calculate_dataframe( PREDICTORS + ACS_CALCULATED_IMPUTED_VARIABLES, map_to="person", - use_weights=False, ) # TODO(PolicyEngine/policyengine-core#482): policyengine-core 3.24.0+ # silently drops user-supplied ETERNITY inputs on dataset reload because @@ -434,7 +433,6 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): inference_df = cps_sim.calculate_dataframe( PREDICTORS, map_to="person", - use_weights=False, ) inference_df["is_household_head"] = np.asarray(cps["is_household_head"], dtype=bool) mask = inference_df.is_household_head.values diff --git a/tests/integration/test_cps_generation.py b/tests/integration/test_cps_generation.py index a9f790473..690a6ecce 100644 --- a/tests/integration/test_cps_generation.py +++ b/tests/integration/test_cps_generation.py @@ -364,8 +364,8 @@ def fit(self, X_train, predictors, imputed_variables): add_rent(dataset, cps, person, household) assert [(call[2], call[3]) for call in FakeMicrosimulation.calls] == [ - ("person", False), - ("person", False), + ("person", True), + ("person", True), ] np.testing.assert_array_equal(cps["rent"], np.array([1200, 0, 0], dtype=np.int32)) np.testing.assert_array_equal( diff --git a/tests/unit/calibration/test_source_impute.py b/tests/unit/calibration/test_source_impute.py index f3d735812..89729ea62 100644 --- a/tests/unit/calibration/test_source_impute.py +++ b/tests/unit/calibration/test_source_impute.py @@ -265,7 +265,6 @@ def __init__(self, dataset): self.dataset = dataset def calculate_dataframe(self, variables, map_to=None, use_weights=True): - assert use_weights is False if self.dataset is acs_module.ACS_2022: return pd.DataFrame( {