Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ All 9 steps are automated by `run_pipeline.py` (or Docker Compose). The script s
| 4. Create schema | `schema/create_database.sql`, `schema/create_functions.sql` | Set up tables, extensions, and functions |
| 5. Import | `scripts/import_csv.py` | Bulk load CSVs via psycopg COPY |
| 6. Create indexes | `schema/create_indexes.sql` | Accent-insensitive trigram GIN indexes for fuzzy search |
| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (US first, then most tracks) |
| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (label match, US, most tracks) |
| 8. Prune/Copy | `scripts/verify_cache.py` | Remove non-library releases or copy matches to target DB |
| 9. Vacuum | `VACUUM FULL` | Reclaim disk space |

Expand Down Expand Up @@ -95,8 +95,38 @@ python scripts/run_pipeline.py \
```

- `--library-db` is optional; if omitted, the prune step is skipped
- `--library-labels` accepts a pre-generated `library_labels.csv` for label-aware dedup (see below)
- `--database-url` defaults to `DATABASE_URL` env var or `postgresql://localhost:5432/discogs`

### Label-Aware Dedup

By default, dedup keeps the release with the most tracks per `master_id` group. When WXYC label preferences are available, dedup instead prefers the release whose Discogs label matches WXYC's known pressing -- ensuring the cached edition matches what the station actually owns.

Label preferences come from WXYC's `FLOWSHEET_ENTRY_PROD` MySQL table (rotation play entries include `LABEL_NAME`). The extraction script `scripts/extract_library_labels.py` produces a `library_labels.csv` with `(artist_name, release_title, label_name)` triples.

There are two ways to enable label-aware dedup:

1. **Automatic extraction** (when `--wxyc-db-url` is provided): the pipeline extracts labels from MySQL before the dedup step.

2. **Pre-generated CSV** (when `--library-labels` is provided): the pipeline uses the CSV directly, no MySQL connection needed.

```bash
# Automatic: extract labels from WXYC MySQL and use for dedup
python scripts/run_pipeline.py \
--csv-dir /path/to/filtered/ \
--library-db /path/to/library.db \
--wxyc-db-url mysql://user:pass@host:port/wxycmusic \
--database-url postgresql://localhost:5432/discogs

# Pre-generated: use an existing library_labels.csv
python scripts/run_pipeline.py \
--csv-dir /path/to/filtered/ \
--library-labels /path/to/library_labels.csv \
--database-url postgresql://localhost:5432/discogs
```

