Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1128.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added deterministic collapsed long-term capital gains basis and holding-period imputation.
109 changes: 98 additions & 11 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
)
from policyengine_us_data.pipeline_metadata import pipeline_node
from policyengine_us_data.pipeline_schema import PipelineNode
from policyengine_us_data.utils.capital_gains_basis import (
CAPITAL_GAINS_BASIS_VARIABLES,
LONG_TERM_CAPITAL_GAINS_BASIS,
LONG_TERM_CAPITAL_GAINS_YEARS_HELD,
impute_person_level_long_term_capital_gains_basis,
)
from policyengine_us_data.utils.policyengine import (
has_policyengine_us_variables,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -59,6 +68,8 @@
"interest_deduction",
"tax_exempt_pension_income",
"long_term_capital_gains",
"long_term_capital_gains_basis",
"long_term_capital_gains_years_held",
"unreimbursed_business_employee_expenses",
"pre_tax_contributions",
"taxable_ira_distributions",
Expand Down Expand Up @@ -111,6 +122,8 @@
"self_employment_income_would_be_qualified",
]

DETERMINISTIC_IMPUTED_VARIABLES = list(CAPITAL_GAINS_BASIS_VARIABLES)

SS_SUBCOMPONENTS = [
"social_security_retirement",
"social_security_disability",
Expand Down Expand Up @@ -190,6 +203,68 @@
RETIREMENT_PREDICTORS = RETIREMENT_DEMOGRAPHIC_PREDICTORS + RETIREMENT_INCOME_PREDICTORS


def _person_weights_from_household_weights(
data: Dict[str, Dict[int, np.ndarray]],
time_period: int,
) -> Optional[np.ndarray]:
household_weight = data.get("household_weight", {}).get(time_period)
if household_weight is None:
return None

person_id = data.get("person_id", {}).get(time_period)
if person_id is not None and len(household_weight) == len(person_id):
return np.asarray(household_weight, dtype=float)

person_household_id = data.get("person_household_id", {}).get(time_period)
household_id = data.get("household_id", {}).get(time_period)
if person_household_id is None or household_id is None:
return None
if len(household_weight) != len(household_id):
return None

household_weight_by_id = dict(zip(household_id.tolist(), household_weight))
return np.asarray(
[
household_weight_by_id.get(household_id, 1.0)
for household_id in person_household_id
],
dtype=float,
)


def _impute_long_term_capital_gains_basis(
data: Dict[str, Dict[int, np.ndarray]],
time_period: int,
) -> None:
"""Add deterministic basis and holding period fields to cloned data."""

if not has_policyengine_us_variables(*CAPITAL_GAINS_BASIS_VARIABLES):
return
if (
"long_term_capital_gains" not in data
or "person_tax_unit_id" not in data
or "person_id" not in data
):
return

imputation = impute_person_level_long_term_capital_gains_basis(
data["long_term_capital_gains"][time_period],
person_tax_unit_ids=data["person_tax_unit_id"][time_period],
person_ids=data["person_id"][time_period],
person_sample_weight=_person_weights_from_household_weights(
data,
time_period,
),
tax_year=time_period,
)
data[LONG_TERM_CAPITAL_GAINS_BASIS] = {
time_period: imputation.basis.astype(np.float32)
}
data[LONG_TERM_CAPITAL_GAINS_YEARS_HELD] = {
time_period: imputation.years_held.astype(np.float32)
}


def _get_retirement_limits(year: int) -> dict:
"""Return contribution limits for the given tax year.

Expand Down Expand Up @@ -517,8 +592,8 @@ def puf_clone_dataset(
person_count,
)

y_full = None
y_override = None
y_full = {}
y_override = {}
if not skip_qrf and puf_dataset is not None:
y_full, y_override = _run_qrf_imputation(
data,
Expand All @@ -541,19 +616,21 @@ def _map_to_entity(pred_values, variable_name):
var_meta = tbs.variables.get(variable_name)
if var_meta is None:
return pred_values
entity = var_meta.entity.key
entity = getattr(getattr(var_meta, "entity", None), "key", None)
if not isinstance(entity, str):
return pred_values
if entity != "person":
return cps_sim.populations[entity].value_from_first_person(pred_values)
return pred_values

# Impute weeks_unemployed for PUF half
puf_weeks = None
if y_full is not None and dataset_path is not None:
if y_full and dataset_path is not None:
puf_weeks = _impute_weeks_unemployed(data, y_full, time_period, dataset_path)

# Impute retirement contributions for PUF half
puf_retirement = None
if y_full is not None and dataset_path is not None:
if y_full and dataset_path is not None:
puf_retirement = _impute_retirement_contributions(
data, y_full, time_period, dataset_path
)
Expand All @@ -566,10 +643,10 @@ def _map_to_entity(pred_values, variable_name):

values = time_dict[time_period]

if variable in OVERRIDDEN_IMPUTED_VARIABLES and y_override:
if variable in y_override:
pred = _map_to_entity(y_override[variable], variable)
new_data[variable] = {time_period: np.concatenate([pred, pred])}
elif variable in IMPUTED_VARIABLES and y_full:
elif variable in y_full:
pred = _map_to_entity(y_full[variable], variable)
new_data[variable] = {time_period: np.concatenate([values, pred])}
elif "_id" in variable and np.issubdtype(values.dtype, np.number):
Expand Down Expand Up @@ -624,14 +701,18 @@ def _map_to_entity(pred_values, variable_name):
}

if y_full:
for var in IMPUTED_VARIABLES:
for var in y_full:
if var in PUF_REPORTED_CALCULATED_TAX_OUTPUT_VARIABLES:
continue
if var not in data:
pred = _map_to_entity(y_full[var], var)
new_data[var] = {time_period: np.concatenate([pred, pred])}

if cps_sim is not None:
del cps_sim

_impute_long_term_capital_gains_basis(new_data, time_period)

# Ensure SS sub-components match the (possibly imputed) total.
reconcile_ss_subcomponents(new_data, person_count, time_period)

Expand Down Expand Up @@ -942,8 +1023,14 @@ def _run_qrf_imputation(

puf_agi = puf_sim.calculate("adjusted_gross_income", map_to="person").values

qrf_imputed_variables = [
variable
for variable in IMPUTED_VARIABLES
if variable not in DETERMINISTIC_IMPUTED_VARIABLES
]

X_train_full = puf_sim.calculate_dataframe(
DEMOGRAPHIC_PREDICTORS + IMPUTED_VARIABLES
DEMOGRAPHIC_PREDICTORS + qrf_imputed_variables
)

X_train_override = puf_sim.calculate_dataframe(
Expand Down Expand Up @@ -972,9 +1059,9 @@ def _run_qrf_imputation(
if pred in data:
X_test[pred] = data[pred][time_period].astype(np.float32)

logger.info("Imputing %d PUF variables (full)", len(IMPUTED_VARIABLES))
logger.info("Imputing %d PUF variables (full)", len(qrf_imputed_variables))
y_full = _sequential_qrf(
X_train_full, X_test, DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES
X_train_full, X_test, DEMOGRAPHIC_PREDICTORS, qrf_imputed_variables
)

logger.info(
Expand Down
Loading
Loading