Skip to content
6 changes: 6 additions & 0 deletions changelog.d/701.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Add SSTB QBI split inputs to `policyengine-us-data` by exposing
`sstb_self_employment_income`, `sstb_w2_wages_from_qualified_business`, and
`sstb_unadjusted_basis_qualified_property` from the existing PUF/calibration
pipeline. The current split follows the legacy all-or-nothing
`business_is_sstb` flag, so mixed SSTB/non-SSTB allocations remain approximate
until more granular source data or imputation is added.
7 changes: 7 additions & 0 deletions docs/appendix.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,19 @@ for iteration in range(5000):
- w2_wages_from_qualified_business
- unadjusted_basis_qualified_property
- business_is_sstb
- sstb_self_employment_income
- sstb_w2_wages_from_qualified_business
- sstb_unadjusted_basis_qualified_property
- qualified_reit_and_ptp_income
- qualified_bdc_income
- farm_operations_income
- estate_income_would_be_qualified
- farm_operations_income_would_be_qualified
- farm_rent_income_would_be_qualified

The current PUF/calibration pipeline uses the legacy `business_is_sstb` flag to
split these SSTB variables on an all-or-nothing basis. It does not yet infer
mixed SSTB and non-SSTB allocations within the same record.
- partnership_s_corp_income_would_be_qualified
- rental_income_would_be_qualified
- self_employment_income_would_be_qualified
Expand Down
14 changes: 13 additions & 1 deletion policyengine_us_data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from importlib import import_module

from .geography import ZIP_CODE_DATASET
from .utils.policyengine import ensure_policyengine_us_compat_variables

ensure_policyengine_us_compat_variables()

