refactor(msl): improve key exchange handling and code cleanup

- Replace commented Widevine key exchange code with active parsing and key extraction
- Add checks for CDM session and CDM availability before license parsing
- Update key permission strings to lowercase and align with key extraction logic
- Handle AsymmetricWrapped scheme separately with RSA decryption of keys
- Change EntityAuthentication to Unauthenticated for certain challenge cases
- Remove redundant jsonpickle encoding/decoding for cached keys, store raw data instead
- Add detailed logging for key UUIDs and extracted Widevine keys
- Fix key comparison by converting kid bytes to UUID on comparison
- Raise Exception instead of using logger exit on MSL response error
- Update imports to consolidate pywidevine package classes used
This commit is contained in:
2025-08-29 20:52:30 +07:00
parent fcd1ebcf83
commit 3c24d83293

View File

@@ -27,8 +27,7 @@ from .schemes import EntityAuthenticationSchemes # noqa: F401
from .schemes import KeyExchangeSchemes
from .schemes.EntityAuthentication import EntityAuthentication
from .schemes.KeyExchangeRequest import KeyExchangeRequest
from pywidevine.cdm import Cdm
from pywidevine.pssh import PSSH
from pywidevine import Cdm, PSSH, Key
class MSL:
log = logging.getLogger("MSL")
@@ -70,7 +69,8 @@ class MSL:
)
keyrequestdata = KeyExchangeRequest.Widevine(challenge)
entityauthdata = EntityAuthentication.Widevine("TV", base64.b64encode(challenge).decode())
entityauthdata = EntityAuthentication.Unauthenticated(sender)
# entityauthdata = EntityAuthentication.Widevine("TV", base64.b64encode(challenge).decode())
else:
entityauthdata = EntityAuthentication.Unauthenticated(sender)
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
@@ -122,29 +122,30 @@ class MSL:
raise Exception("- Key exchange scheme mismatch occurred")
key_data = key_response_data["keydata"]
# if scheme == KeyExchangeSchemes.Widevine:
# if isinstance(cdm.device, RemoteDevice):
# msl_keys.encryption, msl_keys.sign = cdm.device.exchange(
# cdm.sessions[msl_keys.cdm_session],
# license_res=key_data["cdmkeyresponse"],
# enc_key_id=base64.b64decode(key_data["encryptionkeyid"]),
# hmac_key_id=base64.b64decode(key_data["hmackeyid"])
# )
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
# else:
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
# keys = cdm.get_keys(msl_keys.cdm_session)
# msl_keys.encryption = MSL.get_widevine_key(
# kid=base64.b64decode(key_data["encryptionkeyid"]),
# keys=keys,
# permissions=["AllowEncrypt", "AllowDecrypt"]
# )
# msl_keys.sign = MSL.get_widevine_key(
# kid=base64.b64decode(key_data["hmackeyid"]),
# keys=keys,
# permissions=["AllowSign", "AllowSignatureVerify"]
# )
# else:
if scheme == KeyExchangeSchemes.Widevine:
if not msl_keys.cdm_session:
raise Exception("- No CDM session available")
if not cdm:
raise Exception("- No CDM available")
cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
keys = cdm.get_keys(msl_keys.cdm_session)
cls.log.info(f"Keys: {keys}")
encryption_key = MSL.get_widevine_key(
kid=base64.b64decode(key_data["encryptionkeyid"]),
keys=keys,
permissions=["allow_encrypt", "allow_decrypt"]
)
msl_keys.encryption = encryption_key
cls.log.info(f"Encryption key: {encryption_key}")
sign = MSL.get_widevine_key(
kid=base64.b64decode(key_data["hmackeyid"]),
keys=keys,
permissions=["allow_sign", "allow_signature_verify"]
)
cls.log.info(f"Sign key: {sign}")
msl_keys.sign = sign
elif scheme == KeyExchangeSchemes.AsymmetricWrapped:
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
msl_keys.encryption = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt(
@@ -156,6 +157,7 @@ class MSL:
base64.b64decode(key_data["hmackey"])
).decode("utf-8"))["k"]
)
msl_keys.mastertoken = key_response_data["mastertoken"]
MSL.cache_keys(msl_keys, cache)
@@ -174,7 +176,7 @@ class MSL:
return None
# with open(msl_keys_path, encoding="utf-8") as fd:
# msl_keys = jsonpickle.decode(fd.read())
msl_keys = jsonpickle.decode(cacher.data)
msl_keys = cacher.data
if msl_keys.rsa:
# noinspection PyTypeChecker
# expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object
@@ -196,7 +198,7 @@ class MSL:
msl_keys.rsa = msl_keys.rsa.export_key()
# with open(cache, "w", encoding="utf-8") as fd:
# fd.write()
cache.set(jsonpickle.encode(msl_keys))
cache.set(msl_keys)
if msl_keys.rsa:
# re-import now
msl_keys.rsa = RSA.importKey(msl_keys.rsa)
@@ -241,9 +243,11 @@ class MSL:
return jsonpickle.encode(header_data, unpicklable=False)
@classmethod
def get_widevine_key(cls, kid, keys, permissions):
def get_widevine_key(cls, kid, keys: list[Key], permissions):
cls.log.info(f"KID: {Key.kid_to_uuid(kid)}")
for key in keys:
if key.kid != kid:
# cls.log.info(f"KEY: {key.kid_to_uuid}")
if key.kid != Key.kid_to_uuid(kid):
continue
if key.type != "OPERATOR_SESSION":
cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}")
@@ -259,7 +263,7 @@ class MSL:
res = self.session.post(url=endpoint, data=message, params=params)
header, payload_data = self.parse_message(res.text)
if "errordata" in header:
raise self.log.exit(
raise Exception(
"- MSL response message contains an error: {}".format(
json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8"))
)