feat(netflix-msl): support UserIDToken auth and raw responses

Add `UserAuthentication.UserIDToken()` to build MSL user auth payloads
for token-based Netflix authentication flows.

Extend MSL message handling to be more flexible by:
- allowing custom HTTP headers in `send_message()`
- adding `unwrap_result` to `send_message()`, `parse_message()`, and
  `decrypt_payload_chunks()` so callers can receive either full payload
  data or only `result`

Also lower key/KID and payload logging from `info` to `debug` to reduce
noisy and sensitive runtime logs while keeping diagnostics available.
This commit is contained in:
kenzuya
2026-03-10 00:54:59 +07:00
parent a07302cb88
commit 5dde031bd8
6 changed files with 199 additions and 23 deletions

View File

@@ -129,20 +129,20 @@ class MSL:
raise Exception("- No CDM available") raise Exception("- No CDM available")
cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
keys = cdm.get_keys(msl_keys.cdm_session) keys = cdm.get_keys(msl_keys.cdm_session)
cls.log.info(f"Keys: {keys}") cls.log.debug(f"Keys: {keys}")
encryption_key = MSL.get_widevine_key( encryption_key = MSL.get_widevine_key(
kid=base64.b64decode(key_data["encryptionkeyid"]), kid=base64.b64decode(key_data["encryptionkeyid"]),
keys=keys, keys=keys,
permissions=["allow_encrypt", "allow_decrypt"] permissions=["allow_encrypt", "allow_decrypt"]
) )
msl_keys.encryption = encryption_key msl_keys.encryption = encryption_key
cls.log.info(f"Encryption key: {encryption_key}") cls.log.debug(f"Encryption key: {encryption_key}")
sign = MSL.get_widevine_key( sign = MSL.get_widevine_key(
kid=base64.b64decode(key_data["hmackeyid"]), kid=base64.b64decode(key_data["hmackeyid"]),
keys=keys, keys=keys,
permissions=["allow_sign", "allow_signature_verify"] permissions=["allow_sign", "allow_signature_verify"]
) )
cls.log.info(f"Sign key: {sign}") cls.log.debug(f"Sign key: {sign}")
msl_keys.sign = sign msl_keys.sign = sign
elif scheme == KeyExchangeSchemes.AsymmetricWrapped: elif scheme == KeyExchangeSchemes.AsymmetricWrapped:
@@ -244,7 +244,7 @@ class MSL:
@classmethod @classmethod
def get_widevine_key(cls, kid, keys: list[Key], permissions): def get_widevine_key(cls, kid, keys: list[Key], permissions):
cls.log.info(f"KID: {Key.kid_to_uuid(kid)}") cls.log.debug(f"KID: {Key.kid_to_uuid(kid)}")
for key in keys: for key in keys:
# cls.log.info(f"KEY: {key.kid_to_uuid}") # cls.log.info(f"KEY: {key.kid_to_uuid}")
if key.kid != Key.kid_to_uuid(kid): if key.kid != Key.kid_to_uuid(kid):
@@ -258,10 +258,10 @@ class MSL:
return key.key return key.key
return None return None
def send_message(self, endpoint, params, application_data, userauthdata=None): def send_message(self, endpoint, params, application_data, userauthdata=None, headers=None, unwrap_result=True):
message = self.create_message(application_data, userauthdata) message = self.create_message(application_data, userauthdata)
res = self.session.post(url=endpoint, data=message, params=params) res = self.session.post(url=endpoint, data=message, params=params, headers=headers)
header, payload_data = self.parse_message(res.text) header, payload_data = self.parse_message(res.text, unwrap_result=unwrap_result)
if "errordata" in header: if "errordata" in header:
raise Exception( raise Exception(
"- MSL response message contains an error: {}".format( "- MSL response message contains an error: {}".format(
@@ -302,7 +302,7 @@ class MSL:
return message return message
def decrypt_payload_chunks(self, payload_chunks): def decrypt_payload_chunks(self, payload_chunks, unwrap_result=True):
""" """
Decrypt and extract data from payload chunks Decrypt and extract data from payload chunks
@@ -310,7 +310,6 @@ class MSL:
:return: json object :return: json object
""" """
raw_data = "" raw_data = ""
for payload_chunk in payload_chunks: for payload_chunk in payload_chunks:
# todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"] # todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"]
# expecting base64-encoded json string # expecting base64-encoded json string
@@ -344,10 +343,12 @@ class MSL:
self.log.critical(f"- {error}") self.log.critical(f"- {error}")
raise Exception(f"- MSL response message contains an error: {error}") raise Exception(f"- MSL response message contains an error: {error}")
# sys.exit(1) # sys.exit(1)
self.log.debug(f"Payload Chunks: {data}")
if unwrap_result:
return data["result"] return data["result"]
return data
def parse_message(self, message): def parse_message(self, message, unwrap_result=True):
""" """
Parse an MSL message into a header and list of payload chunks Parse an MSL message into a header and list of payload chunks
@@ -359,7 +360,7 @@ class MSL:
header = parsed_message[0] header = parsed_message[0]
encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else [] encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else []
if encrypted_payload_chunks: if encrypted_payload_chunks:
payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks) payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks, unwrap_result=unwrap_result)
else: else:
payload_chunks = {} payload_chunks = {}

View File

@@ -31,6 +31,19 @@ class UserAuthentication(MSLObject):
} }
) )
@classmethod
def UserIDToken(cls, token_data, signature, master_token):
return cls(
scheme=UserAuthenticationSchemes.UserIDToken,
authdata={
"useridtoken": {
"tokendata": token_data,
"signature": signature
},
"mastertoken": master_token
}
)
@classmethod @classmethod
def NetflixIDCookies(cls, netflixid, securenetflixid): def NetflixIDCookies(cls, netflixid, securenetflixid):
""" """

