Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions lib/db_introspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def column_exists(db_url: str, table_name: str, column_name: str) -> bool:
return result


def trigram_indexes_exist(db_url: str) -> bool:
"""Return True if trigram GIN indexes exist on the expected tables."""
def _get_trigram_indexes(db_url: str) -> set[str]:
"""Return the set of trigram index names in the public schema."""
conn = psycopg.connect(db_url)
with conn.cursor() as cur:
cur.execute(
Expand All @@ -63,15 +63,37 @@ def trigram_indexes_exist(db_url: str) -> bool:
)
indexes = {row[0] for row in cur.fetchall()}
conn.close()
return indexes


def base_trigram_indexes_exist(db_url: str) -> bool:
"""Return True if base trigram GIN indexes exist (release, release_artist)."""
indexes = _get_trigram_indexes(db_url)
expected = {
"idx_release_track_title_trgm",
"idx_release_artist_name_trgm",
"idx_release_track_artist_name_trgm",
"idx_release_title_trgm",
}
return expected.issubset(indexes)


def track_trigram_indexes_exist(db_url: str) -> bool:
"""Return True if track trigram GIN indexes exist (release_track, release_track_artist)."""
indexes = _get_trigram_indexes(db_url)
expected = {
"idx_release_track_title_trgm",
"idx_release_track_artist_name_trgm",
}
return expected.issubset(indexes)


def trigram_indexes_exist(db_url: str) -> bool:
"""Return True if all trigram GIN indexes exist (base + track).

Backward-compatible convenience function.
"""
return base_trigram_indexes_exist(db_url) and track_trigram_indexes_exist(db_url)


def infer_pipeline_state(db_url: str) -> PipelineState:
"""Infer pipeline state from database structure.

Expand All @@ -91,13 +113,21 @@ def infer_pipeline_state(db_url: str) -> PipelineState:
return state
state.mark_completed("import_csv")

if not trigram_indexes_exist(db_url):
if not base_trigram_indexes_exist(db_url):
return state
state.mark_completed("create_indexes")

if column_exists(db_url, "release", "master_id"):
return state
state.mark_completed("dedup")

if not table_has_rows(db_url, "release_track"):
return state
state.mark_completed("import_tracks")

if not track_trigram_indexes_exist(db_url):
return state
state.mark_completed("create_track_indexes")

# prune and vacuum cannot be inferred from database state
return state
58 changes: 55 additions & 3 deletions lib/pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,21 @@
import json
from pathlib import Path

VERSION = 1
VERSION = 2

STEP_NAMES = ["create_schema", "import_csv", "create_indexes", "dedup", "prune", "vacuum"]
STEP_NAMES = [
"create_schema",
"import_csv",
"create_indexes",
"dedup",
"import_tracks",
"create_track_indexes",
"prune",
"vacuum",
]

# Mapping from v1 step names to v2 equivalents for migration
_V1_STEP_NAMES = ["create_schema", "import_csv", "create_indexes", "dedup", "prune", "vacuum"]


class PipelineState:
Expand Down Expand Up @@ -64,11 +76,51 @@ def save(self, path: Path) -> None:

@classmethod
def load(cls, path: Path) -> PipelineState:
"""Load state from a JSON file."""
"""Load state from a JSON file.

Supports v1 state files by migrating them to v2 format.
"""
data = json.loads(path.read_text())
version = data.get("version")

if version == 1:
return cls._migrate_v1(data)
if version != VERSION:
raise ValueError(f"Unsupported state file version {version} (expected {VERSION})")

state = cls(db_url=data["database_url"], csv_dir=data["csv_dir"])
state._steps = data["steps"]
return state

@classmethod
def _migrate_v1(cls, data: dict) -> PipelineState:
"""Migrate a v1 state file to v2 format.

V2 adds import_tracks and create_track_indexes between dedup and prune.

