Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/fix-cps-hdf5-file-handles.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Close raw CPS HDF stores after previous-year income and auto-loan preprocessing so CPS builds do not leave `census_cps_2021.h5` and `census_cps_2022.h5` open at process shutdown.
178 changes: 96 additions & 82 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from contextlib import closing, contextmanager
from importlib.resources import files
from policyengine_core.data import Dataset
from policyengine_us_data.storage import STORAGE_FOLDER, DOCS_FOLDER
Expand Down Expand Up @@ -91,6 +92,20 @@
}


@contextmanager
def _open_dataset_read_only(dataset_source):
dataset = dataset_source(require=True)
file_path = getattr(dataset, "file_path", None)

if file_path is not None:
with pd.HDFStore(file_path, mode="r") as store:
yield store
return

with closing(dataset.load()) as store:
yield store


class CPS(Dataset):
name = "cps"
label = "CPS"
Expand All @@ -113,13 +128,13 @@ def generate(self):
"For future years, use PolicyEngine's uprating at simulation time."
)

raw_data = self.raw_cps(require=True).load()
cps = {}

ENTITIES = ("person", "tax_unit", "family", "spm_unit", "household")
person, tax_unit, family, spm_unit, household = [
raw_data[entity] for entity in ENTITIES
]
with _open_dataset_read_only(self.raw_cps) as raw_data:
person, tax_unit, family, spm_unit, household = [
raw_data[entity] for entity in ENTITIES
]

logging.info("Adding ID variables")
add_id_variables(cps, person, tax_unit, family, spm_unit, household)
Expand Down Expand Up @@ -160,7 +175,6 @@ def generate(self):
add_auto_loan_interest_and_net_worth(self, cps)
logging.info("Added all variables")

raw_data.close()
self.save_dataset(cps)
logging.info("Adding takeup")
add_takeup(self)
Expand Down Expand Up @@ -930,39 +944,41 @@ def add_previous_year_income(self, cps: h5py.File) -> None:
)
return

cps_current_year_data = self.raw_cps(require=True).load()
cps_previous_year_data = self.previous_year_raw_cps(require=True).load()
cps_previous_year = cps_previous_year_data.person.set_index(
cps_previous_year_data.person.PERIDNUM
)
cps_current_year = cps_current_year_data.person.set_index(
cps_current_year_data.person.PERIDNUM
)
with (
_open_dataset_read_only(self.raw_cps) as cps_current_year_data,
_open_dataset_read_only(self.previous_year_raw_cps) as cps_previous_year_data,
):
cps_previous_year = cps_previous_year_data.person.set_index(
cps_previous_year_data.person.PERIDNUM
)
cps_current_year = cps_current_year_data.person.set_index(
cps_current_year_data.person.PERIDNUM
)

previous_year_data = cps_previous_year[
["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"]
].rename(
{
"WSAL_VAL": "employment_income_last_year",
"SEMP_VAL": "self_employment_income_last_year",
},
axis=1,
)
previous_year_data = cps_previous_year[
["WSAL_VAL", "SEMP_VAL", "I_ERNVAL", "I_SEVAL"]
].rename(
{
"WSAL_VAL": "employment_income_last_year",
"SEMP_VAL": "self_employment_income_last_year",
},
axis=1,
)

previous_year_data = previous_year_data[
(previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0)
]
previous_year_data = previous_year_data[
(previous_year_data.I_ERNVAL == 0) & (previous_year_data.I_SEVAL == 0)
]

previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True)
previous_year_data.drop(["I_ERNVAL", "I_SEVAL"], axis=1, inplace=True)

