3 Commits

Author SHA1 Message Date
kenzuya
1244141df2 fix(netflix): align MSL manifest payload with Chrome Widevine
Update Netflix manifest request construction to better match current
Widevine-on-Chrome behavior by:
- setting top-level and param `clientVersion` to `9999999`
- sending `challenge` only for Chrome Widevine requests
- removing hardcoded device/platform fields from params

Also refresh Android TV ESN mappings in config by replacing ESN `7110`
and adding ESN `16401` for Hisense devices to improve request validity.
2026-03-10 12:45:59 +07:00
kenzuya
5dde031bd8 feat(netflix-msl): support UserIDToken auth and raw responses
Add `UserAuthentication.UserIDToken()` to build MSL user auth payloads
for token-based Netflix authentication flows.

Extend MSL message handling to be more flexible by:
- allowing custom HTTP headers in `send_message()`
- adding `unwrap_result` to `send_message()`, `parse_message()`, and
  `decrypt_payload_chunks()` so callers can receive either full payload
  data or only `result`

Also lower key/KID and payload logging from `info` to `debug` to reduce
noisy and sensitive runtime logs while keeping diagnostics available.
2026-03-10 00:54:59 +07:00
kenzuya
a07302cb88 chore(gitignore): ignore capitalized Logs directory too
Add `Logs` to `.gitignore` so log output from environments that use an uppercase directory name is not accidentally staged or committed.
2026-03-10 00:54:47 +07:00
7 changed files with 210 additions and 36 deletions

1
.gitignore vendored
View File

