From 820db5f1792225651c8317a2fbf00d2393ab8a08 Mon Sep 17 00:00:00 2001 From: Andy Date: Wed, 25 Feb 2026 19:02:18 -0700 Subject: [PATCH] refactor(providers): extract metadata providers into modular system - Create `unshackle/core/providers/` package with abstract base class, IMDBApi (free, no key), SIMKL, and TMDB provider implementations - Add consensus-based ID enrichment: cross-references IMDB IDs with TMDB and SIMKL, drops all data from providers that disagree on tmdb_id (likely resolved to wrong title) - Cache enriched IDs alongside raw provider data so they survive cache round-trips - Genericize TitleCacher with `cache_provider()`/`get_cached_provider()` replacing provider-specific methods; respect `--no-cache` flag - Add `--imdb` CLI flag to dl command for direct IMDB ID lookup --- .pre-commit-config.yaml | 6 +- docs/SERVICE_CONFIG.md | 4 +- unshackle/commands/dl.py | 93 ++-- unshackle/core/providers/__init__.py | 428 ++++++++++++++++++ unshackle/core/providers/_base.py | 97 +++++ unshackle/core/providers/imdbapi.py | 123 ++++++ unshackle/core/providers/simkl.py | 172 ++++++++ unshackle/core/providers/tmdb.py | 199 +++++++++ unshackle/core/title_cacher.py | 205 ++++----- unshackle/core/utils/tags.py | 629 +++------------------------ 10 files changed, 1207 insertions(+), 749 deletions(-) create mode 100644 unshackle/core/providers/__init__.py create mode 100644 unshackle/core/providers/_base.py create mode 100644 unshackle/core/providers/imdbapi.py create mode 100644 unshackle/core/providers/simkl.py create mode 100644 unshackle/core/providers/tmdb.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a2f5adb..40804fa 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,10 +7,10 @@ repos: hooks: - id: conventional-pre-commit stages: [commit-msg] - - repo: https://github.com/mtkennerly/pre-commit-hooks - rev: v0.4.0 + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.11.3 hooks: - - id: poetry-ruff-check + - id: ruff args: [--fix] - repo: https://github.com/pycqa/isort rev: 6.0.1 diff --git a/docs/SERVICE_CONFIG.md b/docs/SERVICE_CONFIG.md index 0d108ad..5711b3d 100644 --- a/docs/SERVICE_CONFIG.md +++ b/docs/SERVICE_CONFIG.md @@ -94,7 +94,7 @@ For example, tmdb_api_key: cf66bf18956kca5311ada3bebb84eb9a # Not a real key ``` -**Note**: Keep your API key secure and do not share it publicly. This key is used by the core/utils/tags.py module to fetch metadata from TMDB for proper file tagging. +**Note**: Keep your API key secure and do not share it publicly. This key is used by the `core/providers/tmdb.py` metadata provider to fetch metadata from TMDB for proper file tagging and ID enrichment. --- @@ -115,7 +115,7 @@ For example, simkl_client_id: "your_client_id_here" ``` -**Note**: While optional, having a SIMKL Client ID improves metadata lookup reliability. SIMKL serves as an alternative or fallback metadata source to TMDB. This is used by the `core/utils/tags.py` module. +**Note**: While optional, having a SIMKL Client ID improves metadata lookup reliability. SIMKL serves as an alternative or fallback metadata source to TMDB. This is used by the `core/providers/simkl.py` metadata provider. --- diff --git a/unshackle/commands/dl.py b/unshackle/commands/dl.py index ab6e6df..a4aa6c8 100644 --- a/unshackle/commands/dl.py +++ b/unshackle/commands/dl.py @@ -42,7 +42,7 @@ from rich.table import Table from rich.text import Text from rich.tree import Tree -from unshackle.core import binaries +from unshackle.core import binaries, providers from unshackle.core.cdm import CustomRemoteCDM, DecryptLabsRemoteCDM from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm from unshackle.core.config import config @@ -429,6 +429,13 @@ class dl: default=False, help="Use the release year from TMDB for naming and tagging.", ) + @click.option( + "--imdb", + "imdb_id", + type=str, + default=None, + help="Use this IMDB ID (e.g. tt1375666) for tagging instead of automatic lookup.", + ) @click.option( "--sub-format", type=SubtitleCodecChoice(Subtitle.Codec), @@ -523,6 +530,7 @@ class dl: tmdb_id: Optional[int] = None, tmdb_name: bool = False, tmdb_year: bool = False, + imdb_id: Optional[str] = None, output_dir: Optional[Path] = None, *_: Any, **__: Any, @@ -569,6 +577,7 @@ class dl: self.tmdb_id = tmdb_id self.tmdb_name = tmdb_name self.tmdb_year = tmdb_year + self.imdb_id = imdb_id self.output_dir = output_dir # Initialize debug logger with service name if debug logging is enabled @@ -595,10 +604,11 @@ class dl: "tmdb_id": tmdb_id, "tmdb_name": tmdb_name, "tmdb_year": tmdb_year, + "imdb_id": imdb_id, "cli_params": { k: v for k, v in ctx.params.items() - if k not in ["profile", "proxy", "tag", "tmdb_id", "tmdb_name", "tmdb_year"] + if k not in ["profile", "proxy", "tag", "tmdb_id", "tmdb_name", "tmdb_year", "imdb_id"] }, }, ) @@ -622,9 +632,7 @@ class dl: ) version = (r.stdout or r.stderr or "").strip() elif name in ("ffmpeg", "ffprobe"): - r = subprocess.run( - [str(binary), "-version"], capture_output=True, text=True, timeout=5 - ) + r = subprocess.run([str(binary), "-version"], capture_output=True, text=True, timeout=5) version = (r.stdout or "").split("\n")[0].strip() elif name == "mkvmerge": r = subprocess.run( @@ -632,9 +640,7 @@ class dl: ) version = (r.stdout or "").strip() elif name == "mp4decrypt": - r = subprocess.run( - [str(binary)], capture_output=True, text=True, timeout=5 - ) + r = subprocess.run([str(binary)], capture_output=True, text=True, timeout=5) output = (r.stdout or "") + (r.stderr or "") lines = [line.strip() for line in output.split("\n") if line.strip()] version = " | ".join(lines[:2]) if lines else None @@ -1087,12 +1093,12 @@ class dl: tmdb_name_val = None if self.tmdb_year: - tmdb_year_val = tags.get_year( + tmdb_year_val = providers.get_year_by_id( self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash ) if self.tmdb_name: - tmdb_name_val = tags.get_title( + tmdb_name_val = providers.get_title_by_id( self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash ) @@ -1214,15 +1220,20 @@ class dl: if isinstance(title, Episode) and not self.tmdb_searched: kind = "tv" + tmdb_title: Optional[str] = None if self.tmdb_id: - tmdb_title = tags.get_title( + tmdb_title = providers.get_title_by_id( self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash ) else: - self.tmdb_id, tmdb_title, self.search_source = tags.search_show_info( + result = providers.search_metadata( title.title, title.year, kind, title_cacher, cache_title_id, cache_region, cache_account_hash ) - if not (self.tmdb_id and tmdb_title and tags.fuzzy_match(tmdb_title, title.title)): + if result and result.title and providers.fuzzy_match(result.title, title.title): + self.tmdb_id = result.external_ids.tmdb_id + tmdb_title = result.title + self.search_source = result.source + else: self.tmdb_id = None if list_ or list_titles: if self.tmdb_id: @@ -1237,22 +1248,25 @@ class dl: self.tmdb_searched = True if isinstance(title, Movie) and (list_ or list_titles) and not self.tmdb_id: - movie_id, movie_title, _ = tags.search_show_info( + movie_result = providers.search_metadata( title.name, title.year, "movie", title_cacher, cache_title_id, cache_region, cache_account_hash ) - if movie_id: + if movie_result and movie_result.external_ids.tmdb_id: console.print( Padding( - f"Search -> {movie_title or '?'} [bright_black](ID {movie_id})", + f"Search -> {movie_result.title or '?'} " + f"[bright_black](ID {movie_result.external_ids.tmdb_id})", (0, 5), ) ) else: console.print(Padding("Search -> [bright_black]No match found[/]", (0, 5))) - if self.tmdb_id and getattr(self, "search_source", None) != "simkl": + if self.tmdb_id and getattr(self, "search_source", None) not in ("simkl", "imdbapi"): kind = "tv" if isinstance(title, Episode) else "movie" - tags.external_ids(self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash) + providers.fetch_external_ids( + self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash + ) if slow and i != 0: delay = random.randint(60, 120) @@ -1460,11 +1474,13 @@ class dl: if has_hybrid: # Split tracks: hybrid candidates vs non-hybrid hybrid_candidate_tracks = [ - v for v in title.tracks.videos + v + for v in title.tracks.videos if v.range in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV) ] non_hybrid_tracks = [ - v for v in title.tracks.videos + v + for v in title.tracks.videos if v.range not in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV) ] @@ -1475,11 +1491,9 @@ class dl: if non_hybrid_ranges and non_hybrid_tracks: # Also filter non-hybrid tracks by resolution non_hybrid_selected = [ - v for v in non_hybrid_tracks - if any( - v.height == res or int(v.width * (9 / 16)) == res - for res in quality - ) + v + for v in non_hybrid_tracks + if any(v.height == res or int(v.width * (9 / 16)) == res for res in quality) ] title.tracks.videos = hybrid_selected + non_hybrid_selected else: @@ -1513,29 +1527,25 @@ class dl: if has_hybrid: # Apply hybrid selection for HYBRID tracks hybrid_candidate_tracks = [ - v for v in title.tracks.videos + v + for v in title.tracks.videos if v.range in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV) ] non_hybrid_tracks = [ - v for v in title.tracks.videos + v + for v in title.tracks.videos if v.range not in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV) ] if not quality: - best_resolution = max( - (v.height for v in hybrid_candidate_tracks), default=None - ) + best_resolution = max((v.height for v in hybrid_candidate_tracks), default=None) if best_resolution: - hybrid_filter = title.tracks.select_hybrid( - hybrid_candidate_tracks, [best_resolution] - ) + hybrid_filter = title.tracks.select_hybrid(hybrid_candidate_tracks, [best_resolution]) hybrid_selected = list(filter(hybrid_filter, hybrid_candidate_tracks)) else: hybrid_selected = [] else: - hybrid_filter = title.tracks.select_hybrid( - hybrid_candidate_tracks, quality - ) + hybrid_filter = title.tracks.select_hybrid(hybrid_candidate_tracks, quality) hybrid_selected = list(filter(hybrid_filter, hybrid_candidate_tracks)) # For non-hybrid ranges, apply Cartesian product selection @@ -1588,8 +1598,7 @@ class dl: # validate hybrid mode requirements if any(r == Video.Range.HYBRID for r in range_): base_tracks = [ - v for v in title.tracks.videos - if v.range in (Video.Range.HDR10, Video.Range.HDR10P) + v for v in title.tracks.videos if v.range in (Video.Range.HDR10, Video.Range.HDR10P) ] dv_tracks = [v for v in title.tracks.videos if v.range == Video.Range.DV] @@ -1617,8 +1626,7 @@ class dl: if best_available and other_ranges: self.log.warning(msg) self.log.warning( - f"Continuing with remaining range(s): " - f"{', '.join(r.name for r in other_ranges)}" + f"Continuing with remaining range(s): {', '.join(r.name for r in other_ranges)}" ) range_ = other_ranges else: @@ -2150,8 +2158,7 @@ class dl: # Group video tracks by resolution (prefer HDR10+ over HDR10 as base) resolutions_processed = set() base_tracks_list = [ - v for v in title.tracks.videos - if v.range in (Video.Range.HDR10P, Video.Range.HDR10) + v for v in title.tracks.videos if v.range in (Video.Range.HDR10P, Video.Range.HDR10) ] dv_tracks = [v for v in title.tracks.videos if v.range == Video.Range.DV] @@ -2399,7 +2406,7 @@ class dl: final_path.unlink() shutil.move(muxed_path, final_path) used_final_paths.add(final_path) - tags.tag_file(final_path, title, self.tmdb_id) + tags.tag_file(final_path, title, self.tmdb_id, self.imdb_id) title_dl_time = time_elapsed_since(dl_start_time) console.print( diff --git a/unshackle/core/providers/__init__.py b/unshackle/core/providers/__init__.py new file mode 100644 index 0000000..5ec7343 --- /dev/null +++ b/unshackle/core/providers/__init__.py @@ -0,0 +1,428 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional + +import requests + +from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, fuzzy_match, log +from unshackle.core.providers.imdbapi import IMDBApiProvider +from unshackle.core.providers.simkl import SimklProvider +from unshackle.core.providers.tmdb import TMDBProvider + +if TYPE_CHECKING: + from unshackle.core.title_cacher import TitleCacher + +# Ordered by priority: IMDBApi (free), SIMKL, TMDB +ALL_PROVIDERS: list[type[MetadataProvider]] = [IMDBApiProvider, SimklProvider, TMDBProvider] + + +def get_available_providers() -> list[MetadataProvider]: + """Return instantiated providers that have valid credentials.""" + return [cls() for cls in ALL_PROVIDERS if cls().is_available()] + + +def get_provider(name: str) -> Optional[MetadataProvider]: + """Get a specific provider by name.""" + for cls in ALL_PROVIDERS: + if cls.NAME == name: + p = cls() + return p if p.is_available() else None + return None + + +# -- Public API (replaces tags.py functions) -- + + +def search_metadata( + title: str, + year: Optional[int], + kind: str, + title_cacher: Optional[TitleCacher] = None, + cache_title_id: Optional[str] = None, + cache_region: Optional[str] = None, + cache_account_hash: Optional[str] = None, +) -> Optional[MetadataResult]: + """Search all available providers for metadata. Returns best match.""" + # Check cache first + if title_cacher and cache_title_id: + for cls in ALL_PROVIDERS: + p = cls() + if not p.is_available(): + continue + cached = title_cacher.get_cached_provider(p.NAME, cache_title_id, kind, cache_region, cache_account_hash) + if cached: + result = _cached_to_result(cached, p.NAME, kind) + if result and result.title and fuzzy_match(result.title, title): + log.debug("Using cached %s data for %r", p.NAME, title) + return result + + # Search providers in priority order + for cls in ALL_PROVIDERS: + p = cls() + if not p.is_available(): + continue + try: + result = p.search(title, year, kind) + except (requests.RequestException, ValueError, KeyError) as exc: + log.debug("%s search failed: %s", p.NAME, exc) + continue + if result and result.title and fuzzy_match(result.title, title): + # Enrich with cross-referenced IDs if we have IMDB but missing TMDB/TVDB + enrich_ids(result) + # Cache the result (include enriched IDs so they survive round-trip) + if title_cacher and cache_title_id and result.raw: + try: + cache_data = result.raw + if result.external_ids.tmdb_id or result.external_ids.tvdb_id: + cache_data = { + **result.raw, + "_enriched_ids": _external_ids_to_dict(result.external_ids), + } + title_cacher.cache_provider( + p.NAME, cache_title_id, cache_data, kind, cache_region, cache_account_hash + ) + except Exception as exc: + log.debug("Failed to cache %s data: %s", p.NAME, exc) + return result + + return None + + +def get_title_by_id( + tmdb_id: int, + kind: str, + title_cacher: Optional[TitleCacher] = None, + cache_title_id: Optional[str] = None, + cache_region: Optional[str] = None, + cache_account_hash: Optional[str] = None, +) -> Optional[str]: + """Get title name by TMDB ID.""" + # Check cache first + if title_cacher and cache_title_id: + cached = title_cacher.get_cached_provider("tmdb", cache_title_id, kind, cache_region, cache_account_hash) + if cached and cached.get("detail"): + detail = cached["detail"] + tmdb_title = detail.get("title") or detail.get("name") + if tmdb_title: + log.debug("Using cached TMDB title: %r", tmdb_title) + return tmdb_title + + tmdb = get_provider("tmdb") + if not tmdb: + return None + result = tmdb.get_by_id(tmdb_id, kind) + if not result: + return None + + # Cache if possible + if title_cacher and cache_title_id and result.raw: + try: + ext_ids = tmdb.get_external_ids(tmdb_id, kind) + title_cacher.cache_provider( + "tmdb", + cache_title_id, + {"detail": result.raw, "external_ids": _external_ids_to_dict(ext_ids)}, + kind, + cache_region, + cache_account_hash, + ) + except Exception as exc: + log.debug("Failed to cache TMDB data: %s", exc) + + return result.title + + +def get_year_by_id( + tmdb_id: int, + kind: str, + title_cacher: Optional[TitleCacher] = None, + cache_title_id: Optional[str] = None, + cache_region: Optional[str] = None, + cache_account_hash: Optional[str] = None, +) -> Optional[int]: + """Get release year by TMDB ID.""" + # Check cache first + if title_cacher and cache_title_id: + cached = title_cacher.get_cached_provider("tmdb", cache_title_id, kind, cache_region, cache_account_hash) + if cached and cached.get("detail"): + detail = cached["detail"] + date = detail.get("release_date") or detail.get("first_air_date") + if date and len(date) >= 4 and date[:4].isdigit(): + year = int(date[:4]) + log.debug("Using cached TMDB year: %d", year) + return year + + tmdb = get_provider("tmdb") + if not tmdb: + return None + result = tmdb.get_by_id(tmdb_id, kind) + if not result: + return None + + # Cache if possible + if title_cacher and cache_title_id and result.raw: + try: + ext_ids = tmdb.get_external_ids(tmdb_id, kind) + title_cacher.cache_provider( + "tmdb", + cache_title_id, + {"detail": result.raw, "external_ids": _external_ids_to_dict(ext_ids)}, + kind, + cache_region, + cache_account_hash, + ) + except Exception as exc: + log.debug("Failed to cache TMDB data: %s", exc) + + return result.year + + +def fetch_external_ids( + tmdb_id: int, + kind: str, + title_cacher: Optional[TitleCacher] = None, + cache_title_id: Optional[str] = None, + cache_region: Optional[str] = None, + cache_account_hash: Optional[str] = None, +) -> ExternalIds: + """Get external IDs by TMDB ID.""" + # Check cache first + if title_cacher and cache_title_id: + cached = title_cacher.get_cached_provider("tmdb", cache_title_id, kind, cache_region, cache_account_hash) + if cached and cached.get("external_ids"): + log.debug("Using cached TMDB external IDs") + raw = cached["external_ids"] + return ExternalIds( + imdb_id=raw.get("imdb_id"), + tmdb_id=tmdb_id, + tmdb_kind=kind, + tvdb_id=raw.get("tvdb_id"), + ) + + tmdb = get_provider("tmdb") + if not tmdb: + return ExternalIds() + ext = tmdb.get_external_ids(tmdb_id, kind) + + # Cache if possible + if title_cacher and cache_title_id: + try: + detail = None + result = tmdb.get_by_id(tmdb_id, kind) + if result and result.raw: + detail = result.raw + if detail: + title_cacher.cache_provider( + "tmdb", + cache_title_id, + {"detail": detail, "external_ids": _external_ids_to_dict(ext)}, + kind, + cache_region, + cache_account_hash, + ) + except Exception as exc: + log.debug("Failed to cache TMDB data: %s", exc) + + return ext + + +# -- Internal helpers -- + + +# Provider authority ranking for tie-breaking (lower index = more authoritative) +_ENRICHMENT_PROVIDERS = ("tmdb", "simkl") +_ENRICHMENT_AUTHORITY: dict[str, int] = {name: i for i, name in enumerate(_ENRICHMENT_PROVIDERS)} + + +def enrich_ids(result: MetadataResult) -> None: + """Enrich a MetadataResult by cross-referencing IMDB ID with available providers. + + Queries all available providers, cross-validates tmdb_id as anchor. + If a provider returns a different tmdb_id than the authoritative source, + ALL of that provider's data is dropped (likely resolved to wrong title). + """ + ids = result.external_ids + if not ids.imdb_id: + return + if ids.tmdb_id and ids.tvdb_id: + return # already have everything + + kind = result.kind or "movie" + + # Step 1: Collect enrichment results from all available providers + enrichments: list[tuple[str, ExternalIds]] = [] + for provider_name in _ENRICHMENT_PROVIDERS: + p = get_provider(provider_name) + if not p: + continue + try: + enriched = p.find_by_imdb_id(ids.imdb_id, kind) # type: ignore[union-attr] + except Exception as exc: + log.debug("Enrichment via %s failed: %s", provider_name, exc) + continue + if enriched: + enrichments.append((provider_name, enriched)) + + if not enrichments: + return + + # Step 2: Cross-validate using tmdb_id as anchor — drop providers that disagree + validated = _validate_enrichments(enrichments) + + # Step 3: Merge validated data (fill gaps only) + for _provider_name, ext in validated: + if not ids.tmdb_id and ext.tmdb_id: + ids.tmdb_id = ext.tmdb_id + ids.tmdb_kind = ext.tmdb_kind or kind + if not ids.tvdb_id and ext.tvdb_id: + ids.tvdb_id = ext.tvdb_id + + +def _validate_enrichments( + enrichments: list[tuple[str, ExternalIds]], +) -> list[tuple[str, ExternalIds]]: + """Drop providers whose tmdb_id conflicts with the authoritative value. + + If providers disagree on tmdb_id, the more authoritative source wins + and ALL data from disagreeing providers is discarded (different tmdb_id + means the provider likely resolved to a different title entirely). + """ + from collections import Counter + + # Collect tmdb_id votes + tmdb_votes: dict[str, int] = {} + for provider_name, ext in enrichments: + if ext.tmdb_id is not None: + tmdb_votes[provider_name] = ext.tmdb_id + + if len(set(tmdb_votes.values())) <= 1: + return enrichments # all agree or only one voted — no conflict + + # Find the authoritative tmdb_id + value_counts = Counter(tmdb_votes.values()) + most_common_val, most_common_count = value_counts.most_common(1)[0] + + if most_common_count > 1: + anchor_tmdb_id = most_common_val + else: + # No majority — pick the most authoritative provider + best_provider = min( + tmdb_votes.keys(), + key=lambda name: _ENRICHMENT_AUTHORITY.get(name, 99), + ) + anchor_tmdb_id = tmdb_votes[best_provider] + + # Drop any provider that disagrees + validated: list[tuple[str, ExternalIds]] = [] + for provider_name, ext in enrichments: + if ext.tmdb_id is not None and ext.tmdb_id != anchor_tmdb_id: + log.debug( + "Dropping %s enrichment data: tmdb_id %s conflicts with " + "authoritative value %s (likely resolved to wrong title)", + provider_name, + ext.tmdb_id, + anchor_tmdb_id, + ) + continue + validated.append((provider_name, ext)) + + return validated + + +def _external_ids_to_dict(ext: ExternalIds) -> dict: + """Convert ExternalIds to a dict for caching.""" + result: dict = {} + if ext.imdb_id: + result["imdb_id"] = ext.imdb_id + if ext.tmdb_id: + result["tmdb_id"] = ext.tmdb_id + if ext.tmdb_kind: + result["tmdb_kind"] = ext.tmdb_kind + if ext.tvdb_id: + result["tvdb_id"] = ext.tvdb_id + return result + + +def _cached_to_result(cached: dict, provider_name: str, kind: str) -> Optional[MetadataResult]: + """Convert a cached provider dict back to a MetadataResult.""" + if provider_name == "tmdb": + detail = cached.get("detail", {}) + ext_raw = cached.get("external_ids", {}) + title = detail.get("title") or detail.get("name") + date = detail.get("release_date") or detail.get("first_air_date") + year = int(date[:4]) if date and len(date) >= 4 and date[:4].isdigit() else None + tmdb_id = detail.get("id") + return MetadataResult( + title=title, + year=year, + kind=kind, + external_ids=ExternalIds( + imdb_id=ext_raw.get("imdb_id"), + tmdb_id=tmdb_id, + tmdb_kind=kind, + tvdb_id=ext_raw.get("tvdb_id"), + ), + source="tmdb", + raw=cached, + ) + elif provider_name == "simkl": + response = cached.get("response", cached) + if response.get("type") == "episode" and "show" in response: + info = response["show"] + elif response.get("type") == "movie" and "movie" in response: + info = response["movie"] + else: + return None + ids = info.get("ids", {}) + tmdb_id = ids.get("tmdbtv") or ids.get("tmdb") or ids.get("moviedb") + if tmdb_id: + tmdb_id = int(tmdb_id) + return MetadataResult( + title=info.get("title"), + year=info.get("year"), + kind=kind, + external_ids=ExternalIds( + imdb_id=ids.get("imdb"), + tmdb_id=tmdb_id, + tmdb_kind=kind, + tvdb_id=ids.get("tvdb"), + ), + source="simkl", + raw=cached, + ) + elif provider_name == "imdbapi": + title = cached.get("primaryTitle") or cached.get("originalTitle") + year = cached.get("startYear") + imdb_id = cached.get("id") + # Restore enriched IDs that were saved alongside the raw data + enriched = cached.get("_enriched_ids", {}) + return MetadataResult( + title=title, + year=year, + kind=kind, + external_ids=ExternalIds( + imdb_id=imdb_id, + tmdb_id=enriched.get("tmdb_id"), + tmdb_kind=enriched.get("tmdb_kind"), + tvdb_id=enriched.get("tvdb_id"), + ), + source="imdbapi", + raw=cached, + ) + return None + + +__all__ = [ + "ALL_PROVIDERS", + "ExternalIds", + "MetadataProvider", + "MetadataResult", + "enrich_ids", + "fetch_external_ids", + "fuzzy_match", + "get_available_providers", + "get_provider", + "get_title_by_id", + "get_year_by_id", + "search_metadata", +] diff --git a/unshackle/core/providers/_base.py b/unshackle/core/providers/_base.py new file mode 100644 index 0000000..6222786 --- /dev/null +++ b/unshackle/core/providers/_base.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import logging +import re +from abc import ABCMeta, abstractmethod +from dataclasses import dataclass, field +from difflib import SequenceMatcher +from typing import Optional, Union + +import requests +from requests.adapters import HTTPAdapter, Retry + +log = logging.getLogger("METADATA") + +HEADERS = {"User-Agent": "unshackle-tags/1.0"} + +STRIP_RE = re.compile(r"[^a-z0-9]+", re.I) +YEAR_RE = re.compile(r"\s*\(?[12][0-9]{3}\)?$") + + +@dataclass +class ExternalIds: + """Normalized external IDs across providers.""" + + imdb_id: Optional[str] = None + tmdb_id: Optional[int] = None + tmdb_kind: Optional[str] = None # "movie" or "tv" + tvdb_id: Optional[int] = None + + +@dataclass +class MetadataResult: + """Unified metadata result from any provider.""" + + title: Optional[str] = None + year: Optional[int] = None + kind: Optional[str] = None # "movie" or "tv" + external_ids: ExternalIds = field(default_factory=ExternalIds) + source: str = "" # provider name, e.g. "tmdb", "simkl", "imdbapi" + raw: Optional[dict] = None # original API response for caching + + +class MetadataProvider(metaclass=ABCMeta): + """Abstract base for metadata providers.""" + + NAME: str = "" + REQUIRES_KEY: bool = True + + def __init__(self) -> None: + self.log = logging.getLogger(f"METADATA.{self.NAME.upper()}") + self._session: Optional[requests.Session] = None + + @property + def session(self) -> requests.Session: + if self._session is None: + self._session = requests.Session() + self._session.headers.update(HEADERS) + retry = Retry( + total=3, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["GET", "POST"], + ) + adapter = HTTPAdapter(max_retries=retry) + self._session.mount("https://", adapter) + self._session.mount("http://", adapter) + return self._session + + @abstractmethod + def is_available(self) -> bool: + """Return True if this provider has the credentials/keys it needs.""" + + @abstractmethod + def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]: + """Search for a title and return metadata, or None on failure/no match.""" + + @abstractmethod + def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]: + """Fetch metadata by this provider's native ID.""" + + @abstractmethod + def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds: + """Fetch external IDs for a title by this provider's native ID.""" + + +def _clean(s: str) -> str: + return STRIP_RE.sub("", s).lower() + + +def _strip_year(s: str) -> str: + return YEAR_RE.sub("", s).strip() + + +def fuzzy_match(a: str, b: str, threshold: float = 0.8) -> bool: + """Return True if ``a`` and ``b`` are a close match.""" + ratio = SequenceMatcher(None, _clean(a), _clean(b)).ratio() + return ratio >= threshold diff --git a/unshackle/core/providers/imdbapi.py b/unshackle/core/providers/imdbapi.py new file mode 100644 index 0000000..b818927 --- /dev/null +++ b/unshackle/core/providers/imdbapi.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from difflib import SequenceMatcher +from typing import Optional, Union + +import requests + +from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, _clean, fuzzy_match + +# Mapping from our kind ("movie"/"tv") to imdbapi.dev title types +KIND_TO_TYPES: dict[str, list[str]] = { + "movie": ["movie"], + "tv": ["tvSeries", "tvMiniSeries"], +} + + +class IMDBApiProvider(MetadataProvider): + """IMDb metadata provider using imdbapi.dev (free, no API key).""" + + NAME = "imdbapi" + REQUIRES_KEY = False + BASE_URL = "https://api.imdbapi.dev" + + def is_available(self) -> bool: + return True # no key needed + + def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]: + self.log.debug("Searching IMDBApi for %r (%s, %s)", title, kind, year) + + try: + params: dict[str, str | int] = {"query": title, "limit": 20} + r = self.session.get( + f"{self.BASE_URL}/search/titles", + params=params, + timeout=30, + ) + r.raise_for_status() + data = r.json() + except (requests.RequestException, ValueError) as exc: + self.log.debug("IMDBApi search failed: %s", exc) + return None + + results = data.get("titles") or data.get("results") or [] + if not results: + self.log.debug("IMDBApi returned no results for %r", title) + return None + + # Filter by type if possible + type_filter = KIND_TO_TYPES.get(kind, []) + filtered = [r for r in results if r.get("type") in type_filter] if type_filter else results + candidates = filtered if filtered else results + + # Find best fuzzy match, optionally filtered by year + best_match: Optional[dict] = None + best_ratio = 0.0 + + for candidate in candidates: + primary = candidate.get("primaryTitle") or "" + original = candidate.get("originalTitle") or "" + + for name in [primary, original]: + if not name: + continue + ratio = SequenceMatcher(None, _clean(title), _clean(name)).ratio() + if ratio > best_ratio: + # If year provided, prefer matches within 1 year + candidate_year = candidate.get("startYear") + if year and candidate_year and abs(year - candidate_year) > 1: + continue + best_ratio = ratio + best_match = candidate + + if not best_match: + self.log.debug("No matching result found in IMDBApi for %r", title) + return None + + result_title = best_match.get("primaryTitle") or best_match.get("originalTitle") + if not result_title or not fuzzy_match(result_title, title): + self.log.debug("IMDBApi title mismatch: searched %r, got %r", title, result_title) + return None + + imdb_id = best_match.get("id") + result_year = best_match.get("startYear") + + self.log.debug("IMDBApi -> %s (ID %s)", result_title, imdb_id) + + return MetadataResult( + title=result_title, + year=result_year, + kind=kind, + external_ids=ExternalIds(imdb_id=imdb_id), + source="imdbapi", + raw=best_match, + ) + + def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]: + """Fetch metadata by IMDB ID (e.g. 'tt1375666').""" + imdb_id = str(provider_id) + self.log.debug("Fetching IMDBApi title %s", imdb_id) + + try: + r = self.session.get(f"{self.BASE_URL}/titles/{imdb_id}", timeout=30) + r.raise_for_status() + data = r.json() + except (requests.RequestException, ValueError) as exc: + self.log.debug("IMDBApi get_by_id failed: %s", exc) + return None + + title = data.get("primaryTitle") or data.get("originalTitle") + result_year = data.get("startYear") + + return MetadataResult( + title=title, + year=result_year, + kind=kind, + external_ids=ExternalIds(imdb_id=data.get("id")), + source="imdbapi", + raw=data, + ) + + def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds: + """Return external IDs. For IMDB, the provider_id IS the IMDB ID.""" + return ExternalIds(imdb_id=str(provider_id)) diff --git a/unshackle/core/providers/simkl.py b/unshackle/core/providers/simkl.py new file mode 100644 index 0000000..00bad34 --- /dev/null +++ b/unshackle/core/providers/simkl.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +from typing import Optional, Union + +import requests + +from unshackle.core.config import config +from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, fuzzy_match + + +class SimklProvider(MetadataProvider): + """SIMKL metadata provider (filename-based search).""" + + NAME = "simkl" + REQUIRES_KEY = True + BASE_URL = "https://api.simkl.com" + + def is_available(self) -> bool: + return bool(config.simkl_client_id) + + def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]: + self.log.debug("Searching Simkl for %r (%s, %s)", title, kind, year) + + # Construct appropriate filename based on type + filename = f"{title}" + if year: + filename = f"{title} {year}" + if kind == "tv": + filename += " S01E01.mkv" + else: + filename += " 2160p.mkv" + + try: + headers = {"simkl-api-key": config.simkl_client_id} + resp = self.session.post( + f"{self.BASE_URL}/search/file", json={"file": filename}, headers=headers, timeout=30 + ) + resp.raise_for_status() + data = resp.json() + self.log.debug("Simkl API response received") + except (requests.RequestException, ValueError) as exc: + self.log.debug("Simkl search failed: %s", exc) + return None + + # Handle case where SIMKL returns empty list (no results) + if isinstance(data, list): + self.log.debug("Simkl returned list (no matches) for %r", filename) + return None + + return self._parse_response(data, title, year, kind) + + def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]: + return None # SIMKL has no direct ID lookup used here + + def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds: + return ExternalIds() # IDs come from search() response + + def find_by_imdb_id(self, imdb_id: str, kind: str) -> Optional[ExternalIds]: + """Look up TMDB/TVDB IDs from an IMDB ID using SIMKL's /search/id and detail endpoints.""" + self.log.debug("Looking up IMDB ID %s on SIMKL", imdb_id) + headers = {"simkl-api-key": config.simkl_client_id} + + try: + r = self.session.get(f"{self.BASE_URL}/search/id", params={"imdb": imdb_id}, headers=headers, timeout=30) + r.raise_for_status() + data = r.json() + except (requests.RequestException, ValueError) as exc: + self.log.debug("SIMKL search/id failed: %s", exc) + return None + + if not isinstance(data, list) or not data: + self.log.debug("No SIMKL results for IMDB ID %s", imdb_id) + return None + + entry = data[0] + simkl_id = entry.get("ids", {}).get("simkl") + if not simkl_id: + return None + + # Map SIMKL type to endpoint + simkl_type = entry.get("type", "") + endpoint = "tv" if simkl_type in ("tv", "anime") else "movies" + + # Fetch full details to get cross-referenced IDs + try: + r2 = self.session.get( + f"{self.BASE_URL}/{endpoint}/{simkl_id}", + params={"extended": "full"}, + headers=headers, + timeout=30, + ) + r2.raise_for_status() + detail = r2.json() + except (requests.RequestException, ValueError) as exc: + self.log.debug("SIMKL detail fetch failed: %s", exc) + return None + + ids = detail.get("ids", {}) + tmdb_id: Optional[int] = None + raw_tmdb = ids.get("tmdb") + if raw_tmdb: + tmdb_id = int(raw_tmdb) + + tvdb_id: Optional[int] = None + raw_tvdb = ids.get("tvdb") + if raw_tvdb: + tvdb_id = int(raw_tvdb) + + self.log.debug("SIMKL find -> TMDB %s, TVDB %s for IMDB %s", tmdb_id, tvdb_id, imdb_id) + + return ExternalIds( + imdb_id=imdb_id, + tmdb_id=tmdb_id, + tmdb_kind=kind, + tvdb_id=tvdb_id, + ) + + def _parse_response( + self, data: dict, search_title: str, search_year: Optional[int], kind: str + ) -> Optional[MetadataResult]: + """Parse a SIMKL response into a MetadataResult.""" + if data.get("type") == "episode" and "show" in data: + info = data["show"] + content_type = "tv" + elif data.get("type") == "movie" and "movie" in data: + info = data["movie"] + content_type = "movie" + else: + return None + + result_title = info.get("title") + result_year = info.get("year") + + # Verify title matches + if not result_title or not fuzzy_match(result_title, search_title): + self.log.debug("Simkl title mismatch: searched %r, got %r", search_title, result_title) + return None + + # Verify year if provided (allow 1 year difference) + if search_year and result_year and abs(search_year - result_year) > 1: + self.log.debug("Simkl year mismatch: searched %d, got %d", search_year, result_year) + return None + + ids = info.get("ids", {}) + tmdb_id: Optional[int] = None + if content_type == "tv": + raw_tmdb = ids.get("tmdbtv") + else: + raw_tmdb = ids.get("tmdb") or ids.get("moviedb") + if raw_tmdb: + tmdb_id = int(raw_tmdb) + + tvdb_id: Optional[int] = None + raw_tvdb = ids.get("tvdb") + if raw_tvdb: + tvdb_id = int(raw_tvdb) + + self.log.debug("Simkl -> %s (TMDB ID %s)", result_title, tmdb_id) + + return MetadataResult( + title=result_title, + year=result_year, + kind=kind, + external_ids=ExternalIds( + imdb_id=ids.get("imdb"), + tmdb_id=tmdb_id, + tmdb_kind=kind, + tvdb_id=tvdb_id, + ), + source="simkl", + raw=data, + ) diff --git a/unshackle/core/providers/tmdb.py b/unshackle/core/providers/tmdb.py new file mode 100644 index 0000000..f50a3b1 --- /dev/null +++ b/unshackle/core/providers/tmdb.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +from difflib import SequenceMatcher +from typing import Optional, Union + +import requests + +from unshackle.core.config import config +from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, _clean, _strip_year + + +class TMDBProvider(MetadataProvider): + """TMDB (The Movie Database) metadata provider.""" + + NAME = "tmdb" + REQUIRES_KEY = True + BASE_URL = "https://api.themoviedb.org/3" + + def is_available(self) -> bool: + return bool(config.tmdb_api_key) + + @property + def _api_key(self) -> str: + return config.tmdb_api_key + + def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]: + search_title = _strip_year(title) + self.log.debug("Searching TMDB for %r (%s, %s)", search_title, kind, year) + + params: dict[str, str | int] = {"api_key": self._api_key, "query": search_title} + if year is not None: + params["year" if kind == "movie" else "first_air_date_year"] = year + + try: + r = self.session.get(f"{self.BASE_URL}/search/{kind}", params=params, timeout=30) + r.raise_for_status() + results = r.json().get("results") or [] + self.log.debug("TMDB returned %d results", len(results)) + if not results: + return None + except requests.RequestException as exc: + self.log.warning("Failed to search TMDB for %s: %s", title, exc) + return None + + best_ratio = 0.0 + best_id: Optional[int] = None + best_title: Optional[str] = None + for result in results: + candidates = [ + result.get("title"), + result.get("name"), + result.get("original_title"), + result.get("original_name"), + ] + candidates = [c for c in candidates if c] + + for candidate in candidates: + ratio = SequenceMatcher(None, _clean(search_title), _clean(candidate)).ratio() + if ratio > best_ratio: + best_ratio = ratio + best_id = result.get("id") + best_title = candidate + + self.log.debug("Best candidate ratio %.2f for %r (ID %s)", best_ratio, best_title, best_id) + + if best_id is None: + first = results[0] + best_id = first.get("id") + best_title = first.get("title") or first.get("name") + + if best_id is None: + return None + + # Fetch full detail for caching + detail = self._fetch_detail(best_id, kind) + ext_raw = self._fetch_external_ids_raw(best_id, kind) + + date = (detail or {}).get("release_date") or (detail or {}).get("first_air_date") + result_year = int(date[:4]) if date and len(date) >= 4 and date[:4].isdigit() else None + + ext = ExternalIds( + imdb_id=ext_raw.get("imdb_id") if ext_raw else None, + tmdb_id=best_id, + tmdb_kind=kind, + tvdb_id=ext_raw.get("tvdb_id") if ext_raw else None, + ) + + return MetadataResult( + title=best_title, + year=result_year, + kind=kind, + external_ids=ext, + source="tmdb", + raw={"detail": detail or {}, "external_ids": ext_raw or {}}, + ) + + def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]: + detail = self._fetch_detail(int(provider_id), kind) + if not detail: + return None + + title = detail.get("title") or detail.get("name") + date = detail.get("release_date") or detail.get("first_air_date") + year = int(date[:4]) if date and len(date) >= 4 and date[:4].isdigit() else None + + return MetadataResult( + title=title, + year=year, + kind=kind, + external_ids=ExternalIds(tmdb_id=int(provider_id), tmdb_kind=kind), + source="tmdb", + raw=detail, + ) + + def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds: + raw = self._fetch_external_ids_raw(int(provider_id), kind) + if not raw: + return ExternalIds(tmdb_id=int(provider_id), tmdb_kind=kind) + return ExternalIds( + imdb_id=raw.get("imdb_id"), + tmdb_id=int(provider_id), + tmdb_kind=kind, + tvdb_id=raw.get("tvdb_id"), + ) + + def find_by_imdb_id(self, imdb_id: str, kind: str) -> Optional[ExternalIds]: + """Look up TMDB/TVDB IDs from an IMDB ID using TMDB's /find endpoint.""" + self.log.debug("Looking up IMDB ID %s on TMDB", imdb_id) + try: + r = self.session.get( + f"{self.BASE_URL}/find/{imdb_id}", + params={"api_key": self._api_key, "external_source": "imdb_id"}, + timeout=30, + ) + r.raise_for_status() + data = r.json() + except requests.RequestException as exc: + self.log.debug("TMDB find by IMDB ID failed: %s", exc) + return None + + # Check movie_results or tv_results based on kind + if kind == "movie": + results = data.get("movie_results") or [] + else: + results = data.get("tv_results") or [] + + if not results: + # Try the other type as fallback + fallback_key = "tv_results" if kind == "movie" else "movie_results" + results = data.get(fallback_key) or [] + if results: + kind = "tv" if kind == "movie" else "movie" + + if not results: + self.log.debug("No TMDB results found for IMDB ID %s", imdb_id) + return None + + match = results[0] + tmdb_id = match.get("id") + if not tmdb_id: + return None + + self.log.debug("TMDB find -> ID %s (%s) for IMDB %s", tmdb_id, kind, imdb_id) + + # Now fetch the full external IDs from TMDB to get TVDB etc. + ext_raw = self._fetch_external_ids_raw(tmdb_id, kind) + + return ExternalIds( + imdb_id=imdb_id, + tmdb_id=tmdb_id, + tmdb_kind=kind, + tvdb_id=ext_raw.get("tvdb_id") if ext_raw else None, + ) + + def _fetch_detail(self, tmdb_id: int, kind: str) -> Optional[dict]: + try: + r = self.session.get( + f"{self.BASE_URL}/{kind}/{tmdb_id}", + params={"api_key": self._api_key}, + timeout=30, + ) + r.raise_for_status() + return r.json() + except requests.RequestException as exc: + self.log.debug("Failed to fetch TMDB detail: %s", exc) + return None + + def _fetch_external_ids_raw(self, tmdb_id: int, kind: str) -> Optional[dict]: + try: + r = self.session.get( + f"{self.BASE_URL}/{kind}/{tmdb_id}/external_ids", + params={"api_key": self._api_key}, + timeout=30, + ) + r.raise_for_status() + return r.json() + except requests.RequestException as exc: + self.log.debug("Failed to fetch TMDB external IDs: %s", exc) + return None diff --git a/unshackle/core/title_cacher.py b/unshackle/core/title_cacher.py index 76ca639..da386c3 100644 --- a/unshackle/core/title_cacher.py +++ b/unshackle/core/title_cacher.py @@ -26,6 +26,7 @@ class TitleCacher: self.log = logging.getLogger(f"{service_name}.TitleCache") self.cacher = Cacher(service_name) self.stats = {"hits": 0, "misses": 0, "fallbacks": 0} + self.no_cache = False def _generate_cache_key( self, title_id: str, region: Optional[str] = None, account_hash: Optional[str] = None @@ -59,9 +60,6 @@ class TitleCacher: # Join with underscores cache_key = "_".join(key_parts) - # Log the mapping for debugging - self.log.debug(f"Cache key mapping: {title_id} -> {cache_key}") - return cache_key def get_cached_titles( @@ -89,6 +87,7 @@ class TitleCacher: """ # If caching is globally disabled or no_cache flag is set if not config.title_cache_enabled or no_cache: + self.no_cache = True self.log.debug("Cache bypassed, fetching fresh titles") return fetch_function() @@ -113,7 +112,7 @@ class TitleCacher: # Cache miss or expired, try to fetch fresh data self.stats["misses"] += 1 - self.log.debug(f"Cache miss for {title_id}, fetching fresh data") + self.log.debug(f"Cache miss for {title_id} fetching fresh data") try: # Attempt to fetch fresh titles @@ -180,22 +179,18 @@ class TitleCacher: "hit_rate": f"{hit_rate:.1f}%", } - def get_cached_tmdb( - self, title_id: str, kind: str, region: Optional[str] = None, account_hash: Optional[str] = None + # -- Generic provider cache methods -- + + def get_cached_provider( + self, + provider_name: str, + title_id: str, + kind: Optional[str] = None, + region: Optional[str] = None, + account_hash: Optional[str] = None, ) -> Optional[dict]: - """ - Get cached TMDB data for a title. - - Args: - title_id: The title identifier - kind: "movie" or "tv" - region: The region/proxy identifier - account_hash: Hash of account credentials - - Returns: - Dict with 'detail' and 'external_ids' if cached and valid, None otherwise - """ - if not config.title_cache_enabled: + """Get cached metadata for any provider.""" + if not config.title_cache_enabled or self.no_cache: return None cache_key = self._generate_cache_key(title_id, region, account_hash) @@ -204,142 +199,90 @@ class TitleCacher: if not cache or not cache.data: return None - tmdb_data = getattr(cache.data, "tmdb_data", None) - if not tmdb_data: + provider_data = getattr(cache.data, f"{provider_name}_data", None) + if not provider_data: return None - tmdb_expiration = tmdb_data.get("expires_at") - if not tmdb_expiration or datetime.now() >= tmdb_expiration: - self.log.debug(f"TMDB cache expired for {title_id}") + expiration = provider_data.get("expires_at") + if not expiration or datetime.now() >= expiration: + self.log.debug(f"{provider_name} cache expired for {title_id}") return None - if tmdb_data.get("kind") != kind: - self.log.debug(f"TMDB cache kind mismatch for {title_id}: cached {tmdb_data.get('kind')}, requested {kind}") + if kind and provider_data.get("kind") != kind: + self.log.debug( + f"{provider_name} cache kind mismatch for {title_id}: " + f"cached {provider_data.get('kind')}, requested {kind}" + ) return None - self.log.debug(f"TMDB cache hit for {title_id}") - return { - "detail": tmdb_data.get("detail"), - "external_ids": tmdb_data.get("external_ids"), - "fetched_at": tmdb_data.get("fetched_at"), - } + self.log.debug(f"{provider_name} cache hit for {title_id}") - def cache_tmdb( + # Return the inner data (provider-specific format) + response = provider_data.get("response") + if response is not None: + return response + + # For TMDB-style caches that store detail + external_ids at top level + result: dict = {} + if "detail" in provider_data: + result["detail"] = provider_data["detail"] + if "external_ids" in provider_data: + result["external_ids"] = provider_data["external_ids"] + if "fetched_at" in provider_data: + result["fetched_at"] = provider_data["fetched_at"] + return result if result else provider_data + + def cache_provider( self, + provider_name: str, title_id: str, - detail_response: dict, - external_ids_response: dict, - kind: str, + data: dict, + kind: Optional[str] = None, region: Optional[str] = None, account_hash: Optional[str] = None, + ttl_days: int = 7, ) -> None: - """ - Cache TMDB data for a title. - - Args: - title_id: The title identifier - detail_response: Full TMDB detail API response - external_ids_response: Full TMDB external_ids API response - kind: "movie" or "tv" - region: The region/proxy identifier - account_hash: Hash of account credentials - """ - if not config.title_cache_enabled: + """Cache metadata from any provider.""" + if not config.title_cache_enabled or self.no_cache: return cache_key = self._generate_cache_key(title_id, region, account_hash) cache = self.cacher.get(cache_key, version=1) if not cache or not cache.data: - self.log.debug(f"Cannot cache TMDB data: no title cache exists for {title_id}") + self.log.debug(f"Cannot cache {provider_name} data: no title cache exists for {title_id}") return now = datetime.now() - tmdb_data = { - "detail": detail_response, - "external_ids": external_ids_response, - "kind": kind, - "fetched_at": now, - "expires_at": now + timedelta(days=7), # 7-day expiration - } - cache.data.tmdb_data = tmdb_data + # Build cache entry in a format compatible with legacy methods + if provider_name == "tmdb" and "detail" in data: + # TMDB stores detail + external_ids at top level + cache_entry = { + **data, + "kind": kind, + "fetched_at": now, + "expires_at": now + timedelta(days=ttl_days), + } + elif provider_name == "simkl": + # SIMKL wraps in a "response" key + cache_entry = { + "response": data, + "fetched_at": now, + "expires_at": now + timedelta(days=ttl_days), + } + else: + # Generic format: store data directly with metadata + cache_entry = { + "response": data, + "kind": kind, + "fetched_at": now, + "expires_at": now + timedelta(days=ttl_days), + } + setattr(cache.data, f"{provider_name}_data", cache_entry) cache.set(cache.data, expiration=cache.expiration) - self.log.debug(f"Cached TMDB data for {title_id} (kind={kind})") - - def get_cached_simkl( - self, title_id: str, region: Optional[str] = None, account_hash: Optional[str] = None - ) -> Optional[dict]: - """ - Get cached Simkl data for a title. - - Args: - title_id: The title identifier - region: The region/proxy identifier - account_hash: Hash of account credentials - - Returns: - Simkl response dict if cached and valid, None otherwise - """ - if not config.title_cache_enabled: - return None - - cache_key = self._generate_cache_key(title_id, region, account_hash) - cache = self.cacher.get(cache_key, version=1) - - if not cache or not cache.data: - return None - - simkl_data = getattr(cache.data, "simkl_data", None) - if not simkl_data: - return None - - simkl_expiration = simkl_data.get("expires_at") - if not simkl_expiration or datetime.now() >= simkl_expiration: - self.log.debug(f"Simkl cache expired for {title_id}") - return None - - self.log.debug(f"Simkl cache hit for {title_id}") - return simkl_data.get("response") - - def cache_simkl( - self, - title_id: str, - simkl_response: dict, - region: Optional[str] = None, - account_hash: Optional[str] = None, - ) -> None: - """ - Cache Simkl data for a title. - - Args: - title_id: The title identifier - simkl_response: Full Simkl API response - region: The region/proxy identifier - account_hash: Hash of account credentials - """ - if not config.title_cache_enabled: - return - - cache_key = self._generate_cache_key(title_id, region, account_hash) - cache = self.cacher.get(cache_key, version=1) - - if not cache or not cache.data: - self.log.debug(f"Cannot cache Simkl data: no title cache exists for {title_id}") - return - - now = datetime.now() - simkl_data = { - "response": simkl_response, - "fetched_at": now, - "expires_at": now + timedelta(days=7), - } - - cache.data.simkl_data = simkl_data - - cache.set(cache.data, expiration=cache.expiration) - self.log.debug(f"Cached Simkl data for {title_id}") + self.log.debug(f"Cached {provider_name} data for {title_id}") def get_region_from_proxy(proxy_url: Optional[str]) -> Optional[str]: diff --git a/unshackle/core/utils/tags.py b/unshackle/core/utils/tags.py index 5fad48c..3058f92 100644 --- a/unshackle/core/utils/tags.py +++ b/unshackle/core/utils/tags.py @@ -1,488 +1,23 @@ from __future__ import annotations import logging -import re import subprocess import tempfile -from difflib import SequenceMatcher from pathlib import Path -from typing import Optional, Tuple +from typing import Optional from xml.sax.saxutils import escape -import requests -from requests.adapters import HTTPAdapter, Retry - from unshackle.core import binaries from unshackle.core.config import config +from unshackle.core.providers import (ExternalIds, MetadataResult, enrich_ids, fetch_external_ids, fuzzy_match, + get_available_providers, get_provider, search_metadata) from unshackle.core.titles.episode import Episode from unshackle.core.titles.movie import Movie from unshackle.core.titles.title import Title -STRIP_RE = re.compile(r"[^a-z0-9]+", re.I) -YEAR_RE = re.compile(r"\s*\(?[12][0-9]{3}\)?$") -HEADERS = {"User-Agent": "unshackle-tags/1.0"} - - log = logging.getLogger("TAGS") -def _get_session() -> requests.Session: - """Create a requests session with retry logic for network failures.""" - session = requests.Session() - session.headers.update(HEADERS) - - retry = Retry( - total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"] - ) - - adapter = HTTPAdapter(max_retries=retry) - session.mount("https://", adapter) - session.mount("http://", adapter) - - return session - - -def _api_key() -> Optional[str]: - return config.tmdb_api_key - - -def _simkl_client_id() -> Optional[str]: - return config.simkl_client_id - - -def _clean(s: str) -> str: - return STRIP_RE.sub("", s).lower() - - -def _strip_year(s: str) -> str: - return YEAR_RE.sub("", s).strip() - - -def fuzzy_match(a: str, b: str, threshold: float = 0.8) -> bool: - """Return True if ``a`` and ``b`` are a close match.""" - - ratio = SequenceMatcher(None, _clean(a), _clean(b)).ratio() - return ratio >= threshold - - -def search_simkl( - title: str, - year: Optional[int], - kind: str, - title_cacher=None, - cache_title_id: Optional[str] = None, - cache_region: Optional[str] = None, - cache_account_hash: Optional[str] = None, -) -> Tuple[Optional[dict], Optional[str], Optional[int]]: - """Search Simkl API for show information by filename.""" - - if title_cacher and cache_title_id: - cached_simkl = title_cacher.get_cached_simkl(cache_title_id, cache_region, cache_account_hash) - if cached_simkl: - log.debug("Using cached Simkl data") - if cached_simkl.get("type") == "episode" and "show" in cached_simkl: - show_info = cached_simkl["show"] - show_title = show_info.get("title") - tmdb_id = show_info.get("ids", {}).get("tmdbtv") - if tmdb_id: - tmdb_id = int(tmdb_id) - return cached_simkl, show_title, tmdb_id - elif cached_simkl.get("type") == "movie" and "movie" in cached_simkl: - movie_info = cached_simkl["movie"] - movie_title = movie_info.get("title") - ids = movie_info.get("ids", {}) - tmdb_id = ids.get("tmdb") or ids.get("moviedb") - if tmdb_id: - tmdb_id = int(tmdb_id) - return cached_simkl, movie_title, tmdb_id - - log.debug("Searching Simkl for %r (%s, %s)", title, kind, year) - - client_id = _simkl_client_id() - if not client_id: - log.debug("No SIMKL client ID configured; skipping SIMKL search") - return None, None, None - - # Construct appropriate filename based on type - filename = f"{title}" - if year: - filename = f"{title} {year}" - - if kind == "tv": - filename += " S01E01.mkv" - else: # movie - filename += " 2160p.mkv" - - try: - session = _get_session() - headers = {"simkl-api-key": client_id} - resp = session.post("https://api.simkl.com/search/file", json={"file": filename}, headers=headers, timeout=30) - resp.raise_for_status() - data = resp.json() - log.debug("Simkl API response received") - - # Handle case where SIMKL returns empty list (no results) - if isinstance(data, list): - log.debug("Simkl returned list (no matches) for %r", filename) - return None, None, None - - # Handle TV show responses - if data.get("type") == "episode" and "show" in data: - show_info = data["show"] - show_title = show_info.get("title") - show_year = show_info.get("year") - - # Verify title matches and year if provided - if not fuzzy_match(show_title, title): - log.debug("Simkl title mismatch: searched %r, got %r", title, show_title) - return None, None, None - if year and show_year and abs(year - show_year) > 1: # Allow 1 year difference - log.debug("Simkl year mismatch: searched %d, got %d", year, show_year) - return None, None, None - - if title_cacher and cache_title_id: - try: - title_cacher.cache_simkl(cache_title_id, data, cache_region, cache_account_hash) - except Exception as exc: - log.debug("Failed to cache Simkl data: %s", exc) - - tmdb_id = show_info.get("ids", {}).get("tmdbtv") - if tmdb_id: - tmdb_id = int(tmdb_id) - log.debug("Simkl -> %s (TMDB ID %s)", show_title, tmdb_id) - return data, show_title, tmdb_id - - elif data.get("type") == "movie" and "movie" in data: - movie_info = data["movie"] - movie_title = movie_info.get("title") - movie_year = movie_info.get("year") - - if not fuzzy_match(movie_title, title): - log.debug("Simkl title mismatch: searched %r, got %r", title, movie_title) - return None, None, None - if year and movie_year and abs(year - movie_year) > 1: # Allow 1 year difference - log.debug("Simkl year mismatch: searched %d, got %d", year, movie_year) - return None, None, None - - if title_cacher and cache_title_id: - try: - title_cacher.cache_simkl(cache_title_id, data, cache_region, cache_account_hash) - except Exception as exc: - log.debug("Failed to cache Simkl data: %s", exc) - - ids = movie_info.get("ids", {}) - tmdb_id = ids.get("tmdb") or ids.get("moviedb") - if tmdb_id: - tmdb_id = int(tmdb_id) - log.debug("Simkl -> %s (TMDB ID %s)", movie_title, tmdb_id) - return data, movie_title, tmdb_id - - except (requests.RequestException, ValueError, KeyError) as exc: - log.debug("Simkl search failed: %s", exc) - - return None, None, None - - -def search_show_info( - title: str, - year: Optional[int], - kind: str, - title_cacher=None, - cache_title_id: Optional[str] = None, - cache_region: Optional[str] = None, - cache_account_hash: Optional[str] = None, -) -> Tuple[Optional[int], Optional[str], Optional[str]]: - """Search for show information, trying Simkl first, then TMDB fallback. Returns (tmdb_id, title, source).""" - simkl_data, simkl_title, simkl_tmdb_id = search_simkl( - title, year, kind, title_cacher, cache_title_id, cache_region, cache_account_hash - ) - - if simkl_data and simkl_title and fuzzy_match(simkl_title, title): - return simkl_tmdb_id, simkl_title, "simkl" - - tmdb_id, tmdb_title = search_tmdb(title, year, kind, title_cacher, cache_title_id, cache_region, cache_account_hash) - return tmdb_id, tmdb_title, "tmdb" - - -def _fetch_tmdb_detail(tmdb_id: int, kind: str) -> Optional[dict]: - """Fetch full TMDB detail response for caching.""" - api_key = _api_key() - if not api_key: - return None - - try: - session = _get_session() - r = session.get( - f"https://api.themoviedb.org/3/{kind}/{tmdb_id}", - params={"api_key": api_key}, - timeout=30, - ) - r.raise_for_status() - return r.json() - except requests.RequestException as exc: - log.debug("Failed to fetch TMDB detail: %s", exc) - return None - - -def _fetch_tmdb_external_ids(tmdb_id: int, kind: str) -> Optional[dict]: - """Fetch full TMDB external_ids response for caching.""" - api_key = _api_key() - if not api_key: - return None - - try: - session = _get_session() - r = session.get( - f"https://api.themoviedb.org/3/{kind}/{tmdb_id}/external_ids", - params={"api_key": api_key}, - timeout=30, - ) - r.raise_for_status() - return r.json() - except requests.RequestException as exc: - log.debug("Failed to fetch TMDB external IDs: %s", exc) - return None - - -def search_tmdb( - title: str, - year: Optional[int], - kind: str, - title_cacher=None, - cache_title_id: Optional[str] = None, - cache_region: Optional[str] = None, - cache_account_hash: Optional[str] = None, -) -> Tuple[Optional[int], Optional[str]]: - if title_cacher and cache_title_id: - cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash) - if cached_tmdb and cached_tmdb.get("detail"): - detail = cached_tmdb["detail"] - tmdb_id = detail.get("id") - tmdb_title = detail.get("title") or detail.get("name") - log.debug("Using cached TMDB data: %r (ID %s)", tmdb_title, tmdb_id) - return tmdb_id, tmdb_title - - api_key = _api_key() - if not api_key: - return None, None - - search_title = _strip_year(title) - log.debug("Searching TMDB for %r (%s, %s)", search_title, kind, year) - - params = {"api_key": api_key, "query": search_title} - if year is not None: - params["year" if kind == "movie" else "first_air_date_year"] = year - - try: - session = _get_session() - r = session.get( - f"https://api.themoviedb.org/3/search/{kind}", - params=params, - timeout=30, - ) - r.raise_for_status() - js = r.json() - results = js.get("results") or [] - log.debug("TMDB returned %d results", len(results)) - if not results: - return None, None - except requests.RequestException as exc: - log.warning("Failed to search TMDB for %s: %s", title, exc) - return None, None - - best_ratio = 0.0 - best_id: Optional[int] = None - best_title: Optional[str] = None - for result in results: - candidates = [ - result.get("title"), - result.get("name"), - result.get("original_title"), - result.get("original_name"), - ] - candidates = [c for c in candidates if c] # Filter out None/empty values - - if not candidates: - continue - - # Find the best matching candidate from all available titles - for candidate in candidates: - ratio = SequenceMatcher(None, _clean(search_title), _clean(candidate)).ratio() - if ratio > best_ratio: - best_ratio = ratio - best_id = result.get("id") - best_title = candidate - log.debug( - "Best candidate ratio %.2f for %r (ID %s)", - best_ratio, - best_title, - best_id, - ) - - if best_id is not None: - if title_cacher and cache_title_id: - try: - detail_response = _fetch_tmdb_detail(best_id, kind) - external_ids_response = _fetch_tmdb_external_ids(best_id, kind) - if detail_response and external_ids_response: - title_cacher.cache_tmdb( - cache_title_id, detail_response, external_ids_response, kind, cache_region, cache_account_hash - ) - except Exception as exc: - log.debug("Failed to cache TMDB data: %s", exc) - - return best_id, best_title - - first = results[0] - return first.get("id"), first.get("title") or first.get("name") - - -def get_title( - tmdb_id: int, - kind: str, - title_cacher=None, - cache_title_id: Optional[str] = None, - cache_region: Optional[str] = None, - cache_account_hash: Optional[str] = None, -) -> Optional[str]: - """Fetch the name/title of a TMDB entry by ID.""" - - if title_cacher and cache_title_id: - cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash) - if cached_tmdb and cached_tmdb.get("detail"): - detail = cached_tmdb["detail"] - tmdb_title = detail.get("title") or detail.get("name") - log.debug("Using cached TMDB title: %r", tmdb_title) - return tmdb_title - - api_key = _api_key() - if not api_key: - return None - - try: - session = _get_session() - r = session.get( - f"https://api.themoviedb.org/3/{kind}/{tmdb_id}", - params={"api_key": api_key}, - timeout=30, - ) - r.raise_for_status() - js = r.json() - - if title_cacher and cache_title_id: - try: - external_ids_response = _fetch_tmdb_external_ids(tmdb_id, kind) - if external_ids_response: - title_cacher.cache_tmdb( - cache_title_id, js, external_ids_response, kind, cache_region, cache_account_hash - ) - except Exception as exc: - log.debug("Failed to cache TMDB data: %s", exc) - - return js.get("title") or js.get("name") - except requests.RequestException as exc: - log.debug("Failed to fetch TMDB title: %s", exc) - return None - - -def get_year( - tmdb_id: int, - kind: str, - title_cacher=None, - cache_title_id: Optional[str] = None, - cache_region: Optional[str] = None, - cache_account_hash: Optional[str] = None, -) -> Optional[int]: - """Fetch the release year of a TMDB entry by ID.""" - - if title_cacher and cache_title_id: - cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash) - if cached_tmdb and cached_tmdb.get("detail"): - detail = cached_tmdb["detail"] - date = detail.get("release_date") or detail.get("first_air_date") - if date and len(date) >= 4 and date[:4].isdigit(): - year = int(date[:4]) - log.debug("Using cached TMDB year: %d", year) - return year - - api_key = _api_key() - if not api_key: - return None - - try: - session = _get_session() - r = session.get( - f"https://api.themoviedb.org/3/{kind}/{tmdb_id}", - params={"api_key": api_key}, - timeout=30, - ) - r.raise_for_status() - js = r.json() - - if title_cacher and cache_title_id: - try: - external_ids_response = _fetch_tmdb_external_ids(tmdb_id, kind) - if external_ids_response: - title_cacher.cache_tmdb( - cache_title_id, js, external_ids_response, kind, cache_region, cache_account_hash - ) - except Exception as exc: - log.debug("Failed to cache TMDB data: %s", exc) - - date = js.get("release_date") or js.get("first_air_date") - if date and len(date) >= 4 and date[:4].isdigit(): - return int(date[:4]) - return None - except requests.RequestException as exc: - log.debug("Failed to fetch TMDB year: %s", exc) - return None - - -def external_ids( - tmdb_id: int, - kind: str, - title_cacher=None, - cache_title_id: Optional[str] = None, - cache_region: Optional[str] = None, - cache_account_hash: Optional[str] = None, -) -> dict: - if title_cacher and cache_title_id: - cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash) - if cached_tmdb and cached_tmdb.get("external_ids"): - log.debug("Using cached TMDB external IDs") - return cached_tmdb["external_ids"] - - api_key = _api_key() - if not api_key: - return {} - url = f"https://api.themoviedb.org/3/{kind}/{tmdb_id}/external_ids" - log.debug("Fetching external IDs for %s %s", kind, tmdb_id) - - try: - session = _get_session() - r = session.get( - url, - params={"api_key": api_key}, - timeout=30, - ) - r.raise_for_status() - js = r.json() - log.debug("External IDs response: %s", js) - - if title_cacher and cache_title_id: - try: - detail_response = _fetch_tmdb_detail(tmdb_id, kind) - if detail_response: - title_cacher.cache_tmdb(cache_title_id, detail_response, js, kind, cache_region, cache_account_hash) - except Exception as exc: - log.debug("Failed to cache TMDB data: %s", exc) - - return js - except requests.RequestException as exc: - log.warning("Failed to fetch external IDs for %s %s: %s", kind, tmdb_id, exc) - return {} - - def apply_tags(path: Path, tags: dict[str, str]) -> None: if not tags: return @@ -509,9 +44,26 @@ def apply_tags(path: Path, tags: dict[str, str]) -> None: tmp_path.unlink(missing_ok=True) -def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) -> None: +def _build_tags_from_ids(ids: ExternalIds, kind: str) -> dict[str, str]: + """Build standard MKV tags from external IDs.""" + tags: dict[str, str] = {} + if ids.imdb_id: + tags["IMDB"] = ids.imdb_id + if ids.tmdb_id and ids.tmdb_kind: + tags["TMDB"] = f"{ids.tmdb_kind}/{ids.tmdb_id}" + if ids.tvdb_id: + prefix = "movies" if kind == "movie" else "series" + tags["TVDB2"] = f"{prefix}/{ids.tvdb_id}" + return tags + + +def tag_file( + path: Path, + title: Title, + tmdb_id: Optional[int] = None, + imdb_id: Optional[str] = None, +) -> None: log.debug("Tagging file %s with title %r", path, title) - standard_tags: dict[str, str] = {} custom_tags: dict[str, str] = {} if config.tag and config.tag_group_name: @@ -537,115 +89,52 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) -> apply_tags(path, custom_tags) return - if config.tag_imdb_tmdb: - # Check if we have any API keys available for metadata lookup - api_key = _api_key() - simkl_client = _simkl_client_id() + standard_tags: dict[str, str] = {} - if not api_key and not simkl_client: - log.debug("No TMDB API key or Simkl client ID configured; skipping IMDB/TMDB tag lookup") + if config.tag_imdb_tmdb: + providers = get_available_providers() + if not providers: + log.debug("No metadata providers available; skipping tag lookup") apply_tags(path, custom_tags) return + + result: Optional[MetadataResult] = None + + # Direct ID lookup path + if imdb_id: + imdbapi = get_provider("imdbapi") + if imdbapi: + result = imdbapi.get_by_id(imdb_id, kind) + if result: + result.external_ids.imdb_id = imdb_id + enrich_ids(result) + elif tmdb_id is not None: + tmdb = get_provider("tmdb") + if tmdb: + result = tmdb.get_by_id(tmdb_id, kind) + if result: + ext = tmdb.get_external_ids(tmdb_id, kind) + result.external_ids = ext else: - # If tmdb_id is provided (via --tmdb), skip Simkl and use TMDB directly - if tmdb_id is not None: - log.debug("Using provided TMDB ID %s for tags", tmdb_id) - else: - # Try Simkl first for automatic lookup (only if client ID is available) - if simkl_client: - simkl_data, simkl_title, simkl_tmdb_id = search_simkl(name, year, kind) + # Search across providers in priority order + result = search_metadata(name, year, kind) - if simkl_data and simkl_title and fuzzy_match(simkl_title, name): - log.debug("Using Simkl data for tags") - if simkl_tmdb_id: - tmdb_id = simkl_tmdb_id + # If we got a TMDB ID from search but no full external IDs, fetch them + if result and result.external_ids.tmdb_id and not result.external_ids.imdb_id: + ext = fetch_external_ids(result.external_ids.tmdb_id, kind) + if ext.imdb_id: + result.external_ids.imdb_id = ext.imdb_id + if ext.tvdb_id: + result.external_ids.tvdb_id = ext.tvdb_id - # Handle TV show data from Simkl - if simkl_data.get("type") == "episode" and "show" in simkl_data: - show_ids = simkl_data.get("show", {}).get("ids", {}) - if show_ids.get("imdb"): - standard_tags["IMDB"] = show_ids["imdb"] - if show_ids.get("tvdb"): - standard_tags["TVDB2"] = f"series/{show_ids['tvdb']}" - if show_ids.get("tmdbtv"): - standard_tags["TMDB"] = f"tv/{show_ids['tmdbtv']}" + if result and result.external_ids: + standard_tags = _build_tags_from_ids(result.external_ids, kind) - # Handle movie data from Simkl - elif simkl_data.get("type") == "movie" and "movie" in simkl_data: - movie_ids = simkl_data.get("movie", {}).get("ids", {}) - if movie_ids.get("imdb"): - standard_tags["IMDB"] = movie_ids["imdb"] - if movie_ids.get("tvdb"): - standard_tags["TVDB2"] = f"movies/{movie_ids['tvdb']}" - if movie_ids.get("tmdb"): - standard_tags["TMDB"] = f"movie/{movie_ids['tmdb']}" - - # Use TMDB API for additional metadata (either from provided ID or Simkl lookup) - if api_key: - tmdb_title: Optional[str] = None - if tmdb_id is None: - tmdb_id, tmdb_title = search_tmdb(name, year, kind) - log.debug("TMDB search result: %r (ID %s)", tmdb_title, tmdb_id) - if not tmdb_id or not tmdb_title or not fuzzy_match(tmdb_title, name): - log.debug("TMDB search did not match; skipping external ID lookup") - else: - prefix = "movie" if kind == "movie" else "tv" - standard_tags["TMDB"] = f"{prefix}/{tmdb_id}" - try: - ids = external_ids(tmdb_id, kind) - except requests.RequestException as exc: - log.debug("Failed to fetch external IDs: %s", exc) - ids = {} - else: - log.debug("External IDs found: %s", ids) - - imdb_id = ids.get("imdb_id") - if imdb_id: - standard_tags["IMDB"] = imdb_id - tvdb_id = ids.get("tvdb_id") - if tvdb_id: - if kind == "movie": - standard_tags["TVDB2"] = f"movies/{tvdb_id}" - else: - standard_tags["TVDB2"] = f"series/{tvdb_id}" - elif tmdb_id is not None: - # tmdb_id was provided or found via Simkl - prefix = "movie" if kind == "movie" else "tv" - standard_tags["TMDB"] = f"{prefix}/{tmdb_id}" - try: - ids = external_ids(tmdb_id, kind) - except requests.RequestException as exc: - log.debug("Failed to fetch external IDs: %s", exc) - ids = {} - else: - log.debug("External IDs found: %s", ids) - - imdb_id = ids.get("imdb_id") - if imdb_id: - standard_tags["IMDB"] = imdb_id - tvdb_id = ids.get("tvdb_id") - if tvdb_id: - if kind == "movie": - standard_tags["TVDB2"] = f"movies/{tvdb_id}" - else: - standard_tags["TVDB2"] = f"series/{tvdb_id}" - else: - log.debug("No TMDB API key configured; skipping TMDB external ID lookup") - - merged_tags = { - **custom_tags, - **standard_tags, - } - apply_tags(path, merged_tags) + apply_tags(path, {**custom_tags, **standard_tags}) __all__ = [ - "search_simkl", - "search_show_info", - "search_tmdb", - "get_title", - "get_year", - "external_ids", - "tag_file", + "apply_tags", "fuzzy_match", + "tag_file", ]