5 Commits

Author SHA1 Message Date
Andy
6b8a8ba8a8 feat(cdm): normalize CDM detection for local and remote implementations
Add unshackle.core.cdm.detect helpers to classify CDMs consistently across local and remote backends.

- Add is_playready_cdm/is_widevine_cdm for DRM selection across pyplayready, pywidevine, and wrappers

- Add is_remote_cdm/is_local_cdm/cdm_location so services can branch on CDM execution location

- Switch core DASH/HLS parsing, track DRM selection, and dl CDM switching away from brittle isinstance/DecryptLabs-only checks

- Make unshackle.core.cdm import-light via lazy __getattr__ so optional CDM deps are only imported when needed
2026-02-08 00:37:53 -07:00
Andy
b9fb928292 fix(service): redact proxy credentials in logs
Proxy URIs may contain embedded userinfo (username/password). Add a small sanitizer helper and use it for proxy mapping and proxy selection logs to avoid leaking credentials.
2026-02-07 20:36:25 -07:00
Andy
984a8b9efa fix(proxies): harden surfshark and windscribe selection 2026-02-07 20:34:31 -07:00
Andy
71adee4ec6 fix(api): log PSSH extraction failures 2026-02-07 20:29:53 -07:00
Andy
ee8f7cb650 docs(config): clarify sdh_method uses subtitle-filter 2026-02-07 20:29:31 -07:00
11 changed files with 348 additions and 61 deletions

View File

@@ -1433,20 +1433,21 @@ Control subtitle conversion, SDH (hearing-impaired) stripping behavior, and form
- `sdh_method`: How to strip SDH cues. Default: `auto`.
- `auto`: Try subby for SRT first, then SubtitleEdit, then subtitle-filter.
- `auto`: Try subby for SRT first, then SubtitleEdit, then `filter-subs` (the `subtitle-filter` library).
- `subby`: Use subby's SDHStripper (SRT only).
- `subtitleedit`: Use SubtitleEdit's RemoveTextForHI when available.
- `filter-subs`: Use the subtitle-filter library directly.
- `filter-subs`: Use the `subtitle-filter` library directly.
Note: `filter-subs` is the canonical `sdh_method` config value; it maps to the `subtitle-filter` library.
- `strip_sdh`: Automatically create stripped (non-SDH) versions of SDH subtitles. Default: `true`.
Set to `false` to disable automatic SDH stripping entirely. When `true`, unshackle will automatically
detect SDH subtitles and create clean versions alongside the originals.
- `convert_before_strip`: Auto-convert VTT/other formats to SRT before using subtitle-filter. Default: `true`.
- `convert_before_strip`: Auto-convert VTT/other formats to SRT before using `subtitle-filter`
(via `sdh_method: filter-subs`, including as the final fallback in `sdh_method: auto`). Default: `true`.
This ensures compatibility when subtitle-filter is used as the fallback SDH stripping method, as
subtitle-filter works best with SRT format.
This ensures compatibility when `subtitle-filter` is used, as it works best with SRT format.
- `preserve_formatting`: Preserve original subtitle formatting (tags, positioning, styling). Default: `true`.

View File

@@ -43,6 +43,7 @@ from rich.tree import Tree
from unshackle.core import binaries
from unshackle.core.cdm import CustomRemoteCDM, DecryptLabsRemoteCDM
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.constants import DOWNLOAD_LICENCE_ONLY, AnyTrack, context_settings
@@ -1601,9 +1602,7 @@ class dl:
if video_tracks:
highest_quality = max((track.height for track in video_tracks if track.height), default=0)
if highest_quality > 0:
if isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) and not (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
if is_widevine_cdm(self.cdm):
quality_based_cdm = self.get_cdm(
self.service, self.profile, drm="widevine", quality=highest_quality
)
@@ -1612,9 +1611,7 @@ class dl:
f"Pre-selecting Widevine CDM based on highest quality {highest_quality}p across all video tracks"
)
self.cdm = quality_based_cdm
elif isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) and (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
elif is_playready_cdm(self.cdm):
quality_based_cdm = self.get_cdm(
self.service, self.profile, drm="playready", quality=highest_quality
)
@@ -1646,10 +1643,7 @@ class dl:
licence=partial(
service.get_playready_license
if (
isinstance(self.cdm, PlayReadyCdm)
or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
)
is_playready_cdm(self.cdm)
)
and hasattr(service, "get_playready_license")
else service.get_widevine_license,
@@ -2186,9 +2180,7 @@ class dl:
track_quality = track.height
if isinstance(drm, Widevine):
if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
if not is_widevine_cdm(self.cdm):
widevine_cdm = self.get_cdm(self.service, self.profile, drm="widevine", quality=track_quality)
if widevine_cdm:
if track_quality:
@@ -2198,9 +2190,7 @@ class dl:
self.cdm = widevine_cdm
elif isinstance(drm, PlayReady):
if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready
):
if not is_playready_cdm(self.cdm):
playready_cdm = self.get_cdm(self.service, self.profile, drm="playready", quality=track_quality)
if playready_cdm:
if track_quality:

View File

@@ -207,6 +207,7 @@ def serialize_drm(drm_list) -> Optional[List[Dict[str, Any]]]:
# Get PSSH - handle both Widevine and PlayReady
if hasattr(drm, "_pssh") and drm._pssh:
pssh_obj = None
try:
pssh_obj = drm._pssh
# Try to get base64 representation
@@ -225,8 +226,24 @@ def serialize_drm(drm_list) -> Optional[List[Dict[str, Any]]]:
# Check if it's already base64-like or an object repr
if not pssh_str.startswith("<"):
drm_info["pssh"] = pssh_str
except (ValueError, TypeError, KeyError):
# Some PSSH implementations can fail to parse/serialize; log and continue.
pssh_type = type(pssh_obj).__name__ if pssh_obj is not None else None
log.warning(
"Failed to extract/serialize PSSH for DRM type=%s pssh_type=%s",
drm_class,
pssh_type,
exc_info=True,
)
except Exception:
pass
# Don't silently swallow unexpected failures; make them visible and propagate.
pssh_type = type(pssh_obj).__name__ if pssh_obj is not None else None
log.exception(
"Unexpected error while extracting/serializing PSSH for DRM type=%s pssh_type=%s",
drm_class,
pssh_type,
)
raise
# Get KIDs
if hasattr(drm, "kids") and drm.kids:

View File