_LAZY_EXPORTS = {
"CPS_2024": (
Expand All @@ -26,7 +29,16 @@

def __getattr__(name: str):
if name not in _LAZY_EXPORTS:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
try:
value = import_module(f"{__name__}.{name}")
except ModuleNotFoundError as exc:
if exc.name == f"{__name__}.{name}":
raise AttributeError(
f"module {__name__!r} has no attribute {name!r}"
) from exc
raise
globals()[name] = value
return value

module_name, attribute_name = _LAZY_EXPORTS[name]
value = getattr(import_module(module_name), attribute_name)
Expand Down
2 changes: 1 addition & 1 deletion policyengine_us_data/calibration/check_staging_sums.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
VARIABLES = [
"adjusted_gross_income",
"employment_income",
"self_employment_income",
"total_self_employment_income",
"tax_unit_partnership_s_corp_income",
"taxable_pension_income",
"dividend_income",
Expand Down
14 changes: 12 additions & 2 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@
"pre_tax_contributions",
"taxable_ira_distributions",
"self_employment_income",
"sstb_self_employment_income",
"w2_wages_from_qualified_business",
"unadjusted_basis_qualified_property",
"business_is_sstb",
"sstb_w2_wages_from_qualified_business",
"sstb_unadjusted_basis_qualified_property",
"short_term_capital_gains",
"qualified_dividend_income",
"charitable_cash_donations",
Expand Down Expand Up @@ -122,6 +125,8 @@
"w2_wages_from_qualified_business",
"unadjusted_basis_qualified_property",
"business_is_sstb",
"sstb_w2_wages_from_qualified_business",
"sstb_unadjusted_basis_qualified_property",
"charitable_cash_donations",
"self_employed_pension_contribution_ald",
"unrecaptured_section_1250_gain",
Expand Down Expand Up @@ -693,6 +698,11 @@ def _impute_retirement_contributions(
X_test[income_var] = puf_imputations[income_var]
else:
X_test[income_var] = cps_sim.calculate(income_var).values
if "sstb_self_employment_income" in puf_imputations:
X_test["self_employment_income"] = (
X_test["self_employment_income"]
+ puf_imputations["sstb_self_employment_income"]
)

del cps_sim

Expand Down Expand Up @@ -723,13 +733,13 @@ def _impute_retirement_contributions(
catch_up_eligible = age >= 50
limit_401k = limits["401k"] + catch_up_eligible * limits["401k_catch_up"]
limit_ira = limits["ira"] + catch_up_eligible * limits["ira_catch_up"]
se_income = X_test["self_employment_income"].values
se_pension_cap = np.minimum(
X_test["self_employment_income"].values * limits["se_pension_rate"],
se_income * limits["se_pension_rate"],
limits["se_pension_dollar_limit"],
)

emp_income = X_test["employment_income"].values
se_income = X_test["self_employment_income"].values

result = {}
for var in CPS_RETIREMENT_VARIABLES:
Expand Down
8 changes: 4 additions & 4 deletions policyengine_us_data/calibration/target_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ include:
geo_level: district
- variable: real_estate_taxes
geo_level: district
- variable: self_employment_income
- variable: total_self_employment_income
geo_level: district
- variable: taxable_pension_income
geo_level: district
Expand Down Expand Up @@ -163,9 +163,9 @@ include:
- variable: non_refundable_ctc
geo_level: national
domain_variable: adjusted_gross_income,non_refundable_ctc
- variable: self_employment_income
- variable: total_self_employment_income
geo_level: national
domain_variable: self_employment_income
domain_variable: total_self_employment_income
- variable: tax_unit_partnership_s_corp_income
geo_level: national
domain_variable: tax_unit_partnership_s_corp_income
Expand Down Expand Up @@ -199,7 +199,7 @@ include:
# Restore old loss.py's self-employment return-count target.
- variable: tax_unit_count
geo_level: national
domain_variable: self_employment_income
domain_variable: total_self_employment_income

# === NATIONAL — identity / population count targets from old loss.py ===
- variable: person_count
Expand Down
2 changes: 1 addition & 1 deletion policyengine_us_data/calibration/validate_national_h5.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
VARIABLES = [
"adjusted_gross_income",
"employment_income",
"self_employment_income",
"total_self_employment_income",
"tax_unit_partnership_s_corp_income",
"taxable_pension_income",
"dividend_income",
Expand Down
176 changes: 176 additions & 0 deletions policyengine_us_data/datasets/puf/puf.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import h5py
import yaml
from importlib.resources import files

Expand Down Expand Up @@ -432,6 +433,20 @@ def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame:
0.0
)
puf["business_is_sstb"] = rng.binomial(n=1, p=pr_sstb)
is_sstb = puf["business_is_sstb"].astype(bool)

# The current PUF pipeline only imputes an all-or-nothing SSTB flag.
# Use that to split Schedule C self-employment and allocable W-2/UBIA
# inputs for policyengine-us without pretending to observe mixed cases.
legacy_self_employment_income = puf["self_employment_income"].fillna(0)
puf["sstb_self_employment_income"] = np.where(
is_sstb, legacy_self_employment_income, 0.0
)
puf["self_employment_income"] = np.where(
is_sstb, 0.0, legacy_self_employment_income
)
puf["sstb_w2_wages_from_qualified_business"] = np.where(is_sstb, w2, 0.0)
puf["sstb_unadjusted_basis_qualified_property"] = np.where(is_sstb, ubia, 0.0)

reit_params = QBI_PARAMS["reit_ptp_income_distribution"]
p_reit_ptp = reit_params["probability_of_receiving"]
Expand Down Expand Up @@ -526,6 +541,9 @@ def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame:
"w2_wages_from_qualified_business",
"unadjusted_basis_qualified_property",
"business_is_sstb",
"sstb_self_employment_income",
"sstb_w2_wages_from_qualified_business",
"sstb_unadjusted_basis_qualified_property",
"deductible_mortgage_interest",
"partnership_s_corp_income",
"partnership_se_income",
Expand All @@ -538,6 +556,164 @@ class PUF(Dataset):
time_period = None
data_format = Dataset.ARRAYS

@staticmethod
def _replace_array(file_handle, key: str, values: np.ndarray) -> None:
if key in file_handle:
del file_handle[key]
file_handle.create_dataset(key, data=values)

def _sstb_split_overrides(self) -> dict[str, np.ndarray]:
if not self.file_path.exists():
return {}

with h5py.File(self.file_path, "r") as file_handle:
if "business_is_sstb" not in file_handle:
return {}
keys = set(file_handle.keys())
is_sstb = np.asarray(file_handle["business_is_sstb"]).astype(bool)
overrides = {}
if "self_employment_income" in keys:
self_employment_income = np.asarray(
file_handle["self_employment_income"]
)
existing_sstb_self_employment_income = (
np.asarray(file_handle["sstb_self_employment_income"])
if "sstb_self_employment_income" in keys
else np.zeros_like(self_employment_income)
)
corrected_sstb_self_employment_income = np.where(
is_sstb,
np.where(
existing_sstb_self_employment_income != 0,
existing_sstb_self_employment_income,
self_employment_income,
),
0.0,
)
corrected_self_employment_income = np.where(
is_sstb, 0.0, self_employment_income
)
if (
"sstb_self_employment_income" not in keys
or not np.array_equal(
existing_sstb_self_employment_income,
corrected_sstb_self_employment_income,
)
or not np.array_equal(
self_employment_income,
corrected_self_employment_income,
)
):
overrides["sstb_self_employment_income"] = (
corrected_sstb_self_employment_income
)
overrides["self_employment_income"] = (
corrected_self_employment_income
)

for source_key, target_key in (
(
"w2_wages_from_qualified_business",
"sstb_w2_wages_from_qualified_business",
),
(
"unadjusted_basis_qualified_property",
"sstb_unadjusted_basis_qualified_property",
),
):
if source_key not in keys:
continue
corrected_target = np.where(
is_sstb, np.asarray(file_handle[source_key]), 0.0
)
if target_key not in keys or not np.array_equal(
np.asarray(file_handle[target_key]),
corrected_target,
):
overrides[target_key] = corrected_target

return overrides

def _ensure_sstb_split_inputs(self) -> dict[str, np.ndarray]:
overrides = self._sstb_split_overrides()
if not overrides:
return {}

try:
with h5py.File(self.file_path, "r+") as file_handle:
for key, values in overrides.items():
self._replace_array(file_handle, key, values)
except OSError:
pass

return overrides

class _OverrideView:
def __init__(self, backing, overrides: dict[str, np.ndarray]):
self._backing = backing
self._overrides = overrides

def __getitem__(self, key):
if key in self._overrides:
return self._overrides[key]
return self._backing[key]

def __contains__(self, key):
return key in self._overrides or key in self._backing

def keys(self):
if hasattr(self._backing, "keys"):
return tuple(dict.fromkeys((*self._backing.keys(), *self._overrides)))
return tuple(self._overrides)

def get(self, key, default=None):
if key in self:
return self[key]
return default

def items(self):
for key in self.keys():
yield key, self[key]

def values(self):
for key in self.keys():
yield self[key]

def __iter__(self):
return iter(self.keys())

def close(self):
if hasattr(self._backing, "close"):
self._backing.close()

def __enter__(self):
if hasattr(self._backing, "__enter__"):
self._backing.__enter__()
return self

def __exit__(self, exc_type, exc, traceback):
if hasattr(self._backing, "__exit__"):
return self._backing.__exit__(exc_type, exc, traceback)
return None

def __getattr__(self, name):
return getattr(self._backing, name)

def load(self, key=None, mode="r"):
if mode == "r":
overrides = self._ensure_sstb_split_inputs()
if key in overrides:
return overrides[key]
if key is None and overrides:
return self._OverrideView(super().load(key=key, mode=mode), overrides)
return super().load(key=key, mode=mode)

def load_dataset(self):
overrides = self._ensure_sstb_split_inputs()
arrays = super().load_dataset()
arrays.update(overrides)
return arrays

def generate(self):
from policyengine_us.system import system

Expand Down
4 changes: 2 additions & 2 deletions policyengine_us_data/db/etl_irs_soi.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
name="qualified_business_income_deduction",
breakdown=None,
),
dict(code="00900", name="self_employment_income", breakdown=None),
dict(code="00900", name="total_self_employment_income", breakdown=None),
dict(
code="01000",
name="net_capital_gains",
Expand Down Expand Up @@ -147,7 +147,7 @@ def _skip_coarse_state_agi_person_count_target(geo_type: str, agi_stub: int) ->
"net_capital_gains": "capital_gains_gross",
"qualified_dividend_income": "qualified_dividends",
"rental_income": "rent_and_royalty_net_income",
"self_employment_income": "business_net_profits",
"total_self_employment_income": "business_net_profits",
"tax_exempt_interest_income": "exempt_interest",
"tax_unit_partnership_s_corp_income": "partnership_and_s_corp_income",
"taxable_interest_income": "taxable_interest_income",
Expand Down
Loading
Loading