Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ETL pipeline for building and maintaining a PostgreSQL cache of Discogs release
6. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`)
7. **Import** filtered CSVs into PostgreSQL (`scripts/import_csv.py`)
8. **Create indexes** including accent-insensitive trigram GIN indexes (`schema/create_indexes.sql`)
9. **Deduplicate** by master_id (`scripts/dedup_releases.py`)
9. **Deduplicate** by master_id (`scripts/dedup_releases.py`) -- prefers US releases, then most tracks, then lowest ID
10. **Prune or Copy-to** -- one of:
- `--prune`: delete non-matching releases in place (~89% data reduction, 3 GB -> 340 MB)
- `--copy-to`/`--target-db-url`: copy matched releases to a separate database, preserving the full import
Expand All @@ -37,6 +37,8 @@ Step 1 (download) is always manual.

The `release` table includes a `master_id` column used during import and dedup. The dedup copy-swap strategy (`CREATE TABLE AS SELECT ...` without `master_id`) drops the column automatically. After dedup, `master_id` no longer exists in the schema.

The `country` column, by contrast, is permanent -- it is included in the dedup copy-swap SELECT list and persists in the final schema for consumers.

### Database Schema (Shared Contract)

The SQL files in `schema/` define the contract between this ETL pipeline and all consumers:
Expand Down Expand Up @@ -64,7 +66,7 @@ docker compose up db -d # just the database (for tests)
- `scripts/enrich_library_artists.py` -- Enrich artist list with WXYC cross-references (pymysql)
- `scripts/filter_csv.py` -- Filter Discogs CSVs to library artists
- `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY)
- `scripts/dedup_releases.py` -- Deduplicate releases by master_id (copy-swap with `DROP CASCADE`)
- `scripts/dedup_releases.py` -- Deduplicate releases by master_id, preferring US releases (copy-swap with `DROP CASCADE`)
- `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB
- `scripts/csv_to_tsv.py` -- CSV to TSV conversion utility
- `scripts/fix_csv_newlines.py` -- Fix multiline CSV fields
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ All 9 steps are automated by `run_pipeline.py` (or Docker Compose). The script s
| 4. Create schema | `schema/create_database.sql`, `schema/create_functions.sql` | Set up tables, extensions, and functions |
| 5. Import | `scripts/import_csv.py` | Bulk load CSVs via psycopg COPY |
| 6. Create indexes | `schema/create_indexes.sql` | Accent-insensitive trigram GIN indexes for fuzzy search |
| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (most tracks) |
| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (US first, then most tracks) |
| 8. Prune/Copy | `scripts/verify_cache.py` | Remove non-library releases or copy matches to target DB |
| 9. Vacuum | `VACUUM FULL` | Reclaim disk space |

Expand Down Expand Up @@ -193,7 +193,7 @@ The schema files in `schema/` define the shared contract between this ETL pipeli

| Table | Description |
|-------|-------------|
| `release` | Release metadata: id, title, release_year, artwork_url |
| `release` | Release metadata: id, title, release_year, country, artwork_url |
| `release_artist` | Artists on releases (main + extra credits) |
| `release_track` | Tracks on releases with position and duration |
| `release_track_artist` | Artists on specific tracks (for compilations) |
Expand Down
1 change: 1 addition & 0 deletions schema/create_database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS release (
id integer PRIMARY KEY,
title text NOT NULL,
release_year smallint,
country text,
artwork_url text,
master_id integer -- used by dedup, dropped after dedup copy-swap
);
Expand Down
5 changes: 3 additions & 2 deletions scripts/dedup_releases.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def ensure_dedup_ids(conn) -> int:
SELECT r.id, r.master_id,
ROW_NUMBER() OVER (
PARTITION BY r.master_id
ORDER BY tc.track_count DESC, r.id ASC
ORDER BY (r.country = 'US')::int DESC,
tc.track_count DESC, r.id ASC
) as rn
FROM release r
{track_count_join}
Expand Down Expand Up @@ -265,7 +266,7 @@ def main():
# Step 2: Copy each table (keeping only non-duplicate rows)
# Only base tables + cache_metadata (tracks are imported after dedup)
tables = [
("release", "new_release", "id, title, release_year, artwork_url", "id"),
("release", "new_release", "id, title, release_year, country, artwork_url", "id"),
("release_artist", "new_release_artist", "release_id, artist_name, extra", "release_id"),
("release_label", "new_release_label", "release_id, label_name", "release_id"),
(
Expand Down
4 changes: 2 additions & 2 deletions scripts/import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class TableConfig(TypedDict, total=False):
{
"csv_file": "release.csv",
"table": "release",
"csv_columns": ["id", "title", "released", "master_id"],
"db_columns": ["id", "title", "release_year", "master_id"],
"csv_columns": ["id", "title", "country", "released", "master_id"],
"db_columns": ["id", "title", "country", "release_year", "master_id"],
"required": ["id", "title"],
"transforms": {"released": extract_year},
},
Expand Down
2 changes: 1 addition & 1 deletion scripts/verify_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ async def prune_releases(conn: asyncpg.Connection, release_ids: set[int]) -> dic
# Tables and their columns to copy (post-dedup: no master_id).
# Each entry: (table_name, filter_column, columns_list)
COPY_TABLE_SPEC = [
("release", "id", ["id", "title", "release_year", "artwork_url"]),
("release", "id", ["id", "title", "release_year", "country", "artwork_url"]),
("release_artist", "release_id", ["release_id", "artist_name", "extra"]),
("release_label", "release_id", ["release_id", "label_name"]),
("release_track", "release_id", ["release_id", "sequence", "position", "title", "duration"]),
Expand Down
14 changes: 13 additions & 1 deletion tests/e2e/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_duplicates_removed(self) -> None:
"""Duplicate releases (same master_id) have been removed.

In the fixture data, releases 1001, 1002, 1003 share master_id 500.
After dedup, only release 1002 (5 tracks, the most) should remain.
After dedup, only release 1002 (US pressing) should remain.
"""
conn = self._connect()
with conn.cursor() as cur:
Expand Down Expand Up @@ -171,6 +171,18 @@ def test_master_id_column_absent(self) -> None:
conn.close()
assert result is None, "master_id column should not exist after dedup"

def test_country_column_present(self) -> None:
"""country column persists through the dedup copy-swap."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute(
"SELECT column_name FROM information_schema.columns "
"WHERE table_name = 'release' AND column_name = 'country'"
)
result = cur.fetchone()
conn.close()
assert result is not None, "country column should exist after dedup"

def test_indexes_exist(self) -> None:
"""Trigram indexes exist on the final database."""
conn = self._connect()
Expand Down
19 changes: 10 additions & 9 deletions tests/fixtures/create_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@ def create_release_csv() -> None:
]
rows = [
# Group 1: duplicate master_id 500 (Radiohead - OK Computer variants)
# Release 1001 has 3 tracks, 1002 has 5 tracks, 1003 has 1 track
# Dedup should keep 1002 (most tracks)
# Release 1001 has 5 tracks (UK), 1002 has 3 tracks (US), 1003 has 1 track (JP)
# Dedup should keep 1002 (US country preference beats higher track count)
[1001, "Accepted", "OK Computer", "UK", "1997-06-16", "", "Correct", 500, "CD"],
[1002, "Accepted", "OK Computer", "US", "1997-07-01", "", "Correct", 500, "Vinyl"],
[1003, "Accepted", "OK Computer", "JP", "1997", "", "Correct", 500, "Cassette"],
# Group 2: duplicate master_id 600 (Joy Division - Unknown Pleasures)
# Release 2001 has 2 tracks, 2002 has 4 tracks
# Release 2001 has 2 tracks (UK), 2002 has 4 tracks (DE) — no US release
# Dedup should keep 2002 (most tracks, fallback when no US release)
[2001, "Accepted", "Unknown Pleasures", "UK", "1979-06-15", "", "Correct", 600, "LP"],
[2002, "Accepted", "Unknown Pleasures", "US", "1979", "", "Correct", 600, "CD"],
[2002, "Accepted", "Unknown Pleasures", "DE", "1979", "", "Correct", 600, "CD"],
# No duplicate - unique master_id
[3001, "Accepted", "Kid A", "UK", "2000-10-02", "", "Correct", 700, "CD"],
# No master_id (should survive dedup)
Expand Down Expand Up @@ -125,22 +126,22 @@ def create_release_track_csv() -> None:
"""
headers = ["release_id", "sequence", "position", "title", "duration"]
rows = [
# Release 1001 (OK Computer UK CD) - 3 tracks
# Release 1001 (OK Computer UK CD) - 5 tracks (most tracks, but not US)
[1001, 1, "1", "Airbag", "4:44"],
[1001, 2, "2", "Paranoid Android", "6:23"],
[1001, 3, "3", "Subterranean Homesick Alien", "4:27"],
# Release 1002 (OK Computer US Vinyl) - 5 tracks (should win dedup)
[1001, 4, "4", "Exit Music (For a Film)", "4:24"],
[1001, 5, "5", "Let Down", "4:59"],
# Release 1002 (OK Computer US Vinyl) - 3 tracks (US wins despite fewer tracks)
[1002, 1, "A1", "Airbag", "4:44"],
[1002, 2, "A2", "Paranoid Android", "6:23"],
[1002, 3, "A3", "Subterranean Homesick Alien", "4:27"],
[1002, 4, "B1", "Exit Music (For a Film)", "4:24"],
[1002, 5, "B2", "Let Down", "4:59"],
# Release 1003 (OK Computer JP Cassette) - 1 track
[1003, 1, "1", "Airbag", "4:44"],
# Release 2001 (Unknown Pleasures UK LP) - 2 tracks
[2001, 1, "A1", "Disorder", "3:29"],
[2001, 2, "A2", "Day of the Lords", "4:48"],
# Release 2002 (Unknown Pleasures US CD) - 4 tracks (should win dedup)
# Release 2002 (Unknown Pleasures DE CD) - 4 tracks (wins by track count, no US)
[2002, 1, "1", "Disorder", "3:29"],
[2002, 2, "2", "Day of the Lords", "4:48"],
[2002, 3, "3", "Candidate", "3:05"],
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/csv/release.csv
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ id,status,title,country,released,notes,data_quality,master_id,format
1002,Accepted,OK Computer,US,1997-07-01,,Correct,500,Vinyl
1003,Accepted,OK Computer,JP,1997,,Correct,500,Cassette
2001,Accepted,Unknown Pleasures,UK,1979-06-15,,Correct,600,LP
2002,Accepted,Unknown Pleasures,US,1979,,Correct,600,CD
2002,Accepted,Unknown Pleasures,DE,1979,,Correct,600,CD
3001,Accepted,Kid A,UK,2000-10-02,,Correct,700,CD
4001,Accepted,Amnesiac,UK,2001-06-05,,Correct,,CD
5001,Accepted,Unknown Album,US,2020-01-01,,Correct,800,CD
Expand Down
34 changes: 17 additions & 17 deletions tests/fixtures/csv/release_label.csv
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
release_id,label,catno
1001,Parlophone,7243 8 55229 2 8
1001,Capitol Records,CDP 7243 8 55229 2 8
1002,Capitol Records,C1-55229
1003,EMI,TOCP-50201
2001,Factory Records,FACT 10
2002,Qwest Records,1-25840
3001,Parlophone,7243 5 27753 2 3
4001,Parlophone,7243 5 32764 2 8
5001,Unknown Label,UNK-001
5002,Mystery Records,MYS-002
6001,One Little Indian,TPLP 71 CD
8001,Sugar Hill Records,SH-542
9001,Apple Records,PCS 7088
9002,Columbia,KCS 9914
10001,Random Label,RL-001
10002,Obscure Label,OL-002
release_id,label,catno
1001,Parlophone,7243 8 55229 2 8
1001,Capitol Records,CDP 7243 8 55229 2 8
1002,Capitol Records,C1-55229
1003,EMI,TOCP-50201
2001,Factory Records,FACT 10
2002,Qwest Records,1-25840
3001,Parlophone,7243 5 27753 2 3
4001,Parlophone,7243 5 32764 2 8
5001,Unknown Label,UNK-001
5002,Mystery Records,MYS-002
6001,One Little Indian,TPLP 71 CD
8001,Sugar Hill Records,SH-542
9001,Apple Records,PCS 7088
9002,Columbia,KCS 9914
10001,Random Label,RL-001
10002,Obscure Label,OL-002
4 changes: 2 additions & 2 deletions tests/fixtures/csv/release_track.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ release_id,sequence,position,title,duration
1001,1,1,Airbag,4:44
1001,2,2,Paranoid Android,6:23
1001,3,3,Subterranean Homesick Alien,4:27
1001,4,4,Exit Music (For a Film),4:24
1001,5,5,Let Down,4:59
1002,1,A1,Airbag,4:44
1002,2,A2,Paranoid Android,6:23
1002,3,A3,Subterranean Homesick Alien,4:27
1002,4,B1,Exit Music (For a Film),4:24
1002,5,B2,Let Down,4:59
1003,1,1,Airbag,4:44
2001,1,A1,Disorder,3:29
2001,2,A2,Day of the Lords,4:48
Expand Down
Binary file modified tests/fixtures/library.db
Binary file not shown.
11 changes: 10 additions & 1 deletion tests/integration/test_copy_to_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,5 +339,14 @@ def test_target_no_master_id_column(self) -> None:
# Actually the target gets the full schema including master_id,
# but since we only copy id, title, release_year, artwork_url,
# master_id will be NULL. That's acceptable.
expected = {"id", "title", "release_year", "artwork_url"}
expected = {"id", "title", "release_year", "country", "artwork_url"}
assert expected.issubset(columns)

def test_target_country_data_copied(self) -> None:
"""Country data is actually present in the target (not just the column)."""
conn = psycopg.connect(self.target_url)
with conn.cursor() as cur:
cur.execute("SELECT country FROM release WHERE country IS NOT NULL LIMIT 1")
result = cur.fetchone()
conn.close()
assert result is not None, "No country data found in target releases"
37 changes: 33 additions & 4 deletions tests/integration/test_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _run_dedup(db_url: str) -> None:
if delete_count > 0:
# Only base tables + cache_metadata (no track tables)
tables = [
("release", "new_release", "id, title, release_year, artwork_url", "id"),
("release", "new_release", "id, title, release_year, country, artwork_url", "id"),
(
"release_artist",
"new_release_artist",
Expand Down Expand Up @@ -192,7 +192,7 @@ def _connect(self):
return psycopg.connect(self.db_url)

def test_correct_release_kept_for_master_500(self) -> None:
"""Release 1002 (5 tracks) kept over 1001 (3 tracks) and 1003 (1 track)."""
"""Release 1002 (US, 3 tracks) kept over 1001 (UK, 5 tracks) by country preference."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id")
Expand All @@ -201,7 +201,7 @@ def test_correct_release_kept_for_master_500(self) -> None:
assert ids == [1002]

def test_correct_release_kept_for_master_600(self) -> None:
"""Release 2002 (4 tracks) kept over 2001 (2 tracks)."""
"""Release 2002 (DE, 4 tracks) kept over 2001 (UK, 2 tracks) by track count fallback."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT id FROM release WHERE id IN (2001, 2002) ORDER BY id")
Expand Down Expand Up @@ -269,7 +269,36 @@ def test_kept_release_tracks_preserved(self) -> None:
cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002")
count = cur.fetchone()[0]
conn.close()
assert count == 5
assert count == 3

def test_country_column_preserved(self) -> None:
"""country column exists after dedup copy-swap and has the expected value."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT country FROM release WHERE id = 1002")
country = cur.fetchone()[0]
conn.close()
assert country == "US"

def test_us_preferred_over_track_count(self) -> None:
"""US release (1002, 3 tracks) kept over UK release (1001, 5 tracks).

Proves country preference is the deciding factor: the kept release has
fewer tracks than the removed one.
"""
conn = self._connect()
with conn.cursor() as cur:
# 1002 should be kept (US, 3 tracks)
cur.execute("SELECT count(*) FROM release WHERE id = 1002")
assert cur.fetchone()[0] == 1
# 1001 should be removed (UK, 5 tracks — more tracks but not US)
cur.execute("SELECT count(*) FROM release WHERE id = 1001")
assert cur.fetchone()[0] == 0
# Verify the kept release has fewer tracks (proving country was decisive)
cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002")
kept_tracks = cur.fetchone()[0]
conn.close()
assert kept_tracks == 3, "Kept US release should have 3 tracks (fewer than removed UK's 5)"

def test_master_id_column_dropped(self) -> None:
"""master_id column no longer exists after copy-swap (not in SELECT list)."""
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def test_correct_counts(self) -> None:
cur.execute("SELECT track_count FROM release_track_count WHERE release_id = 1002")
count = cur.fetchone()[0]
conn.close()
assert count == 5
assert count == 3

def test_track_tables_empty(self) -> None:
"""Base-only import should not populate track tables."""
Expand Down Expand Up @@ -396,13 +396,13 @@ def test_excluded_release_has_no_tracks(self) -> None:
assert count == 0

def test_included_release_has_correct_track_count(self) -> None:
"""Release 1002 should have all 5 tracks."""
"""Release 1002 should have all 3 tracks."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002")
count = cur.fetchone()[0]
conn.close()
assert count == 5
assert count == 3

def test_total_track_count(self) -> None:
"""Total tracks should be the sum for the filtered releases."""
Expand All @@ -411,5 +411,5 @@ def test_total_track_count(self) -> None:
cur.execute("SELECT count(*) FROM release_track")
count = cur.fetchone()[0]
conn.close()
# 1002: 5, 3001: 2, 4001: 2 = 9
assert count == 9
# 1002: 3, 3001: 2, 4001: 2 = 7
assert count == 7
2 changes: 1 addition & 1 deletion tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_all_tables_exist(self) -> None:
@pytest.mark.parametrize(
"table, expected_columns",
[
("release", {"id", "title", "release_year", "artwork_url", "master_id"}),
("release", {"id", "title", "release_year", "country", "artwork_url", "master_id"}),
("release_artist", {"release_id", "artist_name", "extra"}),
("release_label", {"release_id", "label_name"}),
("release_track", {"release_id", "sequence", "position", "title", "duration"}),
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_filter_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,4 @@ def test_filters_child_table(self, tmp_path: Path) -> None:
output_path = tmp_path / "release_track_filtered.csv"

_, output_count = filter_csv_file(input_path, output_path, matching_ids, "release_id")
assert output_count == 3 # Release 1001 has 3 tracks
assert output_count == 5 # Release 1001 has 5 tracks
10 changes: 8 additions & 2 deletions tests/unit/test_import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ def test_release_table_includes_master_id(self) -> None:
assert "master_id" in release_config["csv_columns"]
assert "master_id" in release_config["db_columns"]

def test_release_table_includes_country(self) -> None:
"""The release table must import country for US-preferred dedup ranking."""
release_config = next(t for t in TABLES if t["table"] == "release")
assert "country" in release_config["csv_columns"]
assert "country" in release_config["db_columns"]

def test_release_table_transforms_released_to_year(self) -> None:
"""The released field should be transformed via extract_year."""
release_config = next(t for t in TABLES if t["table"] == "release")
Expand Down Expand Up @@ -205,8 +211,8 @@ def test_counts_tracks_per_release(self) -> None:
"""Returns a dict mapping release_id -> track count."""
csv_path = CSV_DIR / "release_track.csv"
counts = count_tracks_from_csv(csv_path)
# Release 1002 has 5 tracks in the fixture data
assert counts[1002] == 5
# Release 1002 (US) has 3 tracks in the fixture data
assert counts[1002] == 3

def test_all_releases_counted(self) -> None:
"""Every release_id in the CSV has an entry."""
Expand Down