@@ -1,5 +1,57 @@
from .custom_remote_cdm import CustomRemoteCDM
from .decrypt_labs_remote_cdm import DecryptLabsRemoteCDM
from .monalisa import MonaLisaCDM
"""
CDM helpers and implementations.
__all__ = ["DecryptLabsRemoteCDM", "CustomRemoteCDM", "MonaLisaCDM"]
Keep this module import-light: downstream code frequently imports helpers from
`unshackle.core.cdm.detect`, which requires importing this package first.
Some CDM implementations pull in optional/heavy dependencies, so we lazily
import them via `__getattr__` (PEP 562).
"""
from __future__ import annotations
from typing import Any
__all__ = [
"DecryptLabsRemoteCDM",
"CustomRemoteCDM",
"MonaLisaCDM",
"is_remote_cdm",
"is_local_cdm",
"cdm_location",
"is_playready_cdm",
"is_widevine_cdm",
]
def __getattr__(name: str) -> Any:
if name == "DecryptLabsRemoteCDM":
from .decrypt_labs_remote_cdm import DecryptLabsRemoteCDM
return DecryptLabsRemoteCDM
if name == "CustomRemoteCDM":
from .custom_remote_cdm import CustomRemoteCDM
return CustomRemoteCDM
if name == "MonaLisaCDM":
from .monalisa import MonaLisaCDM
return MonaLisaCDM
if name in {
"is_remote_cdm",
"is_local_cdm",
"cdm_location",
"is_playready_cdm",
"is_widevine_cdm",
}:
from .detect import cdm_location, is_local_cdm, is_playready_cdm, is_remote_cdm, is_widevine_cdm
return {
"is_remote_cdm": is_remote_cdm,
"is_local_cdm": is_local_cdm,
"cdm_location": cdm_location,
"is_playready_cdm": is_playready_cdm,
"is_widevine_cdm": is_widevine_cdm,
}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -0,0 +1,187 @@
from __future__ import annotations
from typing import Any
def is_remote_cdm(cdm: Any) -> bool:
"""
Return True if the CDM instance is backed by a remote/service CDM.
This is useful for service logic that needs to know whether the CDM runs
locally (in-process) vs over HTTP/RPC (remote).
"""
if cdm is None:
return False
if hasattr(cdm, "is_remote_cdm"):
try:
return bool(getattr(cdm, "is_remote_cdm"))
except Exception:
pass
try:
from pyplayready.remote.remotecdm import RemoteCdm as PlayReadyRemoteCdm
except Exception:
PlayReadyRemoteCdm = None
if PlayReadyRemoteCdm is not None:
try:
if isinstance(cdm, PlayReadyRemoteCdm):
return True
except Exception:
pass
try:
from pywidevine.remotecdm import RemoteCdm as WidevineRemoteCdm
except Exception:
WidevineRemoteCdm = None
if WidevineRemoteCdm is not None:
try:
if isinstance(cdm, WidevineRemoteCdm):
return True
except Exception:
pass
cls = getattr(cdm, "__class__", None)
mod = getattr(cls, "__module__", "") or ""
name = getattr(cls, "__name__", "") or ""
if mod == "unshackle.core.cdm.decrypt_labs_remote_cdm" and name == "DecryptLabsRemoteCDM":
return True
if mod == "unshackle.core.cdm.custom_remote_cdm" and name == "CustomRemoteCDM":
return True
if mod.startswith("pyplayready.remote") or mod.startswith("pywidevine.remote"):
return True
if "remote" in mod.lower() and name.lower().endswith("cdm"):
return True
if name.lower().endswith("remotecdm"):
return True
return False
def is_local_cdm(cdm: Any) -> bool:
"""
Return True if the CDM instance is local/in-process.
Unknown CDM types return False (use `cdm_location()` if you need 3-state).
"""
if cdm is None:
return False
if is_remote_cdm(cdm):
return False
if is_playready_cdm(cdm) or is_widevine_cdm(cdm):
return True
cls = getattr(cdm, "__class__", None)
mod = getattr(cls, "__module__", "") or ""
name = getattr(cls, "__name__", "") or ""
if mod == "unshackle.core.cdm.monalisa.monalisa_cdm" and name == "MonaLisaCDM":
return True
return False
def cdm_location(cdm: Any) -> str:
"""
Return one of: "local", "remote", "unknown".
"""
if is_remote_cdm(cdm):
return "remote"
if is_local_cdm(cdm):
return "local"
return "unknown"
def is_playready_cdm(cdm: Any) -> bool:
"""
Return True if the given CDM should be treated as PlayReady.
This intentionally supports both:
- Local PlayReady CDMs (pyplayready.cdm.Cdm)
- Remote/wrapper CDMs (e.g. DecryptLabsRemoteCDM) that expose `is_playready`
"""
if cdm is None:
return False
if hasattr(cdm, "is_playready"):
try:
return bool(getattr(cdm, "is_playready"))
except Exception:
pass
try:
from pyplayready.cdm import Cdm as PlayReadyCdm
except Exception:
PlayReadyCdm = None
if PlayReadyCdm is not None:
try:
return isinstance(cdm, PlayReadyCdm)
except Exception:
pass
try:
from pyplayready.remote.remotecdm import RemoteCdm as PlayReadyRemoteCdm
except Exception:
PlayReadyRemoteCdm = None
if PlayReadyRemoteCdm is not None:
try:
return isinstance(cdm, PlayReadyRemoteCdm)
except Exception:
pass
mod = getattr(getattr(cdm, "__class__", None), "__module__", "") or ""
return "pyplayready" in mod
def is_widevine_cdm(cdm: Any) -> bool:
"""
Return True if the given CDM should be treated as Widevine.
Note: for remote/wrapper CDMs that expose `is_playready`, Widevine is treated
as the logical opposite.
"""
if cdm is None:
return False
if hasattr(cdm, "is_playready"):
try:
return not bool(getattr(cdm, "is_playready"))
except Exception:
pass
try:
from pywidevine.cdm import Cdm as WidevineCdm
except Exception:
WidevineCdm = None
if WidevineCdm is not None:
try:
return isinstance(cdm, WidevineCdm)
except Exception:
pass
try:
from pywidevine.remotecdm import RemoteCdm as WidevineRemoteCdm
except Exception:
WidevineRemoteCdm = None
if WidevineRemoteCdm is not None:
try:
return isinstance(cdm, WidevineRemoteCdm)
except Exception:
pass
mod = getattr(getattr(cdm, "__class__", None), "__module__", "") or ""
return "pywidevine" in mod

View File

