diff --git a/changelog.d/701.changed.md b/changelog.d/701.changed.md new file mode 100644 index 00000000..46015753 --- /dev/null +++ b/changelog.d/701.changed.md @@ -0,0 +1,6 @@ +Add SSTB QBI split inputs to `policyengine-us-data` by exposing +`sstb_self_employment_income`, `sstb_w2_wages_from_qualified_business`, and +`sstb_unadjusted_basis_qualified_property` from the existing PUF/calibration +pipeline. The current split follows the legacy all-or-nothing +`business_is_sstb` flag, so mixed SSTB/non-SSTB allocations remain approximate +until more granular source data or imputation is added. diff --git a/docs/appendix.md b/docs/appendix.md index 41a5b0c7..3e2c8691 100644 --- a/docs/appendix.md +++ b/docs/appendix.md @@ -112,12 +112,19 @@ for iteration in range(5000): - w2_wages_from_qualified_business - unadjusted_basis_qualified_property - business_is_sstb +- sstb_self_employment_income +- sstb_w2_wages_from_qualified_business +- sstb_unadjusted_basis_qualified_property - qualified_reit_and_ptp_income - qualified_bdc_income - farm_operations_income - estate_income_would_be_qualified - farm_operations_income_would_be_qualified - farm_rent_income_would_be_qualified + +The current PUF/calibration pipeline uses the legacy `business_is_sstb` flag to +split these SSTB variables on an all-or-nothing basis. It does not yet infer +mixed SSTB and non-SSTB allocations within the same record. - partnership_s_corp_income_would_be_qualified - rental_income_would_be_qualified - self_employment_income_would_be_qualified diff --git a/policyengine_us_data/__init__.py b/policyengine_us_data/__init__.py index 799e4b91..77ce8693 100644 --- a/policyengine_us_data/__init__.py +++ b/policyengine_us_data/__init__.py @@ -1,6 +1,9 @@ from importlib import import_module from .geography import ZIP_CODE_DATASET +from .utils.policyengine import ensure_policyengine_us_compat_variables + +ensure_policyengine_us_compat_variables() _LAZY_EXPORTS = { "CPS_2024": ( @@ -26,7 +29,16 @@ def __getattr__(name: str): if name not in _LAZY_EXPORTS: - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + try: + value = import_module(f"{__name__}.{name}") + except ModuleNotFoundError as exc: + if exc.name == f"{__name__}.{name}": + raise AttributeError( + f"module {__name__!r} has no attribute {name!r}" + ) from exc + raise + globals()[name] = value + return value module_name, attribute_name = _LAZY_EXPORTS[name] value = getattr(import_module(module_name), attribute_name) diff --git a/policyengine_us_data/calibration/check_staging_sums.py b/policyengine_us_data/calibration/check_staging_sums.py index 9c8172b3..a371dbe3 100644 --- a/policyengine_us_data/calibration/check_staging_sums.py +++ b/policyengine_us_data/calibration/check_staging_sums.py @@ -23,7 +23,7 @@ VARIABLES = [ "adjusted_gross_income", "employment_income", - "self_employment_income", + "total_self_employment_income", "tax_unit_partnership_s_corp_income", "taxable_pension_income", "dividend_income", diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index f6bd0eed..aba3dc1e 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -50,9 +50,12 @@ "pre_tax_contributions", "taxable_ira_distributions", "self_employment_income", + "sstb_self_employment_income", "w2_wages_from_qualified_business", "unadjusted_basis_qualified_property", "business_is_sstb", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", "short_term_capital_gains", "qualified_dividend_income", "charitable_cash_donations", @@ -122,6 +125,8 @@ "w2_wages_from_qualified_business", "unadjusted_basis_qualified_property", "business_is_sstb", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", "charitable_cash_donations", "self_employed_pension_contribution_ald", "unrecaptured_section_1250_gain", @@ -693,6 +698,11 @@ def _impute_retirement_contributions( X_test[income_var] = puf_imputations[income_var] else: X_test[income_var] = cps_sim.calculate(income_var).values + if "sstb_self_employment_income" in puf_imputations: + X_test["self_employment_income"] = ( + X_test["self_employment_income"] + + puf_imputations["sstb_self_employment_income"] + ) del cps_sim @@ -723,13 +733,13 @@ def _impute_retirement_contributions( catch_up_eligible = age >= 50 limit_401k = limits["401k"] + catch_up_eligible * limits["401k_catch_up"] limit_ira = limits["ira"] + catch_up_eligible * limits["ira_catch_up"] + se_income = X_test["self_employment_income"].values se_pension_cap = np.minimum( - X_test["self_employment_income"].values * limits["se_pension_rate"], + se_income * limits["se_pension_rate"], limits["se_pension_dollar_limit"], ) emp_income = X_test["employment_income"].values - se_income = X_test["self_employment_income"].values result = {} for var in CPS_RETIREMENT_VARIABLES: diff --git a/policyengine_us_data/calibration/target_config.yaml b/policyengine_us_data/calibration/target_config.yaml index 41c7474d..926306d1 100644 --- a/policyengine_us_data/calibration/target_config.yaml +++ b/policyengine_us_data/calibration/target_config.yaml @@ -22,7 +22,7 @@ include: geo_level: district - variable: real_estate_taxes geo_level: district - - variable: self_employment_income + - variable: total_self_employment_income geo_level: district - variable: taxable_pension_income geo_level: district @@ -163,9 +163,9 @@ include: - variable: non_refundable_ctc geo_level: national domain_variable: adjusted_gross_income,non_refundable_ctc - - variable: self_employment_income + - variable: total_self_employment_income geo_level: national - domain_variable: self_employment_income + domain_variable: total_self_employment_income - variable: tax_unit_partnership_s_corp_income geo_level: national domain_variable: tax_unit_partnership_s_corp_income @@ -199,7 +199,7 @@ include: # Restore old loss.py's self-employment return-count target. - variable: tax_unit_count geo_level: national - domain_variable: self_employment_income + domain_variable: total_self_employment_income # === NATIONAL — identity / population count targets from old loss.py === - variable: person_count diff --git a/policyengine_us_data/calibration/validate_national_h5.py b/policyengine_us_data/calibration/validate_national_h5.py index c2146a52..2ef8165f 100644 --- a/policyengine_us_data/calibration/validate_national_h5.py +++ b/policyengine_us_data/calibration/validate_national_h5.py @@ -25,7 +25,7 @@ VARIABLES = [ "adjusted_gross_income", "employment_income", - "self_employment_income", + "total_self_employment_income", "tax_unit_partnership_s_corp_income", "taxable_pension_income", "dividend_income", diff --git a/policyengine_us_data/datasets/puf/puf.py b/policyengine_us_data/datasets/puf/puf.py index 2eba0091..dc89c4a9 100644 --- a/policyengine_us_data/datasets/puf/puf.py +++ b/policyengine_us_data/datasets/puf/puf.py @@ -1,3 +1,4 @@ +import h5py import yaml from importlib.resources import files @@ -432,6 +433,20 @@ def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame: 0.0 ) puf["business_is_sstb"] = rng.binomial(n=1, p=pr_sstb) + is_sstb = puf["business_is_sstb"].astype(bool) + + # The current PUF pipeline only imputes an all-or-nothing SSTB flag. + # Use that to split Schedule C self-employment and allocable W-2/UBIA + # inputs for policyengine-us without pretending to observe mixed cases. + legacy_self_employment_income = puf["self_employment_income"].fillna(0) + puf["sstb_self_employment_income"] = np.where( + is_sstb, legacy_self_employment_income, 0.0 + ) + puf["self_employment_income"] = np.where( + is_sstb, 0.0, legacy_self_employment_income + ) + puf["sstb_w2_wages_from_qualified_business"] = np.where(is_sstb, w2, 0.0) + puf["sstb_unadjusted_basis_qualified_property"] = np.where(is_sstb, ubia, 0.0) reit_params = QBI_PARAMS["reit_ptp_income_distribution"] p_reit_ptp = reit_params["probability_of_receiving"] @@ -526,6 +541,9 @@ def preprocess_puf(puf: pd.DataFrame) -> pd.DataFrame: "w2_wages_from_qualified_business", "unadjusted_basis_qualified_property", "business_is_sstb", + "sstb_self_employment_income", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", "deductible_mortgage_interest", "partnership_s_corp_income", "partnership_se_income", @@ -538,6 +556,164 @@ class PUF(Dataset): time_period = None data_format = Dataset.ARRAYS + @staticmethod + def _replace_array(file_handle, key: str, values: np.ndarray) -> None: + if key in file_handle: + del file_handle[key] + file_handle.create_dataset(key, data=values) + + def _sstb_split_overrides(self) -> dict[str, np.ndarray]: + if not self.file_path.exists(): + return {} + + with h5py.File(self.file_path, "r") as file_handle: + if "business_is_sstb" not in file_handle: + return {} + keys = set(file_handle.keys()) + is_sstb = np.asarray(file_handle["business_is_sstb"]).astype(bool) + overrides = {} + if "self_employment_income" in keys: + self_employment_income = np.asarray( + file_handle["self_employment_income"] + ) + existing_sstb_self_employment_income = ( + np.asarray(file_handle["sstb_self_employment_income"]) + if "sstb_self_employment_income" in keys + else np.zeros_like(self_employment_income) + ) + corrected_sstb_self_employment_income = np.where( + is_sstb, + np.where( + existing_sstb_self_employment_income != 0, + existing_sstb_self_employment_income, + self_employment_income, + ), + 0.0, + ) + corrected_self_employment_income = np.where( + is_sstb, 0.0, self_employment_income + ) + if ( + "sstb_self_employment_income" not in keys + or not np.array_equal( + existing_sstb_self_employment_income, + corrected_sstb_self_employment_income, + ) + or not np.array_equal( + self_employment_income, + corrected_self_employment_income, + ) + ): + overrides["sstb_self_employment_income"] = ( + corrected_sstb_self_employment_income + ) + overrides["self_employment_income"] = ( + corrected_self_employment_income + ) + + for source_key, target_key in ( + ( + "w2_wages_from_qualified_business", + "sstb_w2_wages_from_qualified_business", + ), + ( + "unadjusted_basis_qualified_property", + "sstb_unadjusted_basis_qualified_property", + ), + ): + if source_key not in keys: + continue + corrected_target = np.where( + is_sstb, np.asarray(file_handle[source_key]), 0.0 + ) + if target_key not in keys or not np.array_equal( + np.asarray(file_handle[target_key]), + corrected_target, + ): + overrides[target_key] = corrected_target + + return overrides + + def _ensure_sstb_split_inputs(self) -> dict[str, np.ndarray]: + overrides = self._sstb_split_overrides() + if not overrides: + return {} + + try: + with h5py.File(self.file_path, "r+") as file_handle: + for key, values in overrides.items(): + self._replace_array(file_handle, key, values) + except OSError: + pass + + return overrides + + class _OverrideView: + def __init__(self, backing, overrides: dict[str, np.ndarray]): + self._backing = backing + self._overrides = overrides + + def __getitem__(self, key): + if key in self._overrides: + return self._overrides[key] + return self._backing[key] + + def __contains__(self, key): + return key in self._overrides or key in self._backing + + def keys(self): + if hasattr(self._backing, "keys"): + return tuple(dict.fromkeys((*self._backing.keys(), *self._overrides))) + return tuple(self._overrides) + + def get(self, key, default=None): + if key in self: + return self[key] + return default + + def items(self): + for key in self.keys(): + yield key, self[key] + + def values(self): + for key in self.keys(): + yield self[key] + + def __iter__(self): + return iter(self.keys()) + + def close(self): + if hasattr(self._backing, "close"): + self._backing.close() + + def __enter__(self): + if hasattr(self._backing, "__enter__"): + self._backing.__enter__() + return self + + def __exit__(self, exc_type, exc, traceback): + if hasattr(self._backing, "__exit__"): + return self._backing.__exit__(exc_type, exc, traceback) + return None + + def __getattr__(self, name): + return getattr(self._backing, name) + + def load(self, key=None, mode="r"): + if mode == "r": + overrides = self._ensure_sstb_split_inputs() + if key in overrides: + return overrides[key] + if key is None and overrides: + return self._OverrideView(super().load(key=key, mode=mode), overrides) + return super().load(key=key, mode=mode) + + def load_dataset(self): + overrides = self._ensure_sstb_split_inputs() + arrays = super().load_dataset() + arrays.update(overrides) + return arrays + def generate(self): from policyengine_us.system import system diff --git a/policyengine_us_data/db/etl_irs_soi.py b/policyengine_us_data/db/etl_irs_soi.py index aeed698e..b75cded1 100644 --- a/policyengine_us_data/db/etl_irs_soi.py +++ b/policyengine_us_data/db/etl_irs_soi.py @@ -65,7 +65,7 @@ name="qualified_business_income_deduction", breakdown=None, ), - dict(code="00900", name="self_employment_income", breakdown=None), + dict(code="00900", name="total_self_employment_income", breakdown=None), dict( code="01000", name="net_capital_gains", @@ -147,7 +147,7 @@ def _skip_coarse_state_agi_person_count_target(geo_type: str, agi_stub: int) -> "net_capital_gains": "capital_gains_gross", "qualified_dividend_income": "qualified_dividends", "rental_income": "rent_and_royalty_net_income", - "self_employment_income": "business_net_profits", + "total_self_employment_income": "business_net_profits", "tax_exempt_interest_income": "exempt_interest", "tax_unit_partnership_s_corp_income": "partnership_and_s_corp_income", "taxable_interest_income": "taxable_interest_income", diff --git a/policyengine_us_data/db/validate_database.py b/policyengine_us_data/db/validate_database.py index 8f769d76..bfc96d28 100644 --- a/policyengine_us_data/db/validate_database.py +++ b/policyengine_us_data/db/validate_database.py @@ -4,23 +4,21 @@ the overall correctness of data after a full pipeline run with production data. """ +from __future__ import annotations + import sqlite3 +from pathlib import Path import pandas as pd -from policyengine_us.system import system - -conn = sqlite3.connect("policyengine_us_data/storage/calibration/policy_data.db") -stratum_constraints_df = pd.read_sql("SELECT * FROM stratum_constraints", conn) -targets_df = pd.read_sql("SELECT * FROM targets", conn) +from policyengine_us_data.utils.policyengine import ( + ensure_policyengine_us_compat_variables, +) -for var_name in set(targets_df["variable"]): - if not var_name in system.variables.keys(): - raise ValueError(f"{var_name} not a policyengine-us variable") -for var_name in set(stratum_constraints_df["constraint_variable"]): - if not var_name in system.variables.keys(): - raise ValueError(f"{var_name} not a policyengine-us variable") +DEFAULT_DB_PATH = ( + Path("policyengine_us_data") / "storage" / "calibration" / "policy_data.db" +) TAX_EXPENDITURE_VARS = [ "salt_deduction", @@ -30,20 +28,49 @@ "qualified_business_income_deduction", ] -root_stratum_ids = pd.read_sql( - "SELECT stratum_id FROM strata WHERE parent_stratum_id IS NULL", conn -)["stratum_id"].tolist() - -for var in TAX_EXPENDITURE_VARS: - matches = targets_df[ - (targets_df["variable"] == var) - & (targets_df["active"] == 1) - & (targets_df["stratum_id"].isin(root_stratum_ids)) - & (targets_df["reform_id"] > 0) - ] - if matches.empty: - raise ValueError( - f"Validation failed: {var} has no active target with " - f"reform_id > 0 in the root stratum. Tax expenditure targets " - f"must have a non-zero reform_id for correct calibration." - ) + +def validate_database(db_path: str | Path = DEFAULT_DB_PATH) -> None: + ensure_policyengine_us_compat_variables() + + from policyengine_us.system import system + + conn = sqlite3.connect(str(db_path)) + try: + stratum_constraints_df = pd.read_sql("SELECT * FROM stratum_constraints", conn) + targets_df = pd.read_sql("SELECT * FROM targets", conn) + + for var_name in set(targets_df["variable"]): + if var_name not in system.variables: + raise ValueError(f"{var_name} not a policyengine-us variable") + + for var_name in set(stratum_constraints_df["constraint_variable"]): + if var_name not in system.variables: + raise ValueError(f"{var_name} not a policyengine-us variable") + + root_stratum_ids = pd.read_sql( + "SELECT stratum_id FROM strata WHERE parent_stratum_id IS NULL", conn + )["stratum_id"].tolist() + + for var in TAX_EXPENDITURE_VARS: + matches = targets_df[ + (targets_df["variable"] == var) + & (targets_df["active"] == 1) + & (targets_df["stratum_id"].isin(root_stratum_ids)) + & (targets_df["reform_id"] > 0) + ] + if matches.empty: + raise ValueError( + f"Validation failed: {var} has no active target with " + f"reform_id > 0 in the root stratum. Tax expenditure targets " + f"must have a non-zero reform_id for correct calibration." + ) + finally: + conn.close() + + +def main() -> None: + validate_database() + + +if __name__ == "__main__": + main() diff --git a/policyengine_us_data/utils/policyengine.py b/policyengine_us_data/utils/policyengine.py index 869b95d9..ae9c5820 100644 --- a/policyengine_us_data/utils/policyengine.py +++ b/policyengine_us_data/utils/policyengine.py @@ -67,6 +67,364 @@ def _get_git_commit(path: Path | None) -> str | None: return None +@lru_cache(maxsize=1) +def ensure_policyengine_us_compat_variables() -> None: + """Backfill SSTB/QBI variables when running against older policyengine-us. + + The SSTB split landed across `policyengine-us` and `policyengine-us-data` + in separate PRs. Until the model package release catches up, keep the data + package usable by registering the missing inputs/formulas on import. + """ + + try: + from policyengine_us.model_api import ( + Person, + TaxUnit, + USD, + Variable, + YEAR, + add, + max_, + min_, + np, + where, + ) + from policyengine_us.system import CountryTaxBenefitSystem, system + except Exception: + return + + class sstb_self_employment_income(Variable): + value_type = float + entity = Person + label = "SSTB self-employment income" + unit = USD + documentation = ( + "Self-employment non-farm income from a specified service trade or " + "business (SSTB) under IRC Section 199A(d)(2)." + ) + definition_period = YEAR + reference = ( + "https://www.law.cornell.edu/uscode/text/26/1402#a", + "https://www.law.cornell.edu/uscode/text/26/199A#d_2", + ) + uprating = "calibration.gov.irs.soi.self_employment_income" + default_value = 0 + + class sstb_w2_wages_from_qualified_business(Variable): + value_type = float + entity = Person + label = "SSTB allocable W-2 wages" + unit = USD + documentation = ( + "Portion of w2_wages_from_qualified_business allocable to " + "specified service trades or businesses for section 199A." + ) + definition_period = YEAR + reference = ( + "https://www.law.cornell.edu/uscode/text/26/199A#b_2", + "https://www.law.cornell.edu/uscode/text/26/199A#d_3", + ) + uprating = "calibration.gov.cbo.income_by_source.adjusted_gross_income" + default_value = 0 + + class sstb_unadjusted_basis_qualified_property(Variable): + value_type = float + entity = Person + label = "SSTB allocable UBIA of qualified property" + unit = USD + documentation = ( + "Portion of unadjusted_basis_qualified_property allocable to " + "specified service trades or businesses for section 199A." + ) + definition_period = YEAR + reference = ( + "https://www.law.cornell.edu/uscode/text/26/199A#b_2", + "https://www.law.cornell.edu/uscode/text/26/199A#d_3", + ) + default_value = 0 + + class sstb_self_employment_income_would_be_qualified(Variable): + value_type = bool + entity = Person + label = "SSTB self-employment income would be qualified" + documentation = ( + "Whether SSTB self-employment income would count toward qualified " + "business income before the section 199A(d)(3) phaseout." + ) + definition_period = YEAR + reference = "https://www.law.cornell.edu/uscode/text/26/199A#c_3_A" + default_value = True + + def _split_qbi_components(person, period, parameters): + p = parameters(period).gov.irs.deductions.qbi + non_sstb_gross = 0 + for var in p.income_definition: + non_sstb_gross += person(var, period) * person( + var + "_would_be_qualified", period + ) + sstb_gross = person("sstb_self_employment_income", period) * person( + "sstb_self_employment_income_would_be_qualified", period + ) + positive_non_sstb_gross = max_(0, non_sstb_gross) + positive_sstb_gross = max_(0, sstb_gross) + positive_gross_total = positive_non_sstb_gross + positive_sstb_gross + qbi_deductions = add(person, period, p.deduction_definition) + non_sstb_share = where( + positive_gross_total > 0, + positive_non_sstb_gross / positive_gross_total, + 0, + ) + sstb_share = where( + positive_gross_total > 0, + positive_sstb_gross / positive_gross_total, + 0, + ) + return ( + max_(0, non_sstb_gross - qbi_deductions * non_sstb_share), + max_(0, sstb_gross - qbi_deductions * sstb_share), + ) + + class sstb_qualified_business_income(Variable): + value_type = float + entity = Person + label = "SSTB qualified business income" + documentation = ( + "Qualified business income from a specified service trade or " + "business under section 199A(d)(2)." + ) + unit = USD + definition_period = YEAR + reference = ( + "https://www.law.cornell.edu/uscode/text/26/199A#c", + "https://www.law.cornell.edu/uscode/text/26/199A#d_2", + ) + + def formula(person, period, parameters): + return _split_qbi_components(person, period, parameters)[1] + + class total_self_employment_income(Variable): + value_type = float + entity = Person + label = "total self-employment income" + unit = USD + documentation = ( + "Total non-farm self-employment income, including both SSTB and " + "non-SSTB Schedule C income." + ) + definition_period = YEAR + adds = ["self_employment_income", "sstb_self_employment_income"] + reference = "https://www.law.cornell.edu/uscode/text/26/1402#a" + uprating = "calibration.gov.irs.soi.self_employment_income" + + class qualified_business_income(Variable): + value_type = float + entity = Person + label = "Qualified business income" + documentation = ( + "Business income that qualifies for the qualified business income " + "deduction." + ) + unit = USD + definition_period = YEAR + reference = "https://www.law.cornell.edu/uscode/text/26/199A#c" + defined_for = "business_is_qualified" + + def formula(person, period, parameters): + p = parameters(period).gov.irs.deductions.qbi + gross_qbi = 0 + for var in p.income_definition: + gross_qbi += person(var, period) * person( + var + "_would_be_qualified", period + ) + gross_qbi += person("sstb_self_employment_income", period) * person( + "sstb_self_employment_income_would_be_qualified", period + ) + qbi_deductions = add(person, period, p.deduction_definition) + return max_(0, gross_qbi - qbi_deductions) + + class qbid_amount(Variable): + value_type = float + entity = Person + label = "Per-person qualified business income deduction amount" + unit = USD + definition_period = YEAR + reference = ( + "https://www.law.cornell.edu/uscode/text/26/199A#b_1", + "https://www.law.cornell.edu/uscode/text/26/199A#d_3", + "https://www.irs.gov/pub/irs-prior/p535--2018.pdf", + "https://www.irs.gov/pub/irs-pdf/f8995.pdf", + "https://www.irs.gov/pub/irs-pdf/f8995a.pdf", + ) + + def formula(person, period, parameters): + p = parameters(period).gov.irs.deductions.qbi + taxinc_less_qbid = person.tax_unit("taxable_income_less_qbid", period) + filing_status = person.tax_unit("filing_status", period) + po_start = p.phase_out.start[filing_status] + po_length = p.phase_out.length[filing_status] + reduction_rate = min_(1, (max_(0, taxinc_less_qbid - po_start)) / po_length) + applicable_rate = 1 - reduction_rate + total_w2_wages = person("w2_wages_from_qualified_business", period) + total_b_property = person("unadjusted_basis_qualified_property", period) + + def qbi_component(qbi, full_cap, sstb_multiplier): + qbid_max = p.max.rate * qbi + adj_qbid_max = qbid_max * sstb_multiplier + adj_cap = full_cap * sstb_multiplier + line11 = min_(adj_qbid_max, adj_cap) + reduction = reduction_rate * max_(0, adj_qbid_max - adj_cap) + line26 = max_(0, adj_qbid_max - reduction) + line12 = where(adj_cap < adj_qbid_max, line26, 0) + return max_(line11, line12) + + split_non_sstb_qbi = _split_qbi_components(person, period, parameters)[0] + legacy_total_qbi = person("qualified_business_income", period) + sstb_qbi_from_se = person("sstb_qualified_business_income", period) + is_sstb_legacy = person("business_is_sstb", period) + sstb_qbi = where(is_sstb_legacy, legacy_total_qbi, sstb_qbi_from_se) + non_sstb_qbi_final = where( + is_sstb_legacy, + 0, + split_non_sstb_qbi, + ) + + has_non_sstb = non_sstb_qbi_final > 0 + has_sstb = sstb_qbi > 0 + has_mixed_categories = has_non_sstb & has_sstb + + sstb_w2_wages = where( + is_sstb_legacy, + total_w2_wages, + where( + has_mixed_categories, + person("sstb_w2_wages_from_qualified_business", period), + where(has_sstb, total_w2_wages, 0), + ), + ) + non_sstb_w2_wages = where( + is_sstb_legacy, + 0, + where( + has_mixed_categories, + max_(0, total_w2_wages - sstb_w2_wages), + where(has_non_sstb, total_w2_wages, 0), + ), + ) + + sstb_b_property = where( + is_sstb_legacy, + total_b_property, + where( + has_mixed_categories, + person("sstb_unadjusted_basis_qualified_property", period), + where(has_sstb, total_b_property, 0), + ), + ) + non_sstb_b_property = where( + is_sstb_legacy, + 0, + where( + has_mixed_categories, + max_(0, total_b_property - sstb_b_property), + where(has_non_sstb, total_b_property, 0), + ), + ) + + def full_cap(w2_wages, b_property): + wage_cap = w2_wages * p.max.w2_wages.rate + alt_cap = ( + w2_wages * p.max.w2_wages.alt_rate + + b_property * p.max.business_property.rate + ) + return max_(wage_cap, alt_cap) + + non_sstb_component = qbi_component( + non_sstb_qbi_final, + full_cap(non_sstb_w2_wages, non_sstb_b_property), + 1, + ) + sstb_component = qbi_component( + sstb_qbi, + full_cap(sstb_w2_wages, sstb_b_property), + applicable_rate, + ) + + reit_ptp_income = person("qualified_reit_and_ptp_income", period) + reit_ptp_component = p.max.reit_ptp_rate * max_(0, reit_ptp_income) + return non_sstb_component + sstb_component + reit_ptp_component + + class qualified_business_income_deduction(Variable): + value_type = float + entity = TaxUnit + label = "Qualified business income deduction for tax unit" + unit = USD + definition_period = YEAR + reference = ( + "https://www.law.cornell.edu/uscode/text/26/199A#b_1" + "https://www.irs.gov/pub/irs-prior/p535--2018.pdf" + ) + + def formula(tax_unit, period, parameters): + person = tax_unit.members + qbid_amt = person("qbid_amount", period) + split_non_sstb_qbi = _split_qbi_components(person, period, parameters)[0] + legacy_total_qbi = person("qualified_business_income", period) + sstb_qbi = person("sstb_qualified_business_income", period) + is_sstb_legacy = person("business_is_sstb", period) + total_qbi = tax_unit.sum( + where( + is_sstb_legacy, + legacy_total_qbi, + split_non_sstb_qbi + sstb_qbi, + ) + ) + uncapped_qbid = tax_unit.sum(qbid_amt) + taxinc_less_qbid = tax_unit("taxable_income_less_qbid", period) + netcg_qdiv = tax_unit("adjusted_net_capital_gain", period) + p = parameters(period).gov.irs.deductions.qbi + taxinc_cap = p.max.rate * max_(0, taxinc_less_qbid - netcg_qdiv) + pre_floor_qbid = min_(uncapped_qbid, taxinc_cap) + if p.deduction_floor.in_effect: + floor = p.deduction_floor.amount.calc(total_qbi) + return max_(pre_floor_qbid, floor) + return pre_floor_qbid + + compat_variables = [ + sstb_self_employment_income, + sstb_w2_wages_from_qualified_business, + sstb_unadjusted_basis_qualified_property, + sstb_self_employment_income_would_be_qualified, + sstb_qualified_business_income, + total_self_employment_income, + ] + compat_replacements = [ + qualified_business_income, + qbid_amount, + qualified_business_income_deduction, + ] + + def install_compat_variables(tbs) -> None: + needs_sstb_qbi_compat = "sstb_qualified_business_income" not in tbs.variables + for variable in compat_variables: + if variable.__name__ not in tbs.variables: + tbs.add_variable(variable) + if needs_sstb_qbi_compat: + for variable in compat_replacements: + tbs.replace_variable(variable) + + if not getattr(CountryTaxBenefitSystem, "_policyengine_us_data_compat", False): + original_init = CountryTaxBenefitSystem.__init__ + + def patched_init(self, *args, **kwargs): + original_init(self, *args, **kwargs) + install_compat_variables(self) + + CountryTaxBenefitSystem.__init__ = patched_init + CountryTaxBenefitSystem._policyengine_us_data_compat = True + + install_compat_variables(system) + + @lru_cache(maxsize=None) def get_locked_dependency_version(package_name: str) -> str | None: if not UV_LOCK_PATH.exists(): @@ -124,6 +482,7 @@ def assert_locked_policyengine_us_version() -> PolicyEngineUSBuildInfo: def _policyengine_us_variable_names() -> frozenset[str]: from policyengine_us import CountryTaxBenefitSystem + ensure_policyengine_us_compat_variables() return frozenset(CountryTaxBenefitSystem().variables) @@ -144,3 +503,6 @@ def supports_modeled_medicare_part_b_inputs() -> bool: return has_policyengine_us_variables( "medicare_part_b_premiums_reported", ) + + +ensure_policyengine_us_compat_variables() diff --git a/policyengine_us_data/utils/soi.py b/policyengine_us_data/utils/soi.py index 0d45d1a5..d7ed2e84 100644 --- a/policyengine_us_data/utils/soi.py +++ b/policyengine_us_data/utils/soi.py @@ -7,7 +7,7 @@ "adjusted_gross_income": "adjusted_gross_income", "count": "population", "employment_income": "employment_income", - "business_net_profits": "self_employment_income", + "business_net_profits": "total_self_employment_income", "capital_gains_gross": "long_term_capital_gains", "ordinary_dividends": "non_qualified_dividend_income", "partnership_and_s_corp_income": "partnership_s_corp_income", @@ -19,7 +19,7 @@ "mortgage_interest_deductions": "interest_deduction", "total_pension_income": "pension_income", "total_social_security": "social_security", - "business_net_losses": "self_employment_income", + "business_net_losses": "total_self_employment_income", "capital_gains_distributions": "long_term_capital_gains", "capital_gains_losses": "long_term_capital_gains", "estate_income": "estate_income", @@ -59,12 +59,9 @@ def pe(variable): df["income_tax_after_credits"] = pe("income_tax") df["total_income_tax"] = pe("income_tax_before_credits") df["taxable_income"] = pe("taxable_income") - df["business_net_profits"] = pe("self_employment_income") * ( - pe("self_employment_income") > 0 - ) - df["business_net_losses"] = -pe("self_employment_income") * ( - pe("self_employment_income") < 0 - ) + schedule_c_income = pe("self_employment_income") + pe("sstb_self_employment_income") + df["business_net_profits"] = schedule_c_income * (schedule_c_income > 0) + df["business_net_losses"] = -schedule_c_income * (schedule_c_income < 0) df["capital_gains_distributions"] = pe("non_sch_d_capital_gains") df["capital_gains_gross"] = pe("loss_limited_net_capital_gains") * ( pe("loss_limited_net_capital_gains") > 0 diff --git a/tests/unit/calibration/test_calibration_puf_impute.py b/tests/unit/calibration/test_calibration_puf_impute.py index 8c55f731..363e3b19 100644 --- a/tests/unit/calibration/test_calibration_puf_impute.py +++ b/tests/unit/calibration/test_calibration_puf_impute.py @@ -5,11 +5,13 @@ """ import numpy as np +import pandas as pd from policyengine_us_data.calibration.puf_impute import ( DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES, OVERRIDDEN_IMPUTED_VARIABLES, + _impute_retirement_contributions, _log_stratified_subsample, _stratified_subsample_index, puf_clone_dataset, @@ -154,6 +156,23 @@ def test_overridden_subset_of_imputed(self): for var in OVERRIDDEN_IMPUTED_VARIABLES: assert var in IMPUTED_VARIABLES + def test_sstb_qbi_split_variables_imputed(self): + expected = { + "sstb_self_employment_income", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", + } + for var in expected: + assert var in IMPUTED_VARIABLES + + def test_sstb_allocable_wage_and_ubia_are_overridden(self): + expected = { + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", + } + for var in expected: + assert var in OVERRIDDEN_IMPUTED_VARIABLES + class TestStratifiedSubsample: def test_noop_when_small(self): @@ -193,14 +212,109 @@ def test_indices_sorted(self): idx = _stratified_subsample_index(income, target_n=10_000) assert np.all(idx[1:] >= idx[:-1]) - def test_log_handles_grouped_currency_threshold(self, caplog): - threshold = np.float32(8.934329e7) - caplog.set_level( - "INFO", - logger="policyengine_us_data.calibration.puf_impute", - ) - _log_stratified_subsample(484_015, 20_000, 0.5, threshold) +def test_retirement_imputation_caps_se_pension_using_sstb_income(monkeypatch): + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = dataset + + def calculate_dataframe(self, columns): + if "self_employed_pension_contributions" in columns: + return pd.DataFrame( + { + "age": [40, 55], + "is_male": [0, 1], + "tax_unit_is_joint": [0, 1], + "tax_unit_count_dependents": [0, 1], + "is_tax_unit_head": [1, 1], + "is_tax_unit_spouse": [0, 0], + "is_tax_unit_dependent": [0, 0], + "employment_income": [0.0, 0.0], + "self_employment_income": [0.0, 100.0], + "taxable_interest_income": [0.0, 0.0], + "qualified_dividend_income": [0.0, 0.0], + "taxable_pension_income": [0.0, 0.0], + "social_security": [0.0, 0.0], + "traditional_401k_contributions": [0.0, 0.0], + "roth_401k_contributions": [0.0, 0.0], + "traditional_ira_contributions": [0.0, 0.0], + "roth_ira_contributions": [0.0, 0.0], + "self_employed_pension_contributions": [0.0, 0.0], + } + ) + return pd.DataFrame( + { + "age": [40, 55], + "is_male": [0, 1], + "tax_unit_is_joint": [0, 1], + "tax_unit_count_dependents": [0, 1], + "is_tax_unit_head": [1, 1], + "is_tax_unit_spouse": [0, 0], + "is_tax_unit_dependent": [0, 0], + } + ) + + def calculate(self, variable): + return pd.Series(np.zeros(2)) + + class FakeQRF: + def __init__(self, **kwargs): + pass + + def fit_predict( + self, + X_train, + X_test, + predictors, + imputed_variables, + n_jobs, + ): + np.testing.assert_array_equal( + X_test["self_employment_income"].to_numpy(), + np.array([100.0, 100.0]), + ) + return pd.DataFrame( + { + "traditional_401k_contributions": [0.0, 0.0], + "roth_401k_contributions": [0.0, 0.0], + "traditional_ira_contributions": [0.0, 0.0], + "roth_ira_contributions": [0.0, 0.0], + "self_employed_pension_contributions": [50_000.0, 50_000.0], + } + ) + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr("microimpute.models.qrf.QRF", FakeQRF) + + result = _impute_retirement_contributions( + data={"person_id": {2024: np.array([1, 2])}}, + puf_imputations={ + "employment_income": np.array([0.0, 0.0]), + "self_employment_income": np.array([0.0, 100.0]), + "sstb_self_employment_income": np.array([100.0, 0.0]), + "taxable_interest_income": np.array([0.0, 0.0]), + "qualified_dividend_income": np.array([0.0, 0.0]), + "taxable_pension_income": np.array([0.0, 0.0]), + "social_security": np.array([0.0, 0.0]), + }, + time_period=2024, + dataset_path="ignored.h5", + ) + + np.testing.assert_array_equal( + result["self_employed_pension_contributions"], + np.array([25.0, 25.0]), + ) + + +def test_log_handles_grouped_currency_threshold(caplog): + threshold = np.float32(8.934329e7) + caplog.set_level( + "INFO", + logger="policyengine_us_data.calibration.puf_impute", + ) + + _log_stratified_subsample(484_015, 20_000, 0.5, threshold) - assert "Stratified PUF subsample: 484015 -> 20000 records" in caplog.text - assert f"${threshold:,.0f}" in caplog.text + assert "Stratified PUF subsample: 484015 -> 20000 records" in caplog.text + assert f"${threshold:,.0f}" in caplog.text diff --git a/tests/unit/calibration/test_check_staging_sums.py b/tests/unit/calibration/test_check_staging_sums.py index 02e39cdd..0daf2755 100644 --- a/tests/unit/calibration/test_check_staging_sums.py +++ b/tests/unit/calibration/test_check_staging_sums.py @@ -1,4 +1,5 @@ from policyengine_us_data.calibration.check_staging_sums import ( + VARIABLES, get_reference_summary, ) @@ -23,3 +24,8 @@ def test_reference_summary_uses_irs_ctc_component_targets(monkeypatch): assert "refundable CTC ~$33.0B" in summary assert "non-refundable CTC ~$81.6B" in summary assert "IRS SOI 2022" in summary + + +def test_staging_sums_use_total_self_employment_income(): + assert "total_self_employment_income" in VARIABLES + assert "self_employment_income" not in VARIABLES diff --git a/tests/unit/calibration/test_validate_national_h5.py b/tests/unit/calibration/test_validate_national_h5.py index 6177ff36..7bee9a94 100644 --- a/tests/unit/calibration/test_validate_national_h5.py +++ b/tests/unit/calibration/test_validate_national_h5.py @@ -3,6 +3,7 @@ import pandas as pd from policyengine_us_data.calibration.validate_national_h5 import ( + VARIABLES, build_artifact_ctc_summary, build_canonical_ctc_reform_summary, get_artifact_ctc_comparison_outputs, @@ -98,6 +99,11 @@ def fake_download(**kwargs): ] +def test_validation_uses_total_self_employment_income(): + assert "total_self_employment_income" in VARIABLES + assert "self_employment_income" not in VARIABLES + + class _FakeArrayResult: def __init__(self, values): self._values = values diff --git a/tests/unit/datasets/test_irs_puf.py b/tests/unit/datasets/test_irs_puf.py index f6e84728..ad74b175 100644 --- a/tests/unit/datasets/test_irs_puf.py +++ b/tests/unit/datasets/test_irs_puf.py @@ -1,5 +1,9 @@ +import h5py +import numpy as np import pytest +from policyengine_us_data.datasets.puf.puf import PUF + @pytest.mark.skip(reason="This test requires private data.") @pytest.mark.parametrize("year", [2015]) @@ -11,3 +15,183 @@ def test_irs_puf_generates(year: int): } dataset_by_year[year](require=True) + + +def test_puf_load_dataset_backfills_sstb_split_inputs(tmp_path): + class DummyPUF(PUF): + label = "Dummy PUF" + name = "dummy_puf" + time_period = 2024 + file_path = tmp_path / "dummy_puf.h5" + + with h5py.File(DummyPUF.file_path, "w") as file_handle: + file_handle.create_dataset( + "self_employment_income", data=np.array([100.0, 200.0]) + ) + file_handle.create_dataset( + "w2_wages_from_qualified_business", data=np.array([10.0, 20.0]) + ) + file_handle.create_dataset( + "unadjusted_basis_qualified_property", data=np.array([5.0, 6.0]) + ) + file_handle.create_dataset("business_is_sstb", data=np.array([1, 0])) + + dataset = DummyPUF() + arrays = dataset.load_dataset() + + np.testing.assert_array_equal( + arrays["self_employment_income"], np.array([0.0, 200.0]) + ) + np.testing.assert_array_equal( + arrays["sstb_self_employment_income"], np.array([100.0, 0.0]) + ) + np.testing.assert_array_equal( + arrays["sstb_w2_wages_from_qualified_business"], np.array([10.0, 0.0]) + ) + np.testing.assert_array_equal( + arrays["sstb_unadjusted_basis_qualified_property"], np.array([5.0, 0.0]) + ) + + +def test_puf_load_key_backfills_sstb_split_inputs(tmp_path): + class DummyPUF(PUF): + label = "Dummy PUF" + name = "dummy_puf" + time_period = 2024 + file_path = tmp_path / "dummy_puf.h5" + + with h5py.File(DummyPUF.file_path, "w") as file_handle: + file_handle.create_dataset( + "self_employment_income", data=np.array([100.0, 200.0]) + ) + file_handle.create_dataset("business_is_sstb", data=np.array([1, 0])) + + dataset = DummyPUF() + + np.testing.assert_array_equal( + dataset.load("self_employment_income"), np.array([0.0, 200.0]) + ) + np.testing.assert_array_equal( + dataset.load("sstb_self_employment_income"), np.array([100.0, 0.0]) + ) + + +def test_puf_load_key_repairs_partially_migrated_sstb_split_inputs(tmp_path): + class DummyPUF(PUF): + label = "Dummy PUF" + name = "dummy_puf" + time_period = 2024 + file_path = tmp_path / "dummy_puf.h5" + + with h5py.File(DummyPUF.file_path, "w") as file_handle: + file_handle.create_dataset( + "self_employment_income", data=np.array([100.0, 200.0]) + ) + file_handle.create_dataset( + "sstb_self_employment_income", data=np.array([100.0, 0.0]) + ) + file_handle.create_dataset("business_is_sstb", data=np.array([1, 0])) + + dataset = DummyPUF() + + np.testing.assert_array_equal( + dataset.load("self_employment_income"), np.array([0.0, 200.0]) + ) + np.testing.assert_array_equal( + dataset.load("sstb_self_employment_income"), np.array([100.0, 0.0]) + ) + + +def test_puf_load_read_only_backfilled_file_does_not_reopen_for_writes(tmp_path): + class DummyPUF(PUF): + label = "Dummy PUF" + name = "dummy_puf" + time_period = 2024 + file_path = tmp_path / "dummy_puf.h5" + + with h5py.File(DummyPUF.file_path, "w") as file_handle: + file_handle.create_dataset( + "self_employment_income", data=np.array([0.0, 200.0]) + ) + file_handle.create_dataset( + "sstb_self_employment_income", data=np.array([100.0, 0.0]) + ) + file_handle.create_dataset( + "w2_wages_from_qualified_business", data=np.array([10.0, 20.0]) + ) + file_handle.create_dataset( + "sstb_w2_wages_from_qualified_business", data=np.array([10.0, 0.0]) + ) + file_handle.create_dataset( + "unadjusted_basis_qualified_property", data=np.array([5.0, 6.0]) + ) + file_handle.create_dataset( + "sstb_unadjusted_basis_qualified_property", + data=np.array([5.0, 0.0]), + ) + file_handle.create_dataset("business_is_sstb", data=np.array([1, 0])) + + DummyPUF.file_path.chmod(0o444) + dataset = DummyPUF() + + try: + np.testing.assert_array_equal( + dataset.load("sstb_self_employment_income"), np.array([100.0, 0.0]) + ) + arrays = dataset.load_dataset() + finally: + DummyPUF.file_path.chmod(0o644) + + np.testing.assert_array_equal( + arrays["sstb_self_employment_income"], np.array([100.0, 0.0]) + ) + + +def test_puf_load_read_only_partially_migrated_file_uses_overrides(tmp_path): + class DummyPUF(PUF): + label = "Dummy PUF" + name = "dummy_puf" + time_period = 2024 + file_path = tmp_path / "dummy_puf.h5" + + with h5py.File(DummyPUF.file_path, "w") as file_handle: + file_handle.create_dataset( + "self_employment_income", data=np.array([100.0, 200.0]) + ) + file_handle.create_dataset( + "sstb_self_employment_income", data=np.array([100.0, 0.0]) + ) + file_handle.create_dataset("business_is_sstb", data=np.array([1, 0])) + + DummyPUF.file_path.chmod(0o444) + dataset = DummyPUF() + + try: + np.testing.assert_array_equal( + dataset.load("self_employment_income"), np.array([0.0, 200.0]) + ) + np.testing.assert_array_equal( + dataset.load("sstb_self_employment_income"), np.array([100.0, 0.0]) + ) + reader = dataset.load() + np.testing.assert_array_equal( + reader["self_employment_income"], np.array([0.0, 200.0]) + ) + np.testing.assert_array_equal( + reader.get("self_employment_income"), np.array([0.0, 200.0]) + ) + np.testing.assert_array_equal( + dict(reader.items())["self_employment_income"], + np.array([0.0, 200.0]), + ) + reader.close() + arrays = dataset.load_dataset() + finally: + DummyPUF.file_path.chmod(0o644) + + np.testing.assert_array_equal( + arrays["self_employment_income"], np.array([0.0, 200.0]) + ) + np.testing.assert_array_equal( + arrays["sstb_self_employment_income"], np.array([100.0, 0.0]) + ) diff --git a/tests/unit/db/test_validate_database.py b/tests/unit/db/test_validate_database.py new file mode 100644 index 00000000..6d1cdb97 --- /dev/null +++ b/tests/unit/db/test_validate_database.py @@ -0,0 +1,61 @@ +import sqlite3 + +from policyengine_us_data.db.validate_database import validate_database + + +def test_validate_database_accepts_compat_variables(tmp_path): + db_path = tmp_path / "policy_data.db" + conn = sqlite3.connect(db_path) + try: + conn.executescript(""" + CREATE TABLE strata ( + stratum_id INTEGER PRIMARY KEY, + parent_stratum_id INTEGER + ); + CREATE TABLE stratum_constraints ( + stratum_id INTEGER, + constraint_variable TEXT + ); + CREATE TABLE targets ( + stratum_id INTEGER, + variable TEXT, + active INTEGER, + reform_id INTEGER + ); + """) + conn.execute( + "INSERT INTO strata (stratum_id, parent_stratum_id) VALUES (?, ?)", + (1, None), + ) + conn.execute( + "INSERT INTO stratum_constraints (stratum_id, constraint_variable) " + "VALUES (?, ?)", + (1, "total_self_employment_income"), + ) + + for reform_id, variable in enumerate( + [ + "salt_deduction", + "charitable_deduction", + "deductible_mortgage_interest", + "medical_expense_deduction", + "qualified_business_income_deduction", + ], + start=1, + ): + conn.execute( + "INSERT INTO targets (stratum_id, variable, active, reform_id) " + "VALUES (?, ?, ?, ?)", + (1, variable, 1, reform_id), + ) + + conn.execute( + "INSERT INTO targets (stratum_id, variable, active, reform_id) " + "VALUES (?, ?, ?, ?)", + (1, "total_self_employment_income", 1, 0), + ) + conn.commit() + finally: + conn.close() + + validate_database(db_path) diff --git a/tests/unit/test_package_imports.py b/tests/unit/test_package_imports.py index 37c7143e..f65e1947 100644 --- a/tests/unit/test_package_imports.py +++ b/tests/unit/test_package_imports.py @@ -2,6 +2,8 @@ import sys from pathlib import Path +import numpy as np + import policyengine_us_data @@ -43,3 +45,202 @@ def test_package_root_lazily_exports_dataset_classes(): assert policyengine_us_data.ExtendedCPS_2024.__name__ == "ExtendedCPS_2024" assert policyengine_us_data.CPS_2024.__name__ == "CPS_2024" assert policyengine_us_data.PUF_2024.__name__ == "PUF_2024" + + +def test_policyengine_us_compat_variables_are_registered(): + from policyengine_us import CountryTaxBenefitSystem + + tbs = CountryTaxBenefitSystem() + + for variable in [ + "sstb_self_employment_income", + "sstb_w2_wages_from_qualified_business", + "sstb_unadjusted_basis_qualified_property", + "sstb_self_employment_income_would_be_qualified", + "sstb_qualified_business_income", + "total_self_employment_income", + ]: + assert variable in tbs.variables + + assert ( + tbs.variables["sstb_w2_wages_from_qualified_business"].uprating + == tbs.variables["w2_wages_from_qualified_business"].uprating + ) + + +def test_policyengine_us_compat_qbid_supports_sstb_only_schedule_c(): + from policyengine_us import CountryTaxBenefitSystem + + tbs = CountryTaxBenefitSystem() + params = tbs.parameters + qbi_formula = type(tbs.variables["qualified_business_income"]).formula + sstb_qbi_formula = type(tbs.variables["sstb_qualified_business_income"]).formula + qbid_formula = type(tbs.variables["qbid_amount"]).formula + deduction_formula = type( + tbs.variables["qualified_business_income_deduction"] + ).formula + person_entity = tbs.variables["qualified_business_income"].entity + + class FakeTaxUnit: + def __init__(self): + self.members = None + + def __call__(self, variable, period): + values = { + "taxable_income_less_qbid": np.array([100_000.0]), + "filing_status": np.array(["SINGLE"], dtype=object), + "adjusted_net_capital_gain": np.array([0.0]), + } + return values[variable] + + def sum(self, values): + return np.asarray(values) + + class FakePerson: + def __init__(self): + self.entity = type( + "FakeEntity", + (), + { + "is_person": True, + "key": person_entity.key, + "plural": person_entity.plural, + "get_variable": staticmethod(tbs.get_variable), + }, + )() + self.tax_unit = FakeTaxUnit() + self.values = { + "self_employment_income": np.array([0.0]), + "self_employment_income_would_be_qualified": np.array([True]), + "partnership_s_corp_income": np.array([0.0]), + "partnership_s_corp_income_would_be_qualified": np.array([True]), + "farm_rent_income": np.array([0.0]), + "farm_rent_income_would_be_qualified": np.array([True]), + "farm_operations_income": np.array([0.0]), + "farm_operations_income_would_be_qualified": np.array([True]), + "rental_income": np.array([0.0]), + "rental_income_would_be_qualified": np.array([True]), + "estate_income": np.array([0.0]), + "estate_income_would_be_qualified": np.array([True]), + "sstb_self_employment_income": np.array([100_000.0]), + "sstb_self_employment_income_would_be_qualified": np.array([True]), + "self_employment_tax_ald_person": np.array([0.0]), + "self_employed_health_insurance_ald_person": np.array([0.0]), + "self_employed_pension_contribution_ald_person": np.array([0.0]), + "business_is_sstb": np.array([True]), + "w2_wages_from_qualified_business": np.array([0.0]), + "sstb_w2_wages_from_qualified_business": np.array([0.0]), + "unadjusted_basis_qualified_property": np.array([0.0]), + "sstb_unadjusted_basis_qualified_property": np.array([0.0]), + "qualified_reit_and_ptp_income": np.array([0.0]), + } + + def __call__(self, variable, period, *args, **kwargs): + return self.values[variable] + + person = FakePerson() + qualified_business_income = qbi_formula(person, 2024, params) + sstb_qualified_business_income = sstb_qbi_formula(person, 2024, params) + person.values["qualified_business_income"] = qualified_business_income + person.values["sstb_qualified_business_income"] = sstb_qualified_business_income + qbid_amount = qbid_formula(person, 2024, params) + person.values["qbid_amount"] = qbid_amount + person.tax_unit.members = person + qualified_business_income_deduction = deduction_formula( + person.tax_unit, 2024, params + ) + + np.testing.assert_allclose(qualified_business_income, np.array([100_000.0])) + np.testing.assert_allclose(sstb_qualified_business_income, np.array([100_000.0])) + np.testing.assert_allclose(qbid_amount, np.array([20_000.0])) + np.testing.assert_allclose( + qualified_business_income_deduction, np.array([20_000.0]) + ) + + +def test_policyengine_us_compat_qbid_keeps_non_sstb_qbi_when_sstb_is_negative(): + from policyengine_us import CountryTaxBenefitSystem + + tbs = CountryTaxBenefitSystem() + params = tbs.parameters + qbi_formula = type(tbs.variables["qualified_business_income"]).formula + sstb_qbi_formula = type(tbs.variables["sstb_qualified_business_income"]).formula + qbid_formula = type(tbs.variables["qbid_amount"]).formula + deduction_formula = type( + tbs.variables["qualified_business_income_deduction"] + ).formula + person_entity = tbs.variables["qualified_business_income"].entity + + class FakeTaxUnit: + def __init__(self): + self.members = None + + def __call__(self, variable, period): + values = { + "taxable_income_less_qbid": np.array([100_000.0]), + "filing_status": np.array(["SINGLE"], dtype=object), + "adjusted_net_capital_gain": np.array([0.0]), + } + return values[variable] + + def sum(self, values): + return np.asarray(values) + + class FakePerson: + def __init__(self): + self.entity = type( + "FakeEntity", + (), + { + "is_person": True, + "key": person_entity.key, + "plural": person_entity.plural, + "get_variable": staticmethod(tbs.get_variable), + }, + )() + self.tax_unit = FakeTaxUnit() + self.values = { + "self_employment_income": np.array([100.0]), + "self_employment_income_would_be_qualified": np.array([True]), + "partnership_s_corp_income": np.array([0.0]), + "partnership_s_corp_income_would_be_qualified": np.array([True]), + "farm_rent_income": np.array([0.0]), + "farm_rent_income_would_be_qualified": np.array([True]), + "farm_operations_income": np.array([0.0]), + "farm_operations_income_would_be_qualified": np.array([True]), + "rental_income": np.array([0.0]), + "rental_income_would_be_qualified": np.array([True]), + "estate_income": np.array([0.0]), + "estate_income_would_be_qualified": np.array([True]), + "sstb_self_employment_income": np.array([-50.0]), + "sstb_self_employment_income_would_be_qualified": np.array([True]), + "self_employment_tax_ald_person": np.array([0.0]), + "self_employed_health_insurance_ald_person": np.array([0.0]), + "self_employed_pension_contribution_ald_person": np.array([0.0]), + "business_is_sstb": np.array([False]), + "w2_wages_from_qualified_business": np.array([0.0]), + "sstb_w2_wages_from_qualified_business": np.array([0.0]), + "unadjusted_basis_qualified_property": np.array([0.0]), + "sstb_unadjusted_basis_qualified_property": np.array([0.0]), + "qualified_reit_and_ptp_income": np.array([0.0]), + } + + def __call__(self, variable, period, *args, **kwargs): + return self.values[variable] + + person = FakePerson() + qualified_business_income = qbi_formula(person, 2024, params) + sstb_qualified_business_income = sstb_qbi_formula(person, 2024, params) + person.values["qualified_business_income"] = qualified_business_income + person.values["sstb_qualified_business_income"] = sstb_qualified_business_income + qbid_amount = qbid_formula(person, 2024, params) + person.values["qbid_amount"] = qbid_amount + person.tax_unit.members = person + qualified_business_income_deduction = deduction_formula( + person.tax_unit, 2024, params + ) + + np.testing.assert_allclose(qualified_business_income, np.array([50.0])) + np.testing.assert_allclose(sstb_qualified_business_income, np.array([0.0])) + np.testing.assert_allclose(qbid_amount, np.array([20.0])) + np.testing.assert_allclose(qualified_business_income_deduction, np.array([20.0])) diff --git a/tests/unit/test_soi_utils.py b/tests/unit/test_soi_utils.py index da9a1f2d..28dd7166 100644 --- a/tests/unit/test_soi_utils.py +++ b/tests/unit/test_soi_utils.py @@ -5,12 +5,33 @@ import numpy as np import pandas as pd +import pytest REPO_ROOT = Path(__file__).resolve().parent.parent.parent PACKAGE_ROOT = REPO_ROOT / "policyengine_us_data" +@pytest.fixture(autouse=True) +def restore_policyengine_us_data_modules(): + module_names = [ + "policyengine_us_data", + "policyengine_us_data.utils", + "policyengine_us_data.storage", + "policyengine_us_data.utils.uprating", + "policyengine_us_data.utils.soi", + ] + original_modules = {name: sys.modules.get(name) for name in module_names} + + yield + + for name, module in original_modules.items(): + if module is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = module + + def load_soi_module(): for name in [ "policyengine_us_data.utils.soi", @@ -68,6 +89,39 @@ def test_get_soi_includes_mortgage_interest_deduction_targets(): assert mortgage_interest["Value"].gt(0).all() +def test_pe_to_soi_combines_sstb_and_non_sstb_schedule_c(monkeypatch): + soi_module = load_soi_module() + n = 2 + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = dataset + self.default_calculation_period = None + + def calculate(self, variable, map_to=None): + values = { + "self_employment_income": np.array([100.0, -10.0]), + "sstb_self_employment_income": np.array([50.0, -25.0]), + "filing_status": np.array(["SINGLE", "SINGLE"]), + "tax_unit_weight": np.ones(n), + "household_id": np.arange(1, n + 1), + } + return values.get(variable, np.zeros(n)) + + fake_policyengine_us = types.ModuleType("policyengine_us") + fake_policyengine_us.Microsimulation = FakeMicrosimulation + monkeypatch.setitem(sys.modules, "policyengine_us", fake_policyengine_us) + + soi = soi_module.pe_to_soi(object(), 2024) + + np.testing.assert_array_equal( + soi["business_net_profits"].to_numpy(), np.array([150.0, 0.0]) + ) + np.testing.assert_array_equal( + soi["business_net_losses"].to_numpy(), np.array([0.0, 35.0]) + ) + + def test_get_soi_uses_best_available_year_per_variable(monkeypatch): soi_module = load_soi_module() fake_soi = pd.DataFrame(