joined_data = cps_current_year.join(previous_year_data)[
[
"employment_income_last_year",
"self_employment_income_last_year",
"I_ERNVAL",
"I_SEVAL",
joined_data = cps_current_year.join(previous_year_data)[
[
"employment_income_last_year",
"self_employment_income_last_year",
"I_ERNVAL",
"I_SEVAL",
]
]
]
joined_data["previous_year_income_available"] = (
~joined_data.employment_income_last_year.isna()
& ~joined_data.self_employment_income_last_year.isna()
Expand Down Expand Up @@ -1884,13 +1900,12 @@ def add_tips(self, cps: h5py.File):
# Get is_married from raw CPS data (A_MARITL codes: 1,2 = married)
# Note: is_married in policyengine-us is Family-level, but we need
# person-level for imputation models
raw_data = self.raw_cps(require=True).load()
raw_person = raw_data["person"]
cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values
cps["is_tipped_occupation"] = derive_is_tipped_occupation(
derive_treasury_tipped_occupation_code(raw_person.PEIOOCC)
)
raw_data.close()
with _open_dataset_read_only(self.raw_cps) as raw_data:
raw_person = raw_data["person"]
cps["is_married"] = raw_person.A_MARITL.isin([1, 2]).values
cps["is_tipped_occupation"] = derive_is_tipped_occupation(
derive_treasury_tipped_occupation_code(raw_person.PEIOOCC)
)

cps["is_under_18"] = cps.age < 18
cps["is_under_6"] = cps.age < 6
Expand Down Expand Up @@ -2050,51 +2065,50 @@ def add_auto_loan_interest_and_net_worth(self, cps: h5py.File) -> None:
cps_data = self.load_dataset()

# Access raw CPS for additional variables
raw_data_instance = self.raw_cps(require=True)
raw_data = raw_data_instance.load()
person_data = raw_data.person

# Preprocess the CPS for imputation
lengths = {k: len(v) for k, v in cps_data.items()}
var_len = cps_data["person_household_id"].shape[0]
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
agg_data["interest_dividend_income"] = np.sum(
[
agg_data["taxable_interest_income"],
agg_data["tax_exempt_interest_income"],
agg_data["qualified_dividend_income"],
agg_data["non_qualified_dividend_income"],
],
axis=0,
)
agg_data["social_security_pension_income"] = np.sum(
[
agg_data["tax_exempt_private_pension_income"],
agg_data["taxable_private_pension_income"],
agg_data["social_security_retirement"],
],
axis=0,
)

agg = (
agg_data.groupby("person_household_id")[
with _open_dataset_read_only(self.raw_cps) as raw_data:
person_data = raw_data.person

# Preprocess the CPS for imputation
lengths = {k: len(v) for k, v in cps_data.items()}
var_len = cps_data["person_household_id"].shape[0]
vars_of_interest = [name for name, ln in lengths.items() if ln == var_len]
agg_data = pd.DataFrame({n: cps_data[n] for n in vars_of_interest})
agg_data["interest_dividend_income"] = np.sum(
[
agg_data["taxable_interest_income"],
agg_data["tax_exempt_interest_income"],
agg_data["qualified_dividend_income"],
agg_data["non_qualified_dividend_income"],
],
axis=0,
)
agg_data["social_security_pension_income"] = np.sum(
[
"employment_income",
"interest_dividend_income",
"social_security_pension_income",
agg_data["tax_exempt_private_pension_income"],
agg_data["taxable_private_pension_income"],
agg_data["social_security_retirement"],
],
axis=0,
)

agg = (
agg_data.groupby("person_household_id")[
[
"employment_income",
"interest_dividend_income",
"social_security_pension_income",
]
]
]
.sum()
.rename(
columns={
"employment_income": "household_employment_income",
"interest_dividend_income": "household_interest_dividend_income",
"social_security_pension_income": "household_social_security_pension_income",
}
.sum()
.rename(
columns={
"employment_income": "household_employment_income",
"interest_dividend_income": "household_interest_dividend_income",
"social_security_pension_income": "household_social_security_pension_income",
}
)
.reset_index()
)
.reset_index()
)

def create_scf_reference_person_mask(cps_data, raw_person_data):
"""
Expand Down
Loading
Loading