diff --git a/changelog.d/fix-cps-hdf5-file-handles.fixed.md b/changelog.d/fix-cps-hdf5-file-handles.fixed.md new file mode 100644 index 00000000..b508c49a --- /dev/null +++ b/changelog.d/fix-cps-hdf5-file-handles.fixed.md @@ -0,0 +1 @@ +Close raw CPS HDF stores after previous-year income and auto-loan preprocessing so CPS builds do not leave `census_cps_2021.h5` and `census_cps_2022.h5` open at process shutdown. diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index df845a41..8ab5dc7d 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -1,3 +1,4 @@ +from contextlib import closing, contextmanager from importlib.resources import files from policyengine_core.data import Dataset from policyengine_us_data.storage import STORAGE_FOLDER, DOCS_FOLDER @@ -91,6 +92,20 @@ } +@contextmanager +def _open_dataset_read_only(dataset_source): + dataset = dataset_source(require=True) + file_path = getattr(dataset, "file_path", None) + + if file_path is not None: + with pd.HDFStore(file_path, mode="r") as store: + yield store + return + + with closing(dataset.load()) as store: + yield store + + class CPS(Dataset): name = "cps" label = "CPS" @@ -113,13 +128,13 @@ def generate(self): "For future years, use PolicyEngine's uprating at simulation time." ) - raw_data = self.raw_cps(require=True).load() cps = {} ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household") - person, tax_unit, family, spm_unit, household = [ - raw_data[entity] for entity in ENTITIES - ] + with _open_dataset_read_only(self.raw_cps) as raw_data: + person, tax_unit, family, spm_unit, household = [ + raw_data[entity] for entity in ENTITIES + ] logging.info("Adding ID variables") add_id_variables(cps, person, tax_unit, family, spm_unit, household) @@ -160,7 +175,6 @@ def generate(self): add_auto_loan_interest_and_net_worth(self, cps) logging.info("Added all variables") - raw_data.close() self.save_dataset(cps) logging.info("Adding takeup") add_takeup(self) @@ -930,39 +944,41 @@ def add_previous_year_income(self, cps: h5py.File) -> None: ) return - cps_current_year_data = self.raw_cps(require=True).load() - cps_previous_year_data = self.previous_year_raw_cps(require=True).load() - cps_previous_year = cps_previous_year_data.person.set_index( - cps_previous_year_data.person.PERIDNUM - ) - cps_current_year = cps_current_year_data.person.set_index( - cps_current_year_data.person.PERIDNUM - ) + with ( + _open_dataset_read_only(self.raw_cps) as cps_current_year_data, + _open_dataset_read_only(self.previous_year_raw_cps) as cps_previous_year_data, + ): + cps_previous_year = cps_previous_year_data.person.set_index( + cps_previous_year_data.person.PERIDNUM + ) + cps_current_year = cps_current_year_data.person.set_index( + cps_current_year_data.person.PERIDNUM + ) - previous_year_data = cps_previous_year[ - ["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"] - ].rename( - { - "WSAL_VAL": "employment_income_last_year", - "SEMP_VAL": "self_employment_income_last_year", - }, - axis=1, - ) + previous_year_data = cps_previous_year[ + ["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"] + ].rename( + { + "WSAL_VAL": "employment_income_last_year", + "SEMP_VAL": "self_employment_income_last_year", + }, + axis=1, + ) - previous_year_data = previous_year_data[ - (previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0) - ] + previous_year_data = previous_year_data[ + (previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0) + ] - previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True) + previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True) - joined_data = cps_current_year.join(previous_year_data)[ - [ - "employment_income_last_year", - "self_employment_income_last_year", - "I_ERNVAL", - "I_SEVAL", + joined_data = cps_current_year.join(previous_year_data)[ + [ + "employment_income_last_year", + "self_employment_income_last_year", + "I_ERNVAL", + "I_SEVAL", + ] ] - ] joined_data["previous_year_income_available"] = ( ~joined_data.employment_income_last_year.isna() & ~joined_data.self_employment_income_last_year.isna() @@ -1884,13 +1900,12 @@ def add_tips(self, cps: h5py.File): # Get is_married from raw CPS data (A_MARITL codes: 1,2 = married) # Note: is_married in policyengine-us is Family-level, but we need # person-level for imputation models - raw_data = self.raw_cps(require=True).load() - raw_person = raw_data["person"] - cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values - cps["is_tipped_occupation"] = derive_is_tipped_occupation( - derive_treasury_tipped_occupation_code(raw_person.PEIOOCC) - ) - raw_data.close() + with _open_dataset_read_only(self.raw_cps) as raw_data: + raw_person = raw_data["person"] + cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values + cps["is_tipped_occupation"] = derive_is_tipped_occupation( + derive_treasury_tipped_occupation_code(raw_person.PEIOOCC) + ) cps["is_under_18"] = cps.age < 18 cps["is_under_6"] = cps.age < 6 @@ -2050,51 +2065,50 @@ def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None: cps_data = self.load_dataset() # Access raw CPS for additional variables - raw_data_instance = self.raw_cps(require=True) - raw_data = raw_data_instance.load() - person_data = raw_data.person - - # Preprocess the CPS for imputation - lengths = {k: len(v) for k, v in cps_data.items()} - var_len = cps_data["person_household_id"].shape[0] - vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] - agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) - agg_data["interest_dividend_income"] = np.sum( - [ - agg_data["taxable_interest_income"], - agg_data["tax_exempt_interest_income"], - agg_data["qualified_dividend_income"], - agg_data["non_qualified_dividend_income"], - ], - axis=0, - ) - agg_data["social_security_pension_income"] = np.sum( - [ - agg_data["tax_exempt_private_pension_income"], - agg_data["taxable_private_pension_income"], - agg_data["social_security_retirement"], - ], - axis=0, - ) - - agg = ( - agg_data.groupby("person_household_id")[ + with _open_dataset_read_only(self.raw_cps) as raw_data: + person_data = raw_data.person + + # Preprocess the CPS for imputation + lengths = {k: len(v) for k, v in cps_data.items()} + var_len = cps_data["person_household_id"].shape[0] + vars_of_interest = [name for name, ln in lengths.items() if ln == var_len] + agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest}) + agg_data["interest_dividend_income"] = np.sum( + [ + agg_data["taxable_interest_income"], + agg_data["tax_exempt_interest_income"], + agg_data["qualified_dividend_income"], + agg_data["non_qualified_dividend_income"], + ], + axis=0, + ) + agg_data["social_security_pension_income"] = np.sum( [ - "employment_income", - "interest_dividend_income", - "social_security_pension_income", + agg_data["tax_exempt_private_pension_income"], + agg_data["taxable_private_pension_income"], + agg_data["social_security_retirement"], + ], + axis=0, + ) + + agg = ( + agg_data.groupby("person_household_id")[ + [ + "employment_income", + "interest_dividend_income", + "social_security_pension_income", + ] ] - ] - .sum() - .rename( - columns={ - "employment_income": "household_employment_income", - "interest_dividend_income": "household_interest_dividend_income", - "social_security_pension_income": "household_social_security_pension_income", - } + .sum() + .rename( + columns={ + "employment_income": "household_employment_income", + "interest_dividend_income": "household_interest_dividend_income", + "social_security_pension_income": "household_social_security_pension_income", + } + ) + .reset_index() ) - .reset_index() - ) def create_scf_reference_person_mask(cps_data, raw_person_data): """ diff --git a/tests/unit/datasets/test_cps_file_handles.py b/tests/unit/datasets/test_cps_file_handles.py new file mode 100644 index 00000000..ca3e39c1 --- /dev/null +++ b/tests/unit/datasets/test_cps_file_handles.py @@ -0,0 +1,236 @@ +from types import SimpleNamespace + +import numpy as np +import pandas as pd + +import policyengine_us_data.datasets.cps.cps as cps_module +from policyengine_us_data.datasets.cps.cps import ( + add_auto_loan_interest_and_net_worth, + add_previous_year_income, +) + + +class _FakeStore: + def __init__(self, person: pd.DataFrame): + self.person = person + self.closed = False + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + self.close() + return False + + def close(self): + self.closed = True + + +class _FakeDataset: + store: _FakeStore | None = None + + def __init__(self, require: bool = False): + assert require is True + + def load(self): + assert self.store is not None + return self.store + + +def test_add_previous_year_income_closes_raw_cps_handles(): + current_person = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + previous_person = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "WSAL_VAL": [1_000, 2_000], + "SEMP_VAL": [100, 200], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + + current_store = _FakeStore(current_person) + previous_store = _FakeStore(previous_person) + + current_dataset = type("CurrentDataset", (_FakeDataset,), {"store": current_store}) + previous_dataset = type( + "PreviousDataset", (_FakeDataset,), {"store": previous_store} + ) + + holder = SimpleNamespace( + raw_cps=current_dataset, + previous_year_raw_cps=previous_dataset, + ) + cps = {} + + add_previous_year_income(holder, cps) + + np.testing.assert_array_equal(cps["employment_income_last_year"], [1000, 2000]) + np.testing.assert_array_equal(cps["self_employment_income_last_year"], [100, 200]) + np.testing.assert_array_equal(cps["previous_year_income_available"], [True, True]) + assert current_store.closed is True + assert previous_store.closed is True + + +def test_add_previous_year_income_opens_hdfstores_read_only(tmp_path, monkeypatch): + current_path = tmp_path / "current.h5" + previous_path = tmp_path / "previous.h5" + + with pd.HDFStore(current_path, mode="w") as store: + store["person"] = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + + with pd.HDFStore(previous_path, mode="w") as store: + store["person"] = pd.DataFrame( + { + "PERIDNUM": [10, 20], + "WSAL_VAL": [1_000, 2_000], + "SEMP_VAL": [100, 200], + "I_ERNVAL": [0, 0], + "I_SEVAL": [0, 0], + } + ) + + real_hdfstore = pd.HDFStore + opened_modes = [] + + def recording_hdfstore(path, mode="a", *args, **kwargs): + opened_modes.append(mode) + return real_hdfstore(path, mode=mode, *args, **kwargs) + + monkeypatch.setattr(cps_module.pd, "HDFStore", recording_hdfstore) + + class CurrentDataset: + file_path = current_path + + def __init__(self, require: bool = False): + assert require is True + + class PreviousDataset: + file_path = previous_path + + def __init__(self, require: bool = False): + assert require is True + + holder = SimpleNamespace( + raw_cps=CurrentDataset, + previous_year_raw_cps=PreviousDataset, + ) + + cps = {} + add_previous_year_income(holder, cps) + + assert opened_modes == ["r", "r"] + np.testing.assert_array_equal(cps["employment_income_last_year"], [1000, 2000]) + + +def test_add_auto_loan_interest_and_net_worth_uses_outer_receiver_data(monkeypatch): + raw_person = pd.DataFrame( + { + "A_SEX": [1, 2], + "A_MARITL": [1, 3], + } + ) + raw_store = _FakeStore(raw_person) + + class FakeRawCPS: + def __call__(self, require: bool = False): + assert require is True + return self + + def load(self): + return raw_store + + class FakeDataset: + def __init__(self): + self.raw_cps = FakeRawCPS() + self.saved_dataset = None + + def save_dataset(self, data): + self.saved_dataset = data + + def load_dataset(self): + return { + "person_household_id": np.array([10, 20]), + "age": np.array([35, 40]), + "is_female": np.array([False, True]), + "cps_race": np.array([1, 2]), + "own_children_in_household": np.array([0, 1]), + "employment_income": np.array([40_000.0, 25_000.0]), + "taxable_interest_income": np.array([100.0, 0.0]), + "tax_exempt_interest_income": np.array([0.0, 0.0]), + "qualified_dividend_income": np.array([0.0, 0.0]), + "non_qualified_dividend_income": np.array([0.0, 0.0]), + "tax_exempt_private_pension_income": np.array([0.0, 0.0]), + "taxable_private_pension_income": np.array([0.0, 0.0]), + "social_security_retirement": np.array([0.0, 0.0]), + } + + class FakeSCF: + def load_dataset(self): + return { + "age": np.array([30, 50]), + "is_female": np.array([False, True]), + "cps_race": np.array([1, 2]), + "is_married": np.array([True, False]), + "own_children_in_household": np.array([0, 1]), + "employment_income": np.array([35_000.0, 20_000.0]), + "interest_dividend_income": np.array([100.0, 50.0]), + "social_security_pension_income": np.array([0.0, 0.0]), + "networth": np.array([10_000.0, 5_000.0]), + "auto_loan_balance": np.array([2_000.0, 1_000.0]), + "auto_loan_interest": np.array([200.0, 100.0]), + "wgt": np.array([1.0, 1.0]), + } + + class FakeQRF: + def fit( + self, + X_train, + predictors, + imputed_variables, + weight_col, + tune_hyperparameters, + ): + assert predictors[0] == "age" + assert weight_col == "wgt" + self.imputed_variables = imputed_variables + return self + + def predict(self, X_test): + assert X_test["is_married"].tolist() == [True, False] + return pd.DataFrame( + { + "networth": [10_000.0, 5_000.0], + "auto_loan_balance": [2_000.0, 1_000.0], + "auto_loan_interest": [200.0, 100.0], + } + ) + + import policyengine_us_data.datasets.scf.scf as scf_module + import microimpute.models.qrf as qrf_module + + monkeypatch.setattr(scf_module, "SCF_2022", FakeSCF) + monkeypatch.setattr(qrf_module, "QRF", FakeQRF) + + dataset = FakeDataset() + add_auto_loan_interest_and_net_worth(dataset, {}) + + assert raw_store.closed is True + np.testing.assert_array_equal( + dataset.saved_dataset["net_worth"], [10_000.0, 5_000.0] + ) + np.testing.assert_array_equal( + dataset.saved_dataset["auto_loan_interest"], [200.0, 100.0] + )