refactor(providers): extract metadata providers into modular system

- Create `unshackle/core/providers/` package with abstract base class, IMDBApi (free, no key), SIMKL, and TMDB provider implementations
- Add consensus-based ID enrichment: cross-references IMDB IDs with TMDB and SIMKL, drops all data from providers that disagree on tmdb_id (likely resolved to wrong title)
- Cache enriched IDs alongside raw provider data so they survive cache round-trips
- Genericize TitleCacher with `cache_provider()`/`get_cached_provider()` replacing provider-specific methods; respect `--no-cache` flag
- Add `--imdb` CLI flag to dl command for direct IMDB ID lookup
This commit is contained in:
Andy
2026-02-25 19:02:18 -07:00
parent 42d6ef5765
commit 820db5f179
10 changed files with 1207 additions and 749 deletions

View File

@@ -7,10 +7,10 @@ repos:
hooks:
- id: conventional-pre-commit
stages: [commit-msg]
- repo: https://github.com/mtkennerly/pre-commit-hooks
rev: v0.4.0
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.3
hooks:
- id: poetry-ruff-check
- id: ruff
args: [--fix]
- repo: https://github.com/pycqa/isort
rev: 6.0.1

View File

@@ -94,7 +94,7 @@ For example,
tmdb_api_key: cf66bf18956kca5311ada3bebb84eb9a # Not a real key
```
**Note**: Keep your API key secure and do not share it publicly. This key is used by the core/utils/tags.py module to fetch metadata from TMDB for proper file tagging.
**Note**: Keep your API key secure and do not share it publicly. This key is used by the `core/providers/tmdb.py` metadata provider to fetch metadata from TMDB for proper file tagging and ID enrichment.
---
@@ -115,7 +115,7 @@ For example,
simkl_client_id: "your_client_id_here"
```
**Note**: While optional, having a SIMKL Client ID improves metadata lookup reliability. SIMKL serves as an alternative or fallback metadata source to TMDB. This is used by the `core/utils/tags.py` module.
**Note**: While optional, having a SIMKL Client ID improves metadata lookup reliability. SIMKL serves as an alternative or fallback metadata source to TMDB. This is used by the `core/providers/simkl.py` metadata provider.
---

View File