The ranking order is: **label match** (prefer WXYC's pressing) > **US country** (domestic pressing) > **track count** (quality tiebreaker) > **release ID** (deterministic fallback).

### Copy to Target Database

Instead of pruning releases in place (which destroys the full imported dataset), you can copy only matched releases to a separate target database:
Expand Down Expand Up @@ -172,8 +202,9 @@ python scripts/import_csv.py /path/to/filtered/ [database_url]
# 6. Create indexes (10-30 min on large datasets)
psql -d discogs -f schema/create_indexes.sql

# 7. Deduplicate
# 7. Deduplicate (optionally with label matching)
python scripts/dedup_releases.py [database_url]
python scripts/dedup_releases.py --library-labels /path/to/library_labels.csv [database_url]

# 8. Prune (dry run first, then with --prune or --copy-to)
python scripts/verify_cache.py /path/to/library.db [database_url]
Expand All @@ -195,6 +226,7 @@ The schema files in `schema/` define the shared contract between this ETL pipeli
|-------|-------------|
| `release` | Release metadata: id, title, release_year, country, artwork_url |
| `release_artist` | Artists on releases (main + extra credits) |
| `release_label` | Label names per release (e.g., Parlophone, Factory Records) |
| `release_track` | Tracks on releases with position and duration |
| `release_track_artist` | Artists on specific tracks (for compilations) |
| `cache_metadata` | Data freshness tracking (cached_at, source) |
Expand Down
27 changes: 27 additions & 0 deletions lib/wxyc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""WXYC MySQL connection utilities."""

from __future__ import annotations

from urllib.parse import urlparse

import pymysql


def connect_mysql(db_url: str):
"""Connect to MySQL using a URL string.

Args:
db_url: MySQL connection URL (mysql://user:pass@host:port/dbname).

Returns:
A pymysql connection object.
"""
parsed = urlparse(db_url)
return pymysql.connect(
host=parsed.hostname or "localhost",
port=parsed.port or 3306,
user=parsed.username or "root",
password=parsed.password or "",
database=parsed.path.lstrip("/"),
charset="utf8",
)
162 changes: 156 additions & 6 deletions scripts/dedup_releases.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,22 @@
Expects dedup_delete_ids table to already exist (from a previous run).
If not, creates it from the ROW_NUMBER query.

When --library-labels is provided, WXYC label preferences influence the
ranking: releases whose label matches WXYC's known pressing are preferred
over releases with more tracks but a different label.

Usage:
python dedup_releases.py [database_url]
python dedup_releases.py [database_url] [--library-labels <csv>]

database_url defaults to postgresql:///discogs
"""

import argparse
import csv
import logging
import sys
import time
from pathlib import Path

import psycopg

Expand All @@ -39,6 +46,99 @@ def _track_count_table_exists(conn) -> bool:
return cur.fetchone()[0]


def _label_match_table_exists(conn) -> bool:
"""Return True if the release_label_match table exists."""
with conn.cursor() as cur:
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = 'release_label_match'
)
""")
return cur.fetchone()[0]


def load_library_labels(conn, csv_path: Path) -> int:
"""Load WXYC label preferences from CSV into an UNLOGGED table.

Creates the ``wxyc_label_pref`` table with columns
(artist_name, release_title, label_name) and bulk-loads the CSV.

Args:
conn: psycopg connection (autocommit=True).
csv_path: Path to library_labels.csv.

Returns:
Number of rows loaded.
"""
logger.info("Loading WXYC label preferences from %s", csv_path)

with conn.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS wxyc_label_pref")
cur.execute("""
CREATE UNLOGGED TABLE wxyc_label_pref (
artist_name text NOT NULL,
release_title text NOT NULL,
label_name text NOT NULL
)
""")

rows = []
with open(csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append((row["artist_name"], row["release_title"], row["label_name"]))

if rows:
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO wxyc_label_pref (artist_name, release_title, label_name) "
"VALUES (%s, %s, %s)",
rows,
)

logger.info("Loaded %d label preferences", len(rows))
return len(rows)


def create_label_match_table(conn) -> int:
"""Create release_label_match table by joining Discogs labels to WXYC preferences.

Marks which release_ids have a label matching WXYC's known pressing
for that (artist, title) pair. Uses lowercased exact matching.

Requires wxyc_label_pref table to exist (from load_library_labels).

Args:
conn: psycopg connection (autocommit=True).

Returns:
Number of releases matched.
"""
logger.info("Creating release_label_match table...")

with conn.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS release_label_match")
cur.execute("""
CREATE UNLOGGED TABLE release_label_match AS
SELECT DISTINCT rl.release_id, 1 AS label_match
FROM release_label rl
JOIN release r ON r.id = rl.release_id
JOIN release_artist ra ON ra.release_id = r.id AND ra.extra = 0
JOIN wxyc_label_pref wlp
ON lower(ra.artist_name) = lower(wlp.artist_name)
AND lower(r.title) = lower(wlp.release_title)
AND lower(rl.label_name) = lower(wlp.label_name)
WHERE r.master_id IS NOT NULL
""")
cur.execute("ALTER TABLE release_label_match ADD PRIMARY KEY (release_id)")
cur.execute("SELECT count(*) FROM release_label_match")
count = int(cur.fetchone()[0])

logger.info("Matched %d releases to WXYC label preferences", count)
return count


def ensure_dedup_ids(conn) -> int:
"""Ensure dedup_delete_ids table exists. Create if needed.

Expand Down Expand Up @@ -83,19 +183,33 @@ def ensure_dedup_ids(conn) -> int:
") tc ON tc.release_id = r.id"
)

# Optional label matching: prefer releases whose label matches WXYC's pressing
use_label_match = _label_match_table_exists(conn)
if use_label_match:
label_join = "LEFT JOIN release_label_match rlm ON rlm.release_id = r.id"
order_by = (
"COALESCE(rlm.label_match, 0) DESC, "
"(r.country = 'US')::int DESC, "
"tc.track_count DESC, r.id ASC"
)
logger.info(" Label matching enabled (release_label_match table found)")
else:
label_join = ""
order_by = "(r.country = 'US')::int DESC, tc.track_count DESC, r.id ASC"

