diff --git a/changelog_entry.yaml b/changelog_entry.yaml index e69de29b..a48a9413 100644 --- a/changelog_entry.yaml +++ b/changelog_entry.yaml @@ -0,0 +1,5 @@ +- bump: minor + changes: + added: + - weeks_unemployed variable from CPS ASEC LKWEEKS + - QRF-based imputation of weeks_unemployed for Extended CPS PUF copy diff --git a/policyengine_us_data/datasets/cps/census_cps.py b/policyengine_us_data/datasets/cps/census_cps.py index c5941c37..00ca020e 100644 --- a/policyengine_us_data/datasets/cps/census_cps.py +++ b/policyengine_us_data/datasets/cps/census_cps.py @@ -261,6 +261,7 @@ class CensusCPS_2018(CensusCPS): "RNT_VAL", "SS_VAL", "UC_VAL", + "LKWEEKS", # Weeks looking for work during the year (Census variable) "ANN_VAL", "PNSN_VAL", "OI_OFF", diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index b1a060e9..15e8733b 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -423,6 +423,10 @@ def add_personal_income_variables( cps["social_security_retirement"] ) cps["unemployment_compensation"] = person.UC_VAL + # Weeks looking for work during the year (Census variable LKWEEKS) + # LKWEEKS: -1 = NIU (Not In Universe), 0 = not looking, 1-52 = weeks + weeks_raw = person.LKWEEKS + cps["weeks_unemployed"] = np.where(weeks_raw == -1, 0, weeks_raw) # Add pensions and annuities. cps_pensions = person.PNSN_VAL + person.ANN_VAL # Assume a constant fraction of pension income is taxable. diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index b9f2c81a..f96384a6 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -176,6 +176,12 @@ def generate(self): data = cps_sim.dataset.load_dataset() new_data = {} + # Pre-compute weeks_unemployed imputation for PUF copy + # Preserve relationship between UC and weeks from CPS + puf_weeks_unemployed = impute_weeks_unemployed_for_puf( + cps_sim, y_full_imputations + ) + for variable in list(data) + IMPUTED_VARIABLES: variable_metadata = cps_sim.tax_benefit_system.variables.get( variable @@ -206,6 +212,9 @@ def generate(self): values = np.concatenate([values, values + values.max()]) elif "_weight" in variable: values = np.concatenate([values, values * 0]) + elif variable == "weeks_unemployed": + # Use imputed weeks for PUF copy to preserve UC relationship + values = np.concatenate([values, puf_weeks_unemployed]) else: values = np.concatenate([values, values]) new_data[variable] = { @@ -320,6 +329,106 @@ def impute_income_variables( return result +def impute_weeks_unemployed_for_puf(cps_sim, puf_imputations): + """ + Impute weeks_unemployed for the PUF copy using QRF from CPS data. + + Uses microimpute's Quantile Random Forest to impute weeks_unemployed + for PUF records based on CPS data, preserving the joint distribution + of weeks with UC, age, and other predictors. + + This is the reverse of the income imputation (CPS → PUF instead of + PUF → CPS) because weeks_unemployed exists in CPS but not in PUF. + """ + # Get CPS weeks + try: + cps_weeks = cps_sim.calculate("weeks_unemployed").values + except (ValueError, KeyError): + logging.warning( + "weeks_unemployed not available in CPS, " + "returning zeros for PUF copy" + ) + n_persons = len(puf_imputations.index) + return np.zeros(n_persons) + + # Predictors available in both CPS and imputed PUF data + WEEKS_PREDICTORS = [ + "age", + "is_male", + "tax_unit_is_joint", + "is_tax_unit_head", + "is_tax_unit_spouse", + "is_tax_unit_dependent", + ] + + # Build training data from CPS + X_train = cps_sim.calculate_dataframe(WEEKS_PREDICTORS) + X_train["weeks_unemployed"] = cps_weeks + + # Add UC as predictor if available in imputations (strong predictor) + if "taxable_unemployment_compensation" in puf_imputations.columns: + cps_uc = cps_sim.calculate("unemployment_compensation").values + X_train["unemployment_compensation"] = cps_uc + WEEKS_PREDICTORS = WEEKS_PREDICTORS + ["unemployment_compensation"] + + # Build test data for PUF copy + # Use CPS sim to get demographics (same as CPS portion) + X_test = cps_sim.calculate_dataframe( + [p for p in WEEKS_PREDICTORS if p != "unemployment_compensation"] + ) + + # Add imputed UC if available + if "taxable_unemployment_compensation" in puf_imputations.columns: + X_test["unemployment_compensation"] = puf_imputations[ + "taxable_unemployment_compensation" + ].values + + logging.info( + f"Imputing weeks_unemployed using QRF with " + f"predictors: {WEEKS_PREDICTORS}" + ) + + # Use QRF to impute weeks + qrf = QRF( + log_level="INFO", + memory_efficient=True, + ) + + # Sample training data for efficiency + sample_size = min(5000, len(X_train)) + if len(X_train) > sample_size: + X_train_sampled = X_train.sample(n=sample_size, random_state=42) + else: + X_train_sampled = X_train + + fitted_model = qrf.fit( + X_train=X_train_sampled, + predictors=WEEKS_PREDICTORS, + imputed_variables=["weeks_unemployed"], + n_jobs=1, + ) + + predictions = fitted_model.predict(X_test=X_test) + imputed_weeks = predictions["weeks_unemployed"].values + + # Enforce constraints: 0-52 weeks, 0 if no UC + imputed_weeks = np.clip(imputed_weeks, 0, 52) + if "unemployment_compensation" in X_test.columns: + imputed_weeks = np.where( + X_test["unemployment_compensation"].values > 0, + imputed_weeks, + 0, + ) + + logging.info( + f"Imputed weeks_unemployed for PUF: " + f"{(imputed_weeks > 0).sum()} with weeks > 0, " + f"mean = {imputed_weeks[imputed_weeks > 0].mean():.1f} weeks" + ) + + return imputed_weeks + + class ExtendedCPS_2024(ExtendedCPS): cps = CPS_2024_Full puf = PUF_2024 diff --git a/tests/test_weeks_unemployed.py b/tests/test_weeks_unemployed.py new file mode 100644 index 00000000..2a230de2 --- /dev/null +++ b/tests/test_weeks_unemployed.py @@ -0,0 +1,86 @@ +""" +Tests for weeks_unemployed variable extraction from CPS ASEC. + +The Census CPS ASEC uses LKWEEKS (not IPUMS's WKSUNEM1) for weeks looking for work. +""" + +import numpy as np +from pathlib import Path + + +class TestWeeksUnemployed: + """Test suite for weeks_unemployed variable.""" + + def test_lkweeks_in_person_columns(self): + """Test that LKWEEKS is in PERSON_COLUMNS, not WKSUNEM.""" + # Read the source file directly to check column names + census_cps_path = Path(__file__).parent.parent / ( + "policyengine_us_data/datasets/cps/census_cps.py" + ) + content = census_cps_path.read_text() + + # Check for correct variable + assert '"LKWEEKS"' in content, "LKWEEKS should be in PERSON_COLUMNS" + assert ( + '"WKSUNEM"' not in content + ), "WKSUNEM should not be in PERSON_COLUMNS (Census uses LKWEEKS)" + + def test_cps_uses_lkweeks(self): + """Test that cps.py uses LKWEEKS, not WKSUNEM.""" + cps_path = Path(__file__).parent.parent / ( + "policyengine_us_data/datasets/cps/cps.py" + ) + content = cps_path.read_text() + + # Check for correct variable reference + assert "LKWEEKS" in content, "cps.py should reference LKWEEKS" + assert "WKSUNEM" not in content, "cps.py should not reference WKSUNEM" + + def test_weeks_unemployed_value_range(self): + """Test that weeks_unemployed values are in valid range (0-52).""" + # LKWEEKS values: 0 = not unemployed, 1-52 = weeks, -1 = NIU + # After processing, should be 0-52 (NIU mapped to 0) + + raw_values = np.array([-1, 0, 1, 26, 52, -1]) + processed = np.where(raw_values == -1, 0, raw_values) + + assert processed.min() >= 0, "Minimum should be >= 0" + assert processed.max() <= 52, "Maximum should be <= 52" + assert processed[0] == 0, "NIU (-1) should map to 0" + assert processed[1] == 0, "Not unemployed (0) should stay 0" + assert processed[3] == 26, "26 weeks should stay 26" + + def test_puf_weeks_imputation_constraints(self): + """Test the weeks imputation constraints for PUF copy.""" + # The QRF-based imputation should respect these constraints: + # 1. weeks should be in [0, 52] + # 2. weeks should be 0 when UC is 0 + + # Test constraint enforcement + raw_imputed = np.array([-5, 0, 25, 60, 100]) + uc_values = np.array([100, 0, 5000, 10000, 0]) + + # Apply constraints like the function does + constrained = np.clip(raw_imputed, 0, 52) + constrained = np.where(uc_values > 0, constrained, 0) + + assert constrained.min() >= 0, "Should be non-negative" + assert constrained.max() <= 52, "Should be capped at 52 weeks" + assert constrained[1] == 0, "No UC should mean 0 weeks" + assert constrained[4] == 0, "No UC should mean 0 weeks" + assert constrained[2] == 25, "Valid weeks with UC should be preserved" + + def test_extended_cps_handles_weeks_unemployed(self): + """Test that extended_cps.py has special handling for weeks_unemployed.""" + ecps_path = Path(__file__).parent.parent / ( + "policyengine_us_data/datasets/cps/extended_cps.py" + ) + content = ecps_path.read_text() + + # Check for weeks_unemployed handling + assert ( + "weeks_unemployed" in content + ), "extended_cps.py should handle weeks_unemployed" + assert ( + "impute_weeks_unemployed_for_puf" in content + ), "Should have imputation function for PUF weeks"