@@ -19,12 +19,12 @@ import requests
from curl_cffi.requests import Session as CurlSession
from langcodes import Language, tag_is_valid
from lxml.etree import Element, ElementTree
from pyplayready.cdm import Cdm as PlayReadyCdm
from pyplayready.system.pssh import PSSH as PR_PSSH
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.pssh import PSSH
from requests import Session
from unshackle.core.cdm.detect import is_playready_cdm
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
from unshackle.core.downloaders import requests as requests_downloader
from unshackle.core.drm import DRM_T, PlayReady, Widevine
@@ -477,7 +477,7 @@ class DASH:
track.data["dash"]["segment_durations"] = segment_durations
if not track.drm and init_data and isinstance(track, (Video, Audio)):
prefers_playready = isinstance(cdm, PlayReadyCdm) or (hasattr(cdm, "is_playready") and cdm.is_playready)
prefers_playready = is_playready_cdm(cdm)
if prefers_playready:
try:
track.drm = [PlayReady.from_init_data(init_data)]

View File

@@ -28,6 +28,7 @@ from pywidevine.pssh import PSSH as WV_PSSH
from requests import Session
from unshackle.core import binaries
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
from unshackle.core.downloaders import requests as requests_downloader
from unshackle.core.drm import DRM_T, ClearKey, MonaLisa, PlayReady, Widevine
@@ -914,15 +915,10 @@ class HLS:
"""
playready_urn = f"urn:uuid:{PR_PSSH.SYSTEM_ID}"
playready_keyformats = {playready_urn, "com.microsoft.playready"}
if isinstance(cdm, WidevineCdm):
if is_widevine_cdm(cdm):
return [k for k in keys if k.keyformat and k.keyformat.lower() == WidevineCdm.urn]
elif isinstance(cdm, PlayReadyCdm):
elif is_playready_cdm(cdm):
return [k for k in keys if k.keyformat and k.keyformat.lower() in playready_keyformats]
elif hasattr(cdm, "is_playready"):
if cdm.is_playready:
return [k for k in keys if k.keyformat and k.keyformat.lower() in playready_keyformats]
else:
return [k for k in keys if k.keyformat and k.keyformat.lower() == WidevineCdm.urn]
return keys
@staticmethod

View File

@@ -142,12 +142,17 @@ class SurfsharkVPN(Proxy):
)
# Get connection names from filtered servers
connection_names = [x["connectionName"] for x in servers]
if not servers:
raise ValueError(f"Could not get random server for country '{country_id}': no servers found.")
try:
return random.choice(connection_names)
except (IndexError, KeyError):
raise ValueError(f"Could not get random server for country '{country_id}'.")
# Only include servers that actually have a connection name to avoid KeyError.
connection_names = [x["connectionName"] for x in servers if "connectionName" in x]
if not connection_names:
raise ValueError(
f"Could not get random server for country '{country_id}': no servers with connectionName found."
)
return random.choice(connection_names)
@staticmethod
def get_countries() -> list[dict]:

View File

@@ -62,7 +62,7 @@ class WindscribeVPN(Proxy):
server_map_key = f"{query}:{city}" if city else query
if server_map_key in self.server_map:
hostname = self.server_map[server_map_key]
elif query in self.server_map and not city:
elif query in self.server_map:
hostname = self.server_map[query]
else:
server_match = re.match(r"^([a-z]{2})(\d+)$", query)

View File

@@ -5,7 +5,7 @@ from collections.abc import Generator
from http.cookiejar import CookieJar
from pathlib import Path
from typing import Optional, Union
from urllib.parse import urlparse
from urllib.parse import urlparse, urlunparse
import click
import m3u8
@@ -27,6 +27,45 @@ from unshackle.core.tracks import Chapters, Tracks
from unshackle.core.utilities import get_cached_ip_info, get_ip_info
def sanitize_proxy_for_log(uri: Optional[str]) -> Optional[str]:
"""
Sanitize a proxy URI for logs by redacting any embedded userinfo (username/password).
Examples:
- http://user:pass@host:8080 -> http://REDACTED@host:8080
- socks5h://user@host:1080 -> socks5h://REDACTED@host:1080
"""
if uri is None:
return None
if not isinstance(uri, str):
return str(uri)
if not uri:
return uri
try:
parsed = urlparse(uri)
# Handle schemeless proxies like "user:pass@host:port"
if not parsed.scheme and not parsed.netloc and "@" in uri and "://" not in uri:
# Parse as netloc using a dummy scheme, then strip scheme back out.
dummy = urlparse(f"http://{uri}")
netloc = dummy.netloc
if "@" in netloc:
netloc = f"REDACTED@{netloc.split('@', 1)[1]}"
# urlparse("http://...") sets path to "" for typical netloc-only strings; keep it just in case.
return f"{netloc}{dummy.path}"
netloc = parsed.netloc
if "@" in netloc:
netloc = f"REDACTED@{netloc.split('@', 1)[1]}"
return urlunparse(parsed._replace(netloc=netloc))
except Exception:
if "@" in uri:
return f"REDACTED@{uri.split('@', 1)[1]}"
return uri
class Service(metaclass=ABCMeta):
"""The Service Base Class."""
@@ -75,7 +114,9 @@ class Service(metaclass=ABCMeta):
# Check if there's a mapping for this query
mapped_value = proxy_map.get(full_proxy_key)
if mapped_value:
self.log.info(f"Found service-specific proxy mapping: {full_proxy_key} -> {mapped_value}")
self.log.info(
f"Found service-specific proxy mapping: {full_proxy_key} -> {sanitize_proxy_for_log(mapped_value)}"
)
# Query the proxy provider with the mapped value
if proxy_provider_name:
# Specific provider requested
@@ -87,9 +128,13 @@ class Service(metaclass=ABCMeta):
mapped_proxy_uri = proxy_provider.get_proxy(mapped_value)
if mapped_proxy_uri:
proxy = mapped_proxy_uri
self.log.info(f"Using mapped proxy from {proxy_provider.__class__.__name__}: {proxy}")
self.log.info(
f"Using mapped proxy from {proxy_provider.__class__.__name__}: {sanitize_proxy_for_log(proxy)}"
)
else:
self.log.warning(f"Failed to get proxy for mapped value '{mapped_value}', using default")
self.log.warning(
f"Failed to get proxy for mapped value '{sanitize_proxy_for_log(mapped_value)}', using default"
)
else:
self.log.warning(f"Proxy provider '{proxy_provider_name}' not found, using default proxy")
else:
@@ -98,10 +143,14 @@ class Service(metaclass=ABCMeta):
mapped_proxy_uri = proxy_provider.get_proxy(mapped_value)
if mapped_proxy_uri:
proxy = mapped_proxy_uri
self.log.info(f"Using mapped proxy from {proxy_provider.__class__.__name__}: {proxy}")
self.log.info(
f"Using mapped proxy from {proxy_provider.__class__.__name__}: {sanitize_proxy_for_log(proxy)}"
)
break
else:
self.log.warning(f"No provider could resolve mapped value '{mapped_value}', using default")
self.log.warning(
f"No provider could resolve mapped value '{sanitize_proxy_for_log(mapped_value)}', using default"
)
if not proxy:
# don't override the explicit proxy set by the user, even if they may be geoblocked

View File

@@ -15,11 +15,10 @@ from zlib import crc32
from curl_cffi.requests import Session as CurlSession
from langcodes import Language
from pyplayready.cdm import Cdm as PlayReadyCdm
from pywidevine.cdm import Cdm as WidevineCdm
from requests import Session
from unshackle.core import binaries
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.config import config
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY
from unshackle.core.downloaders import aria2c, curl_impersonate, n_m3u8dl_re, requests
@@ -297,7 +296,7 @@ class Track:
if not self.drm and track_type in ("Video", "Audio"):
# the service might not have explicitly defined the `drm` property
# try find DRM information from the init data of URL based on CDM type
if isinstance(cdm, PlayReadyCdm):
if is_playready_cdm(cdm):
try:
self.drm = [PlayReady.from_track(self, session)]
except PlayReady.Exceptions.PSSHNotFound:
@@ -451,23 +450,14 @@ class Track:
if not self.drm:
return None
if isinstance(cdm, WidevineCdm):
if is_widevine_cdm(cdm):
for drm in self.drm:
if isinstance(drm, Widevine):
return drm
elif isinstance(cdm, PlayReadyCdm):
elif is_playready_cdm(cdm):
for drm in self.drm:
if isinstance(drm, PlayReady):
return drm
elif hasattr(cdm, "is_playready"):
if cdm.is_playready:
for drm in self.drm:
if isinstance(drm, PlayReady):
return drm
else:
for drm in self.drm:
if isinstance(drm, Widevine):
return drm
return self.drm[0]