with conn.cursor() as cur:
# track_count_join is built from trusted internal constants, not user input
# All SQL fragments are built from trusted internal constants, not user input
cur.execute(f"""
CREATE UNLOGGED TABLE dedup_delete_ids AS
SELECT id AS release_id FROM (
SELECT r.id, r.master_id,
ROW_NUMBER() OVER (
PARTITION BY r.master_id
ORDER BY (r.country = 'US')::int DESC,
tc.track_count DESC, r.id ASC
ORDER BY {order_by}
) as rn
FROM release r
{track_count_join}
{label_join}
WHERE r.master_id IS NOT NULL
) ranked
WHERE rn > 1
Expand Down Expand Up @@ -245,19 +359,53 @@ def add_constraints_and_indexes(conn) -> None:
add_track_constraints_and_indexes(conn)


def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"database_url",
nargs="?",
default="postgresql:///discogs",
help="PostgreSQL connection URL (default: postgresql:///discogs).",
)
parser.add_argument(
"--library-labels",
type=Path,
default=None,
metavar="FILE",
help="Path to library_labels.csv with WXYC label preferences. "
"When provided, dedup ranking prefers releases whose label "
"matches WXYC's known pressing.",
)
return parser.parse_args(argv)


def main():
db_url = sys.argv[1] if len(sys.argv) > 1 else "postgresql:///discogs"
args = parse_args()
db_url = args.database_url

logger.info(f"Connecting to {db_url}")
conn = psycopg.connect(db_url, autocommit=True)

# Step 0 (optional): Load WXYC label preferences for label-aware ranking
if args.library_labels:
if not args.library_labels.exists():
logger.error("Library labels file not found: %s", args.library_labels)
sys.exit(1)
load_library_labels(conn, args.library_labels)
create_label_match_table(conn)

# Step 1: Ensure dedup IDs exist
delete_count = ensure_dedup_ids(conn)
if delete_count == 0:
logger.info("No duplicates found, nothing to do")
# Clean up release_track_count if it exists
with conn.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS release_track_count")
cur.execute("DROP TABLE IF EXISTS wxyc_label_pref")
cur.execute("DROP TABLE IF EXISTS release_label_match")
conn.close()
return

Expand Down Expand Up @@ -303,6 +451,8 @@ def main():
with conn.cursor() as cur:
cur.execute("DROP TABLE IF EXISTS dedup_delete_ids")
cur.execute("DROP TABLE IF EXISTS release_track_count")
cur.execute("DROP TABLE IF EXISTS wxyc_label_pref")
cur.execute("DROP TABLE IF EXISTS release_label_match")

# Step 7: Report
with conn.cursor() as cur:
Expand Down
26 changes: 2 additions & 24 deletions scripts/enrich_library_artists.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@
import sqlite3
import sys
from pathlib import Path
from urllib.parse import urlparse

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

# Import compilation detection from lib/matching.py
# Import shared utilities from lib/
_LIB_DIR = Path(__file__).parent.parent / "lib"
sys.path.insert(0, str(_LIB_DIR.parent))
from lib.matching import is_compilation_artist # noqa: E402
from lib.wxyc import connect_mysql # noqa: E402


def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
Expand Down Expand Up @@ -94,28 +94,6 @@ def extract_base_artists(library_db_path: Path) -> set[str]:
return artists


def connect_mysql(db_url: str):
"""Connect to MySQL using a URL string.

Args:
db_url: MySQL connection URL (mysql://user:pass@host:port/dbname).

Returns:
A pymysql connection object.
"""
import pymysql

parsed = urlparse(db_url)
return pymysql.connect(
host=parsed.hostname or "localhost",
port=parsed.port or 3306,
user=parsed.username or "root",
password=parsed.password or "",
database=parsed.path.lstrip("/"),
charset="utf8",
)


def extract_alternate_names(conn) -> set[str]:
"""Extract alternate artist names from LIBRARY_RELEASE.

Expand Down
Loading