Migration rules:
- All v1 steps map directly to their v2 equivalents
- If import_csv was completed in v1, import_tracks is also completed
(v1 imported tracks as part of import_csv)
- If create_indexes or dedup was completed in v1, create_track_indexes
is also completed (v1 created track indexes during those steps)
"""
state = cls(db_url=data["database_url"], csv_dir=data["csv_dir"])
v1_steps = data.get("steps", {})

# Copy v1 steps that exist in v2
for step_name in _V1_STEP_NAMES:
if step_name in v1_steps:
state._steps[step_name] = v1_steps[step_name]

# Infer import_tracks from import_csv
if v1_steps.get("import_csv", {}).get("status") == "completed":
state._steps["import_tracks"] = {"status": "completed"}

# Infer create_track_indexes from dedup (v1 created all indexes in dedup)
if v1_steps.get("dedup", {}).get("status") == "completed":
state._steps["create_track_indexes"] = {"status": "completed"}
elif v1_steps.get("create_indexes", {}).get("status") == "completed":
state._steps["create_track_indexes"] = {"status": "completed"}

return state
7 changes: 7 additions & 0 deletions schema/create_database.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ CREATE TABLE IF NOT EXISTS release_artist (
extra integer DEFAULT 0 -- 0 = main artist, 1 = extra credit
);

-- Labels on releases
CREATE TABLE IF NOT EXISTS release_label (
release_id integer NOT NULL REFERENCES release(id) ON DELETE CASCADE,
label_name text NOT NULL
);

-- Tracks on releases
CREATE TABLE IF NOT EXISTS release_track (
release_id integer NOT NULL REFERENCES release(id) ON DELETE CASCADE,
Expand Down Expand Up @@ -69,6 +75,7 @@ CREATE TABLE IF NOT EXISTS cache_metadata (

-- Foreign key indexes
CREATE INDEX IF NOT EXISTS idx_release_artist_release_id ON release_artist(release_id);
CREATE INDEX IF NOT EXISTS idx_release_label_release_id ON release_label(release_id);
CREATE INDEX IF NOT EXISTS idx_release_track_release_id ON release_track(release_id);
CREATE INDEX IF NOT EXISTS idx_release_track_artist_release_id ON release_track_artist(release_id);

Expand Down
28 changes: 8 additions & 20 deletions schema/create_indexes.sql
Original file line number Diff line number Diff line change
@@ -1,35 +1,23 @@
-- Create trigram indexes for fuzzy text search
-- Run AFTER data import: psql -U postgres -d discogs -f 05-create-indexes.sql
-- Create base trigram indexes for fuzzy text search
-- Run AFTER base data import (release, release_artist).
-- Track-related indexes are in create_track_indexes.sql (run after track import).
--
-- These indexes enable fast fuzzy matching using pg_trgm extension.
-- Index creation on large tables can take 10-30 minutes.

-- Ensure extension is loaded
CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- ============================================
-- Trigram indexes for fuzzy text search
-- Base trigram indexes for fuzzy text search
-- ============================================

-- 1. Track title search: "Find releases containing track 'Blue Monday'"
-- Used by: search_releases_by_track()
-- Query pattern: WHERE lower(f_unaccent(title)) % $1 OR lower(f_unaccent(title)) ILIKE ...
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_track_title_trgm
ON release_track USING GIN (lower(f_unaccent(title)) gin_trgm_ops);

-- 2. Artist name search on releases: "Find releases by 'New Order'"
-- 1. Artist name search on releases: "Find releases by 'New Order'"
-- Used by: search_releases_by_track() artist filter
-- Query pattern: WHERE lower(f_unaccent(artist_name)) % $1
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_artist_name_trgm
ON release_artist USING GIN (lower(f_unaccent(artist_name)) gin_trgm_ops);

-- 3. Track artist search: "Find compilation tracks by 'Joy Division'"
-- Used by: validate_track_on_release() for compilations
-- Query pattern: WHERE lower(f_unaccent(artist_name)) % $1
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_track_artist_name_trgm
ON release_track_artist USING GIN (lower(f_unaccent(artist_name)) gin_trgm_ops);

-- 4. Release title search: "Find releases named 'Power, Corruption & Lies'"
-- 2. Release title search: "Find releases named 'Power, Corruption & Lies'"
-- Used by: get_release searches
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_title_trgm
ON release USING GIN (lower(f_unaccent(title)) gin_trgm_ops);
Expand All @@ -48,6 +36,6 @@ ON release USING GIN (lower(f_unaccent(title)) gin_trgm_ops);

-- Test trigram search (should use index)
-- EXPLAIN ANALYZE
-- SELECT * FROM release_track
-- WHERE lower(f_unaccent(title)) % 'blue monday'
-- SELECT * FROM release_artist
-- WHERE lower(f_unaccent(artist_name)) % 'new order'
-- LIMIT 10;
52 changes: 52 additions & 0 deletions schema/create_track_indexes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- Create track-related FK constraints, FK indexes, and trigram indexes.
-- Run AFTER track import (release_track, release_track_artist).
--
-- Base indexes are in create_indexes.sql (run after base import).
-- This file is idempotent: safe to run on resume.

-- Ensure extension is loaded
CREATE EXTENSION IF NOT EXISTS pg_trgm;

-- ============================================
-- FK constraints (idempotent via DO blocks)
-- ============================================

DO $$
BEGIN
ALTER TABLE release_track ADD CONSTRAINT fk_release_track_release
FOREIGN KEY (release_id) REFERENCES release(id) ON DELETE CASCADE;
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;

DO $$
BEGIN
ALTER TABLE release_track_artist ADD CONSTRAINT fk_release_track_artist_release
FOREIGN KEY (release_id) REFERENCES release(id) ON DELETE CASCADE;
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;

-- ============================================
-- FK indexes
-- ============================================

CREATE INDEX IF NOT EXISTS idx_release_track_release_id
ON release_track(release_id);

CREATE INDEX IF NOT EXISTS idx_release_track_artist_release_id
ON release_track_artist(release_id);

-- ============================================
-- Trigram indexes for fuzzy text search
-- ============================================

-- Track title search: "Find releases containing track 'Blue Monday'"
-- Used by: search_releases_by_track()
-- Query pattern: WHERE lower(f_unaccent(title)) % $1
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_track_title_trgm
ON release_track USING GIN (lower(f_unaccent(title)) gin_trgm_ops);

-- Track artist search: "Find compilation tracks by 'Joy Division'"
-- Used by: validate_track_on_release() for compilations
-- Query pattern: WHERE lower(f_unaccent(artist_name)) % $1
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_release_track_artist_name_trgm
ON release_track_artist USING GIN (lower(f_unaccent(artist_name)) gin_trgm_ops);
Loading