View File

@@ -15,6 +15,7 @@ class EntityAuthenticationSchemes(Scheme):
class UserAuthenticationSchemes(Scheme): class UserAuthenticationSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/User-Authentication-%28Configuration%29""" """https://github.com/Netflix/msl/wiki/User-Authentication-%28Configuration%29"""
EmailPassword = "EMAIL_PASSWORD" EmailPassword = "EMAIL_PASSWORD"
UserIDToken = "USER_ID_TOKEN"
NetflixIDCookies = "NETFLIXID" NetflixIDCookies = "NETFLIXID"

View File

@@ -20,6 +20,8 @@ from pymp4.parser import Box
from pywidevine import PSSH, Cdm as WidevineCDM, DeviceTypes from pywidevine import PSSH, Cdm as WidevineCDM, DeviceTypes
from pyplayready import PSSH as PlayReadyPSSH from pyplayready import PSSH as PlayReadyPSSH
import requests import requests
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
from langcodes import Language from langcodes import Language
from unshackle.core.constants import AnyTrack from unshackle.core.constants import AnyTrack
@@ -60,6 +62,7 @@ class Netflix(Service):
"es": "es-419", "es": "es-419",
"pt": "pt-PT", "pt": "pt-PT",
} }
ANDROID_CONFIG_ENDPOINT = "https://android.prod.ftl.netflix.com/nq/androidui/samurai/v1/config"
@staticmethod @staticmethod
@click.command(name="Netflix", short_help="https://netflix.com") @click.command(name="Netflix", short_help="https://netflix.com")
@@ -481,14 +484,166 @@ class Netflix(Service):
securenetflixid=cookie["SecureNetflixId"] securenetflixid=cookie["SecureNetflixId"]
) )
else: else:
# Android like way login to Netflix using email and password
if not self.credential: if not self.credential:
raise Exception(" - Credentials are required for Android CDMs, and none were provided.") raise click.ClickException("Android sign-in requires credentials.")
self.userauthdata = UserAuthentication.EmailPassword( self.userauthdata = self.get_android_userauthdata()
email=self.credential.username,
password=self.credential.password def get_android_userauthdata(self) -> UserAuthentication:
token_cache = self.get_android_user_token_cache()
token_data = token_cache.data if token_cache and isinstance(token_cache.data, dict) else None
if not token_data or not token_data.get("tokendata") or not token_data.get("signature"):
self.log.info("Requesting Android useridtoken")
token_data = self.fetch_android_user_id_token()
token_cache.set(token_data, expiration=self.resolve_android_token_expiration(token_data))
else:
self.log.info("Using cached Android useridtoken")
return UserAuthentication.UserIDToken(
token_data=token_data["tokendata"],
signature=token_data["signature"],
master_token=self.msl.keys.mastertoken
) )
self.log.info(f"userauthdata: {self.userauthdata}")
def get_android_user_token_cache(self):
return self.cache.get(f"ANDROID_USER_ID_TOKEN/{self.credential.sha1}/{self.esn.data['esn']}")
def fetch_android_user_id_token(self) -> dict:
try:
header, payload_data = self.msl.send_message(
endpoint=self.ANDROID_CONFIG_ENDPOINT,
params=self.build_android_sign_in_query(),
application_data="",
headers=self.build_android_sign_in_headers(),
unwrap_result=False
)
except Exception as exc:
raise click.ClickException(f"Android sign-in request failed: {exc}") from exc
header_data = self.decrypt_android_header(header["headerdata"])
tokens = header_data.get("useridtoken")
if not tokens:
self.log.debug(f"Android sign-in header keys: {list(header_data.keys())}")
sign_in_value = self.extract_android_sign_in_value(payload_data)
error_code = self.extract_android_sign_in_error_code(sign_in_value)
if error_code:
raise click.ClickException(f"Android sign-in failed: {error_code}")
raise click.ClickException("Android sign-in did not return a useridtoken.")
return tokens
@staticmethod
def extract_android_sign_in_value(payload_data: dict) -> Optional[dict]:
if not isinstance(payload_data, dict):
return None
json_graph = payload_data.get("jsonGraph")
if not isinstance(json_graph, dict):
return None
sign_in_verify = json_graph.get("signInVerify")
if not isinstance(sign_in_verify, dict):
return None
value = sign_in_verify.get("value")
return value if isinstance(value, dict) else None
@staticmethod
def extract_android_sign_in_error_code(sign_in_value: Optional[dict]) -> Optional[str]:
if not isinstance(sign_in_value, dict):
return None
fields = sign_in_value.get("fields")
if not isinstance(fields, dict):
return None
error_code = fields.get("errorCode")
if not isinstance(error_code, dict):
return None
value = error_code.get("value")
return value if isinstance(value, str) and value else None
def build_android_sign_in_query(self) -> dict:
cookie = self.session.cookies.get_dict()
return {
"api": "33",
"appType": "samurai",
"appVer": "62902",
"appVersion": "9.18.0",
"chipset": "sm6150",
"chipsetHardware": "qcom",
"clientAppState": "FOREGROUND",
"clientAppVersionState": "NORMAL",
"countryCode": "+385",
"countryIsoCode": "HR",
"ctgr": "phone",
"dbg": "false",
"deviceLocale": "hr",
"devmod": "samsung_SM-A705FN",
"ffbc": "phone",
"flwssn": "c3100219-d002-40c5-80a7-055c00407246",
"installType": "regular",
"isAutomation": "false",
"isConsumptionOnly": "true",
"isNetflixPreloaded": "false",
"isPlayBillingEnabled": "true",
"isStubInSystemPartition": "false",
"lackLocale": "false",
"landingOrigin": "https://www.netflix.com",
"mId": "SAMSUSM-A705FNS",
"memLevel": "HIGH",
"method": "get",
"mnf": "samsung",
"model": "SM-A705FN",
"netflixClientPlatform": "androidNative",
"netflixId": cookie["NetflixId"],
"networkType": "wifi",
"osBoard": "sm6150",
"osDevice": "a70q",
"osDisplay": "TQ1A.230205.002",
"password": self.credential.password,
"path": "[\"signInVerify\"]",
"pathFormat": "hierarchical",
"platform": "android",
"preloadSignupRoValue": "",
"progressive": "false",
"qlty": "hd",
"recaptchaResponseTime": 244,
"recaptchaResponseToken": "",
"responseFormat": "json",
"roBspVer": "Q6150-17263-1",
"secureNetflixId": cookie["SecureNetflixId"],
"sid": "7176",
"store": "google",
"userLoginId": self.credential.username
}
def build_android_sign_in_headers(self) -> dict:
return {
"X-Netflix.Request.NqTracking": "VerifyLoginMslRequest",
"X-Netflix.Client.Request.Name": "VerifyLoginMslRequest",
"X-Netflix.Request.Client.Context": "{\"appState\":\"foreground\"}",
"X-Netflix-Esn": self.esn.data["esn"],
"X-Netflix.EsnPrefix": "NFANDROID1-PRV-P-",
"X-Netflix.msl-header-friendly-client": "true",
"content-encoding": "msl_v1"
}
def decrypt_android_header(self, encrypted_header_b64: str) -> dict:
encrypted_header = json.loads(base64.b64decode(encrypted_header_b64))
iv = base64.b64decode(encrypted_header["iv"])
ciphertext = base64.b64decode(encrypted_header["ciphertext"])
cipher = AES.new(self.msl.keys.encryption, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(ciphertext), AES.block_size)
return json.loads(decrypted.decode("utf-8"))
def resolve_android_token_expiration(self, token_data: dict):
for source in (token_data, self.msl.keys.mastertoken):
if not isinstance(source, dict):
continue
tokendata = source.get("tokendata")
if not tokendata:
continue
try:
parsed = json.loads(base64.b64decode(tokendata).decode("utf-8"))
except (TypeError, ValueError, json.JSONDecodeError):
continue
if parsed.get("expiration"):
return parsed["expiration"]
return None
def get_profiles(self): def get_profiles(self):

File diff suppressed because one or more lines are too long

View File

@@ -64,7 +64,7 @@ directories:
cache: Cache cache: Cache
# cookies: Cookies # cookies: Cookies
dcsl: DCSL # Device Certificate Status List dcsl: DCSL # Device Certificate Status List
downloads: Downloads downloads: /home/kenzuya/Mounts/ketuakenzuya/Downloads/
logs: Logs logs: Logs
temp: Temp temp: Temp
# wvds: WVDs # wvds: WVDs
@@ -79,7 +79,7 @@ directories:
# Pre-define which Widevine or PlayReady device to use for each Service # Pre-define which Widevine or PlayReady device to use for each Service
cdm: cdm:
# Global default CDM device (fallback for all services/profiles) # Global default CDM device (fallback for all services/profiles)
default: chromecdm default: lg_50ut73006la.cekukh_17.0.0_22163355_7110_l1
# Direct service-specific CDM # Direct service-specific CDM
DIFFERENT_EXAMPLE: PRD_1 DIFFERENT_EXAMPLE: PRD_1