From 95ae45c2707d7a8d5a3025d9d1004f4ea46377c8 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 6 Apr 2026 00:13:01 -0400 Subject: [PATCH 1/4] Close leaked CPS HDF stores --- .../changed/fix-cps-hdf5-file-handles.md | 1 + policyengine_us_data/datasets/cps/cps.py | 80 +++++++++---------- tests/unit/datasets/test_cps_file_handles.py | 67 ++++++++++++++++ 3 files changed, 108 insertions(+), 40 deletions(-) create mode 100644 changelog.d/changed/fix-cps-hdf5-file-handles.md create mode 100644 tests/unit/datasets/test_cps_file_handles.py diff --git a/changelog.d/changed/fix-cps-hdf5-file-handles.md b/changelog.d/changed/fix-cps-hdf5-file-handles.md new file mode 100644 index 000000000..b508c49ac --- /dev/null +++ b/changelog.d/changed/fix-cps-hdf5-file-handles.md @@ -0,0 +1 @@ +Close raw CPS HDF stores after previous-year income and auto-loan preprocessing so CPS builds do not leave `census_cps_2021.h5` and `census_cps_2022.h5` open at process shutdown. diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index e227760ae..72ef1f903 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -106,13 +106,13 @@ def generate(self): "For future years, use PolicyEngine's uprating at simulation time." ) - raw_data = self.raw_cps(require=True).load() cps = {} ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household") - person, tax_unit, family, spm_unit, household = [ - raw_data[entity] for entity in ENTITIES - ] + with self.raw_cps(require=True).load() as raw_data: + person, tax_unit, family, spm_unit, household = [ + raw_data[entity] for entity in ENTITIES + ] logging.info("Adding ID variables") add_id_variables(cps, person, tax_unit, family, spm_unit, household) @@ -146,7 +146,6 @@ def generate(self): add_auto_loan_interest_and_net_worth(self, cps) logging.info("Added all variables") - raw_data.close() self.save_dataset(cps) logging.info("Adding takeup") add_takeup(self) @@ -911,39 +910,40 @@ def add_previous_year_income(self, cps: h5py.File) -> None: ) return - cps_current_year_data = self.raw_cps(require=True).load() - cps_previous_year_data = self.previous_year_raw_cps(require=True).load() - cps_previous_year = cps_previous_year_data.person.set_index( - cps_previous_year_data.person.PERIDNUM - ) - cps_current_year = cps_current_year_data.person.set_index( - cps_current_year_data.person.PERIDNUM - ) + with self.raw_cps(require=True).load() as cps_current_year_data, self.previous_year_raw_cps( + require=True + ).load() as cps_previous_year_data: + cps_previous_year = cps_previous_year_data.person.set_index( + cps_previous_year_data.person.PERIDNUM + ) + cps_current_year = cps_current_year_data.person.set_index( + cps_current_year_data.person.PERIDNUM + ) - previous_year_data = cps_previous_year[ - ["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"] - ].rename( - { - "WSAL_VAL": "employment_income_last_year", - "SEMP_VAL": "self_employment_income_last_year", - }, - axis=1, - ) + previous_year_data = cps_previous_year[ + ["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"] + ].rename( + { + "WSAL_VAL": "employment_income_last_year", + "SEMP_VAL": "self_employment_income_last_year", + }, + axis=1, + ) - previous_year_data = previous_year_data[ - (previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0) - ] + previous_year_data = previous_year_data[ + (previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0) + ] - previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True) + previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True) - joined_data = cps_current_year.join(previous_year_data)[ - [ - "employment_income_last_year", - "self_employment_income_last_year", - "I_ERNVAL", - "I_SEVAL", + joined_data = cps_current_year.join(previous_year_data)[ + [ + "employment_income_last_year", + "self_employment_income_last_year", + "I_ERNVAL", + "I_SEVAL", + ] ] - ] joined_data["previous_year_income_available"] = ( ~joined_data.employment_income_last_year.isna() & ~joined_data.self_employment_income_last_year.isna() @@ -1870,13 +1870,12 @@ def add_tips(self, cps: h5py.File): # Get is_married from raw CPS data (A_MARITL codes: 1,2 = married) # Note: is_married in policyengine-us is Family-level, but we need # person-level for imputation models - raw_data = self.raw_cps(require=True).load() - raw_person = raw_data["person"] - cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values - cps["is_tipped_occupation"] = derive_is_tipped_occupation( - derive_treasury_tipped_occupation_code(raw_person.PEIOOCC) - ) - raw_data.close() + with self.raw_cps(require=True).load() as raw_data: + raw_person = raw_data["person"] + cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values + cps["is_tipped_occupation"] = derive_is_tipped_occupation( + derive_treasury_tipped_occupation_code(raw_person.PEIOOCC) + ) cps["is_under_18"] = cps.age < 18 cps["is_under_6"] = cps.age < 6 @@ -2261,6 +2260,7 @@ def determine_reference_person(group): # Add is_married variable for household heads based on raw person data reference_persons = person_data[mask] receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values + raw_data.close() # Impute auto loan balance from the SCF from policyengine_us_data.datasets.scf.scf import SCF_2022 diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py new file mode 100644 index 000000000..e59516bb3 --- /dev/null +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -0,0 +1,67 @@ +from types import SimpleNamespace + +import numpy as np +import pandas as pd + +from policyengine_us_data.datasets.cps.cps import add_previous_year_income + + +class _FakeStore: + def __init__(self, person: pd.DataFrame): + self.person = person + self.closed = False + + def close(self): + self.closed = True + + +class _FakeDataset: + store: _FakeStore | None = None + + def __init__(self, require: bool = False): + assert require is True + + def load(self): + assert self.store is not None + return self.store + + +def test_add_previous_year_income_closes_raw_cps_handles(): + current_person = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + previous_person = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "WSAL_VAL": [1_000, 2_000], + "SEMP_VAL": [100, 200], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + + current_store = _FakeStore(current_person) + previous_store = _FakeStore(previous_person) + + current_dataset = type("CurrentDataset", (_FakeDataset,), {"store": current_store}) + previous_dataset = type( + "PreviousDataset", (_FakeDataset,), {"store": previous_store} + ) + + holder = SimpleNamespace( + raw_cps=current_dataset, + previous_year_raw_cps=previous_dataset, + ) + cps = {} + + add_previous_year_income(holder, cps) + + np.testing.assert_array_equal(cps["employment_income_last_year"], [1000, 2000]) + np.testing.assert_array_equal(cps["self_employment_income_last_year"], [100, 200]) + np.testing.assert_array_equal(cps["previous_year_income_available"], [True, True]) + assert current_store.closed is True + assert previous_store.closed is True From 84ba5c03c2c81ca4526c334e6ec670b1371393f9 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Mon, 6 Apr 2026 22:36:58 -0400 Subject: [PATCH 2/4] Use context managers for CPS raw stores --- policyengine_us_data/datasets/cps/cps.py | 223 +++++++++---------- tests/unit/datasets/test_cps_file_handles.py | 7 + 2 files changed, 118 insertions(+), 112 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 72ef1f903..c421f0795 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -910,9 +910,10 @@ def add_previous_year_income(self, cps: h5py.File) -> None: ) return - with self.raw_cps(require=True).load() as cps_current_year_data, self.previous_year_raw_cps( - require=True - ).load() as cps_previous_year_data: + with ( + self.raw_cps(require=True).load() as cps_current_year_data, + self.previous_year_raw_cps(require=True).load() as cps_previous_year_data, + ): cps_previous_year = cps_previous_year_data.person.set_index( cps_previous_year_data.person.PERIDNUM ) @@ -2035,51 +2036,50 @@ def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None: cps_data = self.load_dataset() # Access raw CPS for additional variables - raw_data_instance = self.raw_cps(require=True) - raw_data = raw_data_instance.load() - person_data = raw_data.person - - # Preprocess the CPS for imputation - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) - agg_data["interest_dividend_income"] = np.sum( - [ - agg_data["taxable_interest_income"], - agg_data["tax_exempt_interest_income"], - agg_data["qualified_dividend_income"], - agg_data["non_qualified_dividend_income"], - ], - axis=0, - ) - agg_data["social_security_pension_income"] = np.sum( - [ - agg_data["tax_exempt_private_pension_income"], - agg_data["taxable_private_pension_income"], - agg_data["social_security_retirement"], - ], - axis=0, - ) - - agg = ( - agg_data.groupby("person_household_id")[ + with self.raw_cps(require=True).load() as raw_data: + person_data = raw_data.person + + # Preprocess the CPS for imputation + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["person_household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + agg_data["interest_dividend_income"] = np.sum( + [ + agg_data["taxable_interest_income"], + agg_data["tax_exempt_interest_income"], + agg_data["qualified_dividend_income"], + agg_data["non_qualified_dividend_income"], + ], + axis=0, + ) + agg_data["social_security_pension_income"] = np.sum( [ - "employment_income", - "interest_dividend_income", - "social_security_pension_income", + agg_data["tax_exempt_private_pension_income"], + agg_data["taxable_private_pension_income"], + agg_data["social_security_retirement"], + ], + axis=0, + ) + + agg = ( + agg_data.groupby("person_household_id")[ + [ + "employment_income", + "interest_dividend_income", + "social_security_pension_income", + ] ] - ] - .sum() - .rename( - columns={ - "employment_income": "household_employment_income", - "interest_dividend_income": "household_interest_dividend_income", - "social_security_pension_income": "household_social_security_pension_income", - } + .sum() + .rename( + columns={ + "employment_income": "household_employment_income", + "interest_dividend_income": "household_interest_dividend_income", + "social_security_pension_income": "household_social_security_pension_income", + } + ) + .reset_index() ) - .reset_index() - ) def create_scf_reference_person_mask(cps_data, raw_person_data): """ @@ -2189,78 +2189,77 @@ def determine_reference_person(group): return all_persons_data["is_scf_reference_person"].values - mask = create_scf_reference_person_mask(cps_data, person_data) - mask_len = mask.shape[0] - - cps_data = { - var: data[mask] if data.shape[0] == mask_len else data - for var, data in cps_data.items() - } - - CPS_RACE_MAPPING = { - 1: 1, # White only -> WHITE - 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN - 3: 5, # American Indian, Alaskan Native only -> OTHER - 4: 4, # Asian only -> ASIAN - 5: 5, # Hawaiian/Pacific Islander only -> OTHER - 6: 5, # White-Black -> OTHER - 7: 5, # White-AI -> OTHER - 8: 5, # White-Asian -> OTHER - 9: 3, # White-HP -> HISPANIC - 10: 5, # Black-AI -> OTHER - 11: 5, # Black-Asian -> OTHER - 12: 3, # Black-HP -> HISPANIC - 13: 5, # AI-Asian -> OTHER - 14: 5, # AI-HP -> OTHER - 15: 3, # Asian-HP -> HISPANIC - 16: 5, # White-Black-AI -> OTHER - 17: 5, # White-Black-Asian -> OTHER - 18: 5, # White-Black-HP -> OTHER - 19: 5, # White-AI-Asian -> OTHER - 20: 5, # White-AI-HP -> OTHER - 21: 5, # White-Asian-HP -> OTHER - 22: 5, # Black-AI-Asian -> OTHER - 23: 5, # White-Black-AI-Asian -> OTHER - 24: 5, # White-AI-Asian-HP -> OTHER - 25: 5, # Other 3 race comb. -> OTHER - 26: 5, # Other 4 or 5 race comb. -> OTHER - } + mask = create_scf_reference_person_mask(cps_data, person_data) + mask_len = mask.shape[0] - # Apply the mapping to recode the race values - cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) + cps_data = { + var: data[mask] if data.shape[0] == mask_len else data + for var, data in cps_data.items() + } - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + CPS_RACE_MAPPING = { + 1: 1, # White only -> WHITE + 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN + 3: 5, # American Indian, Alaskan Native only -> OTHER + 4: 4, # Asian only -> ASIAN + 5: 5, # Hawaiian/Pacific Islander only -> OTHER + 6: 5, # White-Black -> OTHER + 7: 5, # White-AI -> OTHER + 8: 5, # White-Asian -> OTHER + 9: 3, # White-HP -> HISPANIC + 10: 5, # Black-AI -> OTHER + 11: 5, # Black-Asian -> OTHER + 12: 3, # Black-HP -> HISPANIC + 13: 5, # AI-Asian -> OTHER + 14: 5, # AI-HP -> OTHER + 15: 3, # Asian-HP -> HISPANIC + 16: 5, # White-Black-AI -> OTHER + 17: 5, # White-Black-Asian -> OTHER + 18: 5, # White-Black-HP -> OTHER + 19: 5, # White-AI-Asian -> OTHER + 20: 5, # White-AI-HP -> OTHER + 21: 5, # White-Asian-HP -> OTHER + 22: 5, # Black-AI-Asian -> OTHER + 23: 5, # White-Black-AI-Asian -> OTHER + 24: 5, # White-AI-Asian-HP -> OTHER + 25: 5, # Other 3 race comb. -> OTHER + 26: 5, # Other 4 or 5 race comb. -> OTHER + } - receiver_data = receiver_data.merge( - agg[ - [ - "person_household_id", - "household_employment_income", - "household_interest_dividend_income", - "household_social_security_pension_income", - ] - ], - on="person_household_id", - how="left", - ) - receiver_data.drop("employment_income", axis=1, inplace=True) + # Apply the mapping to recode the race values + cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) + + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["person_household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + + receiver_data = receiver_data.merge( + agg[ + [ + "person_household_id", + "household_employment_income", + "household_interest_dividend_income", + "household_social_security_pension_income", + ] + ], + on="person_household_id", + how="left", + ) + receiver_data.drop("employment_income", axis=1, inplace=True) - receiver_data.rename( - columns={ - "household_employment_income": "employment_income", - "household_interest_dividend_income": "interest_dividend_income", - "household_social_security_pension_income": "social_security_pension_income", - }, - inplace=True, - ) + receiver_data.rename( + columns={ + "household_employment_income": "employment_income", + "household_interest_dividend_income": "interest_dividend_income", + "household_social_security_pension_income": "social_security_pension_income", + }, + inplace=True, + ) - # Add is_married variable for household heads based on raw person data - reference_persons = person_data[mask] - receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values - raw_data.close() + # Add is_married variable for household heads based on raw person data + reference_persons = person_data[mask] + receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values # Impute auto loan balance from the SCF from policyengine_us_data.datasets.scf.scf import SCF_2022 diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index e59516bb3..84074b366 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -11,6 +11,13 @@ def __init__(self, person: pd.DataFrame): self.person = person self.closed = False + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.close() + return False + def close(self): self.closed = True From 8af99b7bf354fe64d24a143a31b6b2ea113a450a Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Thu, 9 Apr 2026 12:52:34 -0400 Subject: [PATCH 3/4] Use current changelog fragment naming --- ...ps-hdf5-file-handles.md => fix-cps-hdf5-file-handles.fixed.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{changed/fix-cps-hdf5-file-handles.md => fix-cps-hdf5-file-handles.fixed.md} (100%) diff --git a/changelog.d/changed/fix-cps-hdf5-file-handles.md b/changelog.d/fix-cps-hdf5-file-handles.fixed.md similarity index 100% rename from changelog.d/changed/fix-cps-hdf5-file-handles.md rename to changelog.d/fix-cps-hdf5-file-handles.fixed.md From ec9f9a1c5719a49b5570034fbffaf26e198292e6 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Fri, 10 Apr 2026 18:00:42 -0400 Subject: [PATCH 4/4] Fix CPS raw-store CI regressions --- policyengine_us_data/datasets/cps/cps.py | 153 +++++++++-------- tests/unit/datasets/test_cps_file_handles.py | 164 ++++++++++++++++++- 2 files changed, 247 insertions(+), 70 deletions(-) diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index c421f0795..fd4d39924 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -1,3 +1,4 @@ +from contextlib import closing, contextmanager from importlib.resources import files from policyengine_core.data import Dataset from policyengine_us_data.storage import STORAGE_FOLDER, DOCS_FOLDER @@ -84,6 +85,20 @@ } +@contextmanager +def _open_dataset_read_only(dataset_source): + dataset = dataset_source(require=True) + file_path = getattr(dataset, "file_path", None) + + if file_path is not None: + with pd.HDFStore(file_path, mode="r") as store: + yield store + return + + with closing(dataset.load()) as store: + yield store + + class CPS(Dataset): name = "cps" label = "CPS" @@ -109,7 +124,7 @@ def generate(self): cps = {} ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household") - with self.raw_cps(require=True).load() as raw_data: + with _open_dataset_read_only(self.raw_cps) as raw_data: person, tax_unit, family, spm_unit, household = [ raw_data[entity] for entity in ENTITIES ] @@ -911,8 +926,8 @@ def add_previous_year_income(self, cps: h5py.File) -> None: return with ( - self.raw_cps(require=True).load() as cps_current_year_data, - self.previous_year_raw_cps(require=True).load() as cps_previous_year_data, + _open_dataset_read_only(self.raw_cps) as cps_current_year_data, + _open_dataset_read_only(self.previous_year_raw_cps) as cps_previous_year_data, ): cps_previous_year = cps_previous_year_data.person.set_index( cps_previous_year_data.person.PERIDNUM @@ -1871,7 +1886,7 @@ def add_tips(self, cps: h5py.File): # Get is_married from raw CPS data (A_MARITL codes: 1,2 = married) # Note: is_married in policyengine-us is Family-level, but we need # person-level for imputation models - with self.raw_cps(require=True).load() as raw_data: + with _open_dataset_read_only(self.raw_cps) as raw_data: raw_person = raw_data["person"] cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values cps["is_tipped_occupation"] = derive_is_tipped_occupation( @@ -2036,7 +2051,7 @@ def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None: cps_data = self.load_dataset() # Access raw CPS for additional variables - with self.raw_cps(require=True).load() as raw_data: + with _open_dataset_read_only(self.raw_cps) as raw_data: person_data = raw_data.person # Preprocess the CPS for imputation @@ -2189,77 +2204,77 @@ def determine_reference_person(group): return all_persons_data["is_scf_reference_person"].values - mask = create_scf_reference_person_mask(cps_data, person_data) - mask_len = mask.shape[0] + mask = create_scf_reference_person_mask(cps_data, person_data) + mask_len = mask.shape[0] - cps_data = { - var: data[mask] if data.shape[0] == mask_len else data - for var, data in cps_data.items() - } + cps_data = { + var: data[mask] if data.shape[0] == mask_len else data + for var, data in cps_data.items() + } - CPS_RACE_MAPPING = { - 1: 1, # White only -> WHITE - 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN - 3: 5, # American Indian, Alaskan Native only -> OTHER - 4: 4, # Asian only -> ASIAN - 5: 5, # Hawaiian/Pacific Islander only -> OTHER - 6: 5, # White-Black -> OTHER - 7: 5, # White-AI -> OTHER - 8: 5, # White-Asian -> OTHER - 9: 3, # White-HP -> HISPANIC - 10: 5, # Black-AI -> OTHER - 11: 5, # Black-Asian -> OTHER - 12: 3, # Black-HP -> HISPANIC - 13: 5, # AI-Asian -> OTHER - 14: 5, # AI-HP -> OTHER - 15: 3, # Asian-HP -> HISPANIC - 16: 5, # White-Black-AI -> OTHER - 17: 5, # White-Black-Asian -> OTHER - 18: 5, # White-Black-HP -> OTHER - 19: 5, # White-AI-Asian -> OTHER - 20: 5, # White-AI-HP -> OTHER - 21: 5, # White-Asian-HP -> OTHER - 22: 5, # Black-AI-Asian -> OTHER - 23: 5, # White-Black-AI-Asian -> OTHER - 24: 5, # White-AI-Asian-HP -> OTHER - 25: 5, # Other 3 race comb. -> OTHER - 26: 5, # Other 4 or 5 race comb. -> OTHER - } + CPS_RACE_MAPPING = { + 1: 1, # White only -> WHITE + 2: 2, # Black only -> BLACK/AFRICAN-AMERICAN + 3: 5, # American Indian, Alaskan Native only -> OTHER + 4: 4, # Asian only -> ASIAN + 5: 5, # Hawaiian/Pacific Islander only -> OTHER + 6: 5, # White-Black -> OTHER + 7: 5, # White-AI -> OTHER + 8: 5, # White-Asian -> OTHER + 9: 3, # White-HP -> HISPANIC + 10: 5, # Black-AI -> OTHER + 11: 5, # Black-Asian -> OTHER + 12: 3, # Black-HP -> HISPANIC + 13: 5, # AI-Asian -> OTHER + 14: 5, # AI-HP -> OTHER + 15: 3, # Asian-HP -> HISPANIC + 16: 5, # White-Black-AI -> OTHER + 17: 5, # White-Black-Asian -> OTHER + 18: 5, # White-Black-HP -> OTHER + 19: 5, # White-AI-Asian -> OTHER + 20: 5, # White-AI-HP -> OTHER + 21: 5, # White-Asian-HP -> OTHER + 22: 5, # Black-AI-Asian -> OTHER + 23: 5, # White-Black-AI-Asian -> OTHER + 24: 5, # White-AI-Asian-HP -> OTHER + 25: 5, # Other 3 race comb. -> OTHER + 26: 5, # Other 4 or 5 race comb. -> OTHER + } - # Apply the mapping to recode the race values - cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) + # Apply the mapping to recode the race values + cps_data["cps_race"] = np.vectorize(CPS_RACE_MAPPING.get)(cps_data["cps_race"]) - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["person_household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + receiver_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) - receiver_data = receiver_data.merge( - agg[ - [ - "person_household_id", - "household_employment_income", - "household_interest_dividend_income", - "household_social_security_pension_income", - ] - ], - on="person_household_id", - how="left", - ) - receiver_data.drop("employment_income", axis=1, inplace=True) + receiver_data = receiver_data.merge( + agg[ + [ + "person_household_id", + "household_employment_income", + "household_interest_dividend_income", + "household_social_security_pension_income", + ] + ], + on="person_household_id", + how="left", + ) + receiver_data.drop("employment_income", axis=1, inplace=True) - receiver_data.rename( - columns={ - "household_employment_income": "employment_income", - "household_interest_dividend_income": "interest_dividend_income", - "household_social_security_pension_income": "social_security_pension_income", - }, - inplace=True, - ) + receiver_data.rename( + columns={ + "household_employment_income": "employment_income", + "household_interest_dividend_income": "interest_dividend_income", + "household_social_security_pension_income": "social_security_pension_income", + }, + inplace=True, + ) - # Add is_married variable for household heads based on raw person data - reference_persons = person_data[mask] - receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values + # Add is_married variable for household heads based on raw person data + reference_persons = person_data[mask] + receiver_data["is_married"] = reference_persons.A_MARITL.isin([1, 2]).values # Impute auto loan balance from the SCF from policyengine_us_data.datasets.scf.scf import SCF_2022 diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py index 84074b366..ca3e39c1f 100644 --- a/tests/unit/datasets/test_cps_file_handles.py +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -3,7 +3,11 @@ import numpy as np import pandas as pd -from policyengine_us_data.datasets.cps.cps import add_previous_year_income +import policyengine_us_data.datasets.cps.cps as cps_module +from policyengine_us_data.datasets.cps.cps import ( + add_auto_loan_interest_and_net_worth, + add_previous_year_income, +) class _FakeStore: @@ -72,3 +76,161 @@ def test_add_previous_year_income_closes_raw_cps_handles(): np.testing.assert_array_equal(cps["previous_year_income_available"], [True, True]) assert current_store.closed is True assert previous_store.closed is True + + +def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatch): + current_path = tmp_path / "current.h5" + previous_path = tmp_path / "previous.h5" + + with pd.HDFStore(current_path, mode="w") as store: + store["person"] = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + + with pd.HDFStore(previous_path, mode="w") as store: + store["person"] = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "WSAL_VAL": [1_000, 2_000], + "SEMP_VAL": [100, 200], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + + real_hdfstore = pd.HDFStore + opened_modes = [] + + def recording_hdfstore(path, mode="a", *args, **kwargs): + opened_modes.append(mode) + return real_hdfstore(path, mode=mode, *args, **kwargs) + + monkeypatch.setattr(cps_module.pd, "HDFStore", recording_hdfstore) + + class CurrentDataset: + file_path = current_path + + def __init__(self, require: bool = False): + assert require is True + + class PreviousDataset: + file_path = previous_path + + def __init__(self, require: bool = False): + assert require is True + + holder = SimpleNamespace( + raw_cps=CurrentDataset, + previous_year_raw_cps=PreviousDataset, + ) + + cps = {} + add_previous_year_income(holder, cps) + + assert opened_modes == ["r", "r"] + np.testing.assert_array_equal(cps["employment_income_last_year"], [1000, 2000]) + + +def test_add_auto_loan_interest_and_net_worth_uses_outer_receiver_data(monkeypatch): + raw_person = pd.DataFrame( + { + "A_SEX": [1, 2], + "A_MARITL": [1, 3], + } + ) + raw_store = _FakeStore(raw_person) + + class FakeRawCPS: + def __call__(self, require: bool = False): + assert require is True + return self + + def load(self): + return raw_store + + class FakeDataset: + def __init__(self): + self.raw_cps = FakeRawCPS() + self.saved_dataset = None + + def save_dataset(self, data): + self.saved_dataset = data + + def load_dataset(self): + return { + "person_household_id": np.array([10, 20]), + "age": np.array([35, 40]), + "is_female": np.array([False, True]), + "cps_race": np.array([1, 2]), + "own_children_in_household": np.array([0, 1]), + "employment_income": np.array([40_000.0, 25_000.0]), + "taxable_interest_income": np.array([100.0, 0.0]), + "tax_exempt_interest_income": np.array([0.0, 0.0]), + "qualified_dividend_income": np.array([0.0, 0.0]), + "non_qualified_dividend_income": np.array([0.0, 0.0]), + "tax_exempt_private_pension_income": np.array([0.0, 0.0]), + "taxable_private_pension_income": np.array([0.0, 0.0]), + "social_security_retirement": np.array([0.0, 0.0]), + } + + class FakeSCF: + def load_dataset(self): + return { + "age": np.array([30, 50]), + "is_female": np.array([False, True]), + "cps_race": np.array([1, 2]), + "is_married": np.array([True, False]), + "own_children_in_household": np.array([0, 1]), + "employment_income": np.array([35_000.0, 20_000.0]), + "interest_dividend_income": np.array([100.0, 50.0]), + "social_security_pension_income": np.array([0.0, 0.0]), + "networth": np.array([10_000.0, 5_000.0]), + "auto_loan_balance": np.array([2_000.0, 1_000.0]), + "auto_loan_interest": np.array([200.0, 100.0]), + "wgt": np.array([1.0, 1.0]), + } + + class FakeQRF: + def fit( + self, + X_train, + predictors, + imputed_variables, + weight_col, + tune_hyperparameters, + ): + assert predictors[0] == "age" + assert weight_col == "wgt" + self.imputed_variables = imputed_variables + return self + + def predict(self, X_test): + assert X_test["is_married"].tolist() == [True, False] + return pd.DataFrame( + { + "networth": [10_000.0, 5_000.0], + "auto_loan_balance": [2_000.0, 1_000.0], + "auto_loan_interest": [200.0, 100.0], + } + ) + + import policyengine_us_data.datasets.scf.scf as scf_module + import microimpute.models.qrf as qrf_module + + monkeypatch.setattr(scf_module, "SCF_2022", FakeSCF) + monkeypatch.setattr(qrf_module, "QRF", FakeQRF) + + dataset = FakeDataset() + add_auto_loan_interest_and_net_worth(dataset, {}) + + assert raw_store.closed is True + np.testing.assert_array_equal( + dataset.saved_dataset["net_worth"], [10_000.0, 5_000.0] + ) + np.testing.assert_array_equal( + dataset.saved_dataset["auto_loan_interest"], [200.0, 100.0] + )