Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- date: 2026-02-02
type: fixed
description: Fix stale calibration targets by deriving time_period from dataset across all ETL scripts, using income_tax_positive for CBO calibration, and adding 119th Congress district code support for consistent redistricting alignment
13 changes: 11 additions & 2 deletions policyengine_us_data/datasets/cps/cps.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,20 @@ def add_takeup(self):
data["takes_up_dc_ptc"] = (
generator.random(len(data["tax_unit_id"])) < dc_ptc_takeup_rate
)
# Deterministic seed for medicaid: hash person_id to [0, 1) range
# Uses Knuth multiplicative hash for good distribution
# This ensures same person_id always yields same seed, making
# enrollment determination reproducible across dataset rebuilds
HASH_MULTIPLIER = 2654435761 # Knuth's constant
HASH_MODULUS = 2**32
data["medicaid_take_up_seed"] = (
(data["person_id"].astype(np.uint64) * HASH_MULTIPLIER) % HASH_MODULUS
) / HASH_MODULUS

# SNAP and ACA seeds remain random for now (template above for conversion)
generator = np.random.default_rng(seed=100)

data["snap_take_up_seed"] = generator.random(len(data["spm_unit_id"]))
data["aca_take_up_seed"] = generator.random(len(data["tax_unit_id"]))
data["medicaid_take_up_seed"] = generator.random(len(data["person_id"]))

self.save_dataset(data)

Expand Down
28 changes: 26 additions & 2 deletions policyengine_us_data/db/create_initial_strata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import logging
from typing import Dict

Expand All @@ -6,6 +7,8 @@
from sqlmodel import Session, create_engine

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"
from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand Down Expand Up @@ -68,6 +71,28 @@ def fetch_congressional_districts(year):


def main():
parser = argparse.ArgumentParser(
description="Create initial geographic strata for calibration"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for Census API calls is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")

# State FIPS to name/abbreviation mapping
STATE_NAMES = {
1: "Alabama (AL)",
Expand Down Expand Up @@ -123,8 +148,7 @@ def main():
56: "Wyoming (WY)",
}

# Fetch congressional district data for year 2023
year = 2023
# Fetch congressional district data
cd_df = fetch_congressional_districts(year)

DATABASE_URL = (
Expand Down
39 changes: 35 additions & 4 deletions policyengine_us_data/db/etl_age.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import argparse

import pandas as pd
import numpy as np
from sqlmodel import Session, create_engine, select

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand Down Expand Up @@ -66,9 +70,12 @@ def transform_age_data(age_data, docs):
df = df.rename({"GEO_ID": "ucgid_str"}, axis=1)
df_data = df.rename(columns=rename_mapping)[["ucgid_str"] + list(AGE_COLS)]

# Filter out Puerto Rico's district and state records, if needed
# Filter out Puerto Rico's district and state records
# 5001800US7298 = 118th Congress, 5001900US7298 = 119th Congress
df_geos = df_data[
~df_data["ucgid_str"].isin(["5001800US7298", "0400000US72"])
~df_data["ucgid_str"].isin(
["5001800US7298", "5001900US7298", "0400000US72"]
)
].copy()

df = df_geos[["ucgid_str"] + AGE_COLS]
Expand Down Expand Up @@ -279,10 +286,30 @@ def load_age_data(df_long, geo, year):
session.commit()


if __name__ == "__main__":
def main():
parser = argparse.ArgumentParser(
description="ETL for age calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for Census API calls is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")

# --- ETL: Extract, Transform, Load ----
year = 2023

# ---- Extract ----------
docs = get_census_docs(year)
Expand All @@ -301,3 +328,7 @@ def load_age_data(df_long, geo, year):
load_age_data(long_national_df, "National", year)
load_age_data(long_state_df, "State", year)
load_age_data(long_district_df, "District", year)


if __name__ == "__main__":
main()
43 changes: 40 additions & 3 deletions policyengine_us_data/db/etl_irs_soi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import logging
from typing import Optional

Expand All @@ -7,6 +8,11 @@
from sqlmodel import Session, create_engine, select

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

# IRS SOI data is typically available ~2 years after the tax year
IRS_SOI_LAG_YEARS = 2
from policyengine_us_data.utils.raw_cache import (
is_cached,
cache_path,
Expand Down Expand Up @@ -1207,9 +1213,40 @@ def load_soi_data(long_dfs, year):


def main():
# NOTE: predates the finalization of the 2020 Census redistricting
# and there is district mapping in the Transform step
year = 2022
parser = argparse.ArgumentParser(
description="ETL for IRS SOI calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for IRS SOI data is derived from the dataset's "
"default_calculation_period minus IRS_SOI_LAG_YEARS. "
"Default: %(default)s"
),
)
parser.add_argument(
"--lag",
type=int,
default=IRS_SOI_LAG_YEARS,
help=(
"Years to subtract from dataset year for IRS SOI data "
"(default: %(default)s, since IRS data is ~2 years behind)"
),
)
args = parser.parse_args()

# Derive year from dataset with lag applied
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
dataset_year = int(sim.default_calculation_period)
year = dataset_year - args.lag
print(
f"Dataset year: {dataset_year}, IRS SOI year: {year} (lag={args.lag})"
)

# Extract -----------------------
raw_df = extract_soi_data()
Expand Down
25 changes: 24 additions & 1 deletion policyengine_us_data/db/etl_medicaid.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import logging

import requests
Expand All @@ -7,6 +8,8 @@

from policyengine_us_data.storage import STORAGE_FOLDER

DEFAULT_DATASET = "hf://policyengine/policyengine-us-data/calibration/stratified_extended_cps.h5"

from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Expand Down Expand Up @@ -325,7 +328,27 @@ def load_medicaid_data(long_state, long_cd, year):


def main():
year = 2024
parser = argparse.ArgumentParser(
description="ETL for Medicaid calibration targets"
)
parser.add_argument(
"--dataset",
default=DEFAULT_DATASET,
help=(
"Source dataset (local path or HuggingFace URL). "
"The year for targets is derived from the dataset's "
"default_calculation_period. Default: %(default)s"
),
)
args = parser.parse_args()

# Derive year from dataset
from policyengine_us import Microsimulation

print(f"Loading dataset: {args.dataset}")
sim = Microsimulation(dataset=args.dataset)
year = int(sim.default_calculation_period)
print(f"Derived year from dataset: {year}")

# Extract ------------------------------
state_admin_df = extract_administrative_medicaid_data(year)
Expand Down
Loading
Loading