diff --git a/changelog.d/codex-id-primitives.fixed.md b/changelog.d/codex-id-primitives.fixed.md new file mode 100644 index 000000000..ab2a0fe04 --- /dev/null +++ b/changelog.d/codex-id-primitives.fixed.md @@ -0,0 +1 @@ +Added `taxpayer_id_type` and `has_valid_ssn` to Enhanced CPS outputs, with `has_tin` and temporary `has_itin` compatibility fields from a conservative TIN imputation instead of direct legal-status proxy mapping. diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index b6a7f40a6..4e664ba7f 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -1706,7 +1706,7 @@ def get_arrival_year_midpoint(peinusyr): # CONVERT TO STRING LABELS AND STORE # ============================================================================ - _store_identification_variables(cps, ssn_card_type) + _store_identification_variables(cps, person, ssn_card_type, time_period) # Final population summary print(f"\nFinal populations:") diff --git a/policyengine_us_data/utils/identification.py b/policyengine_us_data/utils/identification.py index 69558f068..fecb5e891 100644 --- a/policyengine_us_data/utils/identification.py +++ b/policyengine_us_data/utils/identification.py @@ -2,6 +2,12 @@ import pandas as pd +NON_SSN_FILER_TIN_TARGET_BY_YEAR = { + # Latest available public IRS/TAS figure: about 3.8M TY 2023 returns + # included an ITIN. Use it as a recent proxy for non-SSN filer TINs. + 2024: 3.8e6, +} + SSN_CARD_TYPE_CODE_TO_STR = { 0: "NONE", 1: "CITIZEN", @@ -10,17 +16,241 @@ } -def _derive_has_tin_from_ssn_card_type_codes(ssn_card_type: np.ndarray) -> np.ndarray: - """Return whether a person has any taxpayer ID from CPS ID status codes.""" - return np.asarray(ssn_card_type) != 0 +def _derive_has_valid_ssn_from_ssn_card_type_codes( + ssn_card_type: np.ndarray, +) -> np.ndarray: + """Return direct valid-SSN evidence from CPS ID status codes.""" + ssn_card_type = np.asarray(ssn_card_type) + return ssn_card_type == 1 + + +def _impute_has_valid_ssn(ssn_card_type: np.ndarray) -> np.ndarray: + """Impute valid SSNs without treating EAD or documented-status proxies as IDs.""" + return _derive_has_valid_ssn_from_ssn_card_type_codes(ssn_card_type) + + +def _derive_taxpayer_id_type_from_identification_flags( + has_valid_ssn: np.ndarray, + has_tin: np.ndarray, +) -> np.ndarray: + """Return statute-facing taxpayer ID classes from imputed ID flags.""" + return np.where( + has_valid_ssn, + "VALID_SSN", + np.where(has_tin, "OTHER_TIN", "NONE"), + ) + + +def _person_weights(cps: dict) -> np.ndarray: + """Return person weights from household IDs and weights.""" + household_to_weight = dict(zip(cps["household_id"], cps["household_weight"])) + return np.array( + [ + household_to_weight.get(household_id, 0) + for household_id in cps["person_household_id"] + ], + dtype=float, + ) + + +def _proxy_tax_unit_filers( + person_tax_unit_ids: np.ndarray, + age: np.ndarray, +) -> np.ndarray: + """Proxy tax-unit head/spouse as the two oldest adults in each tax unit.""" + person_tax_unit_ids = np.asarray(person_tax_unit_ids) + age = np.asarray(age) + adult = age >= 18 + ranks = pd.Series(np.inf, index=np.arange(len(age)), dtype=float) + if adult.any(): + adults = pd.DataFrame( + { + "tax_unit_id": person_tax_unit_ids[adult], + "age": age[adult], + }, + index=np.flatnonzero(adult), + ) + ranks.loc[adults.index] = adults.groupby("tax_unit_id")["age"].rank( + method="first", + ascending=False, + ) + return adult & (ranks.to_numpy() <= 2) + + +def _high_confidence_tin_evidence(person: pd.DataFrame) -> np.ndarray: + """Return strong non-tax ID evidence from CPS admin/payroll-linked signals.""" + social_security = ( + (person.SS_YN == 1) + | np.isin(person.RESNSS1, [1, 2, 3, 4, 5, 6, 7]) + | np.isin(person.RESNSS2, [1, 2, 3, 4, 5, 6, 7]) + ) + medicare = person.MCARE == 1 + federal_pension = np.isin(person.PEN_SC1, [3]) | np.isin(person.PEN_SC2, [3]) + government_worker = np.isin(person.PEIO1COW, [1, 2, 3]) | (person.A_MJOCC == 11) + military_link = (person.MIL == 1) | (person.PEAFEVER == 1) | (person.CHAMPVA == 1) + ssi = person.SSI_YN == 1 + return ( + social_security + | medicare + | federal_pension + | government_worker + | military_link + | ssi + ).to_numpy(dtype=bool) + + +def _aggregate_by_tax_unit( + values: np.ndarray, + tax_unit_index: np.ndarray, + n_tax_units: int, +) -> np.ndarray: + total = np.zeros(n_tax_units, dtype=float) + np.add.at(total, tax_unit_index, values) + return total -def _store_identification_variables(cps: dict, ssn_card_type: np.ndarray) -> None: +def _impute_has_tin( + cps: dict, + person: pd.DataFrame, + ssn_card_type: np.ndarray, + time_period: int, + non_ssn_filer_tin_target: float | None = None, + has_valid_ssn: np.ndarray | None = None, +) -> np.ndarray: + """Impute broad TIN possession without treating legal-status proxies as IDs.""" + ssn_card_type = np.asarray(ssn_card_type) + if has_valid_ssn is None: + has_valid_ssn = _impute_has_valid_ssn(ssn_card_type) + has_tin = has_valid_ssn.copy() + + high_confidence_tin = ~has_valid_ssn & _high_confidence_tin_evidence(person) + has_tin |= high_confidence_tin + + target = non_ssn_filer_tin_target + if target is None: + target = NON_SSN_FILER_TIN_TARGET_BY_YEAR.get(time_period) + if target is None or target <= 0: + return has_tin + + age = np.asarray(cps["age"]) + person_tax_unit_ids = np.asarray(cps["person_tax_unit_id"]) + tax_unit_ids, person_tax_unit_index = np.unique( + person_tax_unit_ids, + return_inverse=True, + ) + n_tax_units = len(tax_unit_ids) + person_weights = _person_weights(cps) + tax_unit_weights = np.zeros(n_tax_units, dtype=float) + np.maximum.at(tax_unit_weights, person_tax_unit_index, person_weights) + + proxy_filer = _proxy_tax_unit_filers(person_tax_unit_ids, age) + non_ssn_proxy_filer = proxy_filer & ~has_valid_ssn + + current_non_ssn_tin_units = np.zeros(n_tax_units, dtype=bool) + np.logical_or.at( + current_non_ssn_tin_units, + person_tax_unit_index, + non_ssn_proxy_filer & has_tin, + ) + current_weighted_units = tax_unit_weights[current_non_ssn_tin_units].sum() + additional_target = target - current_weighted_units + if additional_target <= 0: + return has_tin + + employment_income = np.asarray(cps.get("employment_income", np.zeros(len(age)))) + self_employment_income = np.asarray( + cps.get("self_employment_income", np.zeros(len(age))) + ) + prior_year_income = np.asarray( + cps.get("employment_income_last_year", np.zeros(len(age))) + ) + np.asarray(cps.get("self_employment_income_last_year", np.zeros(len(age)))) + + has_filing_income = ( + (employment_income > 0) | (self_employment_income > 0) | (prior_year_income > 0) + ) + candidate_person = ( + non_ssn_proxy_filer & ~has_tin & (ssn_card_type == 0) & has_filing_income + ) + candidate_units = np.zeros(n_tax_units, dtype=bool) + np.logical_or.at(candidate_units, person_tax_unit_index, candidate_person) + if not candidate_units.any(): + return has_tin + + unit_employment_income = _aggregate_by_tax_unit( + np.maximum(employment_income, 0), + person_tax_unit_index, + n_tax_units, + ) + unit_self_employment_income = _aggregate_by_tax_unit( + np.maximum(self_employment_income, 0), + person_tax_unit_index, + n_tax_units, + ) + unit_prior_year_income = _aggregate_by_tax_unit( + np.maximum(prior_year_income, 0), + person_tax_unit_index, + n_tax_units, + ) + unit_non_ssn_filer_count = _aggregate_by_tax_unit( + candidate_person.astype(float), + person_tax_unit_index, + n_tax_units, + ) + unit_has_minor = np.zeros(n_tax_units, dtype=bool) + np.logical_or.at(unit_has_minor, person_tax_unit_index, age < 18) + + score = ( + 4.0 * (unit_self_employment_income > 0) + + 2.0 * (unit_employment_income > 0) + + 1.0 * (unit_prior_year_income > 0) + + 1.0 * unit_has_minor + + 0.5 * (unit_non_ssn_filer_count > 1) + ) + + candidate_idx = np.flatnonzero(candidate_units) + rng = np.random.default_rng(seed=17_000 + int(time_period)) + priority = score[candidate_idx] + rng.random(len(candidate_idx)) * 0.01 + ordered_idx = candidate_idx[np.argsort(-priority)] + + selected_units = np.zeros(n_tax_units, dtype=bool) + cumulative_weight = 0.0 + for tax_unit_index in ordered_idx: + if cumulative_weight >= additional_target: + break + selected_units[tax_unit_index] = True + cumulative_weight += tax_unit_weights[tax_unit_index] + + selected_person_unit = selected_units[person_tax_unit_index] + selected_non_ssn_filers = selected_person_unit & non_ssn_proxy_filer + selected_minor_dependents = selected_person_unit & ~proxy_filer & (age < 18) + has_tin |= selected_non_ssn_filers | (selected_minor_dependents & ~has_valid_ssn) + return has_tin + + +def _store_identification_variables( + cps: dict, + person: pd.DataFrame, + ssn_card_type: np.ndarray, + time_period: int, +) -> None: """Persist identification inputs used by PolicyEngine US.""" - has_tin = _derive_has_tin_from_ssn_card_type_codes(ssn_card_type) + has_valid_ssn = _impute_has_valid_ssn(ssn_card_type) + has_tin = _impute_has_tin( + cps, + person, + ssn_card_type, + time_period, + has_valid_ssn=has_valid_ssn, + ) + taxpayer_id_type = _derive_taxpayer_id_type_from_identification_flags( + has_valid_ssn, + has_tin, + ) cps["ssn_card_type"] = ( pd.Series(ssn_card_type).map(SSN_CARD_TYPE_CODE_TO_STR).astype("S").values ) + cps["taxpayer_id_type"] = pd.Series(taxpayer_id_type).astype("S").values cps["has_tin"] = has_tin + cps["has_valid_ssn"] = has_valid_ssn # Temporary compatibility alias while policyengine-us users migrate. cps["has_itin"] = has_tin diff --git a/tests/integration/test_enhanced_cps.py b/tests/integration/test_enhanced_cps.py index 74c35def5..8faa87502 100644 --- a/tests/integration/test_enhanced_cps.py +++ b/tests/integration/test_enhanced_cps.py @@ -8,6 +8,16 @@ def _period_array(period_values, period): return period_values.get(period, period_values[str(period)]) +def _require_identification_fields(data): + required_fields = ("has_tin", "has_itin", "has_valid_ssn", "taxpayer_id_type") + missing = [field for field in required_fields if field not in data] + if missing: + pytest.skip( + "enhanced_cps_2024.h5 fixture predates raw identification fields: " + + ", ".join(missing) + ) + + @pytest.fixture(scope="module") def ecps_sim(): from policyengine_us_data.datasets.cps import EnhancedCPS_2024 @@ -226,12 +236,26 @@ def test_undocumented_matches_ssn_none(): def test_has_tin_matches_identification_inputs(ecps_sim): data = ecps_sim.dataset.load_dataset() + _require_identification_fields(data) has_tin = _period_array(data["has_tin"], 2024) has_itin = _period_array(data["has_itin"], 2024) + has_valid_ssn = _period_array(data["has_valid_ssn"], 2024) ssn_card_type = _period_array(data["ssn_card_type"], 2024).astype(str) + taxpayer_id_type = _period_array(data["taxpayer_id_type"], 2024).astype(str) np.testing.assert_array_equal(has_itin, has_tin) - np.testing.assert_array_equal(has_tin, ssn_card_type != "NONE") + np.testing.assert_array_equal(has_valid_ssn, taxpayer_id_type == "VALID_SSN") + np.testing.assert_array_equal(has_tin, taxpayer_id_type != "NONE") + assert np.all(has_tin[has_valid_ssn]) + np.testing.assert_array_equal(has_valid_ssn[ssn_card_type == "NONE"], False) + np.testing.assert_array_equal( + taxpayer_id_type, + np.where( + has_valid_ssn, + "VALID_SSN", + np.where(has_tin, "OTHER_TIN", "NONE"), + ), + ) def test_aca_calibration(): diff --git a/tests/integration/test_sparse_enhanced_cps.py b/tests/integration/test_sparse_enhanced_cps.py index 2d53c73b8..488dda666 100644 --- a/tests/integration/test_sparse_enhanced_cps.py +++ b/tests/integration/test_sparse_enhanced_cps.py @@ -21,6 +21,16 @@ def _period_array(period_values, period): return period_values.get(period, period_values[str(period)]) +def _require_identification_fields(data): + required_fields = ("has_tin", "has_itin", "has_valid_ssn", "taxpayer_id_type") + missing = [field for field in required_fields if field not in data] + if missing: + pytest.skip( + "enhanced_cps_2024.h5 fixture predates raw identification fields: " + + ", ".join(missing) + ) + + @pytest.fixture(scope="session") def data(): return Dataset.from_file(STORAGE_FOLDER / "enhanced_cps_2024.h5") @@ -210,12 +220,26 @@ def test_sparse_ssn_card_type_none_target(sim): def test_sparse_has_tin_matches_identification_inputs(sim): data = sim.dataset.load_dataset() + _require_identification_fields(data) has_tin = _period_array(data["has_tin"], 2024) has_itin = _period_array(data["has_itin"], 2024) + has_valid_ssn = _period_array(data["has_valid_ssn"], 2024) ssn_card_type = _period_array(data["ssn_card_type"], 2024).astype(str) + taxpayer_id_type = _period_array(data["taxpayer_id_type"], 2024).astype(str) np.testing.assert_array_equal(has_itin, has_tin) - np.testing.assert_array_equal(has_tin, ssn_card_type != "NONE") + np.testing.assert_array_equal(has_valid_ssn, taxpayer_id_type == "VALID_SSN") + np.testing.assert_array_equal(has_tin, taxpayer_id_type != "NONE") + assert np.all(has_tin[has_valid_ssn]) + np.testing.assert_array_equal(has_valid_ssn[ssn_card_type == "NONE"], False) + np.testing.assert_array_equal( + taxpayer_id_type, + np.where( + has_valid_ssn, + "VALID_SSN", + np.where(has_tin, "OTHER_TIN", "NONE"), + ), + ) def test_sparse_aca_calibration(sim): diff --git a/tests/unit/datasets/test_cps_identification.py b/tests/unit/datasets/test_cps_identification.py index 690aeeaa9..4777f6016 100644 --- a/tests/unit/datasets/test_cps_identification.py +++ b/tests/unit/datasets/test_cps_identification.py @@ -1,24 +1,200 @@ import numpy as np +import pandas as pd from policyengine_us_data.utils.identification import ( - _derive_has_tin_from_ssn_card_type_codes, + _derive_has_valid_ssn_from_ssn_card_type_codes, + _derive_taxpayer_id_type_from_identification_flags, + _high_confidence_tin_evidence, + _impute_has_tin, + _impute_has_valid_ssn, + _proxy_tax_unit_filers, _store_identification_variables, ) -def test_derive_has_tin_from_ssn_card_type_codes(): - result = _derive_has_tin_from_ssn_card_type_codes(np.array([0, 1, 2, 3])) +def _person_fixture(**overrides): + n = max((len(value) for value in overrides.values()), default=4) + defaults = { + "SS_YN": np.zeros(n, dtype=int), + "RESNSS1": np.zeros(n, dtype=int), + "RESNSS2": np.zeros(n, dtype=int), + "MCARE": np.zeros(n, dtype=int), + "PEN_SC1": np.zeros(n, dtype=int), + "PEN_SC2": np.zeros(n, dtype=int), + "PEIO1COW": np.zeros(n, dtype=int), + "A_MJOCC": np.zeros(n, dtype=int), + "MIL": np.zeros(n, dtype=int), + "PEAFEVER": np.zeros(n, dtype=int), + "CHAMPVA": np.zeros(n, dtype=int), + "SSI_YN": np.zeros(n, dtype=int), + "WSAL_VAL": np.zeros(n, dtype=int), + "SEMP_VAL": np.zeros(n, dtype=int), + } + defaults.update(overrides) + return pd.DataFrame(defaults) + + +def _cps_fixture( + *, + age, + tax_unit_ids, + weights=None, + employment_income=None, + self_employment_income=None, + prior_employment_income=None, + prior_self_employment_income=None, +): + n = len(age) + weights = np.ones(n) if weights is None else np.asarray(weights) + household_ids = np.arange(n) + return { + "age": np.asarray(age), + "person_tax_unit_id": np.asarray(tax_unit_ids), + "person_household_id": household_ids, + "household_id": household_ids, + "household_weight": weights, + "employment_income": ( + np.zeros(n) if employment_income is None else np.asarray(employment_income) + ), + "self_employment_income": ( + np.zeros(n) + if self_employment_income is None + else np.asarray(self_employment_income) + ), + "employment_income_last_year": ( + np.zeros(n) + if prior_employment_income is None + else np.asarray(prior_employment_income) + ), + "self_employment_income_last_year": ( + np.zeros(n) + if prior_self_employment_income is None + else np.asarray(prior_self_employment_income) + ), + } + + +def test_derive_has_valid_ssn_from_ssn_card_type_codes(): + result = _derive_has_valid_ssn_from_ssn_card_type_codes(np.array([0, 1, 2, 3])) np.testing.assert_array_equal( result, - np.array([False, True, True, True], dtype=bool), + np.array([False, True, False, False], dtype=bool), + ) + + +def test_impute_has_valid_ssn_does_not_treat_ead_proxy_as_direct_evidence(): + result = _impute_has_valid_ssn( + ssn_card_type=np.array([0, 1, 2, 3]), + ) + + np.testing.assert_array_equal(result, np.array([False, True, False, False])) + + +def test_derive_taxpayer_id_type_from_identification_flags(): + result = _derive_taxpayer_id_type_from_identification_flags( + has_valid_ssn=np.array([False, True, False]), + has_tin=np.array([False, True, True]), + ) + + assert result.tolist() == ["NONE", "VALID_SSN", "OTHER_TIN"] + + +def test_other_non_citizen_without_evidence_does_not_get_tin(): + person = _person_fixture() + cps = _cps_fixture(age=[40], tax_unit_ids=[1]) + + result = _impute_has_tin( + cps, + person.iloc[:1], + ssn_card_type=np.array([3]), + time_period=2024, + non_ssn_filer_tin_target=0, + ) + + np.testing.assert_array_equal(result, np.array([False])) + + +def test_tin_target_does_not_select_other_non_citizen_without_evidence(): + person = _person_fixture() + cps = _cps_fixture( + age=[40], + tax_unit_ids=[1], + self_employment_income=[5_000], + ) + + result = _impute_has_tin( + cps, + person.iloc[:1], + ssn_card_type=np.array([3]), + time_period=2024, + non_ssn_filer_tin_target=1, ) + np.testing.assert_array_equal(result, np.array([False])) + + +def test_high_confidence_admin_signal_gets_tin(): + person = _person_fixture(SS_YN=np.array([1, 0]), MCARE=np.array([0, 1])) + + result = _high_confidence_tin_evidence(person) + + np.testing.assert_array_equal(result, np.array([True, True])) + + +def test_medicaid_only_is_not_high_confidence_tin_evidence(): + person = _person_fixture() + person["CAID"] = np.array([1, 0, 0, 0]) -def test_store_identification_variables_writes_has_tin_and_alias(): - cps = {} + result = _high_confidence_tin_evidence(person) - _store_identification_variables(cps, np.array([0, 1, 2, 3])) + np.testing.assert_array_equal(result, np.zeros(4, dtype=bool)) + + +def test_proxy_tax_unit_filers_selects_two_oldest_adults(): + result = _proxy_tax_unit_filers( + person_tax_unit_ids=np.array([1, 1, 1, 2, 2]), + age=np.array([16, 40, 38, 12, 50]), + ) + + np.testing.assert_array_equal(result, np.array([False, True, True, False, True])) + + +def test_impute_has_tin_targets_likely_itin_filer_unit_and_minor_children(): + person = _person_fixture( + SS_YN=np.zeros(4, dtype=int), + MCARE=np.zeros(4, dtype=int), + ) + cps = _cps_fixture( + age=[40, 8, 40, 8], + tax_unit_ids=[1, 1, 2, 2], + self_employment_income=[5_000, 0, 0, 0], + ) + + result = _impute_has_tin( + cps, + person, + ssn_card_type=np.array([0, 0, 0, 0]), + time_period=2024, + non_ssn_filer_tin_target=1, + ) + + np.testing.assert_array_equal(result, np.array([True, True, False, False])) + + +def test_store_identification_variables_writes_id_primitives(): + person = _person_fixture() + cps = _cps_fixture( + age=[40, 40, 40, 40], + tax_unit_ids=[1, 2, 3, 4], + ) + + _store_identification_variables( + cps, + person, + np.array([0, 1, 2, 3]), + time_period=2023, + ) assert cps["ssn_card_type"].tolist() == [ b"NONE", @@ -26,8 +202,33 @@ def test_store_identification_variables_writes_has_tin_and_alias(): b"NON_CITIZEN_VALID_EAD", b"OTHER_NON_CITIZEN", ] + assert cps["taxpayer_id_type"].tolist() == [ + b"NONE", + b"VALID_SSN", + b"NONE", + b"NONE", + ] np.testing.assert_array_equal( cps["has_tin"], - np.array([False, True, True, True], dtype=bool), + np.array([False, True, False, False], dtype=bool), + ) + np.testing.assert_array_equal( + cps["has_valid_ssn"], + np.array([False, True, False, False], dtype=bool), ) np.testing.assert_array_equal(cps["has_itin"], cps["has_tin"]) + + +def test_store_does_not_treat_ead_with_earnings_as_valid_ssn(): + person = _person_fixture(WSAL_VAL=np.array([5_000])) + cps = _cps_fixture(age=[40], tax_unit_ids=[1], employment_income=[5_000]) + + _store_identification_variables( + cps, + person, + np.array([2]), + time_period=2023, + ) + + assert cps["taxpayer_id_type"].tolist() == [b"NONE"] + np.testing.assert_array_equal(cps["has_valid_ssn"], np.array([False]))