From 7b71d6631c82bf3eb50562a1a838b19e34283a09 Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 26 Aug 2025 22:49:46 +0000 Subject: [PATCH 1/9] fix(main): As requested old devine version removed from banner to avoid any confusion the developer of this software. Original GNU is still applys. --- unshackle/core/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unshackle/core/__main__.py b/unshackle/core/__main__.py index 9a5da25..4b17b1f 100644 --- a/unshackle/core/__main__.py +++ b/unshackle/core/__main__.py @@ -69,7 +69,7 @@ def main(version: bool, debug: bool, log_path: Path) -> None: r" ▀▀▀ ▀▀ █▪ ▀▀▀▀ ▀▀▀ · ▀ ▀ ·▀▀▀ ·▀ ▀.▀▀▀ ▀▀▀ ", style="ascii.art", ), - "v 3.3.3 Copyright © 2019-2025 rlaphoenix" + f"\nv [repr.number]{__version__}[/] - unshackle", + f"v [repr.number]{__version__}[/] - unshackle", ), (1, 11, 1, 10), expand=True, From eb306206268bbf24cbfe85ccc897ce40d0c834d6 Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 26 Aug 2025 23:16:00 +0000 Subject: [PATCH 2/9] fix(main): As requested old devine version removed from banner to avoid any confusion the developer of this software. Original GNU is still applys. --- unshackle/core/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unshackle/core/__main__.py b/unshackle/core/__main__.py index 4b17b1f..e4717fa 100644 --- a/unshackle/core/__main__.py +++ b/unshackle/core/__main__.py @@ -69,7 +69,7 @@ def main(version: bool, debug: bool, log_path: Path) -> None: r" ▀▀▀ ▀▀ █▪ ▀▀▀▀ ▀▀▀ · ▀ ▀ ·▀▀▀ ·▀ ▀.▀▀▀ ▀▀▀ ", style="ascii.art", ), - f"v [repr.number]{__version__}[/] - unshackle", + f"v [repr.number]{__version__}[/] - © 2025 - github.com/unshackle-dl/unshackle", ), (1, 11, 1, 10), expand=True, From b4efdf3f2c35ed1c97e016b04ffea4793d330fd7 Mon Sep 17 00:00:00 2001 From: Andy Date: Thu, 28 Aug 2025 17:09:55 +0000 Subject: [PATCH 3/9] feat(cdm): Enhance DecryptLabsRemoteCDM to support cached keys and improve license handling --- unshackle/core/cdm/decrypt_labs_remote_cdm.py | 62 ++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/unshackle/core/cdm/decrypt_labs_remote_cdm.py b/unshackle/core/cdm/decrypt_labs_remote_cdm.py index c040362..960b25c 100644 --- a/unshackle/core/cdm/decrypt_labs_remote_cdm.py +++ b/unshackle/core/cdm/decrypt_labs_remote_cdm.py @@ -26,7 +26,9 @@ class DecryptLabsRemoteCDM(RemoteCdm): self.api_session_ids = {} self.license_request = None self.service_name = service_name + self.device_name = device_name self.keys = {} + self.scheme = "L1" if device_name == "L1" else "widevine" try: super().__init__(device_type, system_id, security_level, host, secret, device_name) except Exception: @@ -66,16 +68,49 @@ class DecryptLabsRemoteCDM(RemoteCdm): ) -> bytes: self.pssh = pssh + request_data = { + "init_data": self.pssh.dumps(), + "service_certificate": self.req_session.signed_device_certificate, + "scheme": self.scheme, + "service": self.service_name, + } + # Add required parameter for L1 scheme + if self.scheme == "L1": + request_data["get_cached_keys_if_exists"] = True res = self.session( self.host + "/get-request", - { - "init_data": self.pssh.dumps(), - "service_certificate": self.req_session.signed_device_certificate, - "scheme": "widevine", - "service": self.service_name, - }, + request_data, ) + # Check if we got cached keys instead of a challenge + if res.get("message_type") == "cached-keys": + # Store cached keys directly + if session_id not in self.keys: + self.keys[session_id] = [] + session_keys = self.keys[session_id] + + for cached_key in res.get("cached_keys", []): + # Handle KID format - could be hex string or UUID string + kid_str = cached_key["kid"] + try: + # Try as UUID string first + kid_uuid = UUID(kid_str) + except ValueError: + try: + # Try as hex string (like the existing code) + kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) + except ValueError: + # Fallback: use Key.kid_to_uuid + kid_uuid = Key.kid_to_uuid(kid_str) + + session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) + + # Return empty challenge since we already have the keys + self.license_request = "" + self.api_session_ids[session_id] = None + return b"" + + # Normal challenge response self.license_request = res["challenge"] self.api_session_ids[session_id] = res["session_id"] @@ -87,6 +122,10 @@ class DecryptLabsRemoteCDM(RemoteCdm): self.keys[session_id] = [] session_keys = self.keys[session_id] + # If we already have cached keys and no session_id_api, skip processing + if session_id_api is None and session_keys: + return + if isinstance(license_message, dict) and "keys" in license_message: session_keys.extend( [ @@ -96,14 +135,21 @@ class DecryptLabsRemoteCDM(RemoteCdm): ) else: + # Ensure license_message is base64 encoded + if isinstance(license_message, bytes): + license_response_b64 = base64.b64encode(license_message).decode() + elif isinstance(license_message, str): + license_response_b64 = license_message + else: + license_response_b64 = str(license_message) res = self.session( self.host + "/decrypt-response", { "session_id": session_id_api, "init_data": self.pssh.dumps(), "license_request": self.license_request, - "license_response": license_message, - "scheme": "widevine", + "license_response": license_response_b64, + "scheme": self.scheme, }, ) From 26851cbe7c9e544d5ffb111e8f7e5ae88a019f3c Mon Sep 17 00:00:00 2001 From: Andy Date: Mon, 1 Sep 2025 00:28:25 +0000 Subject: [PATCH 4/9] feat(cdm): Enhance DecryptLabsRemoteCDM with improved session management and caching support and better support for remote WV/PR --- unshackle/commands/dl.py | 55 +- unshackle/core/cdm/decrypt_labs_remote_cdm.py | 568 ++++++++++++++++-- unshackle/core/drm/playready.py | 49 +- unshackle/core/drm/widevine.py | 20 +- 4 files changed, 631 insertions(+), 61 deletions(-) diff --git a/unshackle/commands/dl.py b/unshackle/commands/dl.py index 72752fc..254b807 100644 --- a/unshackle/commands/dl.py +++ b/unshackle/commands/dl.py @@ -299,21 +299,6 @@ class dl: if getattr(config, "decryption_map", None): config.decryption = config.decryption_map.get(self.service, config.decryption) - with console.status("Loading DRM CDM...", spinner="dots"): - try: - self.cdm = self.get_cdm(self.service, self.profile) - except ValueError as e: - self.log.error(f"Failed to load CDM, {e}") - sys.exit(1) - - if self.cdm: - if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]: - self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})") - else: - self.log.info( - f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})" - ) - with console.status("Loading Key Vaults...", spinner="dots"): self.vaults = Vaults(self.service) total_vaults = len(config.key_vaults) @@ -352,6 +337,21 @@ class dl: else: self.log.debug("No vaults are currently active") + with console.status("Loading DRM CDM...", spinner="dots"): + try: + self.cdm = self.get_cdm(self.service, self.profile) + except ValueError as e: + self.log.error(f"Failed to load CDM, {e}") + sys.exit(1) + + if self.cdm: + if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]: + self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})") + else: + self.log.info( + f"Loaded PlayReady CDM: {self.cdm.certificate_chain.get_name()} (L{self.cdm.security_level})" + ) + self.proxy_providers = [] if no_proxy: ctx.params["proxy"] = None @@ -1442,8 +1442,8 @@ class dl: return Credential(*credentials) return Credential.loads(credentials) # type: ignore - @staticmethod def get_cdm( + self, service: str, profile: Optional[str] = None, drm: Optional[str] = None, @@ -1478,9 +1478,26 @@ class dl: cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None) if cdm_api: is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False - del cdm_api["name"] - del cdm_api["type"] - return DecryptLabsRemoteCDM(service_name=service, **cdm_api) if is_decrypt_lab else RemoteCdm(**cdm_api) + if is_decrypt_lab: + device_type = cdm_api.get("device_type") + del cdm_api["name"] + del cdm_api["type"] + + # Use the appropriate DecryptLabs CDM class based on device type + if device_type == "PLAYREADY" or cdm_api.get("device_name") in ["SL2", "SL3"]: + from unshackle.core.cdm.decrypt_labs_remote_cdm import DecryptLabsRemotePlayReadyCDM + + # Remove unused parameters for PlayReady CDM + cdm_params = cdm_api.copy() + cdm_params.pop("device_type", None) + cdm_params.pop("system_id", None) + return DecryptLabsRemotePlayReadyCDM(service_name=service, vaults=self.vaults, **cdm_params) + else: + return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api) + else: + del cdm_api["name"] + del cdm_api["type"] + return RemoteCdm(**cdm_api) prd_path = config.directories.prds / f"{cdm_name}.prd" if not prd_path.is_file(): diff --git a/unshackle/core/cdm/decrypt_labs_remote_cdm.py b/unshackle/core/cdm/decrypt_labs_remote_cdm.py index 960b25c..f356e40 100644 --- a/unshackle/core/cdm/decrypt_labs_remote_cdm.py +++ b/unshackle/core/cdm/decrypt_labs_remote_cdm.py @@ -4,13 +4,21 @@ from typing import Optional, Type, Union from uuid import UUID import requests +from pyplayready.cdm import Cdm as PlayReadyCdm from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage # Copyright 2024 by DevYukine. +# Copyright 2025 by sp4rk.y. class DecryptLabsRemoteCDM(RemoteCdm): + """Remote CDM implementation for DecryptLabs KeyXtractor API. + + Provides CDM functionality through DecryptLabs' remote API service, + supporting multiple DRM schemes including Widevine and PlayReady. + """ + def __init__( self, device_type: Union[DeviceTypes, str], @@ -20,7 +28,20 @@ class DecryptLabsRemoteCDM(RemoteCdm): secret: str, device_name: str, service_name: str, + vaults=None, ): + """Initialize DecryptLabs Remote CDM. + + Args: + device_type: Type of device to emulate + system_id: System identifier + security_level: DRM security level + host: DecryptLabs API host URL + secret: DecryptLabs API key for authentication + device_name: Device/scheme name (used as scheme identifier) + service_name: Service/platform name + vaults: Optional vaults reference for caching keys + """ self.response_counter = 0 self.pssh = None self.api_session_ids = {} @@ -28,7 +49,28 @@ class DecryptLabsRemoteCDM(RemoteCdm): self.service_name = service_name self.device_name = device_name self.keys = {} - self.scheme = "L1" if device_name == "L1" else "widevine" + self.scheme = device_name + self._has_cached_keys = False + self.vaults = vaults + self.security_level = security_level + self.host = host + + class MockCertificateChain: + """Mock certificate chain for DecryptLabs remote CDM compatibility.""" + + def __init__(self, scheme: str, security_level: int): + self.scheme = scheme + self.security_level = security_level + + def get_name(self) -> str: + """Return the certificate chain name for logging.""" + return f"DecryptLabs-{self.scheme}" + + def get_security_level(self) -> int: + """Return the security level.""" + return self.security_level + + self.certificate_chain = MockCertificateChain(self.scheme, security_level) try: super().__init__(device_type, system_id, security_level, host, secret, device_name) except Exception: @@ -41,22 +83,36 @@ class DecryptLabsRemoteCDM(RemoteCdm): raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.") def open(self) -> bytes: - # We stub this method to return a random session ID for now, later we save the api session id and resolve by our random generated one. + """Open a new CDM session. + + Returns: + Random session ID bytes for internal tracking + """ return bytes.fromhex(secrets.token_hex(16)) def close(self, session_id: bytes) -> None: - # We stub this method to do nothing. + """Close a CDM session. + + Args: + session_id: Session identifier to close + """ pass def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str: + """Set service certificate for L1/L2 schemes. + + Args: + session_id: Session identifier + certificate: Service certificate (bytes or base64 string) + + Returns: + Success status string + """ if isinstance(certificate, bytes): certificate = base64.b64encode(certificate).decode() - # certificate needs to be base64 to be sent off to the API. - # it needs to intentionally be kept as base64 encoded SignedMessage. - - self.req_session.signed_device_certificate = certificate - self.req_session.privacy_mode = True + self.service_certificate = certificate + self.privacy_mode = True return "success" @@ -66,63 +122,114 @@ class DecryptLabsRemoteCDM(RemoteCdm): def get_license_challenge( self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True ) -> bytes: + """Generate license challenge using DecryptLabs API. + + Args: + session_id: Session identifier + pssh: PSSH initialization data + license_type: Type of license (default: "STREAMING") + privacy_mode: Enable privacy mode + + Returns: + License challenge bytes or empty bytes if using cached keys + """ self.pssh = pssh + scheme_to_use = self.scheme + try: + pssh_data = pssh.dumps() + if b"edef8ba9-79d6-4ace-a3c8-27dcd51d21ed" in pssh_data or "edef8ba979d64acea3c827dcd51d21ed" in pssh_data: + if self.scheme in ["SL2", "SL3"]: + scheme_to_use = "L1" if self.scheme == "SL2" else "L1" + else: + scheme_to_use = self.scheme + except Exception: + scheme_to_use = self.scheme + request_data = { "init_data": self.pssh.dumps(), - "service_certificate": self.req_session.signed_device_certificate, - "scheme": self.scheme, + "scheme": scheme_to_use, "service": self.service_name, } - # Add required parameter for L1 scheme - if self.scheme == "L1": - request_data["get_cached_keys_if_exists"] = True + + if scheme_to_use in ["L1", "L2"] and hasattr(self, "service_certificate"): + request_data["service_certificate"] = self.service_certificate + elif scheme_to_use in ["L1", "L2"]: + pass + + request_data["get_cached_keys_if_exists"] = True + + if not hasattr(self, "session_schemes"): + self.session_schemes = {} + self.session_schemes[session_id] = scheme_to_use res = self.session( self.host + "/get-request", request_data, ) - # Check if we got cached keys instead of a challenge if res.get("message_type") == "cached-keys": - # Store cached keys directly if session_id not in self.keys: self.keys[session_id] = [] session_keys = self.keys[session_id] + cached_keys_for_vault = {} + for cached_key in res.get("cached_keys", []): - # Handle KID format - could be hex string or UUID string kid_str = cached_key["kid"] try: - # Try as UUID string first kid_uuid = UUID(kid_str) except ValueError: try: - # Try as hex string (like the existing code) kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) except ValueError: - # Fallback: use Key.kid_to_uuid kid_uuid = Key.kid_to_uuid(kid_str) session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) + cached_keys_for_vault[kid_uuid] = cached_key["key"] + + if self.vaults and cached_keys_for_vault: + try: + self.vaults.add_keys(cached_keys_for_vault) + except Exception: + pass + + if self.service_name == "NF" or "netflix" in self.service_name.lower(): + request_data_no_cache = request_data.copy() + request_data_no_cache["get_cached_keys_if_exists"] = False + + res_challenge = self.session( + self.host + "/get-request", + request_data_no_cache, + ) + + if res_challenge.get("challenge"): + self.license_request = res_challenge["challenge"] + self.api_session_ids[session_id] = res_challenge.get("session_id") + return base64.b64decode(self.license_request) - # Return empty challenge since we already have the keys self.license_request = "" self.api_session_ids[session_id] = None + self._has_cached_keys = True return b"" - # Normal challenge response self.license_request = res["challenge"] self.api_session_ids[session_id] = res["session_id"] + self._has_cached_keys = False return base64.b64decode(self.license_request) def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None: + """Parse license response and extract decryption keys. + + Args: + session_id: Session identifier + license_message: License response from DRM server + """ session_id_api = self.api_session_ids[session_id] if session_id not in self.keys: self.keys[session_id] = [] session_keys = self.keys[session_id] - # If we already have cached keys and no session_id_api, skip processing if session_id_api is None and session_keys: return @@ -135,13 +242,14 @@ class DecryptLabsRemoteCDM(RemoteCdm): ) else: - # Ensure license_message is base64 encoded if isinstance(license_message, bytes): license_response_b64 = base64.b64encode(license_message).decode() elif isinstance(license_message, str): license_response_b64 = license_message else: license_response_b64 = str(license_message) + scheme_for_session = getattr(self, "session_schemes", {}).get(session_id, self.scheme) + res = self.session( self.host + "/decrypt-response", { @@ -149,32 +257,95 @@ class DecryptLabsRemoteCDM(RemoteCdm): "init_data": self.pssh.dumps(), "license_request": self.license_request, "license_response": license_response_b64, - "scheme": self.scheme, + "scheme": scheme_for_session, }, ) - original_keys = res["keys"].replace("\n", " ") - keys_separated = original_keys.split("--key ") - formatted_keys = [] - for k in keys_separated: - if ":" in k: - key = k.strip() - formatted_keys.append(key) - for keys in formatted_keys: - session_keys.append( - ( + if scheme_for_session in ["SL2", "SL3"]: + if "keys" in res and res["keys"]: + keys_data = res["keys"] + if isinstance(keys_data, str): + original_keys = keys_data.replace("\n", " ") + keys_separated = original_keys.split("--key ") + for k in keys_separated: + if ":" in k: + key_parts = k.strip().split(":") + if len(key_parts) == 2: + try: + kid_hex, key_hex = key_parts + session_keys.append( + Key( + kid=UUID(bytes=bytes.fromhex(kid_hex)), + type_="CONTENT", + key=bytes.fromhex(key_hex), + ) + ) + except (ValueError, TypeError): + continue + elif isinstance(keys_data, list): + for key_info in keys_data: + if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info: + session_keys.append( + Key( + kid=Key.kid_to_uuid(key_info["kid"]), + type_=key_info.get("type", "CONTENT"), + key=bytes.fromhex(key_info["key"]), + ) + ) + else: + original_keys = res["keys"].replace("\n", " ") + keys_separated = original_keys.split("--key ") + formatted_keys = [] + for k in keys_separated: + if ":" in k: + key = k.strip() + formatted_keys.append(key) + for keys in formatted_keys: + session_keys.append( Key( kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])), type_="CONTENT", key=bytes.fromhex(keys.split(":")[1]), ) ) - ) def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]: + """Get decryption keys for a session. + + Args: + session_id: Session identifier + type_: Key type filter (optional) + + Returns: + List of decryption keys for the session + """ return self.keys[session_id] + def has_cached_keys(self, session_id: bytes) -> bool: + """Check if this session has cached keys and doesn't need license request. + + Args: + session_id: Session identifier to check + + Returns: + True if session has cached keys, False otherwise + """ + return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0 + def session(self, url, data, retries=3): + """Make authenticated request to DecryptLabs API. + + Args: + url: API endpoint URL + data: Request payload data + retries: Number of retry attempts for failed requests + + Returns: + API response JSON data + + Raises: + ValueError: If API returns an error after retries + """ res = self.req_session.post(url, json=data).json() if res.get("message") != "success": @@ -187,3 +358,330 @@ class DecryptLabsRemoteCDM(RemoteCdm): raise ValueError(f"CDM API returned an error: {res['Error']}") return res + + def use_cached_keys_as_fallback(self, session_id: bytes) -> bool: + """Use cached keys from DecryptLabs as a fallback when license server fails. + + Args: + session_id: Session identifier + + Returns: + True if cached keys were successfully applied, False otherwise + """ + if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available: + return False + + if session_id not in self.keys: + self.keys[session_id] = [] + session_keys = self.keys[session_id] + + cached_keys_for_vault = {} + + for cached_key in self._cached_keys_available: + kid_str = cached_key["kid"] + try: + kid_uuid = UUID(kid_str) + except ValueError: + try: + kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) + except ValueError: + kid_uuid = Key.kid_to_uuid(kid_str) + + session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) + cached_keys_for_vault[kid_uuid] = cached_key["key"] + + if self.vaults and cached_keys_for_vault: + try: + self.vaults.add_keys(cached_keys_for_vault) + except Exception: + pass + + self._has_cached_keys = True + return True + + +class DecryptLabsRemotePlayReadyCDM(PlayReadyCdm): + """PlayReady Remote CDM implementation for DecryptLabs KeyXtractor API. + + Provides PlayReady CDM functionality through DecryptLabs' remote API service, + supporting PlayReady DRM schemes like SL2 and SL3. + """ + + def __init__( + self, + security_level: int, + host: str, + secret: str, + device_name: str, + service_name: str, + vaults=None, + client_version: str = "10.0.16384.10011", + ): + """Initialize DecryptLabs Remote PlayReady CDM. + + Args: + security_level: DRM security level + host: DecryptLabs API host URL + secret: DecryptLabs API key for authentication + device_name: Device/scheme name (used as scheme identifier) + service_name: Service/platform name + vaults: Optional vaults reference for caching keys + client_version: PlayReady client version + """ + super().__init__( + security_level=security_level, + certificate_chain=None, + encryption_key=None, + signing_key=None, + client_version=client_version, + ) + + self.host = host + self.service_name = service_name + self.device_name = device_name + self.scheme = device_name + self.vaults = vaults + self.keys = {} + self.api_session_ids = {} + self.pssh_b64 = None + self.license_request = None + self._has_cached_keys = False + + self.req_session = requests.Session() + self.req_session.headers.update({"decrypt-labs-api-key": secret}) + + class MockCertificateChain: + """Mock certificate chain for DecryptLabs remote CDM compatibility.""" + + def __init__(self, scheme: str, security_level: int): + self.scheme = scheme + self.security_level = security_level + + def get_name(self) -> str: + """Return the certificate chain name for logging.""" + return f"DecryptLabs-{self.scheme}" + + def get_security_level(self) -> int: + """Return the security level.""" + return self.security_level + + self.certificate_chain = MockCertificateChain(self.scheme, security_level) + + def set_pssh_b64(self, pssh_b64: str): + """Set the original base64-encoded PSSH box for DecryptLabs API. + + Args: + pssh_b64: Base64-encoded PSSH box from the manifest + """ + self.pssh_b64 = pssh_b64 + + def open(self) -> bytes: + """Open a new CDM session. + + Returns: + Random session ID bytes for internal tracking + """ + return bytes.fromhex(secrets.token_hex(16)) + + def close(self, session_id: bytes) -> None: + """Close a CDM session. + + Args: + session_id: Session identifier to close + """ + pass + + def get_license_challenge(self, session_id: bytes, _) -> str: + """Generate license challenge using DecryptLabs API for PlayReady. + + Args: + session_id: Session identifier + + Returns: + License challenge as XML string + """ + if not (hasattr(self, "pssh_b64") and self.pssh_b64): + raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.") + + init_data = self.pssh_b64 + + request_data = { + "init_data": init_data, + "scheme": self.scheme, + "service": self.service_name, + "get_cached_keys_if_exists": False, + } + + res = self.session( + self.host + "/get-request", + request_data, + ) + + if res.get("message_type") == "cached-keys": + self._cached_keys_available = res.get("cached_keys", []) + else: + self._cached_keys_available = None + + self.license_request = res["challenge"] + self.api_session_ids[session_id] = res["session_id"] + self._has_cached_keys = False + + try: + return base64.b64decode(self.license_request).decode() + except Exception: + return self.license_request + + def parse_license(self, session_id: bytes, license_message: str) -> None: + """Parse license response and extract decryption keys. + + Args: + session_id: Session identifier + license_message: License response from DRM server (XML string) + """ + session_id_api = self.api_session_ids[session_id] + if session_id not in self.keys: + self.keys[session_id] = [] + session_keys = self.keys[session_id] + + if session_id_api is None and session_keys: + return + + try: + license_response_b64 = base64.b64encode(license_message.encode("utf-8")).decode("utf-8") + except Exception: + return + + if not (hasattr(self, "pssh_b64") and self.pssh_b64): + raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.") + init_data = self.pssh_b64 + + res = self.session( + self.host + "/decrypt-response", + { + "session_id": session_id_api, + "init_data": init_data, + "license_request": self.license_request, + "license_response": license_response_b64, + "scheme": self.scheme, + }, + ) + + if "keys" in res and res["keys"]: + keys_data = res["keys"] + if isinstance(keys_data, str): + original_keys = keys_data.replace("\n", " ") + keys_separated = original_keys.split("--key ") + for k in keys_separated: + if ":" in k: + key_parts = k.strip().split(":") + if len(key_parts) == 2: + try: + kid_hex, key_hex = key_parts + session_keys.append( + Key( + kid=UUID(bytes=bytes.fromhex(kid_hex)), + type_="CONTENT", + key=bytes.fromhex(key_hex), + ) + ) + except (ValueError, TypeError): + continue + elif isinstance(keys_data, list): + for key_info in keys_data: + if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info: + session_keys.append( + Key( + kid=Key.kid_to_uuid(key_info["kid"]), + type_=key_info.get("type", "CONTENT"), + key=bytes.fromhex(key_info["key"]), + ) + ) + + def get_keys(self, session_id: bytes) -> list: + """Get decryption keys for a session. + + Args: + session_id: Session identifier + + Returns: + List of decryption keys for the session + """ + return self.keys.get(session_id, []) + + def has_cached_keys(self, session_id: bytes) -> bool: + """Check if this session has cached keys and doesn't need license request. + + Args: + session_id: Session identifier to check + + Returns: + True if session has cached keys, False otherwise + """ + return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0 + + def session(self, url, data, retries=3): + """Make authenticated request to DecryptLabs API. + + Args: + url: API endpoint URL + data: Request payload data + retries: Number of retry attempts for failed requests + + Returns: + API response JSON data + + Raises: + ValueError: If API returns an error after retries + """ + res = self.req_session.post(url, json=data).json() + + if res.get("message") != "success": + if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""): + if retries > 0: + return self.session(url, data, retries=retries - 1) + else: + raise ValueError(f"CDM API returned an error: {res['Error']}") + else: + raise ValueError(f"CDM API returned an error: {res['Error']}") + + return res + + def use_cached_keys_as_fallback(self, session_id: bytes) -> bool: + """Use cached keys from DecryptLabs as a fallback when license server fails. + + Args: + session_id: Session identifier + + Returns: + True if cached keys were successfully applied, False otherwise + """ + if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available: + return False + + if session_id not in self.keys: + self.keys[session_id] = [] + session_keys = self.keys[session_id] + + cached_keys_for_vault = {} + + for cached_key in self._cached_keys_available: + kid_str = cached_key["kid"] + try: + kid_uuid = UUID(kid_str) + except ValueError: + try: + kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) + except ValueError: + kid_uuid = Key.kid_to_uuid(kid_str) + + session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) + cached_keys_for_vault[kid_uuid] = cached_key["key"] + + if self.vaults and cached_keys_for_vault: + try: + self.vaults.add_keys(cached_keys_for_vault) + except Exception: + pass + + self._has_cached_keys = True + return True diff --git a/unshackle/core/drm/playready.py b/unshackle/core/drm/playready.py index dee91ca..8678783 100644 --- a/unshackle/core/drm/playready.py +++ b/unshackle/core/drm/playready.py @@ -224,14 +224,59 @@ class PlayReady: def kids(self) -> list[UUID]: return self._kids + def _extract_keys_from_cdm(self, cdm: PlayReadyCdm, session_id: bytes) -> dict: + """Extract keys from CDM session with cross-library compatibility. + + Args: + cdm: CDM instance + session_id: Session identifier + + Returns: + Dictionary mapping KID UUIDs to hex keys + """ + keys = {} + for key in cdm.get_keys(session_id): + if hasattr(key, "key_id"): + kid = key.key_id + elif hasattr(key, "kid"): + kid = key.kid + else: + continue + + if hasattr(key, "key") and hasattr(key.key, "hex"): + key_hex = key.key.hex() + elif hasattr(key, "key") and isinstance(key.key, bytes): + key_hex = key.key.hex() + elif hasattr(key, "key") and isinstance(key.key, str): + key_hex = key.key + else: + continue + + keys[kid] = key_hex + return keys + def get_content_keys(self, cdm: PlayReadyCdm, certificate: Callable, licence: Callable) -> None: for kid in self.kids: if kid in self.content_keys: continue + session_id = cdm.open() try: + if hasattr(cdm, "set_pssh_b64") and self.pssh_b64: + cdm.set_pssh_b64(self.pssh_b64) + challenge = cdm.get_license_challenge(session_id, self.pssh.wrm_headers[0]) - license_res = licence(challenge=challenge) + + try: + license_res = licence(challenge=challenge) + except Exception: + if hasattr(cdm, "use_cached_keys_as_fallback"): + if cdm.use_cached_keys_as_fallback(session_id): + keys = self._extract_keys_from_cdm(cdm, session_id) + self.content_keys.update(keys) + continue + + raise if isinstance(license_res, bytes): license_str = license_res.decode(errors="ignore") @@ -245,7 +290,7 @@ class PlayReady: pass cdm.parse_license(session_id, license_str) - keys = {key.key_id: key.key.hex() for key in cdm.get_keys(session_id)} + keys = self._extract_keys_from_cdm(cdm, session_id) self.content_keys.update(keys) finally: cdm.close(session_id) diff --git a/unshackle/core/drm/widevine.py b/unshackle/core/drm/widevine.py index 825b1b1..8234dc8 100644 --- a/unshackle/core/drm/widevine.py +++ b/unshackle/core/drm/widevine.py @@ -185,7 +185,12 @@ class Widevine: if cert and hasattr(cdm, "set_service_certificate"): cdm.set_service_certificate(session_id, cert) - cdm.parse_license(session_id, licence(challenge=cdm.get_license_challenge(session_id, self.pssh))) + challenge = cdm.get_license_challenge(session_id, self.pssh) + + if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id): + pass + else: + cdm.parse_license(session_id, licence(challenge=challenge)) self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")} if not self.content_keys: @@ -213,10 +218,15 @@ class Widevine: if cert and hasattr(cdm, "set_service_certificate"): cdm.set_service_certificate(session_id, cert) - cdm.parse_license( - session_id, - licence(session_id=session_id, challenge=cdm.get_license_challenge(session_id, self.pssh)), - ) + challenge = cdm.get_license_challenge(session_id, self.pssh) + + if hasattr(cdm, "has_cached_keys") and cdm.has_cached_keys(session_id): + pass + else: + cdm.parse_license( + session_id, + licence(session_id=session_id, challenge=challenge), + ) self.content_keys = {key.kid: key.key.hex() for key in cdm.get_keys(session_id, "CONTENT")} if not self.content_keys: From 3ef43afeed08528ab298a3491d720d1daab4eee6 Mon Sep 17 00:00:00 2001 From: Andy Date: Mon, 1 Sep 2025 00:34:07 +0000 Subject: [PATCH 5/9] feat(cdm): Add DecryptLabs CDM configurations for Chrome and PlayReady devices with updated User-Agent and service certificate --- unshackle/unshackle-example.yaml | 53 +++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/unshackle/unshackle-example.yaml b/unshackle/unshackle-example.yaml index 1d0bfc0..aa8819b 100644 --- a/unshackle/unshackle-example.yaml +++ b/unshackle/unshackle-example.yaml @@ -105,6 +105,50 @@ remote_cdm: host: https://domain-2.com/api secret: secret_key + - name: "decrypt_labs_chrome" + type: "decrypt_labs" # Required to identify as DecryptLabs CDM + device_name: "ChromeCDM" # Scheme identifier - must match exactly + device_type: CHROME + system_id: 4464 # Doesn't matter + security_level: 3 + host: "https://keyxtractor.decryptlabs.com" + secret: "your_decrypt_labs_api_key_here" # Replace with your API key + - name: "decrypt_labs_l1" + type: "decrypt_labs" + device_name: "L1" # Scheme identifier - must match exactly + device_type: ANDROID + system_id: 4464 + security_level: 1 + host: "https://keyxtractor.decryptlabs.com" + secret: "your_decrypt_labs_api_key_here" + + - name: "decrypt_labs_l2" + type: "decrypt_labs" + device_name: "L2" # Scheme identifier - must match exactly + device_type: ANDROID + system_id: 4464 + security_level: 2 + host: "https://keyxtractor.decryptlabs.com" + secret: "your_decrypt_labs_api_key_here" + + - name: "decrypt_labs_playready_sl2" + type: "decrypt_labs" + device_name: "SL2" # Scheme identifier - must match exactly + device_type: PLAYREADY + system_id: 0 + security_level: 2000 + host: "https://keyxtractor.decryptlabs.com" + secret: "your_decrypt_labs_api_key_here" + + - name: "decrypt_labs_playready_sl3" + type: "decrypt_labs" + device_name: "SL3" # Scheme identifier - must match exactly + device_type: PLAYREADY + system_id: 0 + security_level: 3000 + host: "https://keyxtractor.decryptlabs.com" + secret: "your_decrypt_labs_api_key_here" + # Key Vaults store your obtained Content Encryption Keys (CEKs) # Use 'no_push: true' to prevent a vault from receiving pushed keys # while still allowing it to provide keys when requested @@ -171,7 +215,7 @@ chapter_fallback_name: "Chapter {j:02}" # Case-Insensitive dictionary of headers for all Services headers: Accept-Language: "en-US,en;q=0.8" - User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36" + User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36" # Override default filenames used across unshackle filenames: @@ -213,6 +257,13 @@ services: # Global service config api_key: "service_api_key" + # Service certificate for Widevine L1/L2 (base64 encoded) + # This certificate is automatically used when L1/L2 schemes are selected + # Services obtain this from their DRM provider or license server + certificate: | + CAUSwwUKvQIIAxIQ5US6QAvBDzfTtjb4tU/7QxiH8c+TBSKOAjCCAQoCggEBAObzvlu2hZRsapAPx4Aa4GUZj4/GjxgXUtBH4THSkM40x63wQeyVxlEEo + # ... (full base64 certificate here) + # Profile-specific device configurations profiles: john_sd: From ed744205ade0a4cd7d56409f26d5ae429dc92525 Mon Sep 17 00:00:00 2001 From: Andy Date: Mon, 1 Sep 2025 21:02:08 +0000 Subject: [PATCH 6/9] =?UTF-8?q?fix(tags):=20=F0=9F=90=9B=20Fix=20Matroska?= =?UTF-8?q?=20tag=20compliance=20with=20official=20specification?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Update IMDB tags to use ID only (tt123456) instead of URLs - Update TMDB tags to use prefix/id format (movie/123456, tv/123456) - Update TVDB tags to use numeric ID only - Add XML escaping for tag values - Fix XML declaration to use double quotes Fixes #15 --- unshackle/core/utils/tags.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/unshackle/core/utils/tags.py b/unshackle/core/utils/tags.py index d0753f4..f6c8c91 100644 --- a/unshackle/core/utils/tags.py +++ b/unshackle/core/utils/tags.py @@ -11,6 +11,7 @@ from typing import Optional, Tuple import requests from requests.adapters import HTTPAdapter, Retry +from xml.sax.saxutils import escape from unshackle.core import binaries from unshackle.core.config import config @@ -289,9 +290,9 @@ def _apply_tags(path: Path, tags: dict[str, str]) -> None: log.debug("mkvpropedit not found on PATH; skipping tags") return log.debug("Applying tags to %s: %s", path, tags) - xml_lines = ["", "", " ", " "] + xml_lines = ['', "", " ", " "] for name, value in tags.items(): - xml_lines.append(f" {name}{value}") + xml_lines.append(f" {escape(name)}{escape(value)}") xml_lines.extend([" ", ""]) with tempfile.NamedTemporaryFile("w", suffix=".xml", delete=False) as f: f.write("\n".join(xml_lines)) @@ -351,11 +352,11 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) -> show_ids = simkl_data.get("show", {}).get("ids", {}) if show_ids.get("imdb"): - standard_tags["IMDB"] = f"https://www.imdb.com/title/{show_ids['imdb']}" + standard_tags["IMDB"] = show_ids["imdb"] if show_ids.get("tvdb"): - standard_tags["TVDB"] = f"https://thetvdb.com/dereferrer/series/{show_ids['tvdb']}" + standard_tags["TVDB"] = str(show_ids["tvdb"]) if show_ids.get("tmdbtv"): - standard_tags["TMDB"] = f"https://www.themoviedb.org/tv/{show_ids['tmdbtv']}" + standard_tags["TMDB"] = f"tv/{show_ids['tmdbtv']}" # Use TMDB API for additional metadata (either from provided ID or Simkl lookup) api_key = _api_key() @@ -373,8 +374,8 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) -> _apply_tags(path, custom_tags) return - tmdb_url = f"https://www.themoviedb.org/{'movie' if kind == 'movie' else 'tv'}/{tmdb_id}" - standard_tags["TMDB"] = tmdb_url + prefix = "movie" if kind == "movie" else "tv" + standard_tags["TMDB"] = f"{prefix}/{tmdb_id}" try: ids = external_ids(tmdb_id, kind) except requests.RequestException as exc: @@ -385,11 +386,10 @@ def tag_file(path: Path, title: Title, tmdb_id: Optional[int] | None = None) -> imdb_id = ids.get("imdb_id") if imdb_id: - standard_tags["IMDB"] = f"https://www.imdb.com/title/{imdb_id}" + standard_tags["IMDB"] = imdb_id tvdb_id = ids.get("tvdb_id") if tvdb_id: - tvdb_prefix = "movies" if kind == "movie" else "series" - standard_tags["TVDB"] = f"https://thetvdb.com/dereferrer/{tvdb_prefix}/{tvdb_id}" + standard_tags["TVDB"] = str(tvdb_id) merged_tags = { **custom_tags, From 9fd08951284366072d24f5a9ee70ab6369a44070 Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 2 Sep 2025 04:02:52 +0000 Subject: [PATCH 7/9] feat(cdm): Refactor DecryptLabsRemoteCDM full support for Widevine/Playready and ChromeCDM --- pyproject.toml | 2 +- unshackle/commands/dl.py | 51 +- unshackle/core/__init__.py | 2 +- unshackle/core/cdm/decrypt_labs_remote_cdm.py | 1062 ++++++++--------- unshackle/core/tracks/track.py | 9 + uv.lock | 2 +- 6 files changed, 523 insertions(+), 605 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2a0b84f..2c4c67c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "unshackle" -version = "1.4.3" +version = "1.4.4" description = "Modular Movie, TV, and Music Archival Software." authors = [{ name = "unshackle team" }] requires-python = ">=3.10,<3.13" diff --git a/unshackle/commands/dl.py b/unshackle/commands/dl.py index 254b807..eac5e7d 100644 --- a/unshackle/commands/dl.py +++ b/unshackle/commands/dl.py @@ -345,7 +345,10 @@ class dl: sys.exit(1) if self.cdm: - if hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]: + if isinstance(self.cdm, DecryptLabsRemoteCDM): + drm_type = "PlayReady" if self.cdm.is_playready else "Widevine" + self.log.info(f"Loaded {drm_type} Remote CDM: DecryptLabs (L{self.cdm.security_level})") + elif hasattr(self.cdm, "device_type") and self.cdm.device_type.name in ["ANDROID", "CHROME"]: self.log.info(f"Loaded Widevine CDM: {self.cdm.system_id} (L{self.cdm.security_level})") else: self.log.info( @@ -874,7 +877,12 @@ class dl: ), licence=partial( service.get_playready_license - if isinstance(self.cdm, PlayReadyCdm) + if ( + isinstance(self.cdm, PlayReadyCdm) + or ( + isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready + ) + ) and hasattr(service, "get_playready_license") else service.get_widevine_license, title=title, @@ -1201,10 +1209,22 @@ class dl: if not drm: return - if isinstance(drm, Widevine) and not isinstance(self.cdm, WidevineCdm): - self.cdm = self.get_cdm(self.service, self.profile, drm="widevine") - elif isinstance(drm, PlayReady) and not isinstance(self.cdm, PlayReadyCdm): - self.cdm = self.get_cdm(self.service, self.profile, drm="playready") + if isinstance(drm, Widevine): + if not isinstance(self.cdm, (WidevineCdm, DecryptLabsRemoteCDM)) or ( + isinstance(self.cdm, DecryptLabsRemoteCDM) and self.cdm.is_playready + ): + widevine_cdm = self.get_cdm(self.service, self.profile, drm="widevine") + if widevine_cdm: + self.log.info("Switching to Widevine CDM for Widevine content") + self.cdm = widevine_cdm + elif isinstance(drm, PlayReady): + if not isinstance(self.cdm, (PlayReadyCdm, DecryptLabsRemoteCDM)) or ( + isinstance(self.cdm, DecryptLabsRemoteCDM) and not self.cdm.is_playready + ): + playready_cdm = self.get_cdm(self.service, self.profile, drm="playready") + if playready_cdm: + self.log.info("Switching to PlayReady CDM for PlayReady content") + self.cdm = playready_cdm if isinstance(drm, Widevine): with self.DRM_TABLE_LOCK: @@ -1477,26 +1497,17 @@ class dl: cdm_api = next(iter(x for x in config.remote_cdm if x["name"] == cdm_name), None) if cdm_api: - is_decrypt_lab = True if cdm_api["type"] == "decrypt_labs" else False + is_decrypt_lab = True if cdm_api.get("type") == "decrypt_labs" else False if is_decrypt_lab: - device_type = cdm_api.get("device_type") del cdm_api["name"] del cdm_api["type"] - # Use the appropriate DecryptLabs CDM class based on device type - if device_type == "PLAYREADY" or cdm_api.get("device_name") in ["SL2", "SL3"]: - from unshackle.core.cdm.decrypt_labs_remote_cdm import DecryptLabsRemotePlayReadyCDM - - # Remove unused parameters for PlayReady CDM - cdm_params = cdm_api.copy() - cdm_params.pop("device_type", None) - cdm_params.pop("system_id", None) - return DecryptLabsRemotePlayReadyCDM(service_name=service, vaults=self.vaults, **cdm_params) - else: - return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api) + # All DecryptLabs CDMs use DecryptLabsRemoteCDM + return DecryptLabsRemoteCDM(service_name=service, vaults=self.vaults, **cdm_api) else: del cdm_api["name"] - del cdm_api["type"] + if "type" in cdm_api: + del cdm_api["type"] return RemoteCdm(**cdm_api) prd_path = config.directories.prds / f"{cdm_name}.prd" diff --git a/unshackle/core/__init__.py b/unshackle/core/__init__.py index aa56ed4..c0f285b 100644 --- a/unshackle/core/__init__.py +++ b/unshackle/core/__init__.py @@ -1 +1 @@ -__version__ = "1.4.3" +__version__ = "1.4.4" diff --git a/unshackle/core/cdm/decrypt_labs_remote_cdm.py b/unshackle/core/cdm/decrypt_labs_remote_cdm.py index f356e40..b9f4250 100644 --- a/unshackle/core/cdm/decrypt_labs_remote_cdm.py +++ b/unshackle/core/cdm/decrypt_labs_remote_cdm.py @@ -1,687 +1,585 @@ +from __future__ import annotations + import base64 import secrets -from typing import Optional, Type, Union +from typing import Any, Dict, List, Optional, Union from uuid import UUID import requests -from pyplayready.cdm import Cdm as PlayReadyCdm -from pywidevine import PSSH, Device, DeviceTypes, Key, RemoteCdm -from pywidevine.license_protocol_pb2 import SignedDrmCertificate, SignedMessage +from pywidevine.device import DeviceTypes +from requests import Session -# Copyright 2024 by DevYukine. -# Copyright 2025 by sp4rk.y. +from unshackle.core.vaults import Vaults -class DecryptLabsRemoteCDM(RemoteCdm): - """Remote CDM implementation for DecryptLabs KeyXtractor API. +class MockCertificateChain: + """Mock certificate chain for PlayReady compatibility.""" - Provides CDM functionality through DecryptLabs' remote API service, - supporting multiple DRM schemes including Widevine and PlayReady. + def __init__(self, name: str): + self._name = name + + def get_name(self) -> str: + return self._name + + +class Key: + """Key object compatible with pywidevine.""" + + def __init__(self, kid: str, key: str, type_: str = "CONTENT"): + if isinstance(kid, str): + clean_kid = kid.replace("-", "") + if len(clean_kid) == 32: + self.kid = UUID(hex=clean_kid) + else: + self.kid = UUID(hex=clean_kid.ljust(32, "0")) + else: + self.kid = kid + + if isinstance(key, str): + self.key = bytes.fromhex(key) + else: + self.key = key + + self.type = type_ + + +class DecryptLabsRemoteCDMExceptions: + """Exception classes for compatibility with pywidevine CDM.""" + + class InvalidSession(Exception): + """Raised when session ID is invalid.""" + + class TooManySessions(Exception): + """Raised when session limit is reached.""" + + class InvalidInitData(Exception): + """Raised when PSSH/init data is invalid.""" + + class InvalidLicenseType(Exception): + """Raised when license type is invalid.""" + + class InvalidLicenseMessage(Exception): + """Raised when license message is invalid.""" + + class InvalidContext(Exception): + """Raised when session has no context data.""" + + class SignatureMismatch(Exception): + """Raised when signature verification fails.""" + + +class DecryptLabsRemoteCDM: """ + Decrypt Labs Remote CDM implementation compatible with pywidevine's CDM interface. + + This class provides a drop-in replacement for pywidevine's local CDM using + Decrypt Labs' KeyXtractor API service. + """ + + service_certificate_challenge = b"\x08\x04" def __init__( self, - device_type: Union[DeviceTypes, str], - system_id: int, - security_level: int, - host: str, secret: str, - device_name: str, - service_name: str, - vaults=None, + host: str = "https://keyxtractor.decryptlabs.com", + device_name: str = "ChromeCDM", + service_name: Optional[str] = None, + vaults: Optional[Vaults] = None, + device_type: Optional[str] = None, + system_id: Optional[int] = None, + security_level: Optional[int] = None, + **kwargs, ): - """Initialize DecryptLabs Remote CDM. + """ + Initialize Decrypt Labs Remote CDM for Widevine and PlayReady schemes. Args: - device_type: Type of device to emulate - system_id: System identifier - security_level: DRM security level - host: DecryptLabs API host URL - secret: DecryptLabs API key for authentication - device_name: Device/scheme name (used as scheme identifier) - service_name: Service/platform name - vaults: Optional vaults reference for caching keys + secret: Decrypt Labs API key (matches config format) + host: Decrypt Labs API host URL (matches config format) + device_name: DRM scheme (ChromeCDM, L1, L2 for Widevine; SL2, SL3 for PlayReady) + service_name: Service name for key caching and vault operations + vaults: Vaults instance for local key caching + device_type: Device type (CHROME, ANDROID, PLAYREADY) - for compatibility + system_id: System ID - for compatibility + security_level: Security level - for compatibility """ - self.response_counter = 0 - self.pssh = None - self.api_session_ids = {} - self.license_request = None - self.service_name = service_name + _ = kwargs + + self.secret = secret + self.host = host.rstrip("/") self.device_name = device_name - self.keys = {} - self.scheme = device_name - self._has_cached_keys = False + self.service_name = service_name or "" self.vaults = vaults - self.security_level = security_level - self.host = host - class MockCertificateChain: - """Mock certificate chain for DecryptLabs remote CDM compatibility.""" + self._device_type_str = device_type + if device_type: + self.device_type = self._get_device_type_enum(device_type) - def __init__(self, scheme: str, security_level: int): - self.scheme = scheme - self.security_level = security_level + self._is_playready = (device_type and device_type.upper() == "PLAYREADY") or (device_name in ["SL2", "SL3"]) - def get_name(self) -> str: - """Return the certificate chain name for logging.""" - return f"DecryptLabs-{self.scheme}" + if self._is_playready: + self.system_id = system_id or 0 + self.security_level = security_level or (2000 if device_name == "SL2" else 3000) + else: + self.system_id = system_id or 26830 + self.security_level = security_level or 3 - def get_security_level(self) -> int: - """Return the security level.""" - return self.security_level + self._sessions: Dict[bytes, Dict[str, Any]] = {} + self._pssh_b64 = None + self._http_session = Session() + self._http_session.headers.update( + { + "decrypt-labs-api-key": self.secret, + "Content-Type": "application/json", + "User-Agent": "unshackle-decrypt-labs-cdm/1.0", + } + ) - self.certificate_chain = MockCertificateChain(self.scheme, security_level) - try: - super().__init__(device_type, system_id, security_level, host, secret, device_name) - except Exception: - pass - self.req_session = requests.Session() - self.req_session.headers.update({"decrypt-labs-api-key": secret}) + def _get_device_type_enum(self, device_type: str): + """Convert device type string to enum for compatibility.""" + device_type_upper = device_type.upper() + if device_type_upper == "ANDROID": + return DeviceTypes.ANDROID + elif device_type_upper == "CHROME": + return DeviceTypes.CHROME + else: + return DeviceTypes.CHROME - @classmethod - def from_device(cls, device: Device) -> Type["DecryptLabsRemoteCDM"]: - raise NotImplementedError("You cannot load a DecryptLabsRemoteCDM from a local Device file.") + @property + def is_playready(self) -> bool: + """Check if this CDM is in PlayReady mode.""" + return self._is_playready + + @property + def certificate_chain(self) -> MockCertificateChain: + """Mock certificate chain for PlayReady compatibility.""" + return MockCertificateChain(f"{self.device_name}_Remote") + + def set_pssh_b64(self, pssh_b64: str) -> None: + """Store base64-encoded PSSH data for PlayReady compatibility.""" + self._pssh_b64 = pssh_b64 + + def _generate_session_id(self) -> bytes: + """Generate a unique session ID.""" + return secrets.token_bytes(16) + + def _get_init_data_from_pssh(self, pssh: Any) -> str: + """Extract init data from various PSSH formats.""" + if self.is_playready and self._pssh_b64: + return self._pssh_b64 + + if hasattr(pssh, "dumps"): + dumps_result = pssh.dumps() + + if isinstance(dumps_result, str): + try: + base64.b64decode(dumps_result) + return dumps_result + except Exception: + return base64.b64encode(dumps_result.encode("utf-8")).decode("utf-8") + else: + return base64.b64encode(dumps_result).decode("utf-8") + elif hasattr(pssh, "raw"): + raw_data = pssh.raw + if isinstance(raw_data, str): + raw_data = raw_data.encode("utf-8") + return base64.b64encode(raw_data).decode("utf-8") + elif hasattr(pssh, "__class__") and "WrmHeader" in pssh.__class__.__name__: + if self.is_playready: + raise ValueError("PlayReady WRM header received but no PSSH B64 was set via set_pssh_b64()") + + if hasattr(pssh, "raw_bytes"): + return base64.b64encode(pssh.raw_bytes).decode("utf-8") + elif hasattr(pssh, "bytes"): + return base64.b64encode(pssh.bytes).decode("utf-8") + else: + raise ValueError(f"Cannot extract PSSH data from WRM header type: {type(pssh)}") + else: + raise ValueError(f"Unsupported PSSH type: {type(pssh)}") def open(self) -> bytes: - """Open a new CDM session. + """ + Open a new CDM session. Returns: - Random session ID bytes for internal tracking + Session identifier as bytes """ - return bytes.fromhex(secrets.token_hex(16)) + session_id = self._generate_session_id() + self._sessions[session_id] = { + "service_certificate": None, + "keys": [], + "pssh": None, + "challenge": None, + "decrypt_labs_session_id": None, + } + return session_id def close(self, session_id: bytes) -> None: - """Close a CDM session. + """ + Close a CDM session. Args: - session_id: Session identifier to close + session_id: Session identifier + + Raises: + ValueError: If session ID is invalid """ - pass + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + del self._sessions[session_id] + + def get_service_certificate(self, session_id: bytes) -> Optional[bytes]: + """ + Get the service certificate for a session. + + Args: + session_id: Session identifier + + Returns: + Service certificate if set, None otherwise + + Raises: + ValueError: If session ID is invalid + """ + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + return self._sessions[session_id]["service_certificate"] def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str: - """Set service certificate for L1/L2 schemes. + """ + Set the service certificate for a session. Args: session_id: Session identifier certificate: Service certificate (bytes or base64 string) Returns: - Success status string + Certificate status message + + Raises: + ValueError: If session ID is invalid """ - if isinstance(certificate, bytes): - certificate = base64.b64encode(certificate).decode() + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") - self.service_certificate = certificate - self.privacy_mode = True + if certificate is None: + self._sessions[session_id]["service_certificate"] = None + return "Removed" - return "success" + if isinstance(certificate, str): + certificate = base64.b64decode(certificate) - def get_service_certificate(self, session_id: bytes) -> Optional[SignedDrmCertificate]: - raise NotImplementedError("This method is not implemented in this CDM") + self._sessions[session_id]["service_certificate"] = certificate + return "Successfully set Service Certificate" - def get_license_challenge( - self, session_id: bytes, pssh: PSSH, license_type: str = "STREAMING", privacy_mode: bool = True - ) -> bytes: - """Generate license challenge using DecryptLabs API. + def has_cached_keys(self, session_id: bytes) -> bool: + """ + Check if cached keys are available for the session. Args: session_id: Session identifier - pssh: PSSH initialization data - license_type: Type of license (default: "STREAMING") - privacy_mode: Enable privacy mode Returns: - License challenge bytes or empty bytes if using cached keys + True if cached keys are available + + Raises: + ValueError: If session ID is invalid """ - self.pssh = pssh + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") - scheme_to_use = self.scheme - try: - pssh_data = pssh.dumps() - if b"edef8ba9-79d6-4ace-a3c8-27dcd51d21ed" in pssh_data or "edef8ba979d64acea3c827dcd51d21ed" in pssh_data: - if self.scheme in ["SL2", "SL3"]: - scheme_to_use = "L1" if self.scheme == "SL2" else "L1" - else: - scheme_to_use = self.scheme - except Exception: - scheme_to_use = self.scheme + session = self._sessions[session_id] + pssh = session.get("pssh") - request_data = { - "init_data": self.pssh.dumps(), - "scheme": scheme_to_use, - "service": self.service_name, - } + if not pssh: + return False - if scheme_to_use in ["L1", "L2"] and hasattr(self, "service_certificate"): - request_data["service_certificate"] = self.service_certificate - elif scheme_to_use in ["L1", "L2"]: - pass + if self.vaults: + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = pssh.key_ids + elif hasattr(pssh, "kids"): + key_ids = pssh.kids - request_data["get_cached_keys_if_exists"] = True + for kid in key_ids: + key, _ = self.vaults.get_key(kid) + if key and key.count("0") != len(key): + return True - if not hasattr(self, "session_schemes"): - self.session_schemes = {} - self.session_schemes[session_id] = scheme_to_use - res = self.session( - self.host + "/get-request", - request_data, - ) + if self.service_name: + try: + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = [kid.hex for kid in pssh.key_ids] + elif hasattr(pssh, "kids"): + key_ids = [kid.hex for kid in pssh.kids] - if res.get("message_type") == "cached-keys": - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - cached_keys_for_vault = {} - - for cached_key in res.get("cached_keys", []): - kid_str = cached_key["kid"] - try: - kid_uuid = UUID(kid_str) - except ValueError: - try: - kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) - except ValueError: - kid_uuid = Key.kid_to_uuid(kid_str) - - session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) - cached_keys_for_vault[kid_uuid] = cached_key["key"] - - if self.vaults and cached_keys_for_vault: - try: - self.vaults.add_keys(cached_keys_for_vault) - except Exception: - pass - - if self.service_name == "NF" or "netflix" in self.service_name.lower(): - request_data_no_cache = request_data.copy() - request_data_no_cache["get_cached_keys_if_exists"] = False - - res_challenge = self.session( - self.host + "/get-request", - request_data_no_cache, - ) - - if res_challenge.get("challenge"): - self.license_request = res_challenge["challenge"] - self.api_session_ids[session_id] = res_challenge.get("session_id") - return base64.b64decode(self.license_request) - - self.license_request = "" - self.api_session_ids[session_id] = None - self._has_cached_keys = True - return b"" - - self.license_request = res["challenge"] - self.api_session_ids[session_id] = res["session_id"] - self._has_cached_keys = False - - return base64.b64decode(self.license_request) - - def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None: - """Parse license response and extract decryption keys. - - Args: - session_id: Session identifier - license_message: License response from DRM server - """ - session_id_api = self.api_session_ids[session_id] - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - if session_id_api is None and session_keys: - return - - if isinstance(license_message, dict) and "keys" in license_message: - session_keys.extend( - [ - Key(kid=Key.kid_to_uuid(x["kid"]), type_=x.get("type", "CONTENT"), key=bytes.fromhex(x["key"])) - for x in license_message["keys"] - ] - ) - - else: - if isinstance(license_message, bytes): - license_response_b64 = base64.b64encode(license_message).decode() - elif isinstance(license_message, str): - license_response_b64 = license_message - else: - license_response_b64 = str(license_message) - scheme_for_session = getattr(self, "session_schemes", {}).get(session_id, self.scheme) - - res = self.session( - self.host + "/decrypt-response", - { - "session_id": session_id_api, - "init_data": self.pssh.dumps(), - "license_request": self.license_request, - "license_response": license_response_b64, - "scheme": scheme_for_session, - }, - ) - - if scheme_for_session in ["SL2", "SL3"]: - if "keys" in res and res["keys"]: - keys_data = res["keys"] - if isinstance(keys_data, str): - original_keys = keys_data.replace("\n", " ") - keys_separated = original_keys.split("--key ") - for k in keys_separated: - if ":" in k: - key_parts = k.strip().split(":") - if len(key_parts) == 2: - try: - kid_hex, key_hex = key_parts - session_keys.append( - Key( - kid=UUID(bytes=bytes.fromhex(kid_hex)), - type_="CONTENT", - key=bytes.fromhex(key_hex), - ) - ) - except (ValueError, TypeError): - continue - elif isinstance(keys_data, list): - for key_info in keys_data: - if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info: - session_keys.append( - Key( - kid=Key.kid_to_uuid(key_info["kid"]), - type_=key_info.get("type", "CONTENT"), - key=bytes.fromhex(key_info["key"]), - ) - ) - else: - original_keys = res["keys"].replace("\n", " ") - keys_separated = original_keys.split("--key ") - formatted_keys = [] - for k in keys_separated: - if ":" in k: - key = k.strip() - formatted_keys.append(key) - for keys in formatted_keys: - session_keys.append( - Key( - kid=UUID(bytes=bytes.fromhex(keys.split(":")[0])), - type_="CONTENT", - key=bytes.fromhex(keys.split(":")[1]), - ) + if key_ids: + response = self._http_session.post( + f"{self.host}/get-cached-keys", + json={"service": self.service_name, "kid": key_ids}, + timeout=30, ) - def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]: - """Get decryption keys for a session. + if response.status_code == 200: + data = response.json() + return ( + data.get("message") == "success" + and "cached_keys" in data + and isinstance(data["cached_keys"], list) + and len(data["cached_keys"]) > 0 + ) - Args: - session_id: Session identifier - type_: Key type filter (optional) - - Returns: - List of decryption keys for the session - """ - return self.keys[session_id] - - def has_cached_keys(self, session_id: bytes) -> bool: - """Check if this session has cached keys and doesn't need license request. - - Args: - session_id: Session identifier to check - - Returns: - True if session has cached keys, False otherwise - """ - return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0 - - def session(self, url, data, retries=3): - """Make authenticated request to DecryptLabs API. - - Args: - url: API endpoint URL - data: Request payload data - retries: Number of retry attempts for failed requests - - Returns: - API response JSON data - - Raises: - ValueError: If API returns an error after retries - """ - res = self.req_session.post(url, json=data).json() - - if res.get("message") != "success": - if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""): - if retries > 0: - return self.session(url, data, retries=retries - 1) - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") - - return res - - def use_cached_keys_as_fallback(self, session_id: bytes) -> bool: - """Use cached keys from DecryptLabs as a fallback when license server fails. - - Args: - session_id: Session identifier - - Returns: - True if cached keys were successfully applied, False otherwise - """ - if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available: - return False - - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - cached_keys_for_vault = {} - - for cached_key in self._cached_keys_available: - kid_str = cached_key["kid"] - try: - kid_uuid = UUID(kid_str) - except ValueError: - try: - kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) - except ValueError: - kid_uuid = Key.kid_to_uuid(kid_str) - - session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) - cached_keys_for_vault[kid_uuid] = cached_key["key"] - - if self.vaults and cached_keys_for_vault: - try: - self.vaults.add_keys(cached_keys_for_vault) except Exception: pass - self._has_cached_keys = True - return True + return False - -class DecryptLabsRemotePlayReadyCDM(PlayReadyCdm): - """PlayReady Remote CDM implementation for DecryptLabs KeyXtractor API. - - Provides PlayReady CDM functionality through DecryptLabs' remote API service, - supporting PlayReady DRM schemes like SL2 and SL3. - """ - - def __init__( - self, - security_level: int, - host: str, - secret: str, - device_name: str, - service_name: str, - vaults=None, - client_version: str = "10.0.16384.10011", - ): - """Initialize DecryptLabs Remote PlayReady CDM. - - Args: - security_level: DRM security level - host: DecryptLabs API host URL - secret: DecryptLabs API key for authentication - device_name: Device/scheme name (used as scheme identifier) - service_name: Service/platform name - vaults: Optional vaults reference for caching keys - client_version: PlayReady client version + def get_license_challenge( + self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True + ) -> bytes: """ - super().__init__( - security_level=security_level, - certificate_chain=None, - encryption_key=None, - signing_key=None, - client_version=client_version, - ) - - self.host = host - self.service_name = service_name - self.device_name = device_name - self.scheme = device_name - self.vaults = vaults - self.keys = {} - self.api_session_ids = {} - self.pssh_b64 = None - self.license_request = None - self._has_cached_keys = False - - self.req_session = requests.Session() - self.req_session.headers.update({"decrypt-labs-api-key": secret}) - - class MockCertificateChain: - """Mock certificate chain for DecryptLabs remote CDM compatibility.""" - - def __init__(self, scheme: str, security_level: int): - self.scheme = scheme - self.security_level = security_level - - def get_name(self) -> str: - """Return the certificate chain name for logging.""" - return f"DecryptLabs-{self.scheme}" - - def get_security_level(self) -> int: - """Return the security level.""" - return self.security_level - - self.certificate_chain = MockCertificateChain(self.scheme, security_level) - - def set_pssh_b64(self, pssh_b64: str): - """Set the original base64-encoded PSSH box for DecryptLabs API. - - Args: - pssh_b64: Base64-encoded PSSH box from the manifest - """ - self.pssh_b64 = pssh_b64 - - def open(self) -> bytes: - """Open a new CDM session. - - Returns: - Random session ID bytes for internal tracking - """ - return bytes.fromhex(secrets.token_hex(16)) - - def close(self, session_id: bytes) -> None: - """Close a CDM session. - - Args: - session_id: Session identifier to close - """ - pass - - def get_license_challenge(self, session_id: bytes, _) -> str: - """Generate license challenge using DecryptLabs API for PlayReady. + Generate a license challenge using Decrypt Labs API. Args: session_id: Session identifier + pssh_or_wrm: PSSH object or WRM header (for PlayReady compatibility) + license_type: Type of license (STREAMING, OFFLINE, AUTOMATIC) - for compatibility only + privacy_mode: Whether to use privacy mode - for compatibility only Returns: - License challenge as XML string - """ - if not (hasattr(self, "pssh_b64") and self.pssh_b64): - raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.") + License challenge as bytes - init_data = self.pssh_b64 + Raises: + InvalidSession: If session ID is invalid + requests.RequestException: If API request fails + """ + _ = license_type, privacy_mode + + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + session = self._sessions[session_id] + + session["pssh"] = pssh_or_wrm + init_data = self._get_init_data_from_pssh(pssh_or_wrm) + + if self.has_cached_keys(session_id): + self._load_cached_keys(session_id) + return b"" + + request_data = {"scheme": self.device_name, "init_data": init_data} + + if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name: + request_data["service"] = self.service_name + + if session["service_certificate"]: + request_data["service_certificate"] = base64.b64encode(session["service_certificate"]).decode("utf-8") + + response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30) + + if response.status_code != 200: + raise requests.RequestException(f"API request failed: {response.status_code} {response.text}") + + data = response.json() + + if data.get("message") != "success": + error_msg = data.get("message", "Unknown error") + if "details" in data: + error_msg += f" - Details: {data['details']}" + if "error" in data: + error_msg += f" - Error: {data['error']}" + raise requests.RequestException(f"API error: {error_msg}") + + if data.get("message_type") == "cached-keys" or "cached_keys" in data: + cached_keys = data.get("cached_keys", []) + session["keys"] = self._parse_cached_keys(cached_keys) + return b"" + + challenge = base64.b64decode(data["challenge"]) + session["challenge"] = challenge + session["decrypt_labs_session_id"] = data["session_id"] + + return challenge + + def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None: + """ + Parse license response using Decrypt Labs API. + + Args: + session_id: Session identifier + license_message: License response from license server + + Raises: + ValueError: If session ID is invalid or no challenge available + requests.RequestException: If API request fails + """ + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") + + session = self._sessions[session_id] + + if session["keys"]: + return + + if not session.get("challenge") or not session.get("decrypt_labs_session_id"): + raise ValueError("No challenge available - call get_license_challenge first") + + if isinstance(license_message, str): + if self.is_playready and license_message.strip().startswith(" None: - """Parse license response and extract decryption keys. - - Args: - session_id: Session identifier - license_message: License response from DRM server (XML string) - """ - session_id_api = self.api_session_ids[session_id] - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] - - if session_id_api is None and session_keys: - return - - try: - license_response_b64 = base64.b64encode(license_message.encode("utf-8")).decode("utf-8") - except Exception: - return - - if not (hasattr(self, "pssh_b64") and self.pssh_b64): - raise ValueError("DecryptLabs CDM requires original PSSH box data. Call set_pssh_b64() first.") - init_data = self.pssh_b64 - - res = self.session( - self.host + "/decrypt-response", - { - "session_id": session_id_api, - "init_data": init_data, - "license_request": self.license_request, - "license_response": license_response_b64, - "scheme": self.scheme, - }, - ) - - if "keys" in res and res["keys"]: - keys_data = res["keys"] - if isinstance(keys_data, str): - original_keys = keys_data.replace("\n", " ") - keys_separated = original_keys.split("--key ") - for k in keys_separated: - if ":" in k: - key_parts = k.strip().split(":") - if len(key_parts) == 2: - try: - kid_hex, key_hex = key_parts - session_keys.append( - Key( - kid=UUID(bytes=bytes.fromhex(kid_hex)), - type_="CONTENT", - key=bytes.fromhex(key_hex), - ) - ) - except (ValueError, TypeError): - continue - elif isinstance(keys_data, list): - for key_info in keys_data: - if isinstance(key_info, dict) and "kid" in key_info and "key" in key_info: - session_keys.append( - Key( - kid=Key.kid_to_uuid(key_info["kid"]), - type_=key_info.get("type", "CONTENT"), - key=bytes.fromhex(key_info["key"]), - ) - ) - - def get_keys(self, session_id: bytes) -> list: - """Get decryption keys for a session. + session["keys"] = self._parse_keys_response(data) + + if self.vaults and session["keys"]: + key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"} + self.vaults.add_keys(key_dict) + + def get_keys(self, session_id: bytes, type_: Optional[str] = None) -> List[Key]: + """ + Get keys from the session. Args: session_id: Session identifier + type_: Optional key type filter (CONTENT, SIGNING, etc.) Returns: - List of decryption keys for the session - """ - return self.keys.get(session_id, []) - - def has_cached_keys(self, session_id: bytes) -> bool: - """Check if this session has cached keys and doesn't need license request. - - Args: - session_id: Session identifier to check - - Returns: - True if session has cached keys, False otherwise - """ - return getattr(self, "_has_cached_keys", False) and session_id in self.keys and len(self.keys[session_id]) > 0 - - def session(self, url, data, retries=3): - """Make authenticated request to DecryptLabs API. - - Args: - url: API endpoint URL - data: Request payload data - retries: Number of retry attempts for failed requests - - Returns: - API response JSON data + List of Key objects Raises: - ValueError: If API returns an error after retries + InvalidSession: If session ID is invalid """ - res = self.req_session.post(url, json=data).json() + if session_id not in self._sessions: + raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}") - if res.get("message") != "success": - if "License Response Decryption Process Failed at the very beginning" in res.get("Error", ""): - if retries > 0: - return self.session(url, data, retries=retries - 1) - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") - else: - raise ValueError(f"CDM API returned an error: {res['Error']}") + key_dicts = self._sessions[session_id]["keys"] + keys = [Key(kid=k["kid"], key=k["key"], type_=k["type"]) for k in key_dicts] - return res + if type_: + keys = [key for key in keys if key.type == type_] - def use_cached_keys_as_fallback(self, session_id: bytes) -> bool: - """Use cached keys from DecryptLabs as a fallback when license server fails. + return keys - Args: - session_id: Session identifier + def _load_cached_keys(self, session_id: bytes) -> None: + """Load cached keys from vaults and Decrypt Labs API.""" + session = self._sessions[session_id] + pssh = session["pssh"] + keys = [] - Returns: - True if cached keys were successfully applied, False otherwise - """ - if not hasattr(self, "_cached_keys_available") or not self._cached_keys_available: - return False + if self.vaults: + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = pssh.key_ids + elif hasattr(pssh, "kids"): + key_ids = pssh.kids - if session_id not in self.keys: - self.keys[session_id] = [] - session_keys = self.keys[session_id] + for kid in key_ids: + key, _ = self.vaults.get_key(kid) + if key and key.count("0") != len(key): + keys.append({"kid": kid.hex, "key": key, "type": "CONTENT"}) - cached_keys_for_vault = {} - - for cached_key in self._cached_keys_available: - kid_str = cached_key["kid"] + if not keys and self.service_name: try: - kid_uuid = UUID(kid_str) - except ValueError: - try: - kid_uuid = UUID(bytes=bytes.fromhex(kid_str)) - except ValueError: - kid_uuid = Key.kid_to_uuid(kid_str) + key_ids = [] + if hasattr(pssh, "key_ids"): + key_ids = [kid.hex for kid in pssh.key_ids] + elif hasattr(pssh, "kids"): + key_ids = [kid.hex for kid in pssh.kids] - session_keys.append(Key(kid=kid_uuid, type_="CONTENT", key=bytes.fromhex(cached_key["key"]))) - cached_keys_for_vault[kid_uuid] = cached_key["key"] + if key_ids: + response = self._http_session.post( + f"{self.host}/get-cached-keys", + json={"service": self.service_name, "kid": key_ids}, + timeout=30, + ) + + if response.status_code == 200: + data = response.json() + if data.get("message") == "success" and "cached_keys" in data: + keys = self._parse_cached_keys(data["cached_keys"]) - if self.vaults and cached_keys_for_vault: - try: - self.vaults.add_keys(cached_keys_for_vault) except Exception: pass - self._has_cached_keys = True - return True + session["keys"] = keys + + def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Parse cached keys from API response. + + Args: + cached_keys_data: List of cached key objects from API + + Returns: + List of key dictionaries + """ + keys = [] + + try: + if cached_keys_data and isinstance(cached_keys_data, list): + for key_data in cached_keys_data: + if "kid" in key_data and "key" in key_data: + keys.append({"kid": key_data["kid"], "key": key_data["key"], "type": "CONTENT"}) + except Exception: + pass + return keys + + def _parse_keys_response(self, data: Dict[str, Any]) -> List[Dict[str, Any]]: + """Parse keys from decrypt response.""" + keys = [] + + if "keys" in data and isinstance(data["keys"], str): + keys_string = data["keys"] + + for line in keys_string.split("\n"): + line = line.strip() + if line.startswith("--key "): + key_part = line[6:] + if ":" in key_part: + kid, key = key_part.split(":", 1) + keys.append({"kid": kid.strip(), "key": key.strip(), "type": "CONTENT"}) + elif "keys" in data and isinstance(data["keys"], list): + for key_data in data["keys"]: + keys.append( + {"kid": key_data.get("kid"), "key": key_data.get("key"), "type": key_data.get("type", "CONTENT")} + ) + + return keys + + +__all__ = ["DecryptLabsRemoteCDM"] diff --git a/unshackle/core/tracks/track.py b/unshackle/core/tracks/track.py index d77db62..d0c6f5a 100644 --- a/unshackle/core/tracks/track.py +++ b/unshackle/core/tracks/track.py @@ -420,6 +420,15 @@ class Track: for drm in self.drm: if isinstance(drm, PlayReady): return drm + elif hasattr(cdm, 'is_playready'): + if cdm.is_playready: + for drm in self.drm: + if isinstance(drm, PlayReady): + return drm + else: + for drm in self.drm: + if isinstance(drm, Widevine): + return drm return self.drm[0] diff --git a/uv.lock b/uv.lock index e8ff5a1..537fc74 100644 --- a/uv.lock +++ b/uv.lock @@ -1499,7 +1499,7 @@ wheels = [ [[package]] name = "unshackle" -version = "1.4.3" +version = "1.4.4" source = { editable = "." } dependencies = [ { name = "appdirs" }, From ab13dde9d2b2e1d8e101f8830f06e49bf2db0365 Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 2 Sep 2025 04:10:28 +0000 Subject: [PATCH 8/9] feat(changelog): Update changelog for version 1.4.4 with enhanced CDM support, configuration options, and various improvements --- CHANGELOG.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67bcc36..c6886af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,43 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.4.4] - 2025-09-02 + +### Added + +- **Enhanced DecryptLabs CDM Support**: Comprehensive remote CDM functionality + - Full support for Widevine, PlayReady, and ChromeCDM through DecryptLabsRemoteCDM + - Enhanced session management and caching support for remote WV/PR operations + - Support for cached keys and improved license handling + - New CDM configurations for Chrome and PlayReady devices with updated User-Agent and service certificate +- **Advanced Configuration Options**: New device and language preferences + - Added configuration options for device certificate status list + - Enhanced language preference settings + +### Changed + +- **DRM Decryption Enhancements**: Streamlined decryption process + - Simplified decrypt method by removing unused parameter and streamlined logic + - Improved DecryptLabs CDM configurations with better device support + +### Fixed + +- **Matroska Tag Compliance**: Enhanced media container compatibility + - Fixed Matroska tag compliance with official specification +- **Application Branding**: Cleaned up version display + - Removed old devine version reference from banner to avoid developer confusion + - Updated branding while maintaining original GNU license compliance +- **IP Information Handling**: Improved geolocation services + - Enhanced get_ip_info functionality with better failover handling + - Added support for 429 error handling and multiple API provider fallback + - Implemented cached IP info retrieval with fallback tester to avoid rate limiting +- **Dependencies**: Streamlined package requirements + - Removed unnecessary data extra requirement from langcodes + +### Removed + +- Deprecated version references in application banner for clarity + ## [1.4.3] - 2025-08-20 ### Added From 1e82283133cbd1a0d1d5f6a488df0656c81f0c57 Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 2 Sep 2025 04:13:43 +0000 Subject: [PATCH 9/9] fix(tags): Fix import order. --- unshackle/core/utils/tags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unshackle/core/utils/tags.py b/unshackle/core/utils/tags.py index f6c8c91..329392f 100644 --- a/unshackle/core/utils/tags.py +++ b/unshackle/core/utils/tags.py @@ -8,10 +8,10 @@ import tempfile from difflib import SequenceMatcher from pathlib import Path from typing import Optional, Tuple +from xml.sax.saxutils import escape import requests from requests.adapters import HTTPAdapter, Retry -from xml.sax.saxutils import escape from unshackle.core import binaries from unshackle.core.config import config