Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ ETL pipeline for building and maintaining a PostgreSQL cache of Discogs release
3. **Fix newlines** in CSV fields (`scripts/fix_csv_newlines.py`)
4. **Enrich** `library_artists.txt` with WXYC cross-references (`scripts/enrich_library_artists.py`, optional)
5. **Filter** CSVs to library-matching artists only (`scripts/filter_csv.py`) -- ~70% data reduction
6. **Create schema** (`schema/create_database.sql`)
6. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`)
7. **Import** filtered CSVs into PostgreSQL (`scripts/import_csv.py`)
8. **Create indexes** including trigram GIN indexes (`schema/create_indexes.sql`)
8. **Create indexes** including accent-insensitive trigram GIN indexes (`schema/create_indexes.sql`)
9. **Deduplicate** by master_id (`scripts/dedup_releases.py`)
10. **Prune or Copy-to** -- one of:
- `--prune`: delete non-matching releases in place (~89% data reduction, 3 GB -> 340 MB)
Expand All @@ -41,15 +41,16 @@ The `release` table includes a `master_id` column used during import and dedup.

The SQL files in `schema/` define the contract between this ETL pipeline and all consumers:

- `schema/create_database.sql` -- Tables: `release`, `release_artist`, `release_track`, `release_track_artist`, `cache_metadata`
- `schema/create_indexes.sql` -- Trigram GIN indexes for fuzzy text search (pg_trgm)
- `schema/create_database.sql` -- Tables: `release`, `release_artist`, `release_track`, `release_track_artist`, `cache_metadata`; extensions: pg_trgm, unaccent
- `schema/create_functions.sql` -- `f_unaccent()` immutable wrapper for accent-insensitive index expressions
- `schema/create_indexes.sql` -- Trigram GIN indexes for accent-insensitive fuzzy text search (pg_trgm + unaccent)

Consumers connect via `DATABASE_URL_DISCOGS` environment variable.

### Docker Compose

`docker-compose.yml` provides a self-contained environment:
- **`db`** service: PostgreSQL 16 with pg_trgm, port 5433:5432
- **`db`** service: PostgreSQL 16 with pg_trgm + unaccent, port 5433:5432
- **`pipeline`** service: runs `scripts/run_pipeline.py` against the db

```bash
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ ETL pipeline for building a PostgreSQL cache of Discogs release data, filtered t
The pipeline processes monthly Discogs data dumps (~40 GB XML) into a focused PostgreSQL database (~3 GB) containing only releases by artists in the WXYC library catalog. This provides:

- Fast local lookups instead of rate-limited Discogs API calls
- Trigram fuzzy text search via pg_trgm
- Accent-insensitive trigram fuzzy text search via pg_trgm + unaccent
- Shared data resource for multiple consuming services

## Prerequisites

