diff --git a/CLAUDE.md b/CLAUDE.md index 8dd0454..08609e7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -19,7 +19,7 @@ ETL pipeline for building and maintaining a PostgreSQL cache of Discogs release 6. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`) 7. **Import** filtered CSVs into PostgreSQL (`scripts/import_csv.py`) 8. **Create indexes** including accent-insensitive trigram GIN indexes (`schema/create_indexes.sql`) -9. **Deduplicate** by master_id (`scripts/dedup_releases.py`) +9. **Deduplicate** by master_id (`scripts/dedup_releases.py`) -- prefers US releases, then most tracks, then lowest ID 10. **Prune or Copy-to** -- one of: - `--prune`: delete non-matching releases in place (~89% data reduction, 3 GB -> 340 MB) - `--copy-to`/`--target-db-url`: copy matched releases to a separate database, preserving the full import @@ -37,6 +37,8 @@ Step 1 (download) is always manual. The `release` table includes a `master_id` column used during import and dedup. The dedup copy-swap strategy (`CREATE TABLE AS SELECT ...` without `master_id`) drops the column automatically. After dedup, `master_id` no longer exists in the schema. +The `country` column, by contrast, is permanent -- it is included in the dedup copy-swap SELECT list and persists in the final schema for consumers. + ### Database Schema (Shared Contract) The SQL files in `schema/` define the contract between this ETL pipeline and all consumers: @@ -64,7 +66,7 @@ docker compose up db -d # just the database (for tests) - `scripts/enrich_library_artists.py` -- Enrich artist list with WXYC cross-references (pymysql) - `scripts/filter_csv.py` -- Filter Discogs CSVs to library artists - `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY) -- `scripts/dedup_releases.py` -- Deduplicate releases by master_id (copy-swap with `DROP CASCADE`) +- `scripts/dedup_releases.py` -- Deduplicate releases by master_id, preferring US releases (copy-swap with `DROP CASCADE`) - `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB - `scripts/csv_to_tsv.py` -- CSV to TSV conversion utility - `scripts/fix_csv_newlines.py` -- Fix multiline CSV fields diff --git a/README.md b/README.md index e5356a2..3ea8e02 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ All 9 steps are automated by `run_pipeline.py` (or Docker Compose). The script s | 4. Create schema | `schema/create_database.sql`, `schema/create_functions.sql` | Set up tables, extensions, and functions | | 5. Import | `scripts/import_csv.py` | Bulk load CSVs via psycopg COPY | | 6. Create indexes | `schema/create_indexes.sql` | Accent-insensitive trigram GIN indexes for fuzzy search | -| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (most tracks) | +| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (US first, then most tracks) | | 8. Prune/Copy | `scripts/verify_cache.py` | Remove non-library releases or copy matches to target DB | | 9. Vacuum | `VACUUM FULL` | Reclaim disk space | @@ -193,7 +193,7 @@ The schema files in `schema/` define the shared contract between this ETL pipeli | Table | Description | |-------|-------------| -| `release` | Release metadata: id, title, release_year, artwork_url | +| `release` | Release metadata: id, title, release_year, country, artwork_url | | `release_artist` | Artists on releases (main + extra credits) | | `release_track` | Tracks on releases with position and duration | | `release_track_artist` | Artists on specific tracks (for compilations) | diff --git a/schema/create_database.sql b/schema/create_database.sql index 9e74311..a80591d 100644 --- a/schema/create_database.sql +++ b/schema/create_database.sql @@ -25,6 +25,7 @@ CREATE TABLE IF NOT EXISTS release ( id integer PRIMARY KEY, title text NOT NULL, release_year smallint, + country text, artwork_url text, master_id integer -- used by dedup, dropped after dedup copy-swap ); diff --git a/scripts/dedup_releases.py b/scripts/dedup_releases.py index 16decab..cc27d81 100644 --- a/scripts/dedup_releases.py +++ b/scripts/dedup_releases.py @@ -91,7 +91,8 @@ def ensure_dedup_ids(conn) -> int: SELECT r.id, r.master_id, ROW_NUMBER() OVER ( PARTITION BY r.master_id - ORDER BY tc.track_count DESC, r.id ASC + ORDER BY (r.country = 'US')::int DESC, + tc.track_count DESC, r.id ASC ) as rn FROM release r {track_count_join} @@ -265,7 +266,7 @@ def main(): # Step 2: Copy each table (keeping only non-duplicate rows) # Only base tables + cache_metadata (tracks are imported after dedup) tables = [ - ("release", "new_release", "id, title, release_year, artwork_url", "id"), + ("release", "new_release", "id, title, release_year, country, artwork_url", "id"), ("release_artist", "new_release_artist", "release_id, artist_name, extra", "release_id"), ("release_label", "new_release_label", "release_id, label_name", "release_id"), ( diff --git a/scripts/import_csv.py b/scripts/import_csv.py index e393ca7..0b47b89 100644 --- a/scripts/import_csv.py +++ b/scripts/import_csv.py @@ -74,8 +74,8 @@ class TableConfig(TypedDict, total=False): { "csv_file": "release.csv", "table": "release", - "csv_columns": ["id", "title", "released", "master_id"], - "db_columns": ["id", "title", "release_year", "master_id"], + "csv_columns": ["id", "title", "country", "released", "master_id"], + "db_columns": ["id", "title", "country", "release_year", "master_id"], "required": ["id", "title"], "transforms": {"released": extract_year}, }, diff --git a/scripts/verify_cache.py b/scripts/verify_cache.py index c60cf16..f919329 100644 --- a/scripts/verify_cache.py +++ b/scripts/verify_cache.py @@ -660,7 +660,7 @@ async def prune_releases(conn: asyncpg.Connection, release_ids: set[int]) -> dic # Tables and their columns to copy (post-dedup: no master_id). # Each entry: (table_name, filter_column, columns_list) COPY_TABLE_SPEC = [ - ("release", "id", ["id", "title", "release_year", "artwork_url"]), + ("release", "id", ["id", "title", "release_year", "country", "artwork_url"]), ("release_artist", "release_id", ["release_id", "artist_name", "extra"]), ("release_label", "release_id", ["release_id", "label_name"]), ("release_track", "release_id", ["release_id", "sequence", "position", "title", "duration"]), diff --git a/tests/e2e/test_pipeline.py b/tests/e2e/test_pipeline.py index 1920ee2..5523148 100644 --- a/tests/e2e/test_pipeline.py +++ b/tests/e2e/test_pipeline.py @@ -127,7 +127,7 @@ def test_duplicates_removed(self) -> None: """Duplicate releases (same master_id) have been removed. In the fixture data, releases 1001, 1002, 1003 share master_id 500. - After dedup, only release 1002 (5 tracks, the most) should remain. + After dedup, only release 1002 (US pressing) should remain. """ conn = self._connect() with conn.cursor() as cur: @@ -171,6 +171,18 @@ def test_master_id_column_absent(self) -> None: conn.close() assert result is None, "master_id column should not exist after dedup" + def test_country_column_present(self) -> None: + """country column persists through the dedup copy-swap.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name = 'release' AND column_name = 'country'" + ) + result = cur.fetchone() + conn.close() + assert result is not None, "country column should exist after dedup" + def test_indexes_exist(self) -> None: """Trigram indexes exist on the final database.""" conn = self._connect() diff --git a/tests/fixtures/create_fixtures.py b/tests/fixtures/create_fixtures.py index 0e8a693..a3cb9d0 100644 --- a/tests/fixtures/create_fixtures.py +++ b/tests/fixtures/create_fixtures.py @@ -52,15 +52,16 @@ def create_release_csv() -> None: ] rows = [ # Group 1: duplicate master_id 500 (Radiohead - OK Computer variants) - # Release 1001 has 3 tracks, 1002 has 5 tracks, 1003 has 1 track - # Dedup should keep 1002 (most tracks) + # Release 1001 has 5 tracks (UK), 1002 has 3 tracks (US), 1003 has 1 track (JP) + # Dedup should keep 1002 (US country preference beats higher track count) [1001, "Accepted", "OK Computer", "UK", "1997-06-16", "", "Correct", 500, "CD"], [1002, "Accepted", "OK Computer", "US", "1997-07-01", "", "Correct", 500, "Vinyl"], [1003, "Accepted", "OK Computer", "JP", "1997", "", "Correct", 500, "Cassette"], # Group 2: duplicate master_id 600 (Joy Division - Unknown Pleasures) - # Release 2001 has 2 tracks, 2002 has 4 tracks + # Release 2001 has 2 tracks (UK), 2002 has 4 tracks (DE) — no US release + # Dedup should keep 2002 (most tracks, fallback when no US release) [2001, "Accepted", "Unknown Pleasures", "UK", "1979-06-15", "", "Correct", 600, "LP"], - [2002, "Accepted", "Unknown Pleasures", "US", "1979", "", "Correct", 600, "CD"], + [2002, "Accepted", "Unknown Pleasures", "DE", "1979", "", "Correct", 600, "CD"], # No duplicate - unique master_id [3001, "Accepted", "Kid A", "UK", "2000-10-02", "", "Correct", 700, "CD"], # No master_id (should survive dedup) @@ -125,22 +126,22 @@ def create_release_track_csv() -> None: """ headers = ["release_id", "sequence", "position", "title", "duration"] rows = [ - # Release 1001 (OK Computer UK CD) - 3 tracks + # Release 1001 (OK Computer UK CD) - 5 tracks (most tracks, but not US) [1001, 1, "1", "Airbag", "4:44"], [1001, 2, "2", "Paranoid Android", "6:23"], [1001, 3, "3", "Subterranean Homesick Alien", "4:27"], - # Release 1002 (OK Computer US Vinyl) - 5 tracks (should win dedup) + [1001, 4, "4", "Exit Music (For a Film)", "4:24"], + [1001, 5, "5", "Let Down", "4:59"], + # Release 1002 (OK Computer US Vinyl) - 3 tracks (US wins despite fewer tracks) [1002, 1, "A1", "Airbag", "4:44"], [1002, 2, "A2", "Paranoid Android", "6:23"], [1002, 3, "A3", "Subterranean Homesick Alien", "4:27"], - [1002, 4, "B1", "Exit Music (For a Film)", "4:24"], - [1002, 5, "B2", "Let Down", "4:59"], # Release 1003 (OK Computer JP Cassette) - 1 track [1003, 1, "1", "Airbag", "4:44"], # Release 2001 (Unknown Pleasures UK LP) - 2 tracks [2001, 1, "A1", "Disorder", "3:29"], [2001, 2, "A2", "Day of the Lords", "4:48"], - # Release 2002 (Unknown Pleasures US CD) - 4 tracks (should win dedup) + # Release 2002 (Unknown Pleasures DE CD) - 4 tracks (wins by track count, no US) [2002, 1, "1", "Disorder", "3:29"], [2002, 2, "2", "Day of the Lords", "4:48"], [2002, 3, "3", "Candidate", "3:05"], diff --git a/tests/fixtures/csv/release.csv b/tests/fixtures/csv/release.csv index 42cc766..167b054 100644 --- a/tests/fixtures/csv/release.csv +++ b/tests/fixtures/csv/release.csv @@ -3,7 +3,7 @@ id,status,title,country,released,notes,data_quality,master_id,format 1002,Accepted,OK Computer,US,1997-07-01,,Correct,500,Vinyl 1003,Accepted,OK Computer,JP,1997,,Correct,500,Cassette 2001,Accepted,Unknown Pleasures,UK,1979-06-15,,Correct,600,LP -2002,Accepted,Unknown Pleasures,US,1979,,Correct,600,CD +2002,Accepted,Unknown Pleasures,DE,1979,,Correct,600,CD 3001,Accepted,Kid A,UK,2000-10-02,,Correct,700,CD 4001,Accepted,Amnesiac,UK,2001-06-05,,Correct,,CD 5001,Accepted,Unknown Album,US,2020-01-01,,Correct,800,CD diff --git a/tests/fixtures/csv/release_label.csv b/tests/fixtures/csv/release_label.csv index aaffd2d..4b313ce 100644 --- a/tests/fixtures/csv/release_label.csv +++ b/tests/fixtures/csv/release_label.csv @@ -1,17 +1,17 @@ -release_id,label,catno -1001,Parlophone,7243 8 55229 2 8 -1001,Capitol Records,CDP 7243 8 55229 2 8 -1002,Capitol Records,C1-55229 -1003,EMI,TOCP-50201 -2001,Factory Records,FACT 10 -2002,Qwest Records,1-25840 -3001,Parlophone,7243 5 27753 2 3 -4001,Parlophone,7243 5 32764 2 8 -5001,Unknown Label,UNK-001 -5002,Mystery Records,MYS-002 -6001,One Little Indian,TPLP 71 CD -8001,Sugar Hill Records,SH-542 -9001,Apple Records,PCS 7088 -9002,Columbia,KCS 9914 -10001,Random Label,RL-001 -10002,Obscure Label,OL-002 +release_id,label,catno +1001,Parlophone,7243 8 55229 2 8 +1001,Capitol Records,CDP 7243 8 55229 2 8 +1002,Capitol Records,C1-55229 +1003,EMI,TOCP-50201 +2001,Factory Records,FACT 10 +2002,Qwest Records,1-25840 +3001,Parlophone,7243 5 27753 2 3 +4001,Parlophone,7243 5 32764 2 8 +5001,Unknown Label,UNK-001 +5002,Mystery Records,MYS-002 +6001,One Little Indian,TPLP 71 CD +8001,Sugar Hill Records,SH-542 +9001,Apple Records,PCS 7088 +9002,Columbia,KCS 9914 +10001,Random Label,RL-001 +10002,Obscure Label,OL-002 diff --git a/tests/fixtures/csv/release_track.csv b/tests/fixtures/csv/release_track.csv index 502cbc9..fa9bd24 100644 --- a/tests/fixtures/csv/release_track.csv +++ b/tests/fixtures/csv/release_track.csv @@ -2,11 +2,11 @@ release_id,sequence,position,title,duration 1001,1,1,Airbag,4:44 1001,2,2,Paranoid Android,6:23 1001,3,3,Subterranean Homesick Alien,4:27 +1001,4,4,Exit Music (For a Film),4:24 +1001,5,5,Let Down,4:59 1002,1,A1,Airbag,4:44 1002,2,A2,Paranoid Android,6:23 1002,3,A3,Subterranean Homesick Alien,4:27 -1002,4,B1,Exit Music (For a Film),4:24 -1002,5,B2,Let Down,4:59 1003,1,1,Airbag,4:44 2001,1,A1,Disorder,3:29 2001,2,A2,Day of the Lords,4:48 diff --git a/tests/fixtures/library.db b/tests/fixtures/library.db index a0a654f..66b31c9 100644 Binary files a/tests/fixtures/library.db and b/tests/fixtures/library.db differ diff --git a/tests/integration/test_copy_to_target.py b/tests/integration/test_copy_to_target.py index 015926a..ce4285b 100644 --- a/tests/integration/test_copy_to_target.py +++ b/tests/integration/test_copy_to_target.py @@ -339,5 +339,14 @@ def test_target_no_master_id_column(self) -> None: # Actually the target gets the full schema including master_id, # but since we only copy id, title, release_year, artwork_url, # master_id will be NULL. That's acceptable. - expected = {"id", "title", "release_year", "artwork_url"} + expected = {"id", "title", "release_year", "country", "artwork_url"} assert expected.issubset(columns) + + def test_target_country_data_copied(self) -> None: + """Country data is actually present in the target (not just the column).""" + conn = psycopg.connect(self.target_url) + with conn.cursor() as cur: + cur.execute("SELECT country FROM release WHERE country IS NOT NULL LIMIT 1") + result = cur.fetchone() + conn.close() + assert result is not None, "No country data found in target releases" diff --git a/tests/integration/test_dedup.py b/tests/integration/test_dedup.py index bb368f5..8a4c4ed 100644 --- a/tests/integration/test_dedup.py +++ b/tests/integration/test_dedup.py @@ -106,7 +106,7 @@ def _run_dedup(db_url: str) -> None: if delete_count > 0: # Only base tables + cache_metadata (no track tables) tables = [ - ("release", "new_release", "id, title, release_year, artwork_url", "id"), + ("release", "new_release", "id, title, release_year, country, artwork_url", "id"), ( "release_artist", "new_release_artist", @@ -192,7 +192,7 @@ def _connect(self): return psycopg.connect(self.db_url) def test_correct_release_kept_for_master_500(self) -> None: - """Release 1002 (5 tracks) kept over 1001 (3 tracks) and 1003 (1 track).""" + """Release 1002 (US, 3 tracks) kept over 1001 (UK, 5 tracks) by country preference.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") @@ -201,7 +201,7 @@ def test_correct_release_kept_for_master_500(self) -> None: assert ids == [1002] def test_correct_release_kept_for_master_600(self) -> None: - """Release 2002 (4 tracks) kept over 2001 (2 tracks).""" + """Release 2002 (DE, 4 tracks) kept over 2001 (UK, 2 tracks) by track count fallback.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (2001, 2002) ORDER BY id") @@ -269,7 +269,36 @@ def test_kept_release_tracks_preserved(self) -> None: cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002") count = cur.fetchone()[0] conn.close() - assert count == 5 + assert count == 3 + + def test_country_column_preserved(self) -> None: + """country column exists after dedup copy-swap and has the expected value.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT country FROM release WHERE id = 1002") + country = cur.fetchone()[0] + conn.close() + assert country == "US" + + def test_us_preferred_over_track_count(self) -> None: + """US release (1002, 3 tracks) kept over UK release (1001, 5 tracks). + + Proves country preference is the deciding factor: the kept release has + fewer tracks than the removed one. + """ + conn = self._connect() + with conn.cursor() as cur: + # 1002 should be kept (US, 3 tracks) + cur.execute("SELECT count(*) FROM release WHERE id = 1002") + assert cur.fetchone()[0] == 1 + # 1001 should be removed (UK, 5 tracks — more tracks but not US) + cur.execute("SELECT count(*) FROM release WHERE id = 1001") + assert cur.fetchone()[0] == 0 + # Verify the kept release has fewer tracks (proving country was decisive) + cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002") + kept_tracks = cur.fetchone()[0] + conn.close() + assert kept_tracks == 3, "Kept US release should have 3 tracks (fewer than removed UK's 5)" def test_master_id_column_dropped(self) -> None: """master_id column no longer exists after copy-swap (not in SELECT list).""" diff --git a/tests/integration/test_import.py b/tests/integration/test_import.py index 589ee01..f9c0c8b 100644 --- a/tests/integration/test_import.py +++ b/tests/integration/test_import.py @@ -312,7 +312,7 @@ def test_correct_counts(self) -> None: cur.execute("SELECT track_count FROM release_track_count WHERE release_id = 1002") count = cur.fetchone()[0] conn.close() - assert count == 5 + assert count == 3 def test_track_tables_empty(self) -> None: """Base-only import should not populate track tables.""" @@ -396,13 +396,13 @@ def test_excluded_release_has_no_tracks(self) -> None: assert count == 0 def test_included_release_has_correct_track_count(self) -> None: - """Release 1002 should have all 5 tracks.""" + """Release 1002 should have all 3 tracks.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002") count = cur.fetchone()[0] conn.close() - assert count == 5 + assert count == 3 def test_total_track_count(self) -> None: """Total tracks should be the sum for the filtered releases.""" @@ -411,5 +411,5 @@ def test_total_track_count(self) -> None: cur.execute("SELECT count(*) FROM release_track") count = cur.fetchone()[0] conn.close() - # 1002: 5, 3001: 2, 4001: 2 = 9 - assert count == 9 + # 1002: 3, 3001: 2, 4001: 2 = 7 + assert count == 7 diff --git a/tests/integration/test_schema.py b/tests/integration/test_schema.py index 87897eb..5c19078 100644 --- a/tests/integration/test_schema.py +++ b/tests/integration/test_schema.py @@ -58,7 +58,7 @@ def test_all_tables_exist(self) -> None: @pytest.mark.parametrize( "table, expected_columns", [ - ("release", {"id", "title", "release_year", "artwork_url", "master_id"}), + ("release", {"id", "title", "release_year", "country", "artwork_url", "master_id"}), ("release_artist", {"release_id", "artist_name", "extra"}), ("release_label", {"release_id", "label_name"}), ("release_track", {"release_id", "sequence", "position", "title", "duration"}), diff --git a/tests/unit/test_filter_csv.py b/tests/unit/test_filter_csv.py index 0b4d6fb..06981d4 100644 --- a/tests/unit/test_filter_csv.py +++ b/tests/unit/test_filter_csv.py @@ -209,4 +209,4 @@ def test_filters_child_table(self, tmp_path: Path) -> None: output_path = tmp_path / "release_track_filtered.csv" _, output_count = filter_csv_file(input_path, output_path, matching_ids, "release_id") - assert output_count == 3 # Release 1001 has 3 tracks + assert output_count == 5 # Release 1001 has 5 tracks diff --git a/tests/unit/test_import_csv.py b/tests/unit/test_import_csv.py index 27657dd..3bcff6d 100644 --- a/tests/unit/test_import_csv.py +++ b/tests/unit/test_import_csv.py @@ -103,6 +103,12 @@ def test_release_table_includes_master_id(self) -> None: assert "master_id" in release_config["csv_columns"] assert "master_id" in release_config["db_columns"] + def test_release_table_includes_country(self) -> None: + """The release table must import country for US-preferred dedup ranking.""" + release_config = next(t for t in TABLES if t["table"] == "release") + assert "country" in release_config["csv_columns"] + assert "country" in release_config["db_columns"] + def test_release_table_transforms_released_to_year(self) -> None: """The released field should be transformed via extract_year.""" release_config = next(t for t in TABLES if t["table"] == "release") @@ -205,8 +211,8 @@ def test_counts_tracks_per_release(self) -> None: """Returns a dict mapping release_id -> track count.""" csv_path = CSV_DIR / "release_track.csv" counts = count_tracks_from_csv(csv_path) - # Release 1002 has 5 tracks in the fixture data - assert counts[1002] == 5 + # Release 1002 (US) has 3 tracks in the fixture data + assert counts[1002] == 3 def test_all_releases_counted(self) -> None: """Every release_id in the CSV has an entry."""