diff --git a/app/api/torznab/api.py b/app/api/torznab/api.py index ff3df63..651d519 100644 --- a/app/api/torznab/api.py +++ b/app/api/torznab/api.py @@ -3,6 +3,9 @@ from datetime import datetime, timezone from typing import List, Optional import xml.etree.ElementTree as ET +import threading +import time +from urllib.parse import urlencode from fastapi import Depends, Query, Request, Response from fastapi.responses import Response as FastAPIResponse @@ -31,10 +34,19 @@ ) from app.utils.magnet import _site_prefix from app.utils.movie_year import get_movie_year +from app.utils.http_client import get as http_get from . import router from .utils import _build_item, _caps_xml, _require_apikey, _rss_root +_SKYHOOK_SEARCH_URL = "https://skyhook.sonarr.tv/v1/tvdb/search/en/" +_SKYHOOK_SHOW_URL = "https://skyhook.sonarr.tv/v1/tvdb/shows/en/{tvdb_id}" +_TVSEARCH_ID_CACHE_TTL_SECONDS = 300.0 +_TVSEARCH_ID_CACHE_MAX_ENTRIES = 512 +_TVSEARCH_SKYHOOK_CACHE_LOCK = threading.Lock() +_TVSEARCH_TERM_TO_TVDB_CACHE: dict[str, tuple[float, int]] = {} +_TVSEARCH_TVDB_TO_TITLE_CACHE: dict[int, tuple[float, str]] = {} + def _default_languages_for_site(site: str) -> List[str]: """ @@ -58,6 +70,152 @@ def _default_languages_for_site(site: str) -> List[str]: return list(fallback) +def _coerce_positive_int(value: object) -> Optional[int]: + """ + Coerce an arbitrary value into a positive integer. + + Parameters: + value (object): Value to convert to an integer. + + Returns: + The parsed positive integer if conversion succeeds and is greater than zero, `None` otherwise. + """ + try: + parsed = int(value) # type: ignore[arg-type] + except (TypeError, ValueError): + return None + return parsed if parsed > 0 else None + + +def _cache_get_term_tvdb(term: str) -> Optional[int]: + """Return cached tvdb id for a SkyHook search term when entry is fresh.""" + now = time.time() + with _TVSEARCH_SKYHOOK_CACHE_LOCK: + entry = _TVSEARCH_TERM_TO_TVDB_CACHE.get(term) + if not entry: + return None + cached_at, cached_tvdb = entry + if now - cached_at > _TVSEARCH_ID_CACHE_TTL_SECONDS: + _TVSEARCH_TERM_TO_TVDB_CACHE.pop(term, None) + return None + return cached_tvdb + + +def _cache_set_term_tvdb(term: str, tvdb_id: int) -> None: + """Cache tvdb id for a SkyHook search term with TTL.""" + with _TVSEARCH_SKYHOOK_CACHE_LOCK: + _TVSEARCH_TERM_TO_TVDB_CACHE[term] = (time.time(), tvdb_id) + if len(_TVSEARCH_TERM_TO_TVDB_CACHE) > _TVSEARCH_ID_CACHE_MAX_ENTRIES: + oldest = min( + _TVSEARCH_TERM_TO_TVDB_CACHE.items(), key=lambda item: item[1][0] + )[0] + _TVSEARCH_TERM_TO_TVDB_CACHE.pop(oldest, None) + + +def _cache_get_tvdb_title(tvdb_id: int) -> Optional[str]: + """Return cached SkyHook show title for tvdb id when entry is fresh.""" + now = time.time() + with _TVSEARCH_SKYHOOK_CACHE_LOCK: + entry = _TVSEARCH_TVDB_TO_TITLE_CACHE.get(tvdb_id) + if not entry: + return None + cached_at, cached_title = entry + if now - cached_at > _TVSEARCH_ID_CACHE_TTL_SECONDS: + _TVSEARCH_TVDB_TO_TITLE_CACHE.pop(tvdb_id, None) + return None + return cached_title + + +def _cache_set_tvdb_title(tvdb_id: int, title: str) -> None: + """Cache SkyHook show title for tvdb id with TTL.""" + with _TVSEARCH_SKYHOOK_CACHE_LOCK: + _TVSEARCH_TVDB_TO_TITLE_CACHE[tvdb_id] = (time.time(), title) + if len(_TVSEARCH_TVDB_TO_TITLE_CACHE) > _TVSEARCH_ID_CACHE_MAX_ENTRIES: + oldest = min( + _TVSEARCH_TVDB_TO_TITLE_CACHE.items(), key=lambda item: item[1][0] + )[0] + _TVSEARCH_TVDB_TO_TITLE_CACHE.pop(oldest, None) + + +def _resolve_tvsearch_query_from_ids( + *, + tvdbid: Optional[int], + tmdbid: Optional[int], + imdbid: Optional[str], +) -> Optional[str]: + """ + Resolve a canonical TV series title from provided Torznab identifiers. + + If a positive `tvdbid` is supplied, the function looks up the show title for that ID. + If not, it attempts to resolve a `tvdbid` by querying SkyHook using `tmdbid` and/or `imdbid`, + then looks up the show title for the resolved `tvdbid`. + + Returns: + title (str): The resolved show title when found. + None: If no title could be resolved. + """ + tvdb_id = _coerce_positive_int(tvdbid) + if tvdb_id is None: + lookup_terms: List[str] = [] + tmdb = _coerce_positive_int(tmdbid) + imdb = (imdbid or "").strip() + if tmdb is not None: + lookup_terms.append(f"tmdb:{tmdb}") + if imdb: + lookup_terms.append(f"imdb:{imdb}") + + for term in lookup_terms: + cached_tvdb = _cache_get_term_tvdb(term) + if cached_tvdb is not None: + tvdb_id = cached_tvdb + break + try: + query = urlencode({"term": term}) + response = http_get( + f"{_SKYHOOK_SEARCH_URL}?{query}", + timeout=8.0, + ) + response.raise_for_status() + payload = response.json() + except Exception as exc: + logger.debug("SkyHook ID search failed for '{}': {}", term, exc) + continue + if not isinstance(payload, list): + continue + for item in payload: + if not isinstance(item, dict): + continue + candidate = _coerce_positive_int(item.get("tvdbId")) + if candidate is not None: + tvdb_id = candidate + _cache_set_term_tvdb(term, candidate) + break + if tvdb_id is not None: + break + + if tvdb_id is None: + return None + + cached_title = _cache_get_tvdb_title(tvdb_id) + if cached_title is not None: + return cached_title + + try: + response = http_get(_SKYHOOK_SHOW_URL.format(tvdb_id=tvdb_id), timeout=8.0) + response.raise_for_status() + payload = response.json() + except Exception as exc: + logger.debug("SkyHook show lookup failed for tvdb {}: {}", tvdb_id, exc) + return None + + if not isinstance(payload, dict): + return None + title = str(payload.get("title") or "").strip() + if title: + _cache_set_tvdb_title(tvdb_id, title) + return title or None + + def _try_mapped_special_probe( *, tn_module, @@ -68,9 +226,27 @@ def _try_mapped_special_probe( special_map, ) -> tuple[bool, Optional[int], Optional[str], Optional[str], int, int, int, int]: """ - Probe mapped AniWorld special coordinates using cache first, then live probe. + Probe availability and quality for an AniWorld special that maps to a different source episode, using cached availability when possible. - Returns availability tuple together with resolved source/alias coordinates. + Parameters: + tn_module: Provider module exposing `get_availability` and `probe_episode_quality` used to fetch cached availability or probe live quality. + session (Session): Database/session object used by `get_availability`. + slug (str): Show identifier used for probing. + lang (str): Language to probe (e.g., "German Dub"). + site_found (str): Catalogue site name where the source episode is hosted. + special_map: Mapping object containing `source_season`, `source_episode`, `alias_season`, and `alias_episode` that describe the source coordinates and their alias. + + Returns: + tuple: ( + available (bool): `True` if the source episode is available, `False` otherwise, + height (Optional[int]): video height in pixels if known, otherwise `None`, + vcodec (Optional[str]): video codec identifier if known, otherwise `None`, + provider (Optional[str]): provider name that supplied the quality info if known, otherwise `None`, + source_season (int): season number of the mapped source episode, + source_episode (int): episode number of the mapped source episode, + alias_season (int): alias season number requested, + alias_episode (int): alias episode number requested + ) """ source_season = special_map.source_season source_episode = special_map.source_episode @@ -757,10 +933,10 @@ def torznab_api( raise HTTPException(status_code=400, detail="invalid t") - # require at least q, and either both season+ep or only season (we'll default ep=1) + # require season and either query text or resolvable identifier hints. import app.api.torznab as tn - if q is None or season is None: + if season is None: rss, _channel = _rss_root() xml = ET.tostring(rss, encoding="utf-8", xml_declaration=True).decode("utf-8") logger.debug("Returning empty RSS feed due to missing parameters.") @@ -770,10 +946,29 @@ def torznab_api( ep = 1 # from here on, non-None - assert season is not None and ep is not None and q is not None + assert season is not None and ep is not None season_i = int(season) ep_i = int(ep) - q_str = str(q) + q_str = (q or "").strip() + if not q_str: + q_str = ( + _resolve_tvsearch_query_from_ids( + tvdbid=tvdbid, + tmdbid=tmdbid, + imdbid=imdbid, + ) + or "" + ).strip() + if q_str: + logger.debug( + "tvsearch: resolved missing q from identifiers to '{}'", + q_str, + ) + if not q_str: + rss, _channel = _rss_root() + xml = ET.tostring(rss, encoding="utf-8", xml_declaration=True).decode("utf-8") + logger.debug("Returning empty RSS feed due to unresolved query.") + return Response(content=xml, media_type="application/rss+xml; charset=utf-8") logger.debug( f"Searching for slug for query '{q_str}' (season={season_i}, ep={ep_i})" diff --git a/app/utils/title_resolver.py b/app/utils/title_resolver.py index 4845ca7..04a424c 100644 --- a/app/utils/title_resolver.py +++ b/app/utils/title_resolver.py @@ -3,6 +3,7 @@ import json import re import time +from difflib import SequenceMatcher from pathlib import Path from urllib.parse import quote from typing import Dict, List, Optional, Set, Tuple @@ -39,6 +40,34 @@ "megakino": get_provider("megakino"), } +# Common stop words across EN/DE/FR/ES used to reduce noisy token overlap. +_MATCH_STOPWORDS = { + "a", + "an", + "and", + "das", + "de", + "del", + "der", + "die", + "du", + "el", + "en", + "et", + "la", + "le", + "les", + "los", + "of", + "the", + "und", + "y", +} + +# Minimum confidence to accept an index-based title match. +_MIN_TITLE_MATCH_SCORE = 3.5 +_SIMILARITY_MIN_F1 = 0.2 + # suppress repetitive logging from _extract_slug by emitting each message only once _extracted_any: bool = False _no_slug_warned: bool = False @@ -503,16 +532,84 @@ def _normalize_tokens(s: str) -> Set[str]: def _normalize_alnum(s: str) -> str: - """Lowercase and filter to alphanumeric characters.""" + """ + Produce a lowercase string containing only the alphanumeric characters from the input. + + Returns: + str: The input lowercased with all non-alphanumeric characters removed. + """ return "".join(ch.lower() for ch in s if ch.isalnum()) +def _match_tokens(s: str) -> Set[str]: + """ + Produce a set of query/title tokens suitable for matching by removing common stopwords. + + If removing stopwords would remove every token, returns the original normalized token set. + + Returns: + Set[str]: Tokens lowercased and split on non-alphanumeric boundaries with common stopwords removed; if that yields an empty set, the unfiltered normalized tokens are returned. + """ + tokens = _normalize_tokens(s) + if not tokens: + return set() + filtered = {token for token in tokens if token not in _MATCH_STOPWORDS} + return filtered or tokens + + +def _score_title_candidate( + query_tokens: Set[str], query_norm: str, candidate_title: str +) -> float: + """ + Assigns a numeric relevance score indicating how well `candidate_title` matches the query. + + The score increases with token overlap and balance between precision and recall (F1), and is boosted for exact or substring matches and for higher normalized string similarity. + + Returns: + float: Relevance score (higher is better). Returns 0.0 when there is no meaningful match. + """ + title_tokens = _match_tokens(candidate_title) + title_norm = _normalize_alnum(candidate_title) + if not query_tokens or not title_tokens or not title_norm: + return 0.0 + + intersection = len(query_tokens & title_tokens) + if intersection <= 0: + return 0.0 + + precision = intersection / max(1, len(title_tokens)) + recall = intersection / max(1, len(query_tokens)) + f1 = ( + (2.0 * precision * recall / (precision + recall)) + if (precision + recall) > 0 + else 0.0 + ) + ratio = 0.0 + if query_norm and title_norm and f1 >= _SIMILARITY_MIN_F1: + ratio = SequenceMatcher(None, query_norm, title_norm).ratio() + exact = 1.0 if query_norm and query_norm == title_norm else 0.0 + contains = ( + 1.0 + if query_norm and (query_norm in title_norm or title_norm in query_norm) + else 0.0 + ) + + return ( + (8.0 * exact) + + (1.5 * contains) + + (3.0 * f1) + + (2.0 * precision) + + (1.0 * recall) + + ratio + ) + + def _build_sto_search_terms(query: str) -> List[str]: - """Build ordered S.to search variants from a raw query. + """ + Builds ordered search variants for S.to from a raw query. - Returns the raw query, a compact alphanumeric-only variant, and a dashed - variant when the compact form is numeric with length >= 3. Empty values are - filtered and the list is de-duplicated while preserving order. + Returns: + terms (List[str]): Ordered, de-duplicated list of non-empty search variants including the original trimmed query, a compact alphanumeric-only variant when different, and a dashed numeric variant when the compact form is all digits of length >= 3. """ raw = (query or "").strip() if not raw: @@ -584,23 +681,35 @@ def _search_sto_slug(query: str) -> Optional[str]: def slug_from_query(q: str, site: Optional[str] = None) -> Optional[Tuple[str, str]]: """ - Find the best-matching site and slug for a free-text query by comparing token overlap with titles and alternative titles. + Determine the best matching catalog site and slug for a free-text series query. Parameters: - q (str): Free-text query used to match against series titles. - site (Optional[str]): If provided, restricts the search to this site; otherwise searches all configured sites. + q (str): Free-text query to match against site indexes and alternative titles. + site (Optional[str]): If provided, restrict search to this site; otherwise searches configured catalog sites and applies site-specific fallbacks. Returns: - Optional[Tuple[str, str]]: `(site, slug)` of the best match, `None` if the query is empty or no match is found. + Optional[Tuple[str, str]]: Tuple `(site, slug)` for the best match, or `None` if the query is empty or no acceptable match is found. """ if not q: return None def _search_sites(sites: List[str]) -> Optional[Tuple[str, str]]: - q_tokens = _normalize_tokens(q) + """ + Finds the best matching (site, slug) for the current free-text query across the given sites. + + Evaluates each site's cached index and alternative titles using the module's title-scoring logic, returning the site and slug with the highest score that meets the minimum match threshold. For sites without an index, attempts a search-only slug derivation; if no indexed match is found and "s.to" is among the sites, queries the S.to suggest API as a fallback. + + Parameters: + sites (List[str]): Ordered list of site identifiers to search. + + Returns: + Optional[Tuple[str, str]]: `(site, slug)` of the best match if a candidate meets the minimum score, otherwise `None`. + """ + q_tokens = _match_tokens(q) + q_norm = _normalize_alnum(q) best_slug: Optional[str] = None best_site: Optional[str] = None - best_score = 0 + best_score = 0.0 for search_site in sites: index = load_or_refresh_index(search_site) # slug -> display title @@ -614,32 +723,36 @@ def _search_sites(sites: List[str]) -> Optional[Tuple[str, str]]: alts = load_or_refresh_alternatives(search_site) # slug -> [titles] for slug, main_title in index.items(): - # Start with main title tokens - titles_for_slug: List[str] = [main_title] + titles_for_slug: List[str] = [] + seen_titles: Set[str] = set() + if main_title: + titles_for_slug.append(main_title) + seen_titles.add(main_title) alt_list = alts.get(slug) if alt_list: - titles_for_slug.extend(alt_list) + for alt_title in alt_list: + if alt_title and alt_title not in seen_titles: + titles_for_slug.append(alt_title) + seen_titles.add(alt_title) # Evaluate best overlap score across all candidate titles - local_best = 0 + local_best = 0.0 for candidate in titles_for_slug: - t_tokens = _normalize_tokens(candidate) - inter = len(q_tokens & t_tokens) - if inter > local_best: - local_best = inter + score = _score_title_candidate(q_tokens, q_norm, candidate) + if score > local_best: + local_best = score if local_best > best_score: best_score = local_best best_slug = slug best_site = search_site - if best_slug and best_site: + if best_slug and best_site and best_score >= _MIN_TITLE_MATCH_SCORE: return (best_site, best_slug) - for search_site in sites: - if search_site == "s.to": - api_slug = _search_sto_slug(q) - if api_slug: - return (search_site, api_slug) + if "s.to" in sites: + api_slug = _search_sto_slug(q) + if api_slug: + return ("s.to", api_slug) return None if site: diff --git a/tests/test_title_resolver_sto.py b/tests/test_title_resolver_sto.py index 539ef12..7bcf0f4 100644 --- a/tests/test_title_resolver_sto.py +++ b/tests/test_title_resolver_sto.py @@ -69,3 +69,50 @@ def _fake_malformed_first(url: str, timeout: int = 15): monkeypatch.setattr(tr, "http_get", _fake_malformed_first) assert tr._search_sto_slug("911") == "9-1-1" + + +def test_slug_from_query_prefers_precise_title_over_shared_token( + monkeypatch, +) -> None: + monkeypatch.setattr(tr, "CATALOG_SITES_LIST", ["aniworld.to", "s.to"]) + + index_by_site = { + "aniworld.to": { + "the-ossan-newbie-adventurer": ( + "The Ossan Newbie Adventurer, Trained to Death by the Most " + "Powerful Party, Became Invincible" + ), + "rick-and-morty-the-anime": "Rick and Morty: The Anime", + }, + "s.to": { + "the-rookie": "The Rookie", + "rick-and-morty": "Rick and Morty", + }, + } + monkeypatch.setattr(tr, "load_or_refresh_index", lambda site: index_by_site[site]) + monkeypatch.setattr(tr, "load_or_refresh_alternatives", lambda _site: {}) + monkeypatch.setattr(tr, "_search_sto_slug", lambda _query: None) + + assert tr.slug_from_query("Rookie Le flic de Los Angeles") == ( + "s.to", + "the-rookie", + ) + assert tr.slug_from_query("Rick and Morty") == ("s.to", "rick-and-morty") + + +def test_slug_from_query_rejects_low_confidence_overlap(monkeypatch) -> None: + monkeypatch.setattr(tr, "CATALOG_SITES_LIST", ["aniworld.to"]) + monkeypatch.setattr( + tr, + "load_or_refresh_index", + lambda _site: { + "the-ossan-newbie-adventurer": ( + "The Ossan Newbie Adventurer, Trained to Death by the Most " + "Powerful Party, Became Invincible" + ) + }, + ) + monkeypatch.setattr(tr, "load_or_refresh_alternatives", lambda _site: {}) + monkeypatch.setattr(tr, "_search_sto_slug", lambda _query: None) + + assert tr.slug_from_query("Rookie Le flic de Los Angeles") is None diff --git a/tests/test_torznab.py b/tests/test_torznab.py index 7f4108c..73a82d3 100644 --- a/tests/test_torznab.py +++ b/tests/test_torznab.py @@ -80,3 +80,77 @@ def test_tvsearch_empty(client): assert resp.status_code == 200 root = ET.fromstring(resp.text) assert root.find("./channel/item") is None + + +def test_tvsearch_uses_id_resolved_query_when_q_missing(client, monkeypatch): + import app.api.torznab as tn + import app.api.torznab.api as torznab_api_mod + + class Rec: + available = True + is_fresh = True + height = 1080 + vcodec = "h264" + provider = "prov" + + seen = {"query": None} + + def _slug_from_query(query, site=None): + """ + Record the provided query in the shared `seen` mapping and return a fixed (site, slug) pair. + + Parameters: + query (str): The query string to record. + site (str | None): Optional site hint (unused by this stub). + + Returns: + tuple: A two-element tuple (site, slug) where `site` is `"aniworld.to"` and `slug` is `"slug"`. + + Side effects: + Mutates the `seen` mapping by setting `seen["query"] = query`. + """ + seen["query"] = query + return ("aniworld.to", "slug") + + monkeypatch.setattr( + torznab_api_mod, + "_resolve_tvsearch_query_from_ids", + lambda **_kwargs: "The Rookie", + ) + monkeypatch.setattr(tn, "_slug_from_query", _slug_from_query) + monkeypatch.setattr( + tn, "resolve_series_title", lambda slug, site="aniworld.to": "Series" + ) + monkeypatch.setattr( + tn, + "list_available_languages_cached", + lambda session, slug, season, episode, site="aniworld.to": ["German Sub"], + ) + monkeypatch.setattr( + tn, + "get_availability", + lambda session, slug, season, episode, language, site="aniworld.to": Rec(), + ) + monkeypatch.setattr( + tn, + "build_release_name", + lambda series_title, season, episode, height, vcodec, language, site="aniworld.to": ( + "Title" + ), + ) + monkeypatch.setattr( + tn, + "build_magnet", + lambda title, slug, season, episode, language, provider, site="aniworld.to", **_kwargs: ( + "magnet:?xt=urn:btih:test&dn=Title&aw_slug=slug&aw_s=1&aw_e=1&aw_lang=German+Sub&aw_site=aniworld.to" + ), + ) + + resp = client.get( + "/torznab/api", + params={"t": "tvsearch", "season": 1, "ep": 1, "tvdbid": 350665}, + ) + assert resp.status_code == 200 + root = ET.fromstring(resp.text) + assert root.find("./channel/item") is not None + assert seen["query"] == "The Rookie" diff --git a/uv.lock b/uv.lock index 2b6e9a5..89eb8d0 100644 --- a/uv.lock +++ b/uv.lock @@ -27,7 +27,7 @@ wheels = [ [[package]] name = "anibridge" -version = "2.4.0" +version = "2.4.1" source = { editable = "." } dependencies = [ { name = "alembic" },