Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ data: download
python policyengine_us_data/datasets/cps/extended_cps.py
python policyengine_us_data/datasets/cps/enhanced_cps.py
python policyengine_us_data/datasets/cps/small_enhanced_cps.py
python policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py 10500
python policyengine_us_data/datasets/cps/local_area_calibration/create_stratified_cps.py 12000 --top=99.5 --seed=3526

publish-local-area:
python policyengine_us_data/datasets/cps/local_area_calibration/publish_local_area.py
Expand Down
3 changes: 3 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- date: 2026-02-02
type: fixed
description: Fix stale calibration targets by deriving time_period from dataset across all ETL scripts, using income_tax_positive for CBO calibration, and adding 119th Congress district code support for consistent redistricting alignment
1 change: 1 addition & 0 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def add_takeup(self):
data["snap_take_up_seed"] = generator.random(len(data["spm_unit_id"]))
data["aca_take_up_seed"] = generator.random(len(data["tax_unit_id"]))
data["medicaid_take_up_seed"] = generator.random(len(data["person_id"]))
data["ssi_resource_test_seed"] = generator.random(len(data["person_id"]))

self.save_dataset(data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,39 +252,24 @@ def get_pseudo_input_variables(sim) -> set:
"""
Identify pseudo-input variables that should NOT be saved to H5 files.

A pseudo-input is a variable that:
- Appears in sim.input_variables (has stored values)
- Has 'adds' or 'subtracts' attribute
- At least one component has a formula (is calculated)

These variables have stale pre-computed values that corrupt calculations
when reloaded, because the stored value overrides the formula.
NOTE: This function currently returns an empty set. The original logic
excluded variables with 'adds' or 'subtracts' attributes, but analysis
showed that in CPS data, these variables contain authoritative stored
data that does NOT match their component variables:

- pre_tax_contributions: components are all 0, aggregate has imputed values
- tax_exempt_pension_income: aggregate has 135M, components only 20M
- taxable_pension_income: aggregate has 82M, components only 29M
- interest_deduction: aggregate has 41M, components are 0

The 'adds' attribute defines how to CALCULATE these values, but in CPS
data the stored values are the authoritative source. Excluding them and
recalculating from components produces incorrect results.

For geo-stacking, entity ID reindexing preserves within-entity
relationships, so aggregation within a person or tax_unit remains valid.
"""
tbs = sim.tax_benefit_system
pseudo_inputs = set()

for var_name in sim.input_variables:
var = tbs.variables.get(var_name)
if not var:
continue

adds = getattr(var, "adds", None)
if adds and isinstance(adds, list):
for component in adds:
comp_var = tbs.variables.get(component)
if comp_var and len(getattr(comp_var, "formulas", {})) > 0:
pseudo_inputs.add(var_name)
break

subtracts = getattr(var, "subtracts", None)
if subtracts and isinstance(subtracts, list):
for component in subtracts:
comp_var = tbs.variables.get(component)
if comp_var and len(getattr(comp_var, "formulas", {})) > 0:
pseudo_inputs.add(var_name)
break

return pseudo_inputs
return set()


def apply_op(values: np.ndarray, op: str, val: str) -> np.ndarray:
Expand Down
28 changes: 26 additions & 2 deletions policyengine_us_data/db/create_initial_strata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import logging
from typing import Dict

Expand All @@ -6,6 +7,8 @@
from sqlmodel import Session, create_engine

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"
from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand Down Expand Up @@ -68,6 +71,28 @@ def fetch_congressional_districts(year):


def main():
parser = argparse.ArgumentParser(
description="Create initial geographic strata for calibration"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for Census API calls is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")

# State FIPS to name/abbreviation mapping
STATE_NAMES = {
1: "Alabama (AL)",
Expand Down Expand Up @@ -123,8 +148,7 @@ def main():
56: "Wyoming (WY)",
}

# Fetch congressional district data for year 2023
year = 2023
# Fetch congressional district data
cd_df = fetch_congressional_districts(year)

DATABASE_URL = (
Expand Down
39 changes: 35 additions & 4 deletions policyengine_us_data/db/etl_age.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import argparse

import pandas as pd
import numpy as np
from sqlmodel import Session, create_engine, select

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand Down Expand Up @@ -66,9 +70,12 @@ def transform_age_data(age_data, docs):
df = df.rename({"GEO_ID": "ucgid_str"}, axis=1)
df_data = df.rename(columns=rename_mapping)[["ucgid_str"] + list(AGE_COLS)]

# Filter out Puerto Rico's district and state records, if needed
# Filter out Puerto Rico's district and state records
# 5001800US7298 = 118th Congress, 5001900US7298 = 119th Congress
df_geos = df_data[
~df_data["ucgid_str"].isin(["5001800US7298", "0400000US72"])
~df_data["ucgid_str"].isin(
["5001800US7298", "5001900US7298", "0400000US72"]
)
].copy()

df = df_geos[["ucgid_str"] + AGE_COLS]
Expand Down Expand Up @@ -279,10 +286,30 @@ def load_age_data(df_long, geo, year):
session.commit()


if __name__ == "__main__":
def main():
parser = argparse.ArgumentParser(
description="ETL for age calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for Census API calls is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")

# --- ETL: Extract, Transform, Load ----
year = 2023

# ---- Extract ----------
docs = get_census_docs(year)
Expand All @@ -301,3 +328,7 @@ def load_age_data(df_long, geo, year):
load_age_data(long_national_df, "National", year)
load_age_data(long_state_df, "State", year)
load_age_data(long_district_df, "District", year)


if __name__ == "__main__":
main()
43 changes: 40 additions & 3 deletions policyengine_us_data/db/etl_irs_soi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import logging
from typing import Optional

Expand All @@ -7,6 +8,11 @@
from sqlmodel import Session, create_engine, select

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

# IRS SOI data is typically available ~2 years after the tax year
IRS_SOI_LAG_YEARS = 2
from policyengine_us_data.utils.raw_cache import (
is_cached,
cache_path,
Expand Down Expand Up @@ -1207,9 +1213,40 @@ def load_soi_data(long_dfs, year):


def main():
# NOTE: predates the finalization of the 2020 Census redistricting
# and there is district mapping in the Transform step
year = 2022
parser = argparse.ArgumentParser(
description="ETL for IRS SOI calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for IRS SOI data is derived from the dataset's "
"default_calculation_period minus IRS_SOI_LAG_YEARS. "
"Default: %(default)s"
),
)
parser.add_argument(
"--lag",
type=int,
default=IRS_SOI_LAG_YEARS,
help=(
"Years to subtract from dataset year for IRS SOI data "
"(default: %(default)s, since IRS data is ~2 years behind)"
),
)
args = parser.parse_args()

# Derive year from dataset with lag applied
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
dataset_year = int(sim.default_calculation_period)
year = dataset_year - args.lag
print(
f"Dataset year: {dataset_year}, IRS SOI year: {year} (lag={args.lag})"
)

# Extract -----------------------
raw_df = extract_soi_data()
Expand Down
25 changes: 24 additions & 1 deletion policyengine_us_data/db/etl_medicaid.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import logging

import requests
Expand All @@ -7,6 +8,8 @@

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand Down Expand Up @@ -325,7 +328,27 @@ def load_medicaid_data(long_state, long_cd, year):


def main():
year = 2024
parser = argparse.ArgumentParser(
description="ETL for Medicaid calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for targets is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")

# Extract ------------------------------
state_admin_df = extract_administrative_medicaid_data(year)
Expand Down
Loading