feat(cdm): normalize CDM detection for local and remote implementations

Add unshackle.core.cdm.detect helpers to classify CDMs consistently across local and remote backends.

- Add is_playready_cdm/is_widevine_cdm for DRM selection across pyplayready, pywidevine, and wrappers

- Add is_remote_cdm/is_local_cdm/cdm_location so services can branch on CDM execution location

- Switch core DASH/HLS parsing, track DRM selection, and dl CDM switching away from brittle isinstance/DecryptLabs-only checks

- Make unshackle.core.cdm import-light via lazy __getattr__ so optional CDM deps are only imported when needed
This commit is contained in:
Andy
2026-02-08 00:37:53 -07:00
parent b9fb928292
commit 6b8a8ba8a8
6 changed files with 258 additions and 43 deletions

View File

@@ -43,6 +43,7 @@ from rich.tree import Tree
from unshackle.core import binaries
from unshackle.core.cdm import CustomRemoteCDM, DecryptLabsRemoteCDM
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.constants import DOWNLOAD_LICENCE_ONLY, AnyTrack, context_settings
@@ -1601,9 +1602,7 @@ class dl:
if video_tracks:
highest_quality = max((track.height for track in video_tracks if track.height), default=0)
if highest_quality > 0:
if isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) and not (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
if is_widevine_cdm(self.cdm):
quality_based_cdm = self.get_cdm(
self.service, self.profile, drm="widevine", quality=highest_quality
)
@@ -1612,9 +1611,7 @@ class dl:
f"Pre-selecting Widevine CDM based on highest quality {highest_quality}p across all video tracks"
)
self.cdm = quality_based_cdm
elif isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) and (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
elif is_playready_cdm(self.cdm):
quality_based_cdm = self.get_cdm(
self.service, self.profile, drm="playready", quality=highest_quality
)
@@ -1646,10 +1643,7 @@ class dl:
licence=partial(
service.get_playready_license
if (
isinstance(self.cdm, PlayReadyCdm)
or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
)
is_playready_cdm(self.cdm)
)
and hasattr(service, "get_playready_license")
else service.get_widevine_license,
@@ -2186,9 +2180,7 @@ class dl:
track_quality = track.height
if isinstance(drm, Widevine):
if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready
):
if not is_widevine_cdm(self.cdm):
widevine_cdm = self.get_cdm(self.service, self.profile, drm="widevine", quality=track_quality)
if widevine_cdm:
if track_quality:
@@ -2198,9 +2190,7 @@ class dl:
self.cdm = widevine_cdm
elif isinstance(drm, PlayReady):
if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or (
isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready
):
if not is_playready_cdm(self.cdm):
playready_cdm = self.get_cdm(self.service, self.profile, drm="playready", quality=track_quality)
if playready_cdm:
if track_quality:

View File

