diff --git a/README.md b/README.md index a1255e1..5653d62 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ All 9 steps are automated by `run_pipeline.py` (or Docker Compose). The script s | 4. Create schema | `schema/create_database.sql`, `schema/create_functions.sql` | Set up tables, extensions, and functions | | 5. Import | `scripts/import_csv.py` | Bulk load CSVs via psycopg COPY | | 6. Create indexes | `schema/create_indexes.sql` | Accent-insensitive trigram GIN indexes for fuzzy search | -| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (US first, then most tracks) | +| 7. Deduplicate | `scripts/dedup_releases.py` | Keep best release per master_id (label match, US, most tracks) | | 8. Prune/Copy | `scripts/verify_cache.py` | Remove non-library releases or copy matches to target DB | | 9. Vacuum | `VACUUM FULL` | Reclaim disk space | @@ -95,8 +95,38 @@ python scripts/run_pipeline.py \ ``` - `--library-db` is optional; if omitted, the prune step is skipped +- `--library-labels` accepts a pre-generated `library_labels.csv` for label-aware dedup (see below) - `--database-url` defaults to `DATABASE_URL` env var or `postgresql://localhost:5432/discogs` +### Label-Aware Dedup + +By default, dedup keeps the release with the most tracks per `master_id` group. When WXYC label preferences are available, dedup instead prefers the release whose Discogs label matches WXYC's known pressing -- ensuring the cached edition matches what the station actually owns. + +Label preferences come from WXYC's `FLOWSHEET_ENTRY_PROD` MySQL table (rotation play entries include `LABEL_NAME`). The extraction script `scripts/extract_library_labels.py` produces a `library_labels.csv` with `(artist_name, release_title, label_name)` triples. + +There are two ways to enable label-aware dedup: + +1. **Automatic extraction** (when `--wxyc-db-url` is provided): the pipeline extracts labels from MySQL before the dedup step. + +2. **Pre-generated CSV** (when `--library-labels` is provided): the pipeline uses the CSV directly, no MySQL connection needed. + +```bash +# Automatic: extract labels from WXYC MySQL and use for dedup +python scripts/run_pipeline.py \ + --csv-dir /path/to/filtered/ \ + --library-db /path/to/library.db \ + --wxyc-db-url mysql://user:pass@host:port/wxycmusic \ + --database-url postgresql://localhost:5432/discogs + +# Pre-generated: use an existing library_labels.csv +python scripts/run_pipeline.py \ + --csv-dir /path/to/filtered/ \ + --library-labels /path/to/library_labels.csv \ + --database-url postgresql://localhost:5432/discogs +``` + +The ranking order is: **label match** (prefer WXYC's pressing) > **US country** (domestic pressing) > **track count** (quality tiebreaker) > **release ID** (deterministic fallback). + ### Copy to Target Database Instead of pruning releases in place (which destroys the full imported dataset), you can copy only matched releases to a separate target database: @@ -172,8 +202,9 @@ python scripts/import_csv.py /path/to/filtered/ [database_url] # 6. Create indexes (10-30 min on large datasets) psql -d discogs -f schema/create_indexes.sql -# 7. Deduplicate +# 7. Deduplicate (optionally with label matching) python scripts/dedup_releases.py [database_url] +python scripts/dedup_releases.py --library-labels /path/to/library_labels.csv [database_url] # 8. Prune (dry run first, then with --prune or --copy-to) python scripts/verify_cache.py /path/to/library.db [database_url] @@ -195,6 +226,7 @@ The schema files in `schema/` define the shared contract between this ETL pipeli |-------|-------------| | `release` | Release metadata: id, title, release_year, country, artwork_url | | `release_artist` | Artists on releases (main + extra credits) | +| `release_label` | Label names per release (e.g., Parlophone, Factory Records) | | `release_track` | Tracks on releases with position and duration | | `release_track_artist` | Artists on specific tracks (for compilations) | | `cache_metadata` | Data freshness tracking (cached_at, source) | diff --git a/lib/wxyc.py b/lib/wxyc.py new file mode 100644 index 0000000..7c43a61 --- /dev/null +++ b/lib/wxyc.py @@ -0,0 +1,27 @@ +"""WXYC MySQL connection utilities.""" + +from __future__ import annotations + +from urllib.parse import urlparse + +import pymysql + + +def connect_mysql(db_url: str): + """Connect to MySQL using a URL string. + + Args: + db_url: MySQL connection URL (mysql://user:pass@host:port/dbname). + + Returns: + A pymysql connection object. + """ + parsed = urlparse(db_url) + return pymysql.connect( + host=parsed.hostname or "localhost", + port=parsed.port or 3306, + user=parsed.username or "root", + password=parsed.password or "", + database=parsed.path.lstrip("/"), + charset="utf8", + ) diff --git a/scripts/dedup_releases.py b/scripts/dedup_releases.py index cc27d81..4ab6331 100644 --- a/scripts/dedup_releases.py +++ b/scripts/dedup_releases.py @@ -8,15 +8,22 @@ Expects dedup_delete_ids table to already exist (from a previous run). If not, creates it from the ROW_NUMBER query. +When --library-labels is provided, WXYC label preferences influence the +ranking: releases whose label matches WXYC's known pressing are preferred +over releases with more tracks but a different label. + Usage: - python dedup_releases.py [database_url] + python dedup_releases.py [database_url] [--library-labels ] database_url defaults to postgresql:///discogs """ +import argparse +import csv import logging import sys import time +from pathlib import Path import psycopg @@ -39,6 +46,99 @@ def _track_count_table_exists(conn) -> bool: return cur.fetchone()[0] +def _label_match_table_exists(conn) -> bool: + """Return True if the release_label_match table exists.""" + with conn.cursor() as cur: + cur.execute(""" + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'release_label_match' + ) + """) + return cur.fetchone()[0] + + +def load_library_labels(conn, csv_path: Path) -> int: + """Load WXYC label preferences from CSV into an UNLOGGED table. + + Creates the ``wxyc_label_pref`` table with columns + (artist_name, release_title, label_name) and bulk-loads the CSV. + + Args: + conn: psycopg connection (autocommit=True). + csv_path: Path to library_labels.csv. + + Returns: + Number of rows loaded. + """ + logger.info("Loading WXYC label preferences from %s", csv_path) + + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") + cur.execute(""" + CREATE UNLOGGED TABLE wxyc_label_pref ( + artist_name text NOT NULL, + release_title text NOT NULL, + label_name text NOT NULL + ) + """) + + rows = [] + with open(csv_path, newline="", encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + rows.append((row["artist_name"], row["release_title"], row["label_name"])) + + if rows: + with conn.cursor() as cur: + cur.executemany( + "INSERT INTO wxyc_label_pref (artist_name, release_title, label_name) " + "VALUES (%s, %s, %s)", + rows, + ) + + logger.info("Loaded %d label preferences", len(rows)) + return len(rows) + + +def create_label_match_table(conn) -> int: + """Create release_label_match table by joining Discogs labels to WXYC preferences. + + Marks which release_ids have a label matching WXYC's known pressing + for that (artist, title) pair. Uses lowercased exact matching. + + Requires wxyc_label_pref table to exist (from load_library_labels). + + Args: + conn: psycopg connection (autocommit=True). + + Returns: + Number of releases matched. + """ + logger.info("Creating release_label_match table...") + + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS release_label_match") + cur.execute(""" + CREATE UNLOGGED TABLE release_label_match AS + SELECT DISTINCT rl.release_id, 1 AS label_match + FROM release_label rl + JOIN release r ON r.id = rl.release_id + JOIN release_artist ra ON ra.release_id = r.id AND ra.extra = 0 + JOIN wxyc_label_pref wlp + ON lower(ra.artist_name) = lower(wlp.artist_name) + AND lower(r.title) = lower(wlp.release_title) + AND lower(rl.label_name) = lower(wlp.label_name) + WHERE r.master_id IS NOT NULL + """) + cur.execute("ALTER TABLE release_label_match ADD PRIMARY KEY (release_id)") + cur.execute("SELECT count(*) FROM release_label_match") + count = int(cur.fetchone()[0]) + + logger.info("Matched %d releases to WXYC label preferences", count) + return count + + def ensure_dedup_ids(conn) -> int: """Ensure dedup_delete_ids table exists. Create if needed. @@ -83,19 +183,33 @@ def ensure_dedup_ids(conn) -> int: ") tc ON tc.release_id = r.id" ) + # Optional label matching: prefer releases whose label matches WXYC's pressing + use_label_match = _label_match_table_exists(conn) + if use_label_match: + label_join = "LEFT JOIN release_label_match rlm ON rlm.release_id = r.id" + order_by = ( + "COALESCE(rlm.label_match, 0) DESC, " + "(r.country = 'US')::int DESC, " + "tc.track_count DESC, r.id ASC" + ) + logger.info(" Label matching enabled (release_label_match table found)") + else: + label_join = "" + order_by = "(r.country = 'US')::int DESC, tc.track_count DESC, r.id ASC" + with conn.cursor() as cur: - # track_count_join is built from trusted internal constants, not user input + # All SQL fragments are built from trusted internal constants, not user input cur.execute(f""" CREATE UNLOGGED TABLE dedup_delete_ids AS SELECT id AS release_id FROM ( SELECT r.id, r.master_id, ROW_NUMBER() OVER ( PARTITION BY r.master_id - ORDER BY (r.country = 'US')::int DESC, - tc.track_count DESC, r.id ASC + ORDER BY {order_by} ) as rn FROM release r {track_count_join} + {label_join} WHERE r.master_id IS NOT NULL ) ranked WHERE rn > 1 @@ -245,19 +359,53 @@ def add_constraints_and_indexes(conn) -> None: add_track_constraints_and_indexes(conn) +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + """Parse command-line arguments.""" + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "database_url", + nargs="?", + default="postgresql:///discogs", + help="PostgreSQL connection URL (default: postgresql:///discogs).", + ) + parser.add_argument( + "--library-labels", + type=Path, + default=None, + metavar="FILE", + help="Path to library_labels.csv with WXYC label preferences. " + "When provided, dedup ranking prefers releases whose label " + "matches WXYC's known pressing.", + ) + return parser.parse_args(argv) + + def main(): - db_url = sys.argv[1] if len(sys.argv) > 1 else "postgresql:///discogs" + args = parse_args() + db_url = args.database_url logger.info(f"Connecting to {db_url}") conn = psycopg.connect(db_url, autocommit=True) + # Step 0 (optional): Load WXYC label preferences for label-aware ranking + if args.library_labels: + if not args.library_labels.exists(): + logger.error("Library labels file not found: %s", args.library_labels) + sys.exit(1) + load_library_labels(conn, args.library_labels) + create_label_match_table(conn) + # Step 1: Ensure dedup IDs exist delete_count = ensure_dedup_ids(conn) if delete_count == 0: logger.info("No duplicates found, nothing to do") - # Clean up release_track_count if it exists with conn.cursor() as cur: cur.execute("DROP TABLE IF EXISTS release_track_count") + cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") + cur.execute("DROP TABLE IF EXISTS release_label_match") conn.close() return @@ -303,6 +451,8 @@ def main(): with conn.cursor() as cur: cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") cur.execute("DROP TABLE IF EXISTS release_track_count") + cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") + cur.execute("DROP TABLE IF EXISTS release_label_match") # Step 7: Report with conn.cursor() as cur: diff --git a/scripts/enrich_library_artists.py b/scripts/enrich_library_artists.py index 94d2af4..7d6cccb 100644 --- a/scripts/enrich_library_artists.py +++ b/scripts/enrich_library_artists.py @@ -25,7 +25,6 @@ import sqlite3 import sys from pathlib import Path -from urllib.parse import urlparse logging.basicConfig( level=logging.INFO, @@ -33,10 +32,11 @@ ) logger = logging.getLogger(__name__) -# Import compilation detection from lib/matching.py +# Import shared utilities from lib/ _LIB_DIR = Path(__file__).parent.parent / "lib" sys.path.insert(0, str(_LIB_DIR.parent)) from lib.matching import is_compilation_artist # noqa: E402 +from lib.wxyc import connect_mysql # noqa: E402 def parse_args(argv: list[str] | None = None) -> argparse.Namespace: @@ -94,28 +94,6 @@ def extract_base_artists(library_db_path: Path) -> set[str]: return artists -def connect_mysql(db_url: str): - """Connect to MySQL using a URL string. - - Args: - db_url: MySQL connection URL (mysql://user:pass@host:port/dbname). - - Returns: - A pymysql connection object. - """ - import pymysql - - parsed = urlparse(db_url) - return pymysql.connect( - host=parsed.hostname or "localhost", - port=parsed.port or 3306, - user=parsed.username or "root", - password=parsed.password or "", - database=parsed.path.lstrip("/"), - charset="utf8", - ) - - def extract_alternate_names(conn) -> set[str]: """Extract alternate artist names from LIBRARY_RELEASE. diff --git a/scripts/extract_library_labels.py b/scripts/extract_library_labels.py new file mode 100644 index 0000000..fe3dcb0 --- /dev/null +++ b/scripts/extract_library_labels.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +"""Extract WXYC label preferences from FLOWSHEET_ENTRY_PROD. + +Queries WXYC MySQL for (artist_name, release_title, label_name) triples +from flowsheet plays linked to library releases, then writes them as a CSV +for use by dedup_releases.py --library-labels. + +The JOIN to LIBRARY_RELEASE ensures the release still exists in the library +(deleted releases are excluded). + +Usage: + python scripts/extract_library_labels.py \\ + --wxyc-db-url mysql://user:pass@host:port/db \\ + --output library_labels.csv +""" + +from __future__ import annotations + +import argparse +import csv +import logging +import sys +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +sys.path.insert(0, str(Path(__file__).parent.parent)) +from lib.wxyc import connect_mysql # noqa: E402 + +EXTRACTION_QUERY = """ + SELECT DISTINCT + lc.PRESENTATION_NAME AS artist_name, + lr.TITLE AS release_title, + fe.LABEL_NAME AS label_name + FROM FLOWSHEET_ENTRY_PROD fe + JOIN LIBRARY_RELEASE lr ON fe.LIBRARY_RELEASE_ID = lr.ID + JOIN LIBRARY_CODE lc ON lr.LIBRARY_CODE_ID = lc.ID + WHERE fe.LABEL_NAME IS NOT NULL + AND fe.LABEL_NAME != '' + AND fe.LIBRARY_RELEASE_ID > 0 +""" + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + """Parse command-line arguments.""" + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--wxyc-db-url", + type=str, + required=True, + metavar="URL", + help="MySQL connection URL for WXYC catalog database " + "(e.g. mysql://user:pass@host:port/dbname).", + ) + parser.add_argument( + "--output", + type=Path, + required=True, + metavar="FILE", + help="Output path for library_labels.csv.", + ) + return parser.parse_args(argv) + + +def extract_library_labels(conn) -> set[tuple[str, str, str]]: + """Extract (artist_name, release_title, label_name) triples from WXYC MySQL. + + Args: + conn: MySQL connection (pymysql). + + Returns: + Set of (artist_name, release_title, label_name) tuples, stripped and + deduplicated. Rows with empty/null fields are excluded. + """ + logger.info("Extracting library labels from FLOWSHEET_ENTRY_PROD") + with conn.cursor() as cur: + cur.execute(EXTRACTION_QUERY) + rows = cur.fetchall() + + triples: set[tuple[str, str, str]] = set() + for artist, title, label in rows: + if not artist or not title or not label: + continue + artist_s = artist.strip() + title_s = title.strip() + label_s = label.strip() + if artist_s and title_s and label_s: + triples.add((artist_s, title_s, label_s)) + + logger.info("Extracted %d unique (artist, title, label) triples", len(triples)) + return triples + + +def write_library_labels_csv(triples: set[tuple[str, str, str]], output: Path) -> None: + """Write label triples to a CSV file. + + Args: + triples: Set of (artist_name, release_title, label_name) tuples. + output: Path to the output CSV file. + """ + output.parent.mkdir(parents=True, exist_ok=True) + sorted_rows = sorted(triples) + with open(output, "w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow(["artist_name", "release_title", "label_name"]) + writer.writerows(sorted_rows) + logger.info("Wrote %d label preferences to %s", len(sorted_rows), output) + + +def main() -> None: + args = parse_args() + conn = connect_mysql(args.wxyc_db_url) + try: + triples = extract_library_labels(conn) + finally: + conn.close() + write_library_labels_csv(triples, args.output) + + +if __name__ == "__main__": + main() diff --git a/scripts/run_pipeline.py b/scripts/run_pipeline.py index 18cc478..1b48bae 100644 --- a/scripts/run_pipeline.py +++ b/scripts/run_pipeline.py @@ -100,7 +100,8 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace: metavar="URL", help="MySQL connection URL for WXYC catalog database " "(e.g. mysql://user:pass@host:port/dbname). " - "Enriches library_artists.txt with alternate names and cross-references. " + "Enriches library_artists.txt with alternate names and cross-references, " + "and extracts label preferences for label-aware dedup. " "Requires --library-db.", ) parser.add_argument( @@ -118,6 +119,15 @@ def parse_args(argv: list[str] | None = None) -> argparse.Namespace: help="Copy matched releases to a separate target database instead of " "pruning in place. Requires --library-db.", ) + parser.add_argument( + "--library-labels", + type=Path, + default=None, + metavar="FILE", + help="Path to pre-generated library_labels.csv for label-aware dedup. " + "If omitted but --wxyc-db-url is provided, labels are extracted " + "automatically before dedup.", + ) parser.add_argument( "--resume", action="store_true", @@ -393,6 +403,10 @@ def main() -> None: logger.error("library.db not found: %s", args.library_db) sys.exit(1) + if args.library_labels and not args.library_labels.exists(): + logger.error("library_labels.csv not found: %s", args.library_labels) + sys.exit(1) + # Steps 1-3: XML conversion, newline fix, filtering (only in --xml mode) if args.xml is not None: with tempfile.TemporaryDirectory(prefix="discogs_pipeline_") as tmpdir: @@ -419,7 +433,14 @@ def main() -> None: filter_to_library_artists(library_artists_path, cleaned_csv_dir, filtered_csv_dir) # -- database build (create_schema through vacuum) - _run_database_build(db_url, filtered_csv_dir, args.library_db, python) + _run_database_build( + db_url, + filtered_csv_dir, + args.library_db, + python, + library_labels=args.library_labels, + wxyc_db_url=args.wxyc_db_url, + ) else: # Database build only (--csv-dir mode) state = _load_or_create_state(args) @@ -429,6 +450,8 @@ def main() -> None: args.library_db, python, target_db_url=args.target_db_url, + library_labels=args.library_labels, + wxyc_db_url=args.wxyc_db_url, state=state, state_file=args.state_file, ) @@ -444,6 +467,8 @@ def _run_database_build( python: str, *, target_db_url: str | None = None, + library_labels: Path | None = None, + wxyc_db_url: str | None = None, state: PipelineState | None = None, state_file: Path | None = None, ) -> None: @@ -454,6 +479,10 @@ def _run_database_build( When *target_db_url* is provided, matched releases are copied to the target database instead of pruning the source in place. + + When *library_labels* is provided, the CSV is passed to the dedup step + for label-aware ranking. When *wxyc_db_url* is provided but + *library_labels* is not, labels are extracted automatically before dedup. """ def _save_state() -> None: @@ -497,10 +526,28 @@ def _save_state() -> None: if state and state.is_completed("dedup"): logger.info("Skipping dedup (already completed)") else: - run_step( - "Deduplicate releases", - [python, str(SCRIPT_DIR / "dedup_releases.py"), db_url], - ) + # Resolve library labels CSV for label-aware dedup + labels_csv = library_labels + if labels_csv is None and wxyc_db_url is not None: + labels_csv = Path(tempfile.mkdtemp(prefix="discogs_labels_")) / "library_labels.csv" + run_step( + "Extract WXYC library labels", + [ + python, + str(SCRIPT_DIR / "extract_library_labels.py"), + "--wxyc-db-url", + wxyc_db_url, + "--output", + str(labels_csv), + ], + ) + + dedup_cmd = [python, str(SCRIPT_DIR / "dedup_releases.py")] + if labels_csv is not None: + dedup_cmd.extend(["--library-labels", str(labels_csv)]) + dedup_cmd.append(db_url) + + run_step("Deduplicate releases", dedup_cmd) if state: state.mark_completed("dedup") _save_state() diff --git a/tests/e2e/test_pipeline.py b/tests/e2e/test_pipeline.py index 5523148..086f30c 100644 --- a/tests/e2e/test_pipeline.py +++ b/tests/e2e/test_pipeline.py @@ -232,6 +232,100 @@ def test_null_title_release_not_imported(self) -> None: assert count == 0 +FIXTURE_LIBRARY_LABELS = CSV_DIR / "library_labels.csv" + + +class TestPipelineWithLabels: + """Run pipeline with --library-labels for label-aware dedup. + + Omits --library-db so the prune step is skipped; this test is focused + on verifying that label matching changes the dedup winner. + """ + + @pytest.fixture(autouse=True, scope="class") + def _run_pipeline(self, e2e_db_url): + """Run run_pipeline.py with --library-labels (no prune).""" + self.__class__._db_url = e2e_db_url + + result = subprocess.run( + [ + sys.executable, + str(RUN_PIPELINE), + "--csv-dir", + str(CSV_DIR), + "--library-labels", + str(FIXTURE_LIBRARY_LABELS), + "--database-url", + e2e_db_url, + ], + capture_output=True, + text=True, + timeout=120, + ) + + self.__class__._stdout = result.stdout + self.__class__._stderr = result.stderr + self.__class__._returncode = result.returncode + + if result.returncode != 0: + print("STDOUT:", result.stdout) + print("STDERR:", result.stderr) + + assert result.returncode == 0, ( + f"Pipeline failed (exit {result.returncode}):\n{result.stderr}" + ) + + @pytest.fixture(autouse=True) + def _store_url(self): + self.db_url = self.__class__._db_url + + def _connect(self): + return psycopg.connect(self.db_url) + + def test_label_match_overrides_track_count_master_500(self) -> None: + """Label-aware dedup keeps release 1001 (Parlophone) over 1002 (Capitol, more tracks).""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") + ids = [row[0] for row in cur.fetchall()] + conn.close() + assert ids == [1001], f"Expected only 1001 after label-aware dedup, got {ids}" + + def test_label_match_overrides_track_count_master_600(self) -> None: + """Label-aware dedup keeps release 2001 (Factory) over 2002 (Qwest, more tracks).""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT id FROM release WHERE id IN (2001, 2002) ORDER BY id") + ids = [row[0] for row in cur.fetchall()] + conn.close() + assert ids == [2001], f"Expected only 2001 after label-aware dedup, got {ids}" + + def test_temp_tables_cleaned_up(self) -> None: + """wxyc_label_pref and release_label_match are dropped after dedup.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute(""" + SELECT table_name FROM information_schema.tables + WHERE table_name IN ('wxyc_label_pref', 'release_label_match') + """) + tables = [row[0] for row in cur.fetchall()] + conn.close() + assert tables == [], f"Temp tables should be cleaned up, found {tables}" + + def test_non_label_matched_uses_track_count(self) -> None: + """Releases without label match still use track count ranking. + + Release 3001 (unique master_id 700) and 4001 (no master_id) should + be unaffected by label matching and survive both dedup and prune. + """ + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM release WHERE id IN (3001, 4001)") + count = cur.fetchone()[0] + conn.close() + assert count == 2 + + class TestPipelineWithoutLibrary: """Run pipeline without library.db (skips prune step).""" diff --git a/tests/fixtures/create_fixtures.py b/tests/fixtures/create_fixtures.py index a3cb9d0..b3efcdd 100644 --- a/tests/fixtures/create_fixtures.py +++ b/tests/fixtures/create_fixtures.py @@ -240,6 +240,29 @@ def create_release_image_csv() -> None: write_csv("release_image.csv", headers, rows) +def create_library_labels_csv() -> None: + """Create library_labels.csv with WXYC label preferences. + + These represent labels WXYC actually owns for specific albums, + used to influence dedup ranking via --library-labels. + + The existing release_label.csv has Discogs label data per release: + - 1001: Parlophone, Capitol Records; 1002: Capitol Records; 1003: EMI + - 2001: Factory Records; 2002: Qwest Records + + This fixture says WXYC owns the Parlophone pressing of OK Computer + and the Factory Records pressing of Unknown Pleasures, which causes + label-aware dedup to prefer 1001 over 1002 and 2001 over 2002 + (overriding the default track-count ranking). + """ + headers = ["artist_name", "release_title", "label_name"] + rows = [ + ["Joy Division", "Unknown Pleasures", "Factory Records"], + ["Radiohead", "OK Computer", "Parlophone"], + ] + write_csv("library_labels.csv", headers, rows) + + def create_library_db() -> None: """Create a SQLite library.db with (artist, title) pairs. @@ -331,6 +354,7 @@ def main() -> None: create_release_track_artist_csv() create_release_label_csv() create_release_image_csv() + create_library_labels_csv() print() print("Library data:") create_library_db() diff --git a/tests/fixtures/csv/library_labels.csv b/tests/fixtures/csv/library_labels.csv new file mode 100644 index 0000000..94f4346 --- /dev/null +++ b/tests/fixtures/csv/library_labels.csv @@ -0,0 +1,3 @@ +artist_name,release_title,label_name +Joy Division,Unknown Pleasures,Factory Records +Radiohead,OK Computer,Parlophone diff --git a/tests/integration/test_dedup.py b/tests/integration/test_dedup.py index 8a4c4ed..ce54aff 100644 --- a/tests/integration/test_dedup.py +++ b/tests/integration/test_dedup.py @@ -35,6 +35,8 @@ swap_tables = _dd.swap_tables add_base_constraints_and_indexes = _dd.add_base_constraints_and_indexes add_constraints_and_indexes = _dd.add_constraints_and_indexes +load_library_labels = _dd.load_library_labels +create_label_match_table = _dd.create_label_match_table pytestmark = pytest.mark.postgres @@ -410,6 +412,135 @@ def test_no_duplicates_found(self) -> None: assert count == 0 +def _run_dedup_with_labels(db_url: str, library_labels_csv: Path) -> None: + """Run the dedup pipeline with label-matching enabled.""" + conn = psycopg.connect(db_url, autocommit=True) + load_library_labels(conn, library_labels_csv) + create_label_match_table(conn) + delete_count = ensure_dedup_ids(conn) + if delete_count > 0: + tables = [ + ("release", "new_release", "id, title, release_year, country, artwork_url", "id"), + ( + "release_artist", + "new_release_artist", + "release_id, artist_name, extra", + "release_id", + ), + ( + "release_label", + "new_release_label", + "release_id, label_name", + "release_id", + ), + ( + "cache_metadata", + "new_cache_metadata", + "release_id, cached_at, source, last_validated", + "release_id", + ), + ] + + for old, new, cols, id_col in tables: + copy_table(conn, old, new, cols, id_col) + + with conn.cursor() as cur: + for stmt in [ + "ALTER TABLE release_artist DROP CONSTRAINT IF EXISTS fk_release_artist_release", + "ALTER TABLE release_label DROP CONSTRAINT IF EXISTS fk_release_label_release", + "ALTER TABLE cache_metadata DROP CONSTRAINT IF EXISTS fk_cache_metadata_release", + ]: + cur.execute(stmt) + + for old, new, _, _ in tables: + swap_tables(conn, old, new) + add_base_constraints_and_indexes(conn) + + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") + cur.execute("DROP TABLE IF EXISTS release_track_count") + cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") + cur.execute("DROP TABLE IF EXISTS release_label_match") + conn.close() + + +class TestDedupWithLabels: + """Dedup ranking prefers releases matching WXYC label preferences.""" + + @pytest.fixture(autouse=True, scope="class") + def _set_up_and_dedup(self, db_url): + """Import base fixtures, run label-aware dedup, then import tracks.""" + self.__class__._db_url = db_url + _fresh_import(db_url) + library_labels_csv = CSV_DIR / "library_labels.csv" + _run_dedup_with_labels(db_url, library_labels_csv) + _import_tracks_after_dedup(db_url) + + @pytest.fixture(autouse=True) + def _store_url(self): + self.db_url = self.__class__._db_url + + def _connect(self): + return psycopg.connect(self.db_url) + + def test_label_match_overrides_track_count_master_500(self) -> None: + """Release 1001 (Parlophone, 3 tracks) wins over 1002 (Capitol, 5 tracks).""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") + ids = [row[0] for row in cur.fetchall()] + conn.close() + assert ids == [1001] + + def test_label_match_overrides_track_count_master_600(self) -> None: + """Release 2001 (Factory, 2 tracks) wins over 2002 (Qwest, 4 tracks).""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT id FROM release WHERE id IN (2001, 2002) ORDER BY id") + ids = [row[0] for row in cur.fetchall()] + conn.close() + assert ids == [2001] + + def test_unmatched_releases_use_track_count(self) -> None: + """Releases with unique master_ids are not affected by label matching.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM release WHERE id IN (5001, 5002)") + count = cur.fetchone()[0] + conn.close() + assert count == 2 + + def test_unique_and_null_master_id_untouched(self) -> None: + """Releases with unique or NULL master_id survive label-aware dedup.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM release WHERE id IN (3001, 4001)") + count = cur.fetchone()[0] + conn.close() + assert count == 2 + + def test_label_match_temp_tables_cleaned_up(self) -> None: + """Temp tables wxyc_label_pref and release_label_match are dropped.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM information_schema.tables " + "WHERE table_name IN ('wxyc_label_pref', 'release_label_match')" + ) + count = cur.fetchone()[0] + conn.close() + assert count == 0 + + def test_total_release_count_after_dedup(self) -> None: + """Same total: 15 imported - 3 duplicates = 12 (just different winners).""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM release") + count = cur.fetchone()[0] + conn.close() + assert count == 12 + + class TestDedupFallback: """Verify dedup falls back to release_track when release_track_count doesn't exist.""" diff --git a/tests/unit/test_extract_library_labels.py b/tests/unit/test_extract_library_labels.py new file mode 100644 index 0000000..92f7634 --- /dev/null +++ b/tests/unit/test_extract_library_labels.py @@ -0,0 +1,135 @@ +"""Unit tests for scripts/extract_library_labels.py.""" + +from __future__ import annotations + +import csv +import importlib.util +import sys +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +# Load module from scripts directory +_SCRIPT_PATH = Path(__file__).parent.parent.parent / "scripts" / "extract_library_labels.py" +_spec = importlib.util.spec_from_file_location("extract_library_labels", _SCRIPT_PATH) +assert _spec is not None and _spec.loader is not None +_mod = importlib.util.module_from_spec(_spec) +sys.modules["extract_library_labels"] = _mod +_spec.loader.exec_module(_mod) + +extract_library_labels = _mod.extract_library_labels +write_library_labels_csv = _mod.write_library_labels_csv +parse_args = _mod.parse_args + + +class TestExtractLibraryLabels: + """Extracting (artist, title, label) triples from a MySQL cursor.""" + + def test_returns_set_of_triples(self) -> None: + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_cursor.fetchall.return_value = [ + ("Radiohead", "OK Computer", "Parlophone"), + ("Joy Division", "Unknown Pleasures", "Factory Records"), + ] + result = extract_library_labels(mock_conn) + assert result == { + ("Radiohead", "OK Computer", "Parlophone"), + ("Joy Division", "Unknown Pleasures", "Factory Records"), + } + + def test_strips_whitespace(self) -> None: + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_cursor.fetchall.return_value = [ + (" Radiohead ", " OK Computer ", " Parlophone "), + ] + result = extract_library_labels(mock_conn) + assert result == {("Radiohead", "OK Computer", "Parlophone")} + + def test_skips_empty_fields(self) -> None: + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_cursor.fetchall.return_value = [ + ("Radiohead", "OK Computer", "Parlophone"), + ("", "OK Computer", "Parlophone"), # empty artist + ("Radiohead", "", "Parlophone"), # empty title + ("Radiohead", "OK Computer", ""), # empty label + (None, "OK Computer", "Parlophone"), # null artist + ] + result = extract_library_labels(mock_conn) + assert result == {("Radiohead", "OK Computer", "Parlophone")} + + def test_empty_result_set(self) -> None: + mock_conn = MagicMock() + mock_cursor = MagicMock() + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + mock_cursor.fetchall.return_value = [] + result = extract_library_labels(mock_conn) + assert result == set() + + +class TestWriteLibraryLabelsCsv: + """Writing label triples to CSV.""" + + def test_writes_correct_headers(self, tmp_path: Path) -> None: + output = tmp_path / "labels.csv" + write_library_labels_csv( + {("Radiohead", "OK Computer", "Parlophone")}, + output, + ) + with open(output, newline="", encoding="utf-8") as f: + reader = csv.reader(f) + headers = next(reader) + assert headers == ["artist_name", "release_title", "label_name"] + + def test_writes_sorted_rows(self, tmp_path: Path) -> None: + output = tmp_path / "labels.csv" + write_library_labels_csv( + { + ("Radiohead", "OK Computer", "Parlophone"), + ("Joy Division", "Unknown Pleasures", "Factory Records"), + }, + output, + ) + with open(output, newline="", encoding="utf-8") as f: + reader = csv.reader(f) + next(reader) # skip header + rows = list(reader) + assert rows[0] == ["Joy Division", "Unknown Pleasures", "Factory Records"] + assert rows[1] == ["Radiohead", "OK Computer", "Parlophone"] + + def test_empty_set_writes_header_only(self, tmp_path: Path) -> None: + output = tmp_path / "labels.csv" + write_library_labels_csv(set(), output) + with open(output, newline="", encoding="utf-8") as f: + reader = csv.reader(f) + headers = next(reader) + rows = list(reader) + assert headers == ["artist_name", "release_title", "label_name"] + assert rows == [] + + +class TestParseArgs: + """CLI argument parsing.""" + + def test_required_args(self) -> None: + args = parse_args(["--wxyc-db-url", "mysql://u:p@h/db", "--output", "out.csv"]) + assert args.wxyc_db_url == "mysql://u:p@h/db" + assert args.output == Path("out.csv") + + def test_missing_wxyc_db_url_exits(self) -> None: + with pytest.raises(SystemExit): + parse_args(["--output", "out.csv"]) + + def test_missing_output_exits(self) -> None: + with pytest.raises(SystemExit): + parse_args(["--wxyc-db-url", "mysql://u:p@h/db"]) diff --git a/tests/unit/test_run_pipeline.py b/tests/unit/test_run_pipeline.py index 366e2de..2b9dcfd 100644 --- a/tests/unit/test_run_pipeline.py +++ b/tests/unit/test_run_pipeline.py @@ -136,3 +136,30 @@ def test_target_db_url_requires_library_db(self) -> None: "postgresql://localhost/target", ] ) + + def test_library_labels_parsed(self) -> None: + args = run_pipeline.parse_args( + [ + "--csv-dir", + "/tmp/csv", + "--library-labels", + "/tmp/library_labels.csv", + ] + ) + assert args.library_labels == Path("/tmp/library_labels.csv") + + def test_library_labels_default_none(self) -> None: + args = run_pipeline.parse_args(["--csv-dir", "/tmp/csv"]) + assert args.library_labels is None + + def test_wxyc_db_url_requires_library_db(self) -> None: + """--wxyc-db-url without --library-db should error.""" + with pytest.raises(SystemExit): + run_pipeline.parse_args( + [ + "--csv-dir", + "/tmp/csv", + "--wxyc-db-url", + "mysql://user:pass@host/db", + ] + ) diff --git a/tests/unit/test_wxyc.py b/tests/unit/test_wxyc.py new file mode 100644 index 0000000..25dbe4f --- /dev/null +++ b/tests/unit/test_wxyc.py @@ -0,0 +1,44 @@ +"""Unit tests for lib/wxyc.py.""" + +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import patch + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) +from lib.wxyc import connect_mysql + + +class TestConnectMysql: + """connect_mysql parses URL components and passes them to pymysql.connect.""" + + @patch("lib.wxyc.pymysql") + def test_standard_url(self, mock_pymysql) -> None: + connect_mysql("mysql://user:pass@dbhost:3307/wxycmusic") + mock_pymysql.connect.assert_called_once_with( + host="dbhost", + port=3307, + user="user", + password="pass", + database="wxycmusic", + charset="utf8", + ) + + @patch("lib.wxyc.pymysql") + def test_default_port(self, mock_pymysql) -> None: + connect_mysql("mysql://user:pass@dbhost/mydb") + call_kwargs = mock_pymysql.connect.call_args[1] + assert call_kwargs["port"] == 3306 + + @patch("lib.wxyc.pymysql") + def test_no_password(self, mock_pymysql) -> None: + connect_mysql("mysql://user@dbhost:3306/mydb") + call_kwargs = mock_pymysql.connect.call_args[1] + assert call_kwargs["user"] == "user" + assert call_kwargs["password"] == "" + + @patch("lib.wxyc.pymysql") + def test_returns_connection(self, mock_pymysql) -> None: + result = connect_mysql("mysql://user:pass@dbhost:3306/mydb") + assert result == mock_pymysql.connect.return_value