@@ -27,6 +27,7 @@ temp/
logs/
Temp/
binaries/
Logs
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@@ -129,20 +129,20 @@ class MSL:
raise Exception("- No CDM available")
cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
keys = cdm.get_keys(msl_keys.cdm_session)
cls.log.info(f"Keys: {keys}")
cls.log.debug(f"Keys: {keys}")
encryption_key = MSL.get_widevine_key(
kid=base64.b64decode(key_data["encryptionkeyid"]),
keys=keys,
permissions=["allow_encrypt", "allow_decrypt"]
)
msl_keys.encryption = encryption_key
cls.log.info(f"Encryption key: {encryption_key}")
cls.log.debug(f"Encryption key: {encryption_key}")
sign = MSL.get_widevine_key(
kid=base64.b64decode(key_data["hmackeyid"]),
keys=keys,
permissions=["allow_sign", "allow_signature_verify"]
)
cls.log.info(f"Sign key: {sign}")
cls.log.debug(f"Sign key: {sign}")
msl_keys.sign = sign
elif scheme == KeyExchangeSchemes.AsymmetricWrapped:
@@ -244,7 +244,7 @@ class MSL:
@classmethod
def get_widevine_key(cls, kid, keys: list[Key], permissions):
cls.log.info(f"KID: {Key.kid_to_uuid(kid)}")
cls.log.debug(f"KID: {Key.kid_to_uuid(kid)}")
for key in keys:
# cls.log.info(f"KEY: {key.kid_to_uuid}")
if key.kid != Key.kid_to_uuid(kid):
@@ -258,10 +258,10 @@ class MSL:
return key.key
return None
def send_message(self, endpoint, params, application_data, userauthdata=None):
def send_message(self, endpoint, params, application_data, userauthdata=None, headers=None, unwrap_result=True):
message = self.create_message(application_data, userauthdata)
res = self.session.post(url=endpoint, data=message, params=params)
header, payload_data = self.parse_message(res.text)
res = self.session.post(url=endpoint, data=message, params=params, headers=headers)
header, payload_data = self.parse_message(res.text, unwrap_result=unwrap_result)
if "errordata" in header:
raise Exception(
"- MSL response message contains an error: {}".format(
@@ -302,7 +302,7 @@ class MSL:
return message
def decrypt_payload_chunks(self, payload_chunks):
def decrypt_payload_chunks(self, payload_chunks, unwrap_result=True):
"""
Decrypt and extract data from payload chunks
@@ -310,7 +310,6 @@ class MSL:
:return: json object
"""
raw_data = ""
for payload_chunk in payload_chunks:
# todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"]
# expecting base64-encoded json string
@@ -344,10 +343,12 @@ class MSL:
self.log.critical(f"- {error}")
raise Exception(f"- MSL response message contains an error: {error}")
# sys.exit(1)
self.log.debug(f"Payload Chunks: {data}")
if unwrap_result:
return data["result"]
return data
return data["result"]
def parse_message(self, message):
def parse_message(self, message, unwrap_result=True):
"""
Parse an MSL message into a header and list of payload chunks
@@ -359,7 +360,7 @@ class MSL:
header = parsed_message[0]
encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else []
if encrypted_payload_chunks:
payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks)
payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks, unwrap_result=unwrap_result)
else:
payload_chunks = {}

View File

@@ -31,6 +31,19 @@ class UserAuthentication(MSLObject):
}
)
@classmethod
def UserIDToken(cls, token_data, signature, master_token):
return cls(
scheme=UserAuthenticationSchemes.UserIDToken,
authdata={
"useridtoken": {
"tokendata": token_data,
"signature": signature
},
"mastertoken": master_token
}
)
@classmethod
def NetflixIDCookies(cls, netflixid, securenetflixid):
"""

View File

@@ -15,6 +15,7 @@ class EntityAuthenticationSchemes(Scheme):
class UserAuthenticationSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/User-Authentication-%28Configuration%29"""
EmailPassword = "EMAIL_PASSWORD"
UserIDToken = "USER_ID_TOKEN"
NetflixIDCookies = "NETFLIXID"

View File

@@ -20,6 +20,8 @@ from pymp4.parser import Box
from pywidevine import PSSH, Cdm as WidevineCDM, DeviceTypes
from pyplayready import PSSH as PlayReadyPSSH
import requests
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
from langcodes import Language
from unshackle.core.constants import AnyTrack
@@ -60,6 +62,7 @@ class Netflix(Service):
"es": "es-419",
"pt": "pt-PT",
}
ANDROID_CONFIG_ENDPOINT = "https://android.prod.ftl.netflix.com/nq/androidui/samurai/v1/config"
@staticmethod
@click.command(name="Netflix", short_help="https://netflix.com")
@@ -481,14 +484,166 @@ class Netflix(Service):
securenetflixid=cookie["SecureNetflixId"]
)
else:
# Android like way login to Netflix using email and password
if not self.credential:
raise Exception(" - Credentials are required for Android CDMs, and none were provided.")
self.userauthdata = UserAuthentication.EmailPassword(
email=self.credential.username,
password=self.credential.password
raise click.ClickException("Android sign-in requires credentials.")
self.userauthdata = self.get_android_userauthdata()
def get_android_userauthdata(self) -> UserAuthentication:
token_cache = self.get_android_user_token_cache()
token_data = token_cache.data if token_cache and isinstance(token_cache.data, dict) else None
if not token_data or not token_data.get("tokendata") or not token_data.get("signature"):
self.log.info("Requesting Android useridtoken")
token_data = self.fetch_android_user_id_token()
token_cache.set(token_data, expiration=self.resolve_android_token_expiration(token_data))
else:
self.log.info("Using cached Android useridtoken")
return UserAuthentication.UserIDToken(
token_data=token_data["tokendata"],
signature=token_data["signature"],
master_token=self.msl.keys.mastertoken
)
def get_android_user_token_cache(self):
return self.cache.get(f"ANDROID_USER_ID_TOKEN/{self.credential.sha1}/{self.esn.data['esn']}")
def fetch_android_user_id_token(self) -> dict:
try:
header, payload_data = self.msl.send_message(
endpoint=self.ANDROID_CONFIG_ENDPOINT,
params=self.build_android_sign_in_query(),
application_data="",
headers=self.build_android_sign_in_headers(),
unwrap_result=False
)
self.log.info(f"userauthdata: {self.userauthdata}")
except Exception as exc:
raise click.ClickException(f"Android sign-in request failed: {exc}") from exc
header_data = self.decrypt_android_header(header["headerdata"])
tokens = header_data.get("useridtoken")
if not tokens:
self.log.debug(f"Android sign-in header keys: {list(header_data.keys())}")
sign_in_value = self.extract_android_sign_in_value(payload_data)
error_code = self.extract_android_sign_in_error_code(sign_in_value)
if error_code:
raise click.ClickException(f"Android sign-in failed: {error_code}")
raise click.ClickException("Android sign-in did not return a useridtoken.")
return tokens
@staticmethod
def extract_android_sign_in_value(payload_data: dict) -> Optional[dict]:
if not isinstance(payload_data, dict):
return None
json_graph = payload_data.get("jsonGraph")
if not isinstance(json_graph, dict):
return None
sign_in_verify = json_graph.get("signInVerify")
if not isinstance(sign_in_verify, dict):
return None
value = sign_in_verify.get("value")
return value if isinstance(value, dict) else None
@staticmethod
def extract_android_sign_in_error_code(sign_in_value: Optional[dict]) -> Optional[str]:
if not isinstance(sign_in_value, dict):
return None
fields = sign_in_value.get("fields")
if not isinstance(fields, dict):
return None
error_code = fields.get("errorCode")
if not isinstance(error_code, dict):
return None
value = error_code.get("value")
return value if isinstance(value, str) and value else None
def build_android_sign_in_query(self) -> dict:
cookie = self.session.cookies.get_dict()
return {
"api": "33",
"appType": "samurai",
"appVer": "62902",
"appVersion": "9.18.0",
"chipset": "sm6150",
"chipsetHardware": "qcom",
"clientAppState": "FOREGROUND",
"clientAppVersionState": "NORMAL",
"countryCode": "+385",
"countryIsoCode": "HR",
"ctgr": "phone",
"dbg": "false",
"deviceLocale": "hr",
"devmod": "samsung_SM-A705FN",
"ffbc": "phone",
"flwssn": "c3100219-d002-40c5-80a7-055c00407246",
"installType": "regular",
"isAutomation": "false",
"isConsumptionOnly": "true",
"isNetflixPreloaded": "false",
"isPlayBillingEnabled": "true",
"isStubInSystemPartition": "false",
"lackLocale": "false",
"landingOrigin": "https://www.netflix.com",
"mId": "SAMSUSM-A705FNS",
"memLevel": "HIGH",
"method": "get",
"mnf": "samsung",
"model": "SM-A705FN",
"netflixClientPlatform": "androidNative",
"netflixId": cookie["NetflixId"],
"networkType": "wifi",
"osBoard": "sm6150",
"osDevice": "a70q",
"osDisplay": "TQ1A.230205.002",
"password": self.credential.password,
"path": "[\"signInVerify\"]",
"pathFormat": "hierarchical",
"platform": "android",
"preloadSignupRoValue": "",
"progressive": "false",
"qlty": "hd",
"recaptchaResponseTime": 244,
"recaptchaResponseToken": "",
"responseFormat": "json",
"roBspVer": "Q6150-17263-1",
"secureNetflixId": cookie["SecureNetflixId"],
"sid": "7176",
"store": "google",
"userLoginId": self.credential.username
}
def build_android_sign_in_headers(self) -> dict:
return {
"X-Netflix.Request.NqTracking": "VerifyLoginMslRequest",
"X-Netflix.Client.Request.Name": "VerifyLoginMslRequest",
"X-Netflix.Request.Client.Context": "{\"appState\":\"foreground\"}",
"X-Netflix-Esn": self.esn.data["esn"],
"X-Netflix.EsnPrefix": "NFANDROID1-PRV-P-",
"X-Netflix.msl-header-friendly-client": "true",
"content-encoding": "msl_v1"
}
def decrypt_android_header(self, encrypted_header_b64: str) -> dict:
encrypted_header = json.loads(base64.b64decode(encrypted_header_b64))
iv = base64.b64decode(encrypted_header["iv"])
ciphertext = base64.b64decode(encrypted_header["ciphertext"])
cipher = AES.new(self.msl.keys.encryption, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(ciphertext), AES.block_size)
return json.loads(decrypted.decode("utf-8"))
def resolve_android_token_expiration(self, token_data: dict):
for source in (token_data, self.msl.keys.mastertoken):
if not isinstance(source, dict):
continue
tokendata = source.get("tokendata")
if not tokendata:
continue
try:
parsed = json.loads(base64.b64decode(tokendata).decode("utf-8"))
except (TypeError, ValueError, json.JSONDecodeError):
continue
if parsed.get("expiration"):
return parsed["expiration"]
return None
def get_profiles(self):
@@ -676,17 +831,17 @@ class Netflix(Service):
"id": int(time.time()),
"esn": self.esn.data["esn"],
"languages": ["en-US"],
"clientVersion": "6.0026.291.011",
"clientVersion": "9999999",
"params": {
"clientVersion": "6.0051.090.911",
"challenge": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"],
# "challenge": base64.b64encode(challenge).decode(),
"challanges": {
# "default": base64.b64encode(challenge).decode()
"default": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"]
},
"clientVersion": "9999999",
**({
"challenge": self.config["payload_challenge"]
} if self.drm_system == "widevine" and self.cdm.device_type == DeviceTypes.CHROME else {}),
# "challanges": {
# # "default": base64.b64encode(challenge).decode()
# "default": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"]
# },
"contentPlaygraph": ["v2"],
"deviceSecurityLevel": "3000",
"drmVersion": 25,
"desiredVmaf": "plus_lts",
"desiredSegmentVmaf": "plus_lts",
@@ -699,10 +854,6 @@ class Netflix(Service):
"licenseType": "standard",
"liveAdsCapability": "replace",
"liveMetadataFormat": "INDEXED_SEGMENT_TEMPLATE",
"manifestVersion": "v2",
"osName": "windows",
"osVersion": "10.0",
"platform": "145.0.0.0",
"profilesGroups": [{
"name": "default",
"profiles": video_profiles

File diff suppressed because one or more lines are too long

View File

@@ -64,7 +64,7 @@ directories:
cache: Cache
# cookies: Cookies
dcsl: DCSL # Device Certificate Status List
downloads: Downloads
downloads: /home/kenzuya/Mounts/ketuakenzuya/Downloads/
logs: Logs
temp: Temp
# wvds: WVDs
@@ -79,7 +79,7 @@ directories:
# Pre-define which Widevine or PlayReady device to use for each Service
cdm:
# Global default CDM device (fallback for all services/profiles)
default: chromecdm
default: lg_50ut73006la.cekukh_17.0.0_22163355_7110_l1
# Direct service-specific CDM
DIFFERENT_EXAMPLE: PRD_1