@@ -1,5 +1,57 @@
from .custom_remote_cdm import CustomRemoteCDM
from .decrypt_labs_remote_cdm import DecryptLabsRemoteCDM
from .monalisa import MonaLisaCDM
"""
CDM helpers and implementations.
__all__ = ["DecryptLabsRemoteCDM", "CustomRemoteCDM", "MonaLisaCDM"]
Keep this module import-light: downstream code frequently imports helpers from
`unshackle.core.cdm.detect`, which requires importing this package first.
Some CDM implementations pull in optional/heavy dependencies, so we lazily
import them via `__getattr__` (PEP 562).
"""
from __future__ import annotations
from typing import Any
__all__ = [
"DecryptLabsRemoteCDM",
"CustomRemoteCDM",
"MonaLisaCDM",
"is_remote_cdm",
"is_local_cdm",
"cdm_location",
"is_playready_cdm",
"is_widevine_cdm",
]
def __getattr__(name: str) -> Any:
if name == "DecryptLabsRemoteCDM":
from .decrypt_labs_remote_cdm import DecryptLabsRemoteCDM
return DecryptLabsRemoteCDM
if name == "CustomRemoteCDM":
from .custom_remote_cdm import CustomRemoteCDM
return CustomRemoteCDM
if name == "MonaLisaCDM":
from .monalisa import MonaLisaCDM
return MonaLisaCDM
if name in {
"is_remote_cdm",
"is_local_cdm",
"cdm_location",
"is_playready_cdm",
"is_widevine_cdm",
}:
from .detect import cdm_location, is_local_cdm, is_playready_cdm, is_remote_cdm, is_widevine_cdm
return {
"is_remote_cdm": is_remote_cdm,
"is_local_cdm": is_local_cdm,
"cdm_location": cdm_location,
"is_playready_cdm": is_playready_cdm,
"is_widevine_cdm": is_widevine_cdm,
}[name]
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -0,0 +1,187 @@
from __future__ import annotations
from typing import Any
def is_remote_cdm(cdm: Any) -> bool:
"""
Return True if the CDM instance is backed by a remote/service CDM.
This is useful for service logic that needs to know whether the CDM runs
locally (in-process) vs over HTTP/RPC (remote).
"""
if cdm is None:
return False
if hasattr(cdm, "is_remote_cdm"):
try:
return bool(getattr(cdm, "is_remote_cdm"))
except Exception:
pass
try:
from pyplayready.remote.remotecdm import RemoteCdm as PlayReadyRemoteCdm
except Exception:
PlayReadyRemoteCdm = None
if PlayReadyRemoteCdm is not None:
try:
if isinstance(cdm, PlayReadyRemoteCdm):
return True
except Exception:
pass
try:
from pywidevine.remotecdm import RemoteCdm as WidevineRemoteCdm
except Exception:
WidevineRemoteCdm = None
if WidevineRemoteCdm is not None:
try:
if isinstance(cdm, WidevineRemoteCdm):
return True
except Exception:
pass
cls = getattr(cdm, "__class__", None)
mod = getattr(cls, "__module__", "") or ""
name = getattr(cls, "__name__", "") or ""
if mod == "unshackle.core.cdm.decrypt_labs_remote_cdm" and name == "DecryptLabsRemoteCDM":
return True
if mod == "unshackle.core.cdm.custom_remote_cdm" and name == "CustomRemoteCDM":
return True
if mod.startswith("pyplayready.remote") or mod.startswith("pywidevine.remote"):
return True
if "remote" in mod.lower() and name.lower().endswith("cdm"):
return True
if name.lower().endswith("remotecdm"):
return True
return False
def is_local_cdm(cdm: Any) -> bool:
"""
Return True if the CDM instance is local/in-process.
Unknown CDM types return False (use `cdm_location()` if you need 3-state).
"""
if cdm is None:
return False
if is_remote_cdm(cdm):
return False
if is_playready_cdm(cdm) or is_widevine_cdm(cdm):
return True
cls = getattr(cdm, "__class__", None)
mod = getattr(cls, "__module__", "") or ""
name = getattr(cls, "__name__", "") or ""
if mod == "unshackle.core.cdm.monalisa.monalisa_cdm" and name == "MonaLisaCDM":
return True
return False
def cdm_location(cdm: Any) -> str:
"""
Return one of: "local", "remote", "unknown".
"""
if is_remote_cdm(cdm):
return "remote"
if is_local_cdm(cdm):
return "local"
return "unknown"
def is_playready_cdm(cdm: Any) -> bool:
"""
Return True if the given CDM should be treated as PlayReady.
This intentionally supports both:
- Local PlayReady CDMs (pyplayready.cdm.Cdm)
- Remote/wrapper CDMs (e.g. DecryptLabsRemoteCDM) that expose `is_playready`
"""
if cdm is None:
return False
if hasattr(cdm, "is_playready"):
try:
return bool(getattr(cdm, "is_playready"))
except Exception:
pass
try:
from pyplayready.cdm import Cdm as PlayReadyCdm
except Exception:
PlayReadyCdm = None
if PlayReadyCdm is not None:
try:
return isinstance(cdm, PlayReadyCdm)
except Exception:
pass
try:
from pyplayready.remote.remotecdm import RemoteCdm as PlayReadyRemoteCdm
except Exception:
PlayReadyRemoteCdm = None
if PlayReadyRemoteCdm is not None:
try:
return isinstance(cdm, PlayReadyRemoteCdm)
except Exception:
pass
mod = getattr(getattr(cdm, "__class__", None), "__module__", "") or ""
return "pyplayready" in mod
def is_widevine_cdm(cdm: Any) -> bool:
"""
Return True if the given CDM should be treated as Widevine.
Note: for remote/wrapper CDMs that expose `is_playready`, Widevine is treated
as the logical opposite.
"""
if cdm is None:
return False
if hasattr(cdm, "is_playready"):
try:
return not bool(getattr(cdm, "is_playready"))
except Exception:
pass
try:
from pywidevine.cdm import Cdm as WidevineCdm
except Exception:
WidevineCdm = None
if WidevineCdm is not None:
try:
return isinstance(cdm, WidevineCdm)
except Exception:
pass
try:
from pywidevine.remotecdm import RemoteCdm as WidevineRemoteCdm
except Exception:
WidevineRemoteCdm = None
if WidevineRemoteCdm is not None:
try:
return isinstance(cdm, WidevineRemoteCdm)
except Exception:
pass
mod = getattr(getattr(cdm, "__class__", None), "__module__", "") or ""
return "pywidevine" in mod

