|
|
|
|
@@ -70,10 +70,24 @@ class DecryptLabsRemoteCDMExceptions:
|
|
|
|
|
|
|
|
|
|
class DecryptLabsRemoteCDM:
|
|
|
|
|
"""
|
|
|
|
|
Decrypt Labs Remote CDM implementation compatible with pywidevine's CDM interface.
|
|
|
|
|
Decrypt Labs Remote CDM implementation with intelligent caching system.
|
|
|
|
|
|
|
|
|
|
This class provides a drop-in replacement for pywidevine's local CDM using
|
|
|
|
|
Decrypt Labs' KeyXtractor API service.
|
|
|
|
|
Decrypt Labs' KeyXtractor API service, enhanced with smart caching logic
|
|
|
|
|
that minimizes unnecessary license requests.
|
|
|
|
|
|
|
|
|
|
Key Features:
|
|
|
|
|
- Compatible with both Widevine and PlayReady DRM schemes
|
|
|
|
|
- Intelligent caching that compares required vs. available keys
|
|
|
|
|
- Automatic key combination for mixed cache/license scenarios
|
|
|
|
|
- Seamless fallback to license requests when keys are missing
|
|
|
|
|
|
|
|
|
|
Intelligent Caching System:
|
|
|
|
|
1. DRM classes (PlayReady/Widevine) provide required KIDs via set_required_kids()
|
|
|
|
|
2. get_license_challenge() first checks for cached keys
|
|
|
|
|
3. If cached keys satisfy requirements, returns empty challenge (no license needed)
|
|
|
|
|
4. If keys are missing, makes targeted license request for remaining keys
|
|
|
|
|
5. parse_license() combines cached and license keys intelligently
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
service_certificate_challenge = b"\x08\x04"
|
|
|
|
|
@@ -110,6 +124,7 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
self.device_name = device_name
|
|
|
|
|
self.service_name = service_name or ""
|
|
|
|
|
self.vaults = vaults
|
|
|
|
|
self.uch = self.host != "https://keyxtractor.decryptlabs.com"
|
|
|
|
|
|
|
|
|
|
self._device_type_str = device_type
|
|
|
|
|
if device_type:
|
|
|
|
|
@@ -126,6 +141,7 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
|
|
|
|
|
self._sessions: Dict[bytes, Dict[str, Any]] = {}
|
|
|
|
|
self._pssh_b64 = None
|
|
|
|
|
self._required_kids: Optional[List[str]] = None
|
|
|
|
|
self._http_session = Session()
|
|
|
|
|
self._http_session.headers.update(
|
|
|
|
|
{
|
|
|
|
|
@@ -159,6 +175,29 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
"""Store base64-encoded PSSH data for PlayReady compatibility."""
|
|
|
|
|
self._pssh_b64 = pssh_b64
|
|
|
|
|
|
|
|
|
|
def set_required_kids(self, kids: List[Union[str, UUID]]) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Set the required Key IDs for intelligent caching decisions.
|
|
|
|
|
|
|
|
|
|
This method enables the CDM to make smart decisions about when to request
|
|
|
|
|
additional keys via license challenges. When cached keys are available,
|
|
|
|
|
the CDM will compare them against the required KIDs to determine if a
|
|
|
|
|
license request is still needed for missing keys.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
kids: List of required Key IDs as UUIDs or hex strings
|
|
|
|
|
|
|
|
|
|
Note:
|
|
|
|
|
Should be called by DRM classes (PlayReady/Widevine) before making
|
|
|
|
|
license challenge requests to enable optimal caching behavior.
|
|
|
|
|
"""
|
|
|
|
|
self._required_kids = []
|
|
|
|
|
for kid in kids:
|
|
|
|
|
if isinstance(kid, UUID):
|
|
|
|
|
self._required_kids.append(str(kid).replace("-", "").lower())
|
|
|
|
|
else:
|
|
|
|
|
self._required_kids.append(str(kid).replace("-", "").lower())
|
|
|
|
|
|
|
|
|
|
def _generate_session_id(self) -> bytes:
|
|
|
|
|
"""Generate a unique session ID."""
|
|
|
|
|
return secrets.token_bytes(16)
|
|
|
|
|
@@ -291,57 +330,25 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
raise DecryptLabsRemoteCDMExceptions.InvalidSession(f"Invalid session ID: {session_id.hex()}")
|
|
|
|
|
|
|
|
|
|
session = self._sessions[session_id]
|
|
|
|
|
pssh = session.get("pssh")
|
|
|
|
|
|
|
|
|
|
if not pssh:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
if self.vaults:
|
|
|
|
|
key_ids = []
|
|
|
|
|
if hasattr(pssh, "key_ids"):
|
|
|
|
|
key_ids = pssh.key_ids
|
|
|
|
|
elif hasattr(pssh, "kids"):
|
|
|
|
|
key_ids = pssh.kids
|
|
|
|
|
|
|
|
|
|
for kid in key_ids:
|
|
|
|
|
key, _ = self.vaults.get_key(kid)
|
|
|
|
|
if key and key.count("0") != len(key):
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if self.service_name:
|
|
|
|
|
try:
|
|
|
|
|
key_ids = []
|
|
|
|
|
if hasattr(pssh, "key_ids"):
|
|
|
|
|
key_ids = [kid.hex for kid in pssh.key_ids]
|
|
|
|
|
elif hasattr(pssh, "kids"):
|
|
|
|
|
key_ids = [kid.hex for kid in pssh.kids]
|
|
|
|
|
|
|
|
|
|
if key_ids:
|
|
|
|
|
response = self._http_session.post(
|
|
|
|
|
f"{self.host}/get-cached-keys",
|
|
|
|
|
json={"service": self.service_name, "kid": key_ids},
|
|
|
|
|
timeout=30,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
data = response.json()
|
|
|
|
|
return (
|
|
|
|
|
data.get("message") == "success"
|
|
|
|
|
and "cached_keys" in data
|
|
|
|
|
and isinstance(data["cached_keys"], list)
|
|
|
|
|
and len(data["cached_keys"]) > 0
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
session_keys = session.get("keys", [])
|
|
|
|
|
return len(session_keys) > 0
|
|
|
|
|
|
|
|
|
|
def get_license_challenge(
|
|
|
|
|
self, session_id: bytes, pssh_or_wrm: Any, license_type: str = "STREAMING", privacy_mode: bool = True
|
|
|
|
|
) -> bytes:
|
|
|
|
|
"""
|
|
|
|
|
Generate a license challenge using Decrypt Labs API.
|
|
|
|
|
Generate a license challenge using Decrypt Labs API with intelligent caching.
|
|
|
|
|
|
|
|
|
|
This method implements smart caching logic that:
|
|
|
|
|
1. First attempts to retrieve cached keys from the API
|
|
|
|
|
2. If required KIDs are set, compares cached keys against requirements
|
|
|
|
|
3. Only makes a license request if keys are missing
|
|
|
|
|
4. Returns empty challenge if all required keys are cached
|
|
|
|
|
|
|
|
|
|
The intelligent caching works as follows:
|
|
|
|
|
- With required KIDs set: Only requests license for missing keys
|
|
|
|
|
- Without required KIDs: Returns any available cached keys
|
|
|
|
|
- For PlayReady: Combines cached keys with license keys seamlessly
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
session_id: Session identifier
|
|
|
|
|
@@ -350,11 +357,14 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
privacy_mode: Whether to use privacy mode - for compatibility only
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
License challenge as bytes
|
|
|
|
|
License challenge as bytes, or empty bytes if cached keys satisfy requirements
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
InvalidSession: If session ID is invalid
|
|
|
|
|
requests.RequestException: If API request fails
|
|
|
|
|
|
|
|
|
|
Note:
|
|
|
|
|
Call set_required_kids() before this method for optimal caching behavior.
|
|
|
|
|
"""
|
|
|
|
|
_ = license_type, privacy_mode
|
|
|
|
|
|
|
|
|
|
@@ -365,12 +375,13 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
|
|
|
|
|
session["pssh"] = pssh_or_wrm
|
|
|
|
|
init_data = self._get_init_data_from_pssh(pssh_or_wrm)
|
|
|
|
|
already_tried_cache = session.get("tried_cache", False)
|
|
|
|
|
|
|
|
|
|
if self.has_cached_keys(session_id):
|
|
|
|
|
self._load_cached_keys(session_id)
|
|
|
|
|
return b""
|
|
|
|
|
|
|
|
|
|
request_data = {"scheme": self.device_name, "init_data": init_data}
|
|
|
|
|
request_data = {
|
|
|
|
|
"scheme": self.device_name,
|
|
|
|
|
"init_data": init_data,
|
|
|
|
|
"get_cached_keys_if_exists": not already_tried_cache,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if self.device_name in ["L1", "L2", "SL2", "SL3"] and self.service_name:
|
|
|
|
|
request_data["service"] = self.service_name
|
|
|
|
|
@@ -391,22 +402,86 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
error_msg += f" - Details: {data['details']}"
|
|
|
|
|
if "error" in data:
|
|
|
|
|
error_msg += f" - Error: {data['error']}"
|
|
|
|
|
|
|
|
|
|
if "service_certificate is required" in str(data) and not session["service_certificate"]:
|
|
|
|
|
error_msg += " (No service certificate was provided to the CDM session)"
|
|
|
|
|
|
|
|
|
|
raise requests.RequestException(f"API error: {error_msg}")
|
|
|
|
|
|
|
|
|
|
if data.get("message_type") == "cached-keys" or "cached_keys" in data:
|
|
|
|
|
cached_keys = data.get("cached_keys", [])
|
|
|
|
|
session["keys"] = self._parse_cached_keys(cached_keys)
|
|
|
|
|
return b""
|
|
|
|
|
message_type = data.get("message_type")
|
|
|
|
|
|
|
|
|
|
if message_type == "cached-keys" or "cached_keys" in data:
|
|
|
|
|
"""
|
|
|
|
|
Handle cached keys response from API.
|
|
|
|
|
|
|
|
|
|
When the API returns cached keys, we need to determine if they satisfy
|
|
|
|
|
our requirements or if we need to make an additional license request
|
|
|
|
|
for missing keys.
|
|
|
|
|
"""
|
|
|
|
|
cached_keys = data.get("cached_keys", [])
|
|
|
|
|
parsed_keys = self._parse_cached_keys(cached_keys)
|
|
|
|
|
session["keys"] = parsed_keys
|
|
|
|
|
session["tried_cache"] = True
|
|
|
|
|
|
|
|
|
|
if self._required_kids:
|
|
|
|
|
cached_kids = set()
|
|
|
|
|
for key in parsed_keys:
|
|
|
|
|
if isinstance(key, dict) and "kid" in key:
|
|
|
|
|
cached_kids.add(key["kid"].replace("-", "").lower())
|
|
|
|
|
|
|
|
|
|
required_kids = set(self._required_kids)
|
|
|
|
|
missing_kids = required_kids - cached_kids
|
|
|
|
|
|
|
|
|
|
if missing_kids:
|
|
|
|
|
session["cached_keys"] = parsed_keys
|
|
|
|
|
request_data["get_cached_keys_if_exists"] = False
|
|
|
|
|
response = self._http_session.post(f"{self.host}/get-request", json=request_data, timeout=30)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
data = response.json()
|
|
|
|
|
if data.get("message") == "success" and "challenge" in data:
|
|
|
|
|
challenge = base64.b64decode(data["challenge"])
|
|
|
|
|
session["challenge"] = challenge
|
|
|
|
|
session["decrypt_labs_session_id"] = data["session_id"]
|
|
|
|
|
|
|
|
|
|
return challenge
|
|
|
|
|
|
|
|
|
|
return b""
|
|
|
|
|
else:
|
|
|
|
|
return b""
|
|
|
|
|
else:
|
|
|
|
|
return b""
|
|
|
|
|
|
|
|
|
|
if message_type == "license-request" or "challenge" in data:
|
|
|
|
|
challenge = base64.b64decode(data["challenge"])
|
|
|
|
|
session["challenge"] = challenge
|
|
|
|
|
session["decrypt_labs_session_id"] = data["session_id"]
|
|
|
|
|
return challenge
|
|
|
|
|
|
|
|
|
|
error_msg = f"Unexpected API response format. message_type={message_type}, available_fields={list(data.keys())}"
|
|
|
|
|
if data.get("message"):
|
|
|
|
|
error_msg = f"API response: {data['message']} - {error_msg}"
|
|
|
|
|
if "details" in data:
|
|
|
|
|
error_msg += f" - Details: {data['details']}"
|
|
|
|
|
if "error" in data:
|
|
|
|
|
error_msg += f" - Error: {data['error']}"
|
|
|
|
|
|
|
|
|
|
if already_tried_cache and data.get("message") == "success":
|
|
|
|
|
return b""
|
|
|
|
|
|
|
|
|
|
raise requests.RequestException(error_msg)
|
|
|
|
|
|
|
|
|
|
def parse_license(self, session_id: bytes, license_message: Union[bytes, str]) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Parse license response using Decrypt Labs API.
|
|
|
|
|
Parse license response using Decrypt Labs API with intelligent key combination.
|
|
|
|
|
|
|
|
|
|
For PlayReady content with partial cached keys, this method intelligently
|
|
|
|
|
combines the cached keys with newly obtained license keys, avoiding
|
|
|
|
|
duplicates while ensuring all required keys are available.
|
|
|
|
|
|
|
|
|
|
The key combination process:
|
|
|
|
|
1. Extracts keys from the license response
|
|
|
|
|
2. If cached keys exist (PlayReady), combines them with license keys
|
|
|
|
|
3. Removes duplicate keys by comparing normalized KIDs
|
|
|
|
|
4. Updates the session with the complete key set
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
session_id: Session identifier
|
|
|
|
|
@@ -421,7 +496,7 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
|
|
|
|
|
session = self._sessions[session_id]
|
|
|
|
|
|
|
|
|
|
if session["keys"]:
|
|
|
|
|
if session["keys"] and not (self.is_playready and "cached_keys" in session):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if not session.get("challenge") or not session.get("decrypt_labs_session_id"):
|
|
|
|
|
@@ -465,7 +540,48 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
error_msg += f" - Details: {data['details']}"
|
|
|
|
|
raise requests.RequestException(f"License decrypt error: {error_msg}")
|
|
|
|
|
|
|
|
|
|
session["keys"] = self._parse_keys_response(data)
|
|
|
|
|
license_keys = self._parse_keys_response(data)
|
|
|
|
|
|
|
|
|
|
if self.is_playready and "cached_keys" in session:
|
|
|
|
|
"""
|
|
|
|
|
Combine cached keys with license keys for PlayReady content.
|
|
|
|
|
|
|
|
|
|
This ensures we have both the cached keys (obtained earlier) and
|
|
|
|
|
any additional keys from the license response, without duplicates.
|
|
|
|
|
"""
|
|
|
|
|
cached_keys = session.get("cached_keys", [])
|
|
|
|
|
all_keys = list(cached_keys)
|
|
|
|
|
|
|
|
|
|
for license_key in license_keys:
|
|
|
|
|
already_exists = False
|
|
|
|
|
license_kid = None
|
|
|
|
|
if isinstance(license_key, dict) and "kid" in license_key:
|
|
|
|
|
license_kid = license_key["kid"].replace("-", "").lower()
|
|
|
|
|
elif hasattr(license_key, "kid"):
|
|
|
|
|
license_kid = str(license_key.kid).replace("-", "").lower()
|
|
|
|
|
elif hasattr(license_key, "key_id"):
|
|
|
|
|
license_kid = str(license_key.key_id).replace("-", "").lower()
|
|
|
|
|
|
|
|
|
|
if license_kid:
|
|
|
|
|
for cached_key in cached_keys:
|
|
|
|
|
cached_kid = None
|
|
|
|
|
if isinstance(cached_key, dict) and "kid" in cached_key:
|
|
|
|
|
cached_kid = cached_key["kid"].replace("-", "").lower()
|
|
|
|
|
elif hasattr(cached_key, "kid"):
|
|
|
|
|
cached_kid = str(cached_key.kid).replace("-", "").lower()
|
|
|
|
|
elif hasattr(cached_key, "key_id"):
|
|
|
|
|
cached_kid = str(cached_key.key_id).replace("-", "").lower()
|
|
|
|
|
|
|
|
|
|
if cached_kid == license_kid:
|
|
|
|
|
already_exists = True
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
if not already_exists:
|
|
|
|
|
all_keys.append(license_key)
|
|
|
|
|
|
|
|
|
|
session["keys"] = all_keys
|
|
|
|
|
else:
|
|
|
|
|
session["keys"] = license_keys
|
|
|
|
|
|
|
|
|
|
if self.vaults and session["keys"]:
|
|
|
|
|
key_dict = {UUID(hex=key["kid"]): key["key"] for key in session["keys"] if key["type"] == "CONTENT"}
|
|
|
|
|
@@ -496,49 +612,6 @@ class DecryptLabsRemoteCDM:
|
|
|
|
|
|
|
|
|
|
return keys
|
|
|
|
|
|
|
|
|
|
def _load_cached_keys(self, session_id: bytes) -> None:
|
|
|
|
|
"""Load cached keys from vaults and Decrypt Labs API."""
|
|
|
|
|
session = self._sessions[session_id]
|
|
|
|
|
pssh = session["pssh"]
|
|
|
|
|
keys = []
|
|
|
|
|
|
|
|
|
|
if self.vaults:
|
|
|
|
|
key_ids = []
|
|
|
|
|
if hasattr(pssh, "key_ids"):
|
|
|
|
|
key_ids = pssh.key_ids
|
|
|
|
|
elif hasattr(pssh, "kids"):
|
|
|
|
|
key_ids = pssh.kids
|
|
|
|
|
|
|
|
|
|
for kid in key_ids:
|
|
|
|
|
key, _ = self.vaults.get_key(kid)
|
|
|
|
|
if key and key.count("0") != len(key):
|
|
|
|
|
keys.append({"kid": kid.hex, "key": key, "type": "CONTENT"})
|
|
|
|
|
|
|
|
|
|
if not keys and self.service_name:
|
|
|
|
|
try:
|
|
|
|
|
key_ids = []
|
|
|
|
|
if hasattr(pssh, "key_ids"):
|
|
|
|
|
key_ids = [kid.hex for kid in pssh.key_ids]
|
|
|
|
|
elif hasattr(pssh, "kids"):
|
|
|
|
|
key_ids = [kid.hex for kid in pssh.kids]
|
|
|
|
|
|
|
|
|
|
if key_ids:
|
|
|
|
|
response = self._http_session.post(
|
|
|
|
|
f"{self.host}/get-cached-keys",
|
|
|
|
|
json={"service": self.service_name, "kid": key_ids},
|
|
|
|
|
timeout=30,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
data = response.json()
|
|
|
|
|
if data.get("message") == "success" and "cached_keys" in data:
|
|
|
|
|
keys = self._parse_cached_keys(data["cached_keys"])
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
session["keys"] = keys
|
|
|
|
|
|
|
|
|
|
def _parse_cached_keys(self, cached_keys_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
|
|
"""Parse cached keys from API response.
|
|
|
|
|
|
|
|
|
|
|