@@ -42,7 +42,7 @@ from rich.table import Table
from rich.text import Text
from rich.tree import Tree
from unshackle.core import binaries
from unshackle.core import binaries, providers
from unshackle.core.cdm import CustomRemoteCDM, DecryptLabsRemoteCDM
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.config import config
@@ -429,6 +429,13 @@ class dl:
default=False,
help="Use the release year from TMDB for naming and tagging.",
)
@click.option(
"--imdb",
"imdb_id",
type=str,
default=None,
help="Use this IMDB ID (e.g. tt1375666) for tagging instead of automatic lookup.",
)
@click.option(
"--sub-format",
type=SubtitleCodecChoice(Subtitle.Codec),
@@ -523,6 +530,7 @@ class dl:
tmdb_id: Optional[int] = None,
tmdb_name: bool = False,
tmdb_year: bool = False,
imdb_id: Optional[str] = None,
output_dir: Optional[Path] = None,
*_: Any,
**__: Any,
@@ -569,6 +577,7 @@ class dl:
self.tmdb_id = tmdb_id
self.tmdb_name = tmdb_name
self.tmdb_year = tmdb_year
self.imdb_id = imdb_id
self.output_dir = output_dir
# Initialize debug logger with service name if debug logging is enabled
@@ -595,10 +604,11 @@ class dl:
"tmdb_id": tmdb_id,
"tmdb_name": tmdb_name,
"tmdb_year": tmdb_year,
"imdb_id": imdb_id,
"cli_params": {
k: v
for k, v in ctx.params.items()
if k not in ["profile", "proxy", "tag", "tmdb_id", "tmdb_name", "tmdb_year"]
if k not in ["profile", "proxy", "tag", "tmdb_id", "tmdb_name", "tmdb_year", "imdb_id"]
},
},
)
@@ -622,9 +632,7 @@ class dl:
)
version = (r.stdout or r.stderr or "").strip()
elif name in ("ffmpeg", "ffprobe"):
r = subprocess.run(
[str(binary), "-version"], capture_output=True, text=True, timeout=5
)
r = subprocess.run([str(binary), "-version"], capture_output=True, text=True, timeout=5)
version = (r.stdout or "").split("\n")[0].strip()
elif name == "mkvmerge":
r = subprocess.run(
@@ -632,9 +640,7 @@ class dl:
)
version = (r.stdout or "").strip()
elif name == "mp4decrypt":
r = subprocess.run(
[str(binary)], capture_output=True, text=True, timeout=5
)
r = subprocess.run([str(binary)], capture_output=True, text=True, timeout=5)
output = (r.stdout or "") + (r.stderr or "")
lines = [line.strip() for line in output.split("\n") if line.strip()]
version = " | ".join(lines[:2]) if lines else None
@@ -1087,12 +1093,12 @@ class dl:
tmdb_name_val = None
if self.tmdb_year:
tmdb_year_val = tags.get_year(
tmdb_year_val = providers.get_year_by_id(
self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash
)
if self.tmdb_name:
tmdb_name_val = tags.get_title(
tmdb_name_val = providers.get_title_by_id(
self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash
)
@@ -1214,15 +1220,20 @@ class dl:
if isinstance(title, Episode) and not self.tmdb_searched:
kind = "tv"
tmdb_title: Optional[str] = None
if self.tmdb_id:
tmdb_title = tags.get_title(
tmdb_title = providers.get_title_by_id(
self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash
)
else:
self.tmdb_id, tmdb_title, self.search_source = tags.search_show_info(
result = providers.search_metadata(
title.title, title.year, kind, title_cacher, cache_title_id, cache_region, cache_account_hash
)
if not (self.tmdb_id and tmdb_title and tags.fuzzy_match(tmdb_title, title.title)):
if result and result.title and providers.fuzzy_match(result.title, title.title):
self.tmdb_id = result.external_ids.tmdb_id
tmdb_title = result.title
self.search_source = result.source
else:
self.tmdb_id = None
if list_ or list_titles:
if self.tmdb_id:
@@ -1237,22 +1248,25 @@ class dl:
self.tmdb_searched = True
if isinstance(title, Movie) and (list_ or list_titles) and not self.tmdb_id:
movie_id, movie_title, _ = tags.search_show_info(
movie_result = providers.search_metadata(
title.name, title.year, "movie", title_cacher, cache_title_id, cache_region, cache_account_hash
)
if movie_id:
if movie_result and movie_result.external_ids.tmdb_id:
console.print(
Padding(
f"Search -> {movie_title or '?'} [bright_black](ID {movie_id})",
f"Search -> {movie_result.title or '?'} "
f"[bright_black](ID {movie_result.external_ids.tmdb_id})",
(0, 5),
)
)
else:
console.print(Padding("Search -> [bright_black]No match found[/]", (0, 5)))
if self.tmdb_id and getattr(self, "search_source", None) != "simkl":
if self.tmdb_id and getattr(self, "search_source", None) not in ("simkl", "imdbapi"):
kind = "tv" if isinstance(title, Episode) else "movie"
tags.external_ids(self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash)
providers.fetch_external_ids(
self.tmdb_id, kind, title_cacher, cache_title_id, cache_region, cache_account_hash
)
if slow and i != 0:
delay = random.randint(60, 120)
@@ -1460,11 +1474,13 @@ class dl:
if has_hybrid:
# Split tracks: hybrid candidates vs non-hybrid
hybrid_candidate_tracks = [
v for v in title.tracks.videos
v
for v in title.tracks.videos
if v.range in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
non_hybrid_tracks = [
v for v in title.tracks.videos
v
for v in title.tracks.videos
if v.range not in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
@@ -1475,11 +1491,9 @@ class dl:
if non_hybrid_ranges and non_hybrid_tracks:
# Also filter non-hybrid tracks by resolution
non_hybrid_selected = [
v for v in non_hybrid_tracks
if any(
v.height == res or int(v.width * (9 / 16)) == res
for res in quality
)
v
for v in non_hybrid_tracks
if any(v.height == res or int(v.width * (9 / 16)) == res for res in quality)
]
title.tracks.videos = hybrid_selected + non_hybrid_selected
else:
@@ -1513,29 +1527,25 @@ class dl:
if has_hybrid:
# Apply hybrid selection for HYBRID tracks
hybrid_candidate_tracks = [
v for v in title.tracks.videos
v
for v in title.tracks.videos
if v.range in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
non_hybrid_tracks = [
v for v in title.tracks.videos
v
for v in title.tracks.videos
if v.range not in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
if not quality:
best_resolution = max(
(v.height for v in hybrid_candidate_tracks), default=None
)
best_resolution = max((v.height for v in hybrid_candidate_tracks), default=None)
if best_resolution:
hybrid_filter = title.tracks.select_hybrid(
hybrid_candidate_tracks, [best_resolution]
)
hybrid_filter = title.tracks.select_hybrid(hybrid_candidate_tracks, [best_resolution])
hybrid_selected = list(filter(hybrid_filter, hybrid_candidate_tracks))
else:
hybrid_selected = []
else:
hybrid_filter = title.tracks.select_hybrid(
hybrid_candidate_tracks, quality
)
hybrid_filter = title.tracks.select_hybrid(hybrid_candidate_tracks, quality)
hybrid_selected = list(filter(hybrid_filter, hybrid_candidate_tracks))
# For non-hybrid ranges, apply Cartesian product selection
@@ -1588,8 +1598,7 @@ class dl:
# validate hybrid mode requirements
if any(r == Video.Range.HYBRID for r in range_):
base_tracks = [
v for v in title.tracks.videos
if v.range in (Video.Range.HDR10, Video.Range.HDR10P)
v for v in title.tracks.videos if v.range in (Video.Range.HDR10, Video.Range.HDR10P)
]
dv_tracks = [v for v in title.tracks.videos if v.range == Video.Range.DV]
@@ -1617,8 +1626,7 @@ class dl:
if best_available and other_ranges:
self.log.warning(msg)
self.log.warning(
f"Continuing with remaining range(s): "
f"{', '.join(r.name for r in other_ranges)}"
f"Continuing with remaining range(s): {', '.join(r.name for r in other_ranges)}"
)
range_ = other_ranges
else:
@@ -2150,8 +2158,7 @@ class dl:
# Group video tracks by resolution (prefer HDR10+ over HDR10 as base)
resolutions_processed = set()
base_tracks_list = [
v for v in title.tracks.videos
if v.range in (Video.Range.HDR10P, Video.Range.HDR10)
v for v in title.tracks.videos if v.range in (Video.Range.HDR10P, Video.Range.HDR10)
]
dv_tracks = [v for v in title.tracks.videos if v.range == Video.Range.DV]
@@ -2399,7 +2406,7 @@ class dl:
final_path.unlink()
shutil.move(muxed_path, final_path)
used_final_paths.add(final_path)
tags.tag_file(final_path, title, self.tmdb_id)
tags.tag_file(final_path, title, self.tmdb_id, self.imdb_id)
title_dl_time = time_elapsed_since(dl_start_time)
console.print(

View File

@@ -0,0 +1,428 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
import requests
from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, fuzzy_match, log
from unshackle.core.providers.imdbapi import IMDBApiProvider
from unshackle.core.providers.simkl import SimklProvider
from unshackle.core.providers.tmdb import TMDBProvider
if TYPE_CHECKING:
from unshackle.core.title_cacher import TitleCacher
# Ordered by priority: IMDBApi (free), SIMKL, TMDB
ALL_PROVIDERS: list[type[MetadataProvider]] = [IMDBApiProvider, SimklProvider, TMDBProvider]
def get_available_providers() -> list[MetadataProvider]:
"""Return instantiated providers that have valid credentials."""
return [cls() for cls in ALL_PROVIDERS if cls().is_available()]
def get_provider(name: str) -> Optional[MetadataProvider]:
"""Get a specific provider by name."""
for cls in ALL_PROVIDERS:
if cls.NAME == name:
p = cls()
return p if p.is_available() else None
return None
# -- Public API (replaces tags.py functions) --
def search_metadata(
title: str,
year: Optional[int],
kind: str,
title_cacher: Optional[TitleCacher] = None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Optional[MetadataResult]:
"""Search all available providers for metadata. Returns best match."""
# Check cache first
if title_cacher and cache_title_id:
for cls in ALL_PROVIDERS:
p = cls()
if not p.is_available():
continue
cached = title_cacher.get_cached_provider(p.NAME, cache_title_id, kind, cache_region, cache_account_hash)
if cached:
result = _cached_to_result(cached, p.NAME, kind)
if result and result.title and fuzzy_match(result.title, title):
log.debug("Using cached %s data for %r", p.NAME, title)
return result
# Search providers in priority order
for cls in ALL_PROVIDERS:
p = cls()
if not p.is_available():
continue
try:
result = p.search(title, year, kind)
except (requests.RequestException, ValueError, KeyError) as exc:
log.debug("%s search failed: %s", p.NAME, exc)
continue
if result and result.title and fuzzy_match(result.title, title):
# Enrich with cross-referenced IDs if we have IMDB but missing TMDB/TVDB
enrich_ids(result)
# Cache the result (include enriched IDs so they survive round-trip)
if title_cacher and cache_title_id and result.raw:
try:
cache_data = result.raw
if result.external_ids.tmdb_id or result.external_ids.tvdb_id:
cache_data = {
**result.raw,
"_enriched_ids": _external_ids_to_dict(result.external_ids),
}
title_cacher.cache_provider(
p.NAME, cache_title_id, cache_data, kind, cache_region, cache_account_hash
)
except Exception as exc:
log.debug("Failed to cache %s data: %s", p.NAME, exc)
return result
return None
def get_title_by_id(
tmdb_id: int,
kind: str,
title_cacher: Optional[TitleCacher] = None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Optional[str]:
"""Get title name by TMDB ID."""
# Check cache first
if title_cacher and cache_title_id:
cached = title_cacher.get_cached_provider("tmdb", cache_title_id, kind, cache_region, cache_account_hash)
if cached and cached.get("detail"):
detail = cached["detail"]
tmdb_title = detail.get("title") or detail.get("name")
if tmdb_title:
log.debug("Using cached TMDB title: %r", tmdb_title)
return tmdb_title
tmdb = get_provider("tmdb")
if not tmdb:
return None
result = tmdb.get_by_id(tmdb_id, kind)
if not result:
return None
# Cache if possible
if title_cacher and cache_title_id and result.raw:
try:
ext_ids = tmdb.get_external_ids(tmdb_id, kind)
title_cacher.cache_provider(
"tmdb",
cache_title_id,
{"detail": result.raw, "external_ids": _external_ids_to_dict(ext_ids)},
kind,
cache_region,
cache_account_hash,
)
except Exception as exc:
log.debug("Failed to cache TMDB data: %s", exc)
return result.title
def get_year_by_id(
tmdb_id: int,
kind: str,
title_cacher: Optional[TitleCacher] = None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Optional[int]:
"""Get release year by TMDB ID."""
# Check cache first
if title_cacher and cache_title_id:
cached = title_cacher.get_cached_provider("tmdb", cache_title_id, kind, cache_region, cache_account_hash)
if cached and cached.get("detail"):
detail = cached["detail"]
date = detail.get("release_date") or detail.get("first_air_date")
if date and len(date) >= 4 and date[:4].isdigit():
year = int(date[:4])
log.debug("Using cached TMDB year: %d", year)
return year
tmdb = get_provider("tmdb")
if not tmdb:
return None
result = tmdb.get_by_id(tmdb_id, kind)
if not result:
return None
# Cache if possible
if title_cacher and cache_title_id and result.raw:
try:
ext_ids = tmdb.get_external_ids(tmdb_id, kind)
title_cacher.cache_provider(
"tmdb",
cache_title_id,
{"detail": result.raw, "external_ids": _external_ids_to_dict(ext_ids)},
kind,
cache_region,
cache_account_hash,
)
except Exception as exc:
log.debug("Failed to cache TMDB data: %s", exc)
return result.year
def fetch_external_ids(
tmdb_id: int,
kind: str,
title_cacher: Optional[TitleCacher] = None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> ExternalIds:
"""Get external IDs by TMDB ID."""
# Check cache first
if title_cacher and cache_title_id:
cached = title_cacher.get_cached_provider("tmdb", cache_title_id, kind, cache_region, cache_account_hash)
if cached and cached.get("external_ids"):
log.debug("Using cached TMDB external IDs")
raw = cached["external_ids"]
return ExternalIds(
imdb_id=raw.get("imdb_id"),
tmdb_id=tmdb_id,
tmdb_kind=kind,
tvdb_id=raw.get("tvdb_id"),
)
tmdb = get_provider("tmdb")
if not tmdb:
return ExternalIds()
ext = tmdb.get_external_ids(tmdb_id, kind)
# Cache if possible
if title_cacher and cache_title_id:
try:
detail = None
result = tmdb.get_by_id(tmdb_id, kind)
if result and result.raw:
detail = result.raw
if detail:
title_cacher.cache_provider(
"tmdb",
cache_title_id,
{"detail": detail, "external_ids": _external_ids_to_dict(ext)},
kind,
cache_region,
cache_account_hash,
)
except Exception as exc:
log.debug("Failed to cache TMDB data: %s", exc)
return ext
# -- Internal helpers --
# Provider authority ranking for tie-breaking (lower index = more authoritative)
_ENRICHMENT_PROVIDERS = ("tmdb", "simkl")
_ENRICHMENT_AUTHORITY: dict[str, int] = {name: i for i, name in enumerate(_ENRICHMENT_PROVIDERS)}
def enrich_ids(result: MetadataResult) -> None:
"""Enrich a MetadataResult by cross-referencing IMDB ID with available providers.
Queries all available providers, cross-validates tmdb_id as anchor.
If a provider returns a different tmdb_id than the authoritative source,
ALL of that provider's data is dropped (likely resolved to wrong title).
"""
ids = result.external_ids
if not ids.imdb_id:
return
if ids.tmdb_id and ids.tvdb_id:
return # already have everything
kind = result.kind or "movie"
# Step 1: Collect enrichment results from all available providers
enrichments: list[tuple[str, ExternalIds]] = []
for provider_name in _ENRICHMENT_PROVIDERS:
p = get_provider(provider_name)
if not p:
continue
try:
enriched = p.find_by_imdb_id(ids.imdb_id, kind) # type: ignore[union-attr]
except Exception as exc:
log.debug("Enrichment via %s failed: %s", provider_name, exc)
continue
if enriched:
enrichments.append((provider_name, enriched))
if not enrichments:
return
# Step 2: Cross-validate using tmdb_id as anchor — drop providers that disagree
validated = _validate_enrichments(enrichments)
# Step 3: Merge validated data (fill gaps only)
for _provider_name, ext in validated:
if not ids.tmdb_id and ext.tmdb_id:
ids.tmdb_id = ext.tmdb_id
ids.tmdb_kind = ext.tmdb_kind or kind
if not ids.tvdb_id and ext.tvdb_id:
ids.tvdb_id = ext.tvdb_id
def _validate_enrichments(
enrichments: list[tuple[str, ExternalIds]],
) -> list[tuple[str, ExternalIds]]:
"""Drop providers whose tmdb_id conflicts with the authoritative value.
If providers disagree on tmdb_id, the more authoritative source wins
and ALL data from disagreeing providers is discarded (different tmdb_id
means the provider likely resolved to a different title entirely).
"""
from collections import Counter
# Collect tmdb_id votes
tmdb_votes: dict[str, int] = {}
for provider_name, ext in enrichments:
if ext.tmdb_id is not None:
tmdb_votes[provider_name] = ext.tmdb_id
if len(set(tmdb_votes.values())) <= 1:
return enrichments # all agree or only one voted — no conflict
# Find the authoritative tmdb_id
value_counts = Counter(tmdb_votes.values())
most_common_val, most_common_count = value_counts.most_common(1)[0]
if most_common_count > 1:
anchor_tmdb_id = most_common_val
else:
# No majority — pick the most authoritative provider
best_provider = min(
tmdb_votes.keys(),
key=lambda name: _ENRICHMENT_AUTHORITY.get(name, 99),
)
anchor_tmdb_id = tmdb_votes[best_provider]
# Drop any provider that disagrees
validated: list[tuple[str, ExternalIds]] = []
for provider_name, ext in enrichments:
if ext.tmdb_id is not None and ext.tmdb_id != anchor_tmdb_id:
log.debug(
"Dropping %s enrichment data: tmdb_id %s conflicts with "
"authoritative value %s (likely resolved to wrong title)",
provider_name,
ext.tmdb_id,
anchor_tmdb_id,
)
continue
validated.append((provider_name, ext))
return validated
def _external_ids_to_dict(ext: ExternalIds) -> dict:
"""Convert ExternalIds to a dict for caching."""
result: dict = {}
if ext.imdb_id:
result["imdb_id"] = ext.imdb_id
if ext.tmdb_id:
result["tmdb_id"] = ext.tmdb_id
if ext.tmdb_kind:
result["tmdb_kind"] = ext.tmdb_kind
if ext.tvdb_id:
result["tvdb_id"] = ext.tvdb_id
return result
def _cached_to_result(cached: dict, provider_name: str, kind: str) -> Optional[MetadataResult]:
"""Convert a cached provider dict back to a MetadataResult."""
if provider_name == "tmdb":
detail = cached.get("detail", {})
ext_raw = cached.get("external_ids", {})
title = detail.get("title") or detail.get("name")
date = detail.get("release_date") or detail.get("first_air_date")
year = int(date[:4]) if date and len(date) >= 4 and date[:4].isdigit() else None
tmdb_id = detail.get("id")
return MetadataResult(
title=title,
year=year,
kind=kind,
external_ids=ExternalIds(
imdb_id=ext_raw.get("imdb_id"),
tmdb_id=tmdb_id,
tmdb_kind=kind,
tvdb_id=ext_raw.get("tvdb_id"),
),
source="tmdb",
raw=cached,
)
elif provider_name == "simkl":
response = cached.get("response", cached)
if response.get("type") == "episode" and "show" in response:
info = response["show"]
elif response.get("type") == "movie" and "movie" in response:
info = response["movie"]
else:
return None
ids = info.get("ids", {})
tmdb_id = ids.get("tmdbtv") or ids.get("tmdb") or ids.get("moviedb")
if tmdb_id:
tmdb_id = int(tmdb_id)
return MetadataResult(
title=info.get("title"),
year=info.get("year"),
kind=kind,
external_ids=ExternalIds(
imdb_id=ids.get("imdb"),
tmdb_id=tmdb_id,
tmdb_kind=kind,
tvdb_id=ids.get("tvdb"),
),
source="simkl",
raw=cached,
)
elif provider_name == "imdbapi":
title = cached.get("primaryTitle") or cached.get("originalTitle")
year = cached.get("startYear")
imdb_id = cached.get("id")
# Restore enriched IDs that were saved alongside the raw data
enriched = cached.get("_enriched_ids", {})
return MetadataResult(
title=title,
year=year,
kind=kind,
external_ids=ExternalIds(
imdb_id=imdb_id,
tmdb_id=enriched.get("tmdb_id"),
tmdb_kind=enriched.get("tmdb_kind"),
tvdb_id=enriched.get("tvdb_id"),
),
source="imdbapi",
raw=cached,
)
return None
__all__ = [
"ALL_PROVIDERS",
"ExternalIds",
"MetadataProvider",
"MetadataResult",
"enrich_ids",
"fetch_external_ids",
"fuzzy_match",
"get_available_providers",
"get_provider",
"get_title_by_id",
"get_year_by_id",
"search_metadata",
]

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
import logging
import re
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Optional, Union
import requests
from requests.adapters import HTTPAdapter, Retry
log = logging.getLogger("METADATA")
HEADERS = {"User-Agent": "unshackle-tags/1.0"}
STRIP_RE = re.compile(r"[^a-z0-9]+", re.I)
YEAR_RE = re.compile(r"\s*\(?[12][0-9]{3}\)?$")
@dataclass
class ExternalIds:
"""Normalized external IDs across providers."""
imdb_id: Optional[str] = None
tmdb_id: Optional[int] = None
tmdb_kind: Optional[str] = None # "movie" or "tv"
tvdb_id: Optional[int] = None
@dataclass
class MetadataResult:
"""Unified metadata result from any provider."""
title: Optional[str] = None
year: Optional[int] = None
kind: Optional[str] = None # "movie" or "tv"
external_ids: ExternalIds = field(default_factory=ExternalIds)
source: str = "" # provider name, e.g. "tmdb", "simkl", "imdbapi"
raw: Optional[dict] = None # original API response for caching
class MetadataProvider(metaclass=ABCMeta):
"""Abstract base for metadata providers."""
NAME: str = ""
REQUIRES_KEY: bool = True
def __init__(self) -> None:
self.log = logging.getLogger(f"METADATA.{self.NAME.upper()}")
self._session: Optional[requests.Session] = None
@property
def session(self) -> requests.Session:
if self._session is None:
self._session = requests.Session()
self._session.headers.update(HEADERS)
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
)
adapter = HTTPAdapter(max_retries=retry)
self._session.mount("https://", adapter)
self._session.mount("http://", adapter)
return self._session
@abstractmethod
def is_available(self) -> bool:
"""Return True if this provider has the credentials/keys it needs."""
@abstractmethod
def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]:
"""Search for a title and return metadata, or None on failure/no match."""
@abstractmethod
def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]:
"""Fetch metadata by this provider's native ID."""
@abstractmethod
def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds:
"""Fetch external IDs for a title by this provider's native ID."""
def _clean(s: str) -> str:
return STRIP_RE.sub("", s).lower()
def _strip_year(s: str) -> str:
return YEAR_RE.sub("", s).strip()
def fuzzy_match(a: str, b: str, threshold: float = 0.8) -> bool:
"""Return True if ``a`` and ``b`` are a close match."""
ratio = SequenceMatcher(None, _clean(a), _clean(b)).ratio()
return ratio >= threshold

View File

@@ -0,0 +1,123 @@
from __future__ import annotations
from difflib import SequenceMatcher
from typing import Optional, Union
import requests
from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, _clean, fuzzy_match
# Mapping from our kind ("movie"/"tv") to imdbapi.dev title types
KIND_TO_TYPES: dict[str, list[str]] = {
"movie": ["movie"],
"tv": ["tvSeries", "tvMiniSeries"],
}
class IMDBApiProvider(MetadataProvider):
"""IMDb metadata provider using imdbapi.dev (free, no API key)."""
NAME = "imdbapi"
REQUIRES_KEY = False
BASE_URL = "https://api.imdbapi.dev"
def is_available(self) -> bool:
return True # no key needed
def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]:
self.log.debug("Searching IMDBApi for %r (%s, %s)", title, kind, year)
try:
params: dict[str, str | int] = {"query": title, "limit": 20}
r = self.session.get(
f"{self.BASE_URL}/search/titles",
params=params,
timeout=30,
)
r.raise_for_status()
data = r.json()
except (requests.RequestException, ValueError) as exc:
self.log.debug("IMDBApi search failed: %s", exc)
return None
results = data.get("titles") or data.get("results") or []
if not results:
self.log.debug("IMDBApi returned no results for %r", title)
return None
# Filter by type if possible
type_filter = KIND_TO_TYPES.get(kind, [])
filtered = [r for r in results if r.get("type") in type_filter] if type_filter else results
candidates = filtered if filtered else results
# Find best fuzzy match, optionally filtered by year
best_match: Optional[dict] = None
best_ratio = 0.0
for candidate in candidates:
primary = candidate.get("primaryTitle") or ""
original = candidate.get("originalTitle") or ""
for name in [primary, original]:
if not name:
continue
ratio = SequenceMatcher(None, _clean(title), _clean(name)).ratio()
if ratio > best_ratio:
# If year provided, prefer matches within 1 year
candidate_year = candidate.get("startYear")
if year and candidate_year and abs(year - candidate_year) > 1:
continue
best_ratio = ratio
best_match = candidate
if not best_match:
self.log.debug("No matching result found in IMDBApi for %r", title)
return None
result_title = best_match.get("primaryTitle") or best_match.get("originalTitle")
if not result_title or not fuzzy_match(result_title, title):
self.log.debug("IMDBApi title mismatch: searched %r, got %r", title, result_title)
return None
imdb_id = best_match.get("id")
result_year = best_match.get("startYear")
self.log.debug("IMDBApi -> %s (ID %s)", result_title, imdb_id)
return MetadataResult(
title=result_title,
year=result_year,
kind=kind,
external_ids=ExternalIds(imdb_id=imdb_id),
source="imdbapi",
raw=best_match,
)
def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]:
"""Fetch metadata by IMDB ID (e.g. 'tt1375666')."""
imdb_id = str(provider_id)
self.log.debug("Fetching IMDBApi title %s", imdb_id)
try:
r = self.session.get(f"{self.BASE_URL}/titles/{imdb_id}", timeout=30)
r.raise_for_status()
data = r.json()
except (requests.RequestException, ValueError) as exc:
self.log.debug("IMDBApi get_by_id failed: %s", exc)
return None
title = data.get("primaryTitle") or data.get("originalTitle")
result_year = data.get("startYear")
return MetadataResult(
title=title,
year=result_year,
kind=kind,
external_ids=ExternalIds(imdb_id=data.get("id")),
source="imdbapi",
raw=data,
)
def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds:
"""Return external IDs. For IMDB, the provider_id IS the IMDB ID."""
return ExternalIds(imdb_id=str(provider_id))

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
from typing import Optional, Union
import requests
from unshackle.core.config import config
from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, fuzzy_match
class SimklProvider(MetadataProvider):
"""SIMKL metadata provider (filename-based search)."""
NAME = "simkl"
REQUIRES_KEY = True
BASE_URL = "https://api.simkl.com"
def is_available(self) -> bool:
return bool(config.simkl_client_id)
def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]:
self.log.debug("Searching Simkl for %r (%s, %s)", title, kind, year)
# Construct appropriate filename based on type
filename = f"{title}"
if year:
filename = f"{title} {year}"
if kind == "tv":
filename += " S01E01.mkv"
else:
filename += " 2160p.mkv"
try:
headers = {"simkl-api-key": config.simkl_client_id}
resp = self.session.post(
f"{self.BASE_URL}/search/file", json={"file": filename}, headers=headers, timeout=30
)
resp.raise_for_status()
data = resp.json()
self.log.debug("Simkl API response received")
except (requests.RequestException, ValueError) as exc:
self.log.debug("Simkl search failed: %s", exc)
return None
# Handle case where SIMKL returns empty list (no results)
if isinstance(data, list):
self.log.debug("Simkl returned list (no matches) for %r", filename)
return None
return self._parse_response(data, title, year, kind)
def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]:
return None # SIMKL has no direct ID lookup used here
def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds:
return ExternalIds() # IDs come from search() response
def find_by_imdb_id(self, imdb_id: str, kind: str) -> Optional[ExternalIds]:
"""Look up TMDB/TVDB IDs from an IMDB ID using SIMKL's /search/id and detail endpoints."""
self.log.debug("Looking up IMDB ID %s on SIMKL", imdb_id)
headers = {"simkl-api-key": config.simkl_client_id}
try:
r = self.session.get(f"{self.BASE_URL}/search/id", params={"imdb": imdb_id}, headers=headers, timeout=30)
r.raise_for_status()
data = r.json()
except (requests.RequestException, ValueError) as exc:
self.log.debug("SIMKL search/id failed: %s", exc)
return None
if not isinstance(data, list) or not data:
self.log.debug("No SIMKL results for IMDB ID %s", imdb_id)
return None
entry = data[0]
simkl_id = entry.get("ids", {}).get("simkl")
if not simkl_id:
return None
# Map SIMKL type to endpoint
simkl_type = entry.get("type", "")
endpoint = "tv" if simkl_type in ("tv", "anime") else "movies"
# Fetch full details to get cross-referenced IDs
try:
r2 = self.session.get(
f"{self.BASE_URL}/{endpoint}/{simkl_id}",
params={"extended": "full"},
headers=headers,
timeout=30,
)
r2.raise_for_status()
detail = r2.json()
except (requests.RequestException, ValueError) as exc:
self.log.debug("SIMKL detail fetch failed: %s", exc)
return None
ids = detail.get("ids", {})
tmdb_id: Optional[int] = None
raw_tmdb = ids.get("tmdb")
if raw_tmdb:
tmdb_id = int(raw_tmdb)
tvdb_id: Optional[int] = None
raw_tvdb = ids.get("tvdb")
if raw_tvdb:
tvdb_id = int(raw_tvdb)
self.log.debug("SIMKL find -> TMDB %s, TVDB %s for IMDB %s", tmdb_id, tvdb_id, imdb_id)
return ExternalIds(
imdb_id=imdb_id,
tmdb_id=tmdb_id,
tmdb_kind=kind,
tvdb_id=tvdb_id,
)
def _parse_response(
self, data: dict, search_title: str, search_year: Optional[int], kind: str
) -> Optional[MetadataResult]:
"""Parse a SIMKL response into a MetadataResult."""
if data.get("type") == "episode" and "show" in data:
info = data["show"]
content_type = "tv"
elif data.get("type") == "movie" and "movie" in data:
info = data["movie"]
content_type = "movie"
else:
return None
result_title = info.get("title")
result_year = info.get("year")
# Verify title matches
if not result_title or not fuzzy_match(result_title, search_title):
self.log.debug("Simkl title mismatch: searched %r, got %r", search_title, result_title)
return None
# Verify year if provided (allow 1 year difference)
if search_year and result_year and abs(search_year - result_year) > 1:
self.log.debug("Simkl year mismatch: searched %d, got %d", search_year, result_year)
return None
ids = info.get("ids", {})
tmdb_id: Optional[int] = None
if content_type == "tv":
raw_tmdb = ids.get("tmdbtv")
else:
raw_tmdb = ids.get("tmdb") or ids.get("moviedb")
if raw_tmdb:
tmdb_id = int(raw_tmdb)
tvdb_id: Optional[int] = None
raw_tvdb = ids.get("tvdb")
if raw_tvdb:
tvdb_id = int(raw_tvdb)
self.log.debug("Simkl -> %s (TMDB ID %s)", result_title, tmdb_id)
return MetadataResult(
title=result_title,
year=result_year,
kind=kind,
external_ids=ExternalIds(
imdb_id=ids.get("imdb"),
tmdb_id=tmdb_id,
tmdb_kind=kind,
tvdb_id=tvdb_id,
),
source="simkl",
raw=data,
)

View File

@@ -0,0 +1,199 @@
from __future__ import annotations
from difflib import SequenceMatcher
from typing import Optional, Union
import requests
from unshackle.core.config import config
from unshackle.core.providers._base import ExternalIds, MetadataProvider, MetadataResult, _clean, _strip_year
class TMDBProvider(MetadataProvider):
"""TMDB (The Movie Database) metadata provider."""
NAME = "tmdb"
REQUIRES_KEY = True
BASE_URL = "https://api.themoviedb.org/3"
def is_available(self) -> bool:
return bool(config.tmdb_api_key)
@property
def _api_key(self) -> str:
return config.tmdb_api_key
def search(self, title: str, year: Optional[int], kind: str) -> Optional[MetadataResult]:
search_title = _strip_year(title)
self.log.debug("Searching TMDB for %r (%s, %s)", search_title, kind, year)
params: dict[str, str | int] = {"api_key": self._api_key, "query": search_title}
if year is not None:
params["year" if kind == "movie" else "first_air_date_year"] = year
try:
r = self.session.get(f"{self.BASE_URL}/search/{kind}", params=params, timeout=30)
r.raise_for_status()
results = r.json().get("results") or []
self.log.debug("TMDB returned %d results", len(results))
if not results:
return None
except requests.RequestException as exc:
self.log.warning("Failed to search TMDB for %s: %s", title, exc)
return None
best_ratio = 0.0
best_id: Optional[int] = None
best_title: Optional[str] = None
for result in results:
candidates = [
result.get("title"),
result.get("name"),
result.get("original_title"),
result.get("original_name"),
]
candidates = [c for c in candidates if c]
for candidate in candidates:
ratio = SequenceMatcher(None, _clean(search_title), _clean(candidate)).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_id = result.get("id")
best_title = candidate
self.log.debug("Best candidate ratio %.2f for %r (ID %s)", best_ratio, best_title, best_id)
if best_id is None:
first = results[0]
best_id = first.get("id")
best_title = first.get("title") or first.get("name")
if best_id is None:
return None
# Fetch full detail for caching
detail = self._fetch_detail(best_id, kind)
ext_raw = self._fetch_external_ids_raw(best_id, kind)
date = (detail or {}).get("release_date") or (detail or {}).get("first_air_date")
result_year = int(date[:4]) if date and len(date) >= 4 and date[:4].isdigit() else None
ext = ExternalIds(
imdb_id=ext_raw.get("imdb_id") if ext_raw else None,
tmdb_id=best_id,
tmdb_kind=kind,
tvdb_id=ext_raw.get("tvdb_id") if ext_raw else None,
)
return MetadataResult(
title=best_title,
year=result_year,
kind=kind,
external_ids=ext,
source="tmdb",
raw={"detail": detail or {}, "external_ids": ext_raw or {}},
)
def get_by_id(self, provider_id: Union[int, str], kind: str) -> Optional[MetadataResult]:
detail = self._fetch_detail(int(provider_id), kind)
if not detail:
return None
title = detail.get("title") or detail.get("name")
date = detail.get("release_date") or detail.get("first_air_date")
year = int(date[:4]) if date and len(date) >= 4 and date[:4].isdigit() else None
return MetadataResult(
title=title,
year=year,
kind=kind,
external_ids=ExternalIds(tmdb_id=int(provider_id), tmdb_kind=kind),
source="tmdb",
raw=detail,
)
def get_external_ids(self, provider_id: Union[int, str], kind: str) -> ExternalIds:
raw = self._fetch_external_ids_raw(int(provider_id), kind)
if not raw:
return ExternalIds(tmdb_id=int(provider_id), tmdb_kind=kind)
return ExternalIds(
imdb_id=raw.get("imdb_id"),
tmdb_id=int(provider_id),
tmdb_kind=kind,
tvdb_id=raw.get("tvdb_id"),
)
def find_by_imdb_id(self, imdb_id: str, kind: str) -> Optional[ExternalIds]:
"""Look up TMDB/TVDB IDs from an IMDB ID using TMDB's /find endpoint."""
self.log.debug("Looking up IMDB ID %s on TMDB", imdb_id)
try:
r = self.session.get(
f"{self.BASE_URL}/find/{imdb_id}",
params={"api_key": self._api_key, "external_source": "imdb_id"},
timeout=30,
)
r.raise_for_status()
data = r.json()
except requests.RequestException as exc:
self.log.debug("TMDB find by IMDB ID failed: %s", exc)
return None
# Check movie_results or tv_results based on kind
if kind == "movie":
results = data.get("movie_results") or []
else:
results = data.get("tv_results") or []
if not results:
# Try the other type as fallback
fallback_key = "tv_results" if kind == "movie" else "movie_results"
results = data.get(fallback_key) or []
if results:
kind = "tv" if kind == "movie" else "movie"
if not results:
self.log.debug("No TMDB results found for IMDB ID %s", imdb_id)
return None
match = results[0]
tmdb_id = match.get("id")
if not tmdb_id:
return None
self.log.debug("TMDB find -> ID %s (%s) for IMDB %s", tmdb_id, kind, imdb_id)
# Now fetch the full external IDs from TMDB to get TVDB etc.
ext_raw = self._fetch_external_ids_raw(tmdb_id, kind)
return ExternalIds(
imdb_id=imdb_id,
tmdb_id=tmdb_id,
tmdb_kind=kind,
tvdb_id=ext_raw.get("tvdb_id") if ext_raw else None,
)
def _fetch_detail(self, tmdb_id: int, kind: str) -> Optional[dict]:
try:
r = self.session.get(
f"{self.BASE_URL}/{kind}/{tmdb_id}",
params={"api_key": self._api_key},
timeout=30,
)
r.raise_for_status()
return r.json()
except requests.RequestException as exc:
self.log.debug("Failed to fetch TMDB detail: %s", exc)
return None
def _fetch_external_ids_raw(self, tmdb_id: int, kind: str) -> Optional[dict]:
try:
r = self.session.get(
f"{self.BASE_URL}/{kind}/{tmdb_id}/external_ids",
params={"api_key": self._api_key},
timeout=30,
)
r.raise_for_status()
return r.json()
except requests.RequestException as exc:
self.log.debug("Failed to fetch TMDB external IDs: %s", exc)
return None

View File

@@ -26,6 +26,7 @@ class TitleCacher:
self.log = logging.getLogger(f"{service_name}.TitleCache")
self.cacher = Cacher(service_name)
self.stats = {"hits": 0, "misses": 0, "fallbacks": 0}
self.no_cache = False
def _generate_cache_key(
self, title_id: str, region: Optional[str] = None, account_hash: Optional[str] = None
@@ -59,9 +60,6 @@ class TitleCacher:
# Join with underscores
cache_key = "_".join(key_parts)
# Log the mapping for debugging
self.log.debug(f"Cache key mapping: {title_id} -> {cache_key}")
return cache_key
def get_cached_titles(
@@ -89,6 +87,7 @@ class TitleCacher:
"""
# If caching is globally disabled or no_cache flag is set
if not config.title_cache_enabled or no_cache:
self.no_cache = True
self.log.debug("Cache bypassed, fetching fresh titles")
return fetch_function()
@@ -113,7 +112,7 @@ class TitleCacher:
# Cache miss or expired, try to fetch fresh data
self.stats["misses"] += 1
self.log.debug(f"Cache miss for {title_id}, fetching fresh data")
self.log.debug(f"Cache miss for {title_id} fetching fresh data")
try:
# Attempt to fetch fresh titles
@@ -180,22 +179,18 @@ class TitleCacher:
"hit_rate": f"{hit_rate:.1f}%",
}
def get_cached_tmdb(
self, title_id: str, kind: str, region: Optional[str] = None, account_hash: Optional[str] = None
# -- Generic provider cache methods --
def get_cached_provider(
self,
provider_name: str,
title_id: str,
kind: Optional[str] = None,
region: Optional[str] = None,
account_hash: Optional[str] = None,
) -> Optional[dict]:
"""
Get cached TMDB data for a title.
Args:
title_id: The title identifier
kind: "movie" or "tv"
region: The region/proxy identifier
account_hash: Hash of account credentials
Returns:
Dict with 'detail' and 'external_ids' if cached and valid, None otherwise
"""
if not config.title_cache_enabled:
"""Get cached metadata for any provider."""
if not config.title_cache_enabled or self.no_cache:
return None
cache_key = self._generate_cache_key(title_id, region, account_hash)
@@ -204,142 +199,90 @@ class TitleCacher:
if not cache or not cache.data:
return None
tmdb_data = getattr(cache.data, "tmdb_data", None)
if not tmdb_data:
provider_data = getattr(cache.data, f"{provider_name}_data", None)
if not provider_data:
return None
tmdb_expiration = tmdb_data.get("expires_at")
if not tmdb_expiration or datetime.now() >= tmdb_expiration:
self.log.debug(f"TMDB cache expired for {title_id}")
expiration = provider_data.get("expires_at")
if not expiration or datetime.now() >= expiration:
self.log.debug(f"{provider_name} cache expired for {title_id}")
return None
if tmdb_data.get("kind") != kind:
self.log.debug(f"TMDB cache kind mismatch for {title_id}: cached {tmdb_data.get('kind')}, requested {kind}")
if kind and provider_data.get("kind") != kind:
self.log.debug(
f"{provider_name} cache kind mismatch for {title_id}: "
f"cached {provider_data.get('kind')}, requested {kind}"
)
return None
self.log.debug(f"TMDB cache hit for {title_id}")
return {
"detail": tmdb_data.get("detail"),
"external_ids": tmdb_data.get("external_ids"),
"fetched_at": tmdb_data.get("fetched_at"),
}
self.log.debug(f"{provider_name} cache hit for {title_id}")
def cache_tmdb(
# Return the inner data (provider-specific format)
response = provider_data.get("response")
if response is not None:
return response
# For TMDB-style caches that store detail + external_ids at top level
result: dict = {}
if "detail" in provider_data:
result["detail"] = provider_data["detail"]
if "external_ids" in provider_data:
result["external_ids"] = provider_data["external_ids"]
if "fetched_at" in provider_data:
result["fetched_at"] = provider_data["fetched_at"]
return result if result else provider_data
def cache_provider(
self,
provider_name: str,
title_id: str,
detail_response: dict,
external_ids_response: dict,
kind: str,
data: dict,
kind: Optional[str] = None,
region: Optional[str] = None,
account_hash: Optional[str] = None,
ttl_days: int = 7,
) -> None:
"""
Cache TMDB data for a title.
Args:
title_id: The title identifier
detail_response: Full TMDB detail API response
external_ids_response: Full TMDB external_ids API response
kind: "movie" or "tv"
region: The region/proxy identifier
account_hash: Hash of account credentials
"""
if not config.title_cache_enabled:
"""Cache metadata from any provider."""
if not config.title_cache_enabled or self.no_cache:
return
cache_key = self._generate_cache_key(title_id, region, account_hash)
cache = self.cacher.get(cache_key, version=1)
if not cache or not cache.data:
self.log.debug(f"Cannot cache TMDB data: no title cache exists for {title_id}")
self.log.debug(f"Cannot cache {provider_name} data: no title cache exists for {title_id}")
return
now = datetime.now()
tmdb_data = {
"detail": detail_response,
"external_ids": external_ids_response,
"kind": kind,
"fetched_at": now,
"expires_at": now + timedelta(days=7), # 7-day expiration
}
cache.data.tmdb_data = tmdb_data
# Build cache entry in a format compatible with legacy methods
if provider_name == "tmdb" and "detail" in data:
# TMDB stores detail + external_ids at top level
cache_entry = {
**data,
"kind": kind,
"fetched_at": now,
"expires_at": now + timedelta(days=ttl_days),
}
elif provider_name == "simkl":
# SIMKL wraps in a "response" key
cache_entry = {
"response": data,
"fetched_at": now,
"expires_at": now + timedelta(days=ttl_days),
}
else:
# Generic format: store data directly with metadata
cache_entry = {
"response": data,
"kind": kind,
"fetched_at": now,
"expires_at": now + timedelta(days=ttl_days),
}
setattr(cache.data, f"{provider_name}_data", cache_entry)
cache.set(cache.data, expiration=cache.expiration)
self.log.debug(f"Cached TMDB data for {title_id} (kind={kind})")
def get_cached_simkl(
self, title_id: str, region: Optional[str] = None, account_hash: Optional[str] = None
) -> Optional[dict]:
"""
Get cached Simkl data for a title.
Args:
title_id: The title identifier
region: The region/proxy identifier
account_hash: Hash of account credentials
Returns:
Simkl response dict if cached and valid, None otherwise
"""
if not config.title_cache_enabled:
return None
cache_key = self._generate_cache_key(title_id, region, account_hash)
cache = self.cacher.get(cache_key, version=1)
if not cache or not cache.data:
return None
simkl_data = getattr(cache.data, "simkl_data", None)
if not simkl_data:
return None
simkl_expiration = simkl_data.get("expires_at")
if not simkl_expiration or datetime.now() >= simkl_expiration:
self.log.debug(f"Simkl cache expired for {title_id}")
return None
self.log.debug(f"Simkl cache hit for {title_id}")
return simkl_data.get("response")
def cache_simkl(
self,
title_id: str,
simkl_response: dict,
region: Optional[str] = None,
account_hash: Optional[str] = None,
) -> None:
"""
Cache Simkl data for a title.
Args:
title_id: The title identifier
simkl_response: Full Simkl API response
region: The region/proxy identifier
account_hash: Hash of account credentials
"""
if not config.title_cache_enabled:
return
cache_key = self._generate_cache_key(title_id, region, account_hash)
cache = self.cacher.get(cache_key, version=1)
if not cache or not cache.data:
self.log.debug(f"Cannot cache Simkl data: no title cache exists for {title_id}")
return
now = datetime.now()
simkl_data = {
"response": simkl_response,
"fetched_at": now,
"expires_at": now + timedelta(days=7),
}
cache.data.simkl_data = simkl_data
cache.set(cache.data, expiration=cache.expiration)
self.log.debug(f"Cached Simkl data for {title_id}")
self.log.debug(f"Cached {provider_name} data for {title_id}")
def get_region_from_proxy(proxy_url: Optional[str]) -> Optional[str]:

View File

@@ -1,488 +1,23 @@
from __future__ import annotations
import logging
import re
import subprocess
import tempfile
from difflib import SequenceMatcher
from pathlib import Path
from typing import Optional, Tuple
from typing import Optional
from xml.sax.saxutils import escape
import requests
from requests.adapters import HTTPAdapter, Retry
from unshackle.core import binaries
from unshackle.core.config import config
from unshackle.core.providers import (ExternalIds, MetadataResult, enrich_ids, fetch_external_ids, fuzzy_match,
get_available_providers, get_provider, search_metadata)
from unshackle.core.titles.episode import Episode
from unshackle.core.titles.movie import Movie
from unshackle.core.titles.title import Title
STRIP_RE = re.compile(r"[^a-z0-9]+", re.I)
YEAR_RE = re.compile(r"\s*\(?[12][0-9]{3}\)?$")
HEADERS = {"User-Agent": "unshackle-tags/1.0"}
log = logging.getLogger("TAGS")
def _get_session() -> requests.Session:
"""Create a requests session with retry logic for network failures."""
session = requests.Session()
session.headers.update(HEADERS)
retry = Retry(
total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def _api_key() -> Optional[str]:
return config.tmdb_api_key
def _simkl_client_id() -> Optional[str]:
return config.simkl_client_id
def _clean(s: str) -> str:
return STRIP_RE.sub("", s).lower()
def _strip_year(s: str) -> str:
return YEAR_RE.sub("", s).strip()
def fuzzy_match(a: str, b: str, threshold: float = 0.8) -> bool:
"""Return True if ``a`` and ``b`` are a close match."""
ratio = SequenceMatcher(None, _clean(a), _clean(b)).ratio()
return ratio >= threshold
def search_simkl(
title: str,
year: Optional[int],
kind: str,
title_cacher=None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Tuple[Optional[dict], Optional[str], Optional[int]]:
"""Search Simkl API for show information by filename."""
if title_cacher and cache_title_id:
cached_simkl = title_cacher.get_cached_simkl(cache_title_id, cache_region, cache_account_hash)
if cached_simkl:
log.debug("Using cached Simkl data")
if cached_simkl.get("type") == "episode" and "show" in cached_simkl:
show_info = cached_simkl["show"]
show_title = show_info.get("title")
tmdb_id = show_info.get("ids", {}).get("tmdbtv")
if tmdb_id:
tmdb_id = int(tmdb_id)
return cached_simkl, show_title, tmdb_id
elif cached_simkl.get("type") == "movie" and "movie" in cached_simkl:
movie_info = cached_simkl["movie"]
movie_title = movie_info.get("title")
ids = movie_info.get("ids", {})
tmdb_id = ids.get("tmdb") or ids.get("moviedb")
if tmdb_id:
tmdb_id = int(tmdb_id)
return cached_simkl, movie_title, tmdb_id
log.debug("Searching Simkl for %r (%s, %s)", title, kind, year)
client_id = _simkl_client_id()
if not client_id:
log.debug("No SIMKL client ID configured; skipping SIMKL search")
return None, None, None
# Construct appropriate filename based on type
filename = f"{title}"
if year:
filename = f"{title} {year}"
if kind == "tv":
filename += " S01E01.mkv"
else: # movie
filename += " 2160p.mkv"
try:
session = _get_session()
headers = {"simkl-api-key": client_id}
resp = session.post("https://api.simkl.com/search/file", json={"file": filename}, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
log.debug("Simkl API response received")
# Handle case where SIMKL returns empty list (no results)
if isinstance(data, list):
log.debug("Simkl returned list (no matches) for %r", filename)
return None, None, None
# Handle TV show responses
if data.get("type") == "episode" and "show" in data:
show_info = data["show"]
show_title = show_info.get("title")
show_year = show_info.get("year")
# Verify title matches and year if provided
if not fuzzy_match(show_title, title):
log.debug("Simkl title mismatch: searched %r, got %r", title, show_title)
return None, None, None
if year and show_year and abs(year - show_year) > 1: # Allow 1 year difference
log.debug("Simkl year mismatch: searched %d, got %d", year, show_year)
return None, None, None
if title_cacher and cache_title_id:
try:
title_cacher.cache_simkl(cache_title_id, data, cache_region, cache_account_hash)
except Exception as exc:
log.debug("Failed to cache Simkl data: %s", exc)
tmdb_id = show_info.get("ids", {}).get("tmdbtv")
if tmdb_id:
tmdb_id = int(tmdb_id)
log.debug("Simkl -> %s (TMDB ID %s)", show_title, tmdb_id)
return data, show_title, tmdb_id
elif data.get("type") == "movie" and "movie" in data:
movie_info = data["movie"]
movie_title = movie_info.get("title")
movie_year = movie_info.get("year")
if not fuzzy_match(movie_title, title):
log.debug("Simkl title mismatch: searched %r, got %r", title, movie_title)
return None, None, None
if year and movie_year and abs(year - movie_year) > 1: # Allow 1 year difference
log.debug("Simkl year mismatch: searched %d, got %d", year, movie_year)
return None, None, None
if title_cacher and cache_title_id:
try:
title_cacher.cache_simkl(cache_title_id, data, cache_region, cache_account_hash)
except Exception as exc:
log.debug("Failed to cache Simkl data: %s", exc)
ids = movie_info.get("ids", {})
tmdb_id = ids.get("tmdb") or ids.get("moviedb")
if tmdb_id:
tmdb_id = int(tmdb_id)
log.debug("Simkl -> %s (TMDB ID %s)", movie_title, tmdb_id)
return data, movie_title, tmdb_id
except (requests.RequestException, ValueError, KeyError) as exc:
log.debug("Simkl search failed: %s", exc)
return None, None, None
def search_show_info(
title: str,
year: Optional[int],
kind: str,
title_cacher=None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Tuple[Optional[int], Optional[str], Optional[str]]:
"""Search for show information, trying Simkl first, then TMDB fallback. Returns (tmdb_id, title, source)."""
simkl_data, simkl_title, simkl_tmdb_id = search_simkl(
title, year, kind, title_cacher, cache_title_id, cache_region, cache_account_hash
)
if simkl_data and simkl_title and fuzzy_match(simkl_title, title):
return simkl_tmdb_id, simkl_title, "simkl"
tmdb_id, tmdb_title = search_tmdb(title, year, kind, title_cacher, cache_title_id, cache_region, cache_account_hash)
return tmdb_id, tmdb_title, "tmdb"
def _fetch_tmdb_detail(tmdb_id: int, kind: str) -> Optional[dict]:
"""Fetch full TMDB detail response for caching."""
api_key = _api_key()
if not api_key:
return None
try:
session = _get_session()
r = session.get(
f"https://api.themoviedb.org/3/{kind}/{tmdb_id}",
params={"api_key": api_key},
timeout=30,
)
r.raise_for_status()
return r.json()
except requests.RequestException as exc:
log.debug("Failed to fetch TMDB detail: %s", exc)
return None
def _fetch_tmdb_external_ids(tmdb_id: int, kind: str) -> Optional[dict]:
"""Fetch full TMDB external_ids response for caching."""
api_key = _api_key()
if not api_key:
return None
try:
session = _get_session()
r = session.get(
f"https://api.themoviedb.org/3/{kind}/{tmdb_id}/external_ids",
params={"api_key": api_key},
timeout=30,
)
r.raise_for_status()
return r.json()
except requests.RequestException as exc:
log.debug("Failed to fetch TMDB external IDs: %s", exc)
return None
def search_tmdb(
title: str,
year: Optional[int],
kind: str,
title_cacher=None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Tuple[Optional[int], Optional[str]]:
if title_cacher and cache_title_id:
cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash)
if cached_tmdb and cached_tmdb.get("detail"):
detail = cached_tmdb["detail"]
tmdb_id = detail.get("id")
tmdb_title = detail.get("title") or detail.get("name")
log.debug("Using cached TMDB data: %r (ID %s)", tmdb_title, tmdb_id)
return tmdb_id, tmdb_title
api_key = _api_key()
if not api_key:
return None, None
search_title = _strip_year(title)
log.debug("Searching TMDB for %r (%s, %s)", search_title, kind, year)
params = {"api_key": api_key, "query": search_title}
if year is not None:
params["year" if kind == "movie" else "first_air_date_year"] = year
try:
session = _get_session()
r = session.get(
f"https://api.themoviedb.org/3/search/{kind}",
params=params,
timeout=30,
)
r.raise_for_status()
js = r.json()
results = js.get("results") or []
log.debug("TMDB returned %d results", len(results))
if not results:
return None, None
except requests.RequestException as exc:
log.warning("Failed to search TMDB for %s: %s", title, exc)
return None, None
best_ratio = 0.0
best_id: Optional[int] = None
best_title: Optional[str] = None
for result in results:
candidates = [
result.get("title"),
result.get("name"),
result.get("original_title"),
result.get("original_name"),
]
candidates = [c for c in candidates if c] # Filter out None/empty values
if not candidates:
continue
# Find the best matching candidate from all available titles
for candidate in candidates:
ratio = SequenceMatcher(None, _clean(search_title), _clean(candidate)).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_id = result.get("id")
best_title = candidate
log.debug(
"Best candidate ratio %.2f for %r (ID %s)",
best_ratio,
best_title,
best_id,
)
if best_id is not None:
if title_cacher and cache_title_id:
try:
detail_response = _fetch_tmdb_detail(best_id, kind)
external_ids_response = _fetch_tmdb_external_ids(best_id, kind)
if detail_response and external_ids_response:
title_cacher.cache_tmdb(
cache_title_id, detail_response, external_ids_response, kind, cache_region, cache_account_hash
)
except Exception as exc:
log.debug("Failed to cache TMDB data: %s", exc)
return best_id, best_title
first = results[0]
return first.get("id"), first.get("title") or first.get("name")
def get_title(
tmdb_id: int,
kind: str,
title_cacher=None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Optional[str]:
"""Fetch the name/title of a TMDB entry by ID."""
if title_cacher and cache_title_id:
cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash)
if cached_tmdb and cached_tmdb.get("detail"):
detail = cached_tmdb["detail"]
tmdb_title = detail.get("title") or detail.get("name")
log.debug("Using cached TMDB title: %r", tmdb_title)
return tmdb_title
api_key = _api_key()
if not api_key:
return None
try:
session = _get_session()
r = session.get(
f"https://api.themoviedb.org/3/{kind}/{tmdb_id}",
params={"api_key": api_key},
timeout=30,
)
r.raise_for_status()
js = r.json()
if title_cacher and cache_title_id:
try:
external_ids_response = _fetch_tmdb_external_ids(tmdb_id, kind)
if external_ids_response:
title_cacher.cache_tmdb(
cache_title_id, js, external_ids_response, kind, cache_region, cache_account_hash
)
except Exception as exc:
log.debug("Failed to cache TMDB data: %s", exc)
return js.get("title") or js.get("name")
except requests.RequestException as exc:
log.debug("Failed to fetch TMDB title: %s", exc)
return None
def get_year(
tmdb_id: int,
kind: str,
title_cacher=None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> Optional[int]:
"""Fetch the release year of a TMDB entry by ID."""
if title_cacher and cache_title_id:
cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash)
if cached_tmdb and cached_tmdb.get("detail"):
detail = cached_tmdb["detail"]
date = detail.get("release_date") or detail.get("first_air_date")
if date and len(date) >= 4 and date[:4].isdigit():
year = int(date[:4])
log.debug("Using cached TMDB year: %d", year)
return year
api_key = _api_key()
if not api_key:
return None
try:
session = _get_session()
r = session.get(
f"https://api.themoviedb.org/3/{kind}/{tmdb_id}",
params={"api_key": api_key},
timeout=30,
)
r.raise_for_status()
js = r.json()
if title_cacher and cache_title_id:
try:
external_ids_response = _fetch_tmdb_external_ids(tmdb_id, kind)
if external_ids_response:
title_cacher.cache_tmdb(
cache_title_id, js, external_ids_response, kind, cache_region, cache_account_hash
)
except Exception as exc:
log.debug("Failed to cache TMDB data: %s", exc)
date = js.get("release_date") or js.get("first_air_date")
if date and len(date) >= 4 and date[:4].isdigit():
return int(date[:4])
return None
except requests.RequestException as exc:
log.debug("Failed to fetch TMDB year: %s", exc)
return None
def external_ids(
tmdb_id: int,
kind: str,
title_cacher=None,
cache_title_id: Optional[str] = None,
cache_region: Optional[str] = None,
cache_account_hash: Optional[str] = None,
) -> dict:
if title_cacher and cache_title_id:
cached_tmdb = title_cacher.get_cached_tmdb(cache_title_id, kind, cache_region, cache_account_hash)
if cached_tmdb and cached_tmdb.get("external_ids"):
log.debug("Using cached TMDB external IDs")
return cached_tmdb["external_ids"]
api_key = _api_key()
if not api_key:
return {}
url = f"https://api.themoviedb.org/3/{kind}/{tmdb_id}/external_ids"
log.debug("Fetching external IDs for %s %s", kind, tmdb_id)
try:
session = _get_session()
r = session.get(
url,
params={"api_key": api_key},
timeout=30,
)
r.raise_for_status()
js = r.json()
log.debug("External IDs response: %s", js)
if title_cacher and cache_title_id:
try:
detail_response = _fetch_tmdb_detail(tmdb_id, kind)
if detail_response:
title_cacher.cache_tmdb(cache_title_id, detail_response, js, kind, cache_region, cache_account_hash)
except Exception as exc:
log.debug("Failed to cache TMDB data: %s", exc)
return js
except requests.RequestException as exc:
log.warning("Failed to fetch external IDs for %s %s: %s", kind, tmdb_id, exc)
return {}
def apply_tags(path: Path, tags: dict[str, str]) -> None:
if not tags:
return
@@ -509,9 +44,26 @@ def apply_tags(path: Path, tags: dict[str, str]) -> None:
tmp_path.unlink(missing_ok=True)
def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) -> None:
def _build_tags_from_ids(ids: ExternalIds, kind: str) -> dict[str, str]:
"""Build standard MKV tags from external IDs."""
tags: dict[str, str] = {}
if ids.imdb_id:
tags["IMDB"] = ids.imdb_id
if ids.tmdb_id and ids.tmdb_kind:
tags["TMDB"] = f"{ids.tmdb_kind}/{ids.tmdb_id}"
if ids.tvdb_id:
prefix = "movies" if kind == "movie" else "series"
tags["TVDB2"] = f"{prefix}/{ids.tvdb_id}"
return tags
def tag_file(
path: Path,
title: Title,
tmdb_id: Optional[int] = None,
imdb_id: Optional[str] = None,
) -> None:
log.debug("Tagging file %s with title %r", path, title)
standard_tags: dict[str, str] = {}
custom_tags: dict[str, str] = {}
if config.tag and config.tag_group_name:
@@ -537,115 +89,52 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) ->
apply_tags(path, custom_tags)
return
if config.tag_imdb_tmdb:
# Check if we have any API keys available for metadata lookup
api_key = _api_key()
simkl_client = _simkl_client_id()
standard_tags: dict[str, str] = {}
if not api_key and not simkl_client:
log.debug("No TMDB API key or Simkl client ID configured; skipping IMDB/TMDB tag lookup")
if config.tag_imdb_tmdb:
providers = get_available_providers()
if not providers:
log.debug("No metadata providers available; skipping tag lookup")
apply_tags(path, custom_tags)
return
result: Optional[MetadataResult] = None
# Direct ID lookup path
if imdb_id:
imdbapi = get_provider("imdbapi")
if imdbapi:
result = imdbapi.get_by_id(imdb_id, kind)
if result:
result.external_ids.imdb_id = imdb_id
enrich_ids(result)
elif tmdb_id is not None:
tmdb = get_provider("tmdb")
if tmdb:
result = tmdb.get_by_id(tmdb_id, kind)
if result:
ext = tmdb.get_external_ids(tmdb_id, kind)
result.external_ids = ext
else:
# If tmdb_id is provided (via --tmdb), skip Simkl and use TMDB directly
if tmdb_id is not None:
log.debug("Using provided TMDB ID %s for tags", tmdb_id)
else:
# Try Simkl first for automatic lookup (only if client ID is available)
if simkl_client:
simkl_data, simkl_title, simkl_tmdb_id = search_simkl(name, year, kind)
# Search across providers in priority order
result = search_metadata(name, year, kind)
if simkl_data and simkl_title and fuzzy_match(simkl_title, name):
log.debug("Using Simkl data for tags")
if simkl_tmdb_id:
tmdb_id = simkl_tmdb_id
# If we got a TMDB ID from search but no full external IDs, fetch them
if result and result.external_ids.tmdb_id and not result.external_ids.imdb_id:
ext = fetch_external_ids(result.external_ids.tmdb_id, kind)
if ext.imdb_id:
result.external_ids.imdb_id = ext.imdb_id
if ext.tvdb_id:
result.external_ids.tvdb_id = ext.tvdb_id
# Handle TV show data from Simkl
if simkl_data.get("type") == "episode" and "show" in simkl_data:
show_ids = simkl_data.get("show", {}).get("ids", {})
if show_ids.get("imdb"):
standard_tags["IMDB"] = show_ids["imdb"]
if show_ids.get("tvdb"):
standard_tags["TVDB2"] = f"series/{show_ids['tvdb']}"
if show_ids.get("tmdbtv"):
standard_tags["TMDB"] = f"tv/{show_ids['tmdbtv']}"
if result and result.external_ids:
standard_tags = _build_tags_from_ids(result.external_ids, kind)
# Handle movie data from Simkl
elif simkl_data.get("type") == "movie" and "movie" in simkl_data:
movie_ids = simkl_data.get("movie", {}).get("ids", {})
if movie_ids.get("imdb"):
standard_tags["IMDB"] = movie_ids["imdb"]
if movie_ids.get("tvdb"):
standard_tags["TVDB2"] = f"movies/{movie_ids['tvdb']}"
if movie_ids.get("tmdb"):
standard_tags["TMDB"] = f"movie/{movie_ids['tmdb']}"
# Use TMDB API for additional metadata (either from provided ID or Simkl lookup)
if api_key:
tmdb_title: Optional[str] = None
if tmdb_id is None:
tmdb_id, tmdb_title = search_tmdb(name, year, kind)
log.debug("TMDB search result: %r (ID %s)", tmdb_title, tmdb_id)
if not tmdb_id or not tmdb_title or not fuzzy_match(tmdb_title, name):
log.debug("TMDB search did not match; skipping external ID lookup")
else:
prefix = "movie" if kind == "movie" else "tv"
standard_tags["TMDB"] = f"{prefix}/{tmdb_id}"
try:
ids = external_ids(tmdb_id, kind)
except requests.RequestException as exc:
log.debug("Failed to fetch external IDs: %s", exc)
ids = {}
else:
log.debug("External IDs found: %s", ids)
imdb_id = ids.get("imdb_id")
if imdb_id:
standard_tags["IMDB"] = imdb_id
tvdb_id = ids.get("tvdb_id")
if tvdb_id:
if kind == "movie":
standard_tags["TVDB2"] = f"movies/{tvdb_id}"
else:
standard_tags["TVDB2"] = f"series/{tvdb_id}"
elif tmdb_id is not None:
# tmdb_id was provided or found via Simkl
prefix = "movie" if kind == "movie" else "tv"
standard_tags["TMDB"] = f"{prefix}/{tmdb_id}"
try:
ids = external_ids(tmdb_id, kind)
except requests.RequestException as exc:
log.debug("Failed to fetch external IDs: %s", exc)
ids = {}
else:
log.debug("External IDs found: %s", ids)
imdb_id = ids.get("imdb_id")
if imdb_id:
standard_tags["IMDB"] = imdb_id
tvdb_id = ids.get("tvdb_id")
if tvdb_id:
if kind == "movie":
standard_tags["TVDB2"] = f"movies/{tvdb_id}"
else:
standard_tags["TVDB2"] = f"series/{tvdb_id}"
else:
log.debug("No TMDB API key configured; skipping TMDB external ID lookup")
merged_tags = {
**custom_tags,
**standard_tags,
}
apply_tags(path, merged_tags)
apply_tags(path, {**custom_tags, **standard_tags})
__all__ = [
"search_simkl",
"search_show_info",
"search_tmdb",
"get_title",
"get_year",
"external_ids",
"tag_file",
"apply_tags",
"fuzzy_match",
"tag_file",
]