View File

@@ -19,12 +19,12 @@ import requests
from curl_cffi.requests import Session as CurlSession
from langcodes import Language, tag_is_valid
from lxml.etree import Element, ElementTree
from pyplayready.cdm import Cdm as PlayReadyCdm
from pyplayready.system.pssh import PSSH as PR_PSSH
from pywidevine.cdm import Cdm as WidevineCdm
from pywidevine.pssh import PSSH
from requests import Session
from unshackle.core.cdm.detect import is_playready_cdm
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
from unshackle.core.downloaders import requests as requests_downloader
from unshackle.core.drm import DRM_T, PlayReady, Widevine
@@ -477,7 +477,7 @@ class DASH:
track.data["dash"]["segment_durations"] = segment_durations
if not track.drm and init_data and isinstance(track, (Video, Audio)):
prefers_playready = isinstance(cdm, PlayReadyCdm) or (hasattr(cdm, "is_playready") and cdm.is_playready)
prefers_playready = is_playready_cdm(cdm)
if prefers_playready:
try:
track.drm = [PlayReady.from_init_data(init_data)]

View File

@@ -28,6 +28,7 @@ from pywidevine.pssh import PSSH as WV_PSSH
from requests import Session
from unshackle.core import binaries
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
from unshackle.core.downloaders import requests as requests_downloader
from unshackle.core.drm import DRM_T, ClearKey, MonaLisa, PlayReady, Widevine
@@ -914,15 +915,10 @@ class HLS:
"""
playready_urn = f"urn:uuid:{PR_PSSH.SYSTEM_ID}"
playready_keyformats = {playready_urn, "com.microsoft.playready"}
if isinstance(cdm, WidevineCdm):
if is_widevine_cdm(cdm):
return [k for k in keys if k.keyformat and k.keyformat.lower() == WidevineCdm.urn]
elif isinstance(cdm, PlayReadyCdm):
elif is_playready_cdm(cdm):
return [k for k in keys if k.keyformat and k.keyformat.lower() in playready_keyformats]
elif hasattr(cdm, "is_playready"):
if cdm.is_playready:
return [k for k in keys if k.keyformat and k.keyformat.lower() in playready_keyformats]
else:
return [k for k in keys if k.keyformat and k.keyformat.lower() == WidevineCdm.urn]
return keys
@staticmethod

View File

@@ -15,11 +15,10 @@ from zlib import crc32
from curl_cffi.requests import Session as CurlSession
from langcodes import Language
from pyplayready.cdm import Cdm as PlayReadyCdm
from pywidevine.cdm import Cdm as WidevineCdm
from requests import Session
from unshackle.core import binaries
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.config import config
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY
from unshackle.core.downloaders import aria2c, curl_impersonate, n_m3u8dl_re, requests
@@ -297,7 +296,7 @@ class Track:
if not self.drm and track_type in ("Video", "Audio"):
# the service might not have explicitly defined the `drm` property
# try find DRM information from the init data of URL based on CDM type
if isinstance(cdm, PlayReadyCdm):
if is_playready_cdm(cdm):
try:
self.drm = [PlayReady.from_track(self, session)]
except PlayReady.Exceptions.PSSHNotFound:
@@ -451,23 +450,14 @@ class Track:
if not self.drm:
return None
if isinstance(cdm, WidevineCdm):
if is_widevine_cdm(cdm):
for drm in self.drm:
if isinstance(drm, Widevine):
return drm
elif isinstance(cdm, PlayReadyCdm):
elif is_playready_cdm(cdm):
for drm in self.drm:
if isinstance(drm, PlayReady):
return drm
elif hasattr(cdm, "is_playready"):
if cdm.is_playready:
for drm in self.drm:
if isinstance(drm, PlayReady):
return drm
else:
for drm in self.drm:
if isinstance(drm, Widevine):
return drm
return self.drm[0]