- Python 3.11+
- PostgreSQL with the `pg_trgm` extension (or use [Docker Compose](#docker-compose))
- PostgreSQL with the `pg_trgm` and `unaccent` extensions (or use [Docker Compose](#docker-compose))
- Discogs monthly data dump (XML) from https://discogs-data-dumps.s3.us-west-2.amazonaws.com/index.html
- [discogs-xml2db](https://github.com/philipmat/discogs-xml2db) -- clone separately; not a PyPI package
- `library_artists.txt` and `library.db` (produced by request-parser's library sync)
Expand All @@ -34,9 +34,9 @@ All 9 steps are automated by `run_pipeline.py` (or Docker Compose). The script s
| 2. Fix newlines | `scripts/fix_csv_newlines.py` | Clean embedded newlines in CSV fields |
| 2.5. Enrich | `scripts/enrich_library_artists.py` | Enrich artist list with cross-references (optional) |
| 3. Filter | `scripts/filter_csv.py` | Keep only library artists (~70% reduction) |
| 4. Create schema | `schema/create_database.sql` | Set up tables and constraints |
| 4. Create schema | `schema/create_database.sql`, `schema/create_functions.sql` | Set up tables, extensions, and functions |
| 5. Import | `scripts/import_csv.py` | Bulk load CSVs via psycopg COPY |
| 6. Create indexes | `schema/create_indexes.sql` | Trigram GIN indexes for fuzzy search |
| 6. Create indexes | `schema/create_indexes.sql` | Accent-insensitive trigram GIN indexes for fuzzy search |
| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (most tracks) |
| 8. Prune/Copy | `scripts/verify_cache.py` | Remove non-library releases or copy matches to target DB |
| 9. Vacuum | `VACUUM FULL` | Reclaim disk space |
Expand Down Expand Up @@ -162,8 +162,9 @@ python scripts/fix_csv_newlines.py /path/to/raw/release.csv /path/to/cleaned/rel
# 3. Filter to library artists
python scripts/filter_csv.py /path/to/library_artists.txt /path/to/cleaned/ /path/to/filtered/

# 4. Create schema
# 4. Create schema and functions
psql -d discogs -f schema/create_database.sql
psql -d discogs -f schema/create_functions.sql

# 5. Import CSVs
python scripts/import_csv.py /path/to/filtered/ [database_url]
Expand Down Expand Up @@ -201,7 +202,7 @@ The schema files in `schema/` define the shared contract between this ETL pipeli
### Indexes

- Foreign key indexes on all child tables
- Trigram GIN indexes (`pg_trgm`) on `title` and `artist_name` columns for fuzzy text search
- Accent-insensitive trigram GIN indexes (`pg_trgm` + `unaccent`) on `title` and `artist_name` columns for fuzzy text search. Uses an immutable `f_unaccent()` wrapper to enable index expressions with `lower(f_unaccent(column))`.
- Cache metadata indexes for freshness queries

### Consumer Integration
Expand Down
3 changes: 3 additions & 0 deletions schema/create_database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
-- Enable trigram extension for fuzzy text search
CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- Enable unaccent extension for accent-insensitive search
CREATE EXTENSION IF NOT EXISTS unaccent;

-- ============================================
-- Core tables
-- ============================================
Expand Down
13 changes: 13 additions & 0 deletions schema/create_functions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Create immutable wrapper for unaccent() to allow use in index expressions.
--
-- PostgreSQL's built-in unaccent() is STABLE (depends on search_path), so it
-- can't be used directly in index expressions which require IMMUTABLE functions.
-- This wrapper pins the dictionary to public.unaccent, removing the search_path
-- variability.
--
-- Run AFTER create_database.sql (which creates the unaccent extension).
-- Run BEFORE create_indexes.sql (which uses f_unaccent in index expressions).

CREATE OR REPLACE FUNCTION f_unaccent(text) RETURNS text AS $$
SELECT public.unaccent('public.unaccent', $1)
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE STRICT;
16 changes: 8 additions & 8 deletions schema/create_indexes.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,26 @@ CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- 1. Track title search: "Find releases containing track 'Blue Monday'"
-- Used by: search_releases_by_track()
-- Query pattern: WHERE lower(title) % $1 OR lower(title) ILIKE '%' || $1 || '%'
-- Query pattern: WHERE lower(f_unaccent(title)) % $1 OR lower(f_unaccent(title)) ILIKE ...
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_track_title_trgm
ON release_track USING GIN (lower(title) gin_trgm_ops);
ON release_track USING GIN (lower(f_unaccent(title)) gin_trgm_ops);

-- 2. Artist name search on releases: "Find releases by 'New Order'"
-- Used by: search_releases_by_track() artist filter
-- Query pattern: WHERE lower(artist_name) % $1
-- Query pattern: WHERE lower(f_unaccent(artist_name)) % $1
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_artist_name_trgm
ON release_artist USING GIN (lower(artist_name) gin_trgm_ops);
ON release_artist USING GIN (lower(f_unaccent(artist_name)) gin_trgm_ops);

-- 3. Track artist search: "Find compilation tracks by 'Joy Division'"
-- Used by: validate_track_on_release() for compilations
-- Query pattern: WHERE lower(artist_name) % $1
-- Query pattern: WHERE lower(f_unaccent(artist_name)) % $1
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_track_artist_name_trgm
ON release_track_artist USING GIN (lower(artist_name) gin_trgm_ops);
ON release_track_artist USING GIN (lower(f_unaccent(artist_name)) gin_trgm_ops);

-- 4. Release title search: "Find releases named 'Power, Corruption & Lies'"
-- Used by: get_release searches
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_title_trgm
ON release USING GIN (lower(title) gin_trgm_ops);
ON release USING GIN (lower(f_unaccent(title)) gin_trgm_ops);

-- ============================================
-- Verification queries
Expand All @@ -49,5 +49,5 @@ ON release USING GIN (lower(title) gin_trgm_ops);
-- Test trigram search (should use index)
-- EXPLAIN ANALYZE
-- SELECT * FROM release_track
-- WHERE lower(title) % 'blue monday'
-- WHERE lower(f_unaccent(title)) % 'blue monday'
-- LIMIT 10;
11 changes: 6 additions & 5 deletions scripts/dedup_releases.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,15 @@ def add_constraints_and_indexes(conn) -> None:
"CREATE INDEX idx_release_artist_release_id ON release_artist(release_id)",
"CREATE INDEX idx_release_track_release_id ON release_track(release_id)",
"CREATE INDEX idx_release_track_artist_release_id ON release_track_artist(release_id)",
# Trigram indexes for fuzzy search
# Trigram indexes for fuzzy search (accent-insensitive via f_unaccent)
"CREATE INDEX idx_release_track_title_trgm ON release_track "
"USING gin (lower(title) gin_trgm_ops)",
"USING gin (lower(f_unaccent(title)) gin_trgm_ops)",
"CREATE INDEX idx_release_artist_name_trgm ON release_artist "
"USING gin (lower(artist_name) gin_trgm_ops)",
"USING gin (lower(f_unaccent(artist_name)) gin_trgm_ops)",
"CREATE INDEX idx_release_track_artist_name_trgm ON release_track_artist "
"USING gin (lower(artist_name) gin_trgm_ops)",
"CREATE INDEX idx_release_title_trgm ON release USING gin (lower(title) gin_trgm_ops)",
"USING gin (lower(f_unaccent(artist_name)) gin_trgm_ops)",
"CREATE INDEX idx_release_title_trgm ON release "
"USING gin (lower(f_unaccent(title)) gin_trgm_ops)",
# Cache metadata indexes
"CREATE INDEX idx_cache_metadata_cached_at ON cache_metadata(cached_at)",
"CREATE INDEX idx_cache_metadata_source ON cache_metadata(source)",
Expand Down
3 changes: 2 additions & 1 deletion scripts/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,12 @@ def _save_state() -> None:
# Step 4: Wait for Postgres
wait_for_postgres(db_url)

# Step 5: Create schema
# Step 5: Create schema and functions
if state and state.is_completed("create_schema"):
logger.info("Skipping create_schema (already completed)")
else:
run_sql_file(db_url, SCHEMA_DIR / "create_database.sql")
run_sql_file(db_url, SCHEMA_DIR / "create_functions.sql")
if state:
state.mark_completed("create_schema")
_save_state()
Expand Down
12 changes: 7 additions & 5 deletions scripts/verify_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,15 +728,17 @@ def _create_target_schema(target_url: str) -> None:


def _create_target_indexes(target_url: str) -> None:
"""Create indexes on the target database (without CONCURRENTLY)."""
sql_text = SCHEMA_DIR.joinpath("create_indexes.sql").read_text()
sql_text = sql_text.replace(" CONCURRENTLY", "")
"""Create functions and indexes on the target database (without CONCURRENTLY)."""
functions_sql = SCHEMA_DIR.joinpath("create_functions.sql").read_text()
indexes_sql = SCHEMA_DIR.joinpath("create_indexes.sql").read_text()
indexes_sql = indexes_sql.replace(" CONCURRENTLY", "")

conn = psycopg.connect(target_url, autocommit=True)
with conn.cursor() as cur:
cur.execute(sql_text)
cur.execute(functions_sql)
cur.execute(indexes_sql)
conn.close()
logger.info("Created indexes on target database")
logger.info("Created functions and indexes on target database")


def copy_releases_to_target(
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_copy_to_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,13 @@


def _fresh_import(db_url: str) -> None:
"""Drop everything, apply schema, and import fixture CSVs."""
"""Drop everything, apply schema and functions, and import fixture CSVs."""
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
for table in ALL_TABLES:
cur.execute(f"DROP TABLE IF EXISTS {table} CASCADE")
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
conn.close()

conn = psycopg.connect(db_url)
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_db_introspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def test_true_after_index_creation(self, db_url) -> None:
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
sql = SCHEMA_DIR.joinpath("create_indexes.sql").read_text()
sql = sql.replace(" CONCURRENTLY", "")
cur.execute(sql)
Expand Down Expand Up @@ -168,6 +169,7 @@ def test_after_indexes(self, db_url) -> None:
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
cur.execute("INSERT INTO release (id, title) VALUES (1, 'Test')")
sql = SCHEMA_DIR.joinpath("create_indexes.sql").read_text()
sql = sql.replace(" CONCURRENTLY", "")
Expand All @@ -186,6 +188,7 @@ def test_after_dedup(self, db_url) -> None:
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
cur.execute("INSERT INTO release (id, title) VALUES (1, 'Test')")
sql = SCHEMA_DIR.joinpath("create_indexes.sql").read_text()
sql = sql.replace(" CONCURRENTLY", "")
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ def _drop_all_tables(conn) -> None:


def _fresh_import(db_url: str) -> None:
"""Drop everything, apply schema, and import fixture CSVs."""
"""Drop everything, apply schema and functions, and import fixture CSVs."""
conn = psycopg.connect(db_url, autocommit=True)
_drop_all_tables(conn)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
conn.close()

conn = psycopg.connect(db_url)
Expand Down
43 changes: 41 additions & 2 deletions tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ class TestCreateDatabase:

@pytest.fixture(autouse=True)
def _apply_schema(self, db_url):
"""Run create_database.sql against the test database."""
"""Run create_database.sql and create_functions.sql against the test database."""
self.db_url = db_url
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
conn.close()

def _connect(self):
Expand Down Expand Up @@ -91,6 +92,23 @@ def test_pg_trgm_extension_enabled(self) -> None:
conn.close()
assert result is not None, "pg_trgm extension not installed"

def test_unaccent_extension_enabled(self) -> None:
conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT extname FROM pg_extension WHERE extname = 'unaccent'")
result = cur.fetchone()
conn.close()
assert result is not None, "unaccent extension not installed"

def test_f_unaccent_function_exists(self) -> None:
"""Immutable f_unaccent() wrapper is available and strips diacritics."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT f_unaccent('Björk')")
result = cur.fetchone()[0]
conn.close()
assert result == "Bjork"

def test_fk_constraints_with_cascade(self) -> None:
"""Child tables have ON DELETE CASCADE foreign keys to release."""
conn = self._connect()
Expand Down Expand Up @@ -146,11 +164,12 @@ class TestCreateIndexes:

@pytest.fixture(autouse=True)
def _apply_schema_and_data(self, db_url):
"""Set up schema and insert minimal sample data for index creation."""
"""Set up schema, functions, and insert minimal sample data for index creation."""
self.db_url = db_url
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
# Insert minimal data so indexes have something to work with
cur.execute(
"INSERT INTO release (id, title) VALUES (1, 'Test Album') ON CONFLICT DO NOTHING"
Expand Down Expand Up @@ -204,3 +223,23 @@ def test_trigram_indexes_exist(self) -> None:
"idx_release_title_trgm",
}
assert expected.issubset(indexes)

def test_trigram_indexes_use_unaccent(self) -> None:
"""All four trigram indexes use f_unaccent() for accent-insensitive matching."""
conn = psycopg.connect(self.db_url, autocommit=True)
with conn.cursor() as cur:
sql = SCHEMA_DIR.joinpath("create_indexes.sql").read_text()
sql = sql.replace(" CONCURRENTLY", "")
cur.execute(sql)

cur.execute("""
SELECT indexname, indexdef FROM pg_indexes
WHERE schemaname = 'public'
AND indexname LIKE '%trgm%'
""")
rows = cur.fetchall()
conn.close()
for indexname, indexdef in rows:
assert "f_unaccent" in indexdef, (
f"Index {indexname} should use f_unaccent(): {indexdef}"
)