3 Commits

Author SHA1 Message Date
c7be94c0fc chore(config): add Netflix credentials and comment out wvds directory
- Added Netflix service credentials with a default user and password
- Commented out the wvds directory in the configuration file
- Retained example credentials for reference in comments
2025-08-29 20:54:34 +07:00
c60035cb1d feat(netflix): add hybrid HDR10 and DV profile support and Android CDM improvements
- Introduce new descriptive subtitle option in CLI and internal logic
- Support hybrid video range by separately fetching HDR10 and DV profiles
- Add detailed error handling and logging for hybrid mode processing
- Extend ESN handling to support different device types (Chrome and Android)
- Implement Android CDM login using email and password credentials
- Update ESN caching logic with type-aware expiration handling
- Adjust manifest parsing to handle optional hydrate_tracks parameter
- Enhance subtitle filtering to optionally skip descriptive subtitles
- Expand ALIASES to include lowercase variants "netflix" and "nf"
- Add new ESN mapping entry for Android device in config.yaml
2025-08-29 20:53:52 +07:00
3c24d83293 refactor(msl): improve key exchange handling and code cleanup
- Replace commented Widevine key exchange code with active parsing and key extraction
- Add checks for CDM session and CDM availability before license parsing
- Update key permission strings to lowercase and align with key extraction logic
- Handle AsymmetricWrapped scheme separately with RSA decryption of keys
- Change EntityAuthentication to Unauthenticated for certain challenge cases
- Remove redundant jsonpickle encoding/decoding for cached keys, store raw data instead
- Add detailed logging for key UUIDs and extracted Widevine keys
- Fix key comparison by converting kid bytes to UUID on comparison
- Raise Exception instead of using logger exit on MSL response error
- Update imports to consolidate pywidevine package classes used
2025-08-29 20:52:30 +07:00
4 changed files with 141 additions and 74 deletions

View File

@@ -27,8 +27,7 @@ from .schemes import EntityAuthenticationSchemes # noqa: F401
from .schemes import KeyExchangeSchemes from .schemes import KeyExchangeSchemes
from .schemes.EntityAuthentication import EntityAuthentication from .schemes.EntityAuthentication import EntityAuthentication
from .schemes.KeyExchangeRequest import KeyExchangeRequest from .schemes.KeyExchangeRequest import KeyExchangeRequest
from pywidevine.cdm import Cdm from pywidevine import Cdm, PSSH, Key
from pywidevine.pssh import PSSH
class MSL: class MSL:
log = logging.getLogger("MSL") log = logging.getLogger("MSL")
@@ -70,7 +69,8 @@ class MSL:
) )
keyrequestdata = KeyExchangeRequest.Widevine(challenge) keyrequestdata = KeyExchangeRequest.Widevine(challenge)
entityauthdata = EntityAuthentication.Widevine("TV", base64.b64encode(challenge).decode()) entityauthdata = EntityAuthentication.Unauthenticated(sender)
# entityauthdata = EntityAuthentication.Widevine("TV", base64.b64encode(challenge).decode())
else: else:
entityauthdata = EntityAuthentication.Unauthenticated(sender) entityauthdata = EntityAuthentication.Unauthenticated(sender)
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped( keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
@@ -122,40 +122,42 @@ class MSL:
raise Exception("- Key exchange scheme mismatch occurred") raise Exception("- Key exchange scheme mismatch occurred")
key_data = key_response_data["keydata"] key_data = key_response_data["keydata"]
# if scheme == KeyExchangeSchemes.Widevine: if scheme == KeyExchangeSchemes.Widevine:
# if isinstance(cdm.device, RemoteDevice): if not msl_keys.cdm_session:
# msl_keys.encryption, msl_keys.sign = cdm.device.exchange( raise Exception("- No CDM session available")
# cdm.sessions[msl_keys.cdm_session], if not cdm:
# license_res=key_data["cdmkeyresponse"], raise Exception("- No CDM available")
# enc_key_id=base64.b64decode(key_data["encryptionkeyid"]), cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
# hmac_key_id=base64.b64decode(key_data["hmackeyid"]) keys = cdm.get_keys(msl_keys.cdm_session)
# ) cls.log.info(f"Keys: {keys}")
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) encryption_key = MSL.get_widevine_key(
# else: kid=base64.b64decode(key_data["encryptionkeyid"]),
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"]) keys=keys,
# keys = cdm.get_keys(msl_keys.cdm_session) permissions=["allow_encrypt", "allow_decrypt"]
# msl_keys.encryption = MSL.get_widevine_key( )
# kid=base64.b64decode(key_data["encryptionkeyid"]), msl_keys.encryption = encryption_key
# keys=keys, cls.log.info(f"Encryption key: {encryption_key}")
# permissions=["AllowEncrypt", "AllowDecrypt"] sign = MSL.get_widevine_key(
# ) kid=base64.b64decode(key_data["hmackeyid"]),
# msl_keys.sign = MSL.get_widevine_key( keys=keys,
# kid=base64.b64decode(key_data["hmackeyid"]), permissions=["allow_sign", "allow_signature_verify"]
# keys=keys, )
# permissions=["AllowSign", "AllowSignatureVerify"] cls.log.info(f"Sign key: {sign}")
# ) msl_keys.sign = sign
# else:
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa) elif scheme == KeyExchangeSchemes.AsymmetricWrapped:
msl_keys.encryption = MSL.base64key_decode( cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
json.JSONDecoder().decode(cipher_rsa.decrypt( msl_keys.encryption = MSL.base64key_decode(
base64.b64decode(key_data["encryptionkey"]) json.JSONDecoder().decode(cipher_rsa.decrypt(
).decode("utf-8"))["k"] base64.b64decode(key_data["encryptionkey"])
) ).decode("utf-8"))["k"]
msl_keys.sign = MSL.base64key_decode( )
json.JSONDecoder().decode(cipher_rsa.decrypt( msl_keys.sign = MSL.base64key_decode(
base64.b64decode(key_data["hmackey"]) json.JSONDecoder().decode(cipher_rsa.decrypt(
).decode("utf-8"))["k"] base64.b64decode(key_data["hmackey"])
) ).decode("utf-8"))["k"]
)
msl_keys.mastertoken = key_response_data["mastertoken"] msl_keys.mastertoken = key_response_data["mastertoken"]
MSL.cache_keys(msl_keys, cache) MSL.cache_keys(msl_keys, cache)
@@ -174,7 +176,7 @@ class MSL:
return None return None
# with open(msl_keys_path, encoding="utf-8") as fd: # with open(msl_keys_path, encoding="utf-8") as fd:
# msl_keys = jsonpickle.decode(fd.read()) # msl_keys = jsonpickle.decode(fd.read())
msl_keys = jsonpickle.decode(cacher.data) msl_keys = cacher.data
if msl_keys.rsa: if msl_keys.rsa:
# noinspection PyTypeChecker # noinspection PyTypeChecker
# expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object # expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object
@@ -196,7 +198,7 @@ class MSL:
msl_keys.rsa = msl_keys.rsa.export_key() msl_keys.rsa = msl_keys.rsa.export_key()
# with open(cache, "w", encoding="utf-8") as fd: # with open(cache, "w", encoding="utf-8") as fd:
# fd.write() # fd.write()
cache.set(jsonpickle.encode(msl_keys)) cache.set(msl_keys)
if msl_keys.rsa: if msl_keys.rsa:
# re-import now # re-import now
msl_keys.rsa = RSA.importKey(msl_keys.rsa) msl_keys.rsa = RSA.importKey(msl_keys.rsa)
@@ -241,9 +243,11 @@ class MSL:
return jsonpickle.encode(header_data, unpicklable=False) return jsonpickle.encode(header_data, unpicklable=False)
@classmethod @classmethod
def get_widevine_key(cls, kid, keys, permissions): def get_widevine_key(cls, kid, keys: list[Key], permissions):
cls.log.info(f"KID: {Key.kid_to_uuid(kid)}")
for key in keys: for key in keys:
if key.kid != kid: # cls.log.info(f"KEY: {key.kid_to_uuid}")
if key.kid != Key.kid_to_uuid(kid):
continue continue
if key.type != "OPERATOR_SESSION": if key.type != "OPERATOR_SESSION":
cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}") cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}")
@@ -259,7 +263,7 @@ class MSL:
res = self.session.post(url=endpoint, data=message, params=params) res = self.session.post(url=endpoint, data=message, params=params)
header, payload_data = self.parse_message(res.text) header, payload_data = self.parse_message(res.text)
if "errordata" in header: if "errordata" in header:
raise self.log.exit( raise Exception(
"- MSL response message contains an error: {}".format( "- MSL response message contains an error: {}".format(
json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8")) json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8"))
) )

View File

@@ -17,7 +17,7 @@ from Crypto.Random import get_random_bytes
import jsonpickle import jsonpickle
from pymp4.parser import Box from pymp4.parser import Box
from pywidevine import PSSH, Cdm from pywidevine import PSSH, Cdm, DeviceTypes
import requests import requests
from langcodes import Language from langcodes import Language
@@ -52,11 +52,8 @@ class Netflix(Service):
Authorization: Cookies Authorization: Cookies
Security: UHD@SL3000/L1 FHD@SL3000/L1 Security: UHD@SL3000/L1 FHD@SL3000/L1
""" """
TITLE_RE = [ TITLE_RE = r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<title_id>\d+)"
r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<id>\d+)", ALIASES= ("NF", "Netflix", "netflix", "nf")
r"^https?://(?:www\.)?unogs\.com/title/(?P<id>\d+)",
]
ALIASES= ("NF", "Netflix")
NF_LANG_MAP = { NF_LANG_MAP = {
"es": "es-419", "es": "es-419",
"pt": "pt-PT", "pt": "pt-PT",
@@ -74,11 +71,12 @@ class Netflix(Service):
@click.option("--meta-lang", type=str, help="Language to use for metadata") @click.option("--meta-lang", type=str, help="Language to use for metadata")
@click.option("-ht","--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.") @click.option("-ht","--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.")
@click.option("-hb", "--high-bitrate", is_flag=True, default=False, help="Get more video bitrate") @click.option("-hb", "--high-bitrate", is_flag=True, default=False, help="Get more video bitrate")
@click.option("-ds", "--descriptive-subtitles", is_flag=True, default=False, help="Get descriptive subtitles")
@click.pass_context @click.pass_context
def cli(ctx, **kwargs): def cli(ctx, **kwargs):
return Netflix(ctx, **kwargs) return Netflix(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool): def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool, descriptive_subtitles: bool):
super().__init__(ctx) super().__init__(ctx)
# General # General
self.title = title self.title = title
@@ -89,6 +87,7 @@ class Netflix(Service):
self.profiles: List[str] = [] self.profiles: List[str] = []
self.requested_profiles: List[str] = [] self.requested_profiles: List[str] = []
self.high_bitrate = high_bitrate self.high_bitrate = high_bitrate
self.descriptive_subtitles = descriptive_subtitles
# MSL # MSL
self.esn = self.cache.get("ESN") self.esn = self.cache.get("ESN")
@@ -207,7 +206,32 @@ class Netflix(Service):
except Exception as e: except Exception as e:
self.log.error(e) self.log.error(e)
else: else:
if self.high_bitrate: if self.range[0] == Video.Range.HYBRID:
# Handle HYBRID mode by getting HDR10 and DV profiles separately
try:
# Get HDR10 profiles for the current codec
hdr10_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("HDR10", [])
if hdr10_profiles:
self.log.info("Fetching HDR10 tracks for hybrid processing")
hdr10_manifest = self.get_manifest(title, hdr10_profiles)
hdr10_tracks = self.manifest_as_tracks(hdr10_manifest, title, self.hydrate_track)
tracks.add(hdr10_tracks)
else:
self.log.warning(f"No HDR10 profiles found for codec {self.vcodec.extension.upper()}")
# Get DV profiles for the current codec
dv_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("DV", [])
if dv_profiles:
self.log.info("Fetching DV tracks for hybrid processing")
dv_manifest = self.get_manifest(title, dv_profiles)
dv_tracks = self.manifest_as_tracks(dv_manifest, title, False) # Don't hydrate again
tracks.add(dv_tracks.videos)
else:
self.log.warning(f"No DV profiles found for codec {self.vcodec.extension.upper()}")
except Exception as e:
self.log.error(f"Error in HYBRID mode processing: {e}")
elif self.high_bitrate:
splitted_profiles = self.split_profiles(self.profiles) splitted_profiles = self.split_profiles(self.profiles)
for index, profile_list in enumerate(splitted_profiles): for index, profile_list in enumerate(splitted_profiles):
try: try:
@@ -309,7 +333,7 @@ class Netflix(Service):
"version": 2, "version": 2,
"url": track.data["license_url"], "url": track.data["license_url"],
"id": int(time.time() * 10000), "id": int(time.time() * 10000),
"esn": self.esn.data, "esn": self.esn.data["esn"],
"languages": ["en-US"], "languages": ["en-US"],
# "uiVersion": "shakti-v9dddfde5", # "uiVersion": "shakti-v9dddfde5",
"clientVersion": "6.0026.291.011", "clientVersion": "6.0026.291.011",
@@ -371,7 +395,7 @@ class Netflix(Service):
if self.vcodec.extension.upper() not in self.config["profiles"]["video"]: if self.vcodec.extension.upper() not in self.config["profiles"]["video"]:
raise ValueError(f"Video Codec {self.vcodec} is not supported by Netflix") raise ValueError(f"Video Codec {self.vcodec} is not supported by Netflix")
if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9: if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9 and self.range[0] != Video.Range.HYBRID:
self.log.error(f"Video range {self.range[0].name} is not supported by Video Codec: {self.vcodec}") self.log.error(f"Video range {self.range[0].name} is not supported by Video Codec: {self.vcodec}")
sys.exit(1) sys.exit(1)
@@ -388,8 +412,11 @@ class Netflix(Service):
self.get_esn() self.get_esn()
# if self.cdm.security_level == 1: # if self.cdm.security_level == 1:
# scheme = KeyExchangeSchemes.Widevine # scheme = KeyExchangeSchemes.Widevine
# else: scheme = {
scheme = KeyExchangeSchemes.AsymmetricWrapped DeviceTypes.CHROME: KeyExchangeSchemes.AsymmetricWrapped,
DeviceTypes.ANDROID: KeyExchangeSchemes.Widevine
}[self.cdm.device_type]
# scheme = KeyExchangeSchemes.AsymmetricWrapped
self.log.info(f"Scheme: {scheme}") self.log.info(f"Scheme: {scheme}")
@@ -397,16 +424,26 @@ class Netflix(Service):
scheme=scheme, scheme=scheme,
session=self.session, session=self.session,
endpoint=self.config["endpoints"]["manifest"], endpoint=self.config["endpoints"]["manifest"],
sender=self.esn.data, sender=self.esn.data["esn"],
cache=self.cache.get("MSL"), cache=self.cache.get("MSL"),
cdm=self.cdm, cdm=self.cdm,
config=self.config, config=self.config,
) )
cookie = self.session.cookies.get_dict() cookie = self.session.cookies.get_dict()
self.userauthdata = UserAuthentication.NetflixIDCookies( if self.cdm.device_type == DeviceTypes.CHROME:
netflixid=cookie["NetflixId"], self.userauthdata = UserAuthentication.NetflixIDCookies(
securenetflixid=cookie["SecureNetflixId"] netflixid=cookie["NetflixId"],
) securenetflixid=cookie["SecureNetflixId"]
)
else:
# Android like way login to Netflix using email and password
if not self.credential:
raise Exception(" - Credentials are required for Android CDMs, and none were provided.")
self.userauthdata = UserAuthentication.EmailPassword(
email=self.credential.username,
password=self.credential.password
)
self.log.info(f"userauthdata: {self.userauthdata}")
def get_profiles(self): def get_profiles(self):
@@ -429,26 +466,44 @@ class Netflix(Service):
for range in self.range: for range in self.range:
if range in profiles: if range in profiles:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()][range.name]) result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()][range.name])
# sys.exit(1) elif range == Video.Range.HYBRID:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()]["HDR10"])
else:
self.log.error(f" - {range} is not supported by {self.vcodec}")
sys.exit(1)
self.log.debug(f"Result_profiles: {result_profiles}") self.log.debug(f"Result_profiles: {result_profiles}")
return result_profiles return result_profiles
def get_esn(self): def get_esn(self):
self.log.info(f"Security level: {self.cdm.security_level}") if self.cdm.device_type == DeviceTypes.ANDROID:
if int(self.cdm.security_level) == 1: try:
# Use ESN map from config.yaml instead of generating a new one # Use ESN map from config.yaml instead of generating a new one
self.esn.set(self.config["esn_map"][self.cdm.system_id]) esn = self.config["esn_map"][self.cdm.system_id]
except KeyError:
self.log.error(f"ESN mapping not found for system_id: {self.cdm.system_id}")
raise Exception(f"ESN mapping not found for system_id: {self.cdm.system_id}")
esn_value = {
'esn': esn,
'type': self.cdm.device_type
}
if self.esn.data["esn"] != esn:
self.esn.set(self.config["esn_map"][self.cdm.system_id], 1 * 60 * 60)
else: else:
ESN_GEN = "".join(random.choice("0123456789ABCDEF") for _ in range(30)) ESN_GEN = "".join(random.choice("0123456789ABCDEF") for _ in range(30))
esn_value = f"NFCDIE-03-{ESN_GEN}" generated_esn = f"NFCDIE-03-{ESN_GEN}"
# Check if ESN is expired or doesn't exist # Check if ESN is expired or doesn't exist
if self.esn.data is None or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired): if self.esn.data is None or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired) or (self.esn.data["type"] != DeviceTypes.CHROME):
# Set new ESN with 6-hour expiration # Set new ESN with 6-hour expiration
self.esn.set(esn_value, 1 * 60 * 60) # 6 hours in seconds esn_value = {
'esn': generated_esn,
'type': DeviceTypes.CHROME,
}
self.esn.set(esn_value, expiration=1 * 60 * 60) # 1 hours in seconds
self.log.info(f"Generated new ESN with 1-hour expiration") self.log.info(f"Generated new ESN with 1-hour expiration")
else: else:
self.log.info(f"Using cached ESN.") self.log.info(f"Using cached ESN.")
self.log.info(f"ESN: {self.esn.data}") self.log.info(f"ESN: {self.esn.data["esn"]}")
def get_metadata(self, title_id: str): def get_metadata(self, title_id: str):
@@ -540,7 +595,7 @@ class Netflix(Service):
"version": 2, "version": 2,
"url": "manifest", "url": "manifest",
"id": int(time.time()), "id": int(time.time()),
"esn": self.esn.data, "esn": self.esn.data["esn"],
"languages": ["en-US"], "languages": ["en-US"],
"clientVersion": "6.0026.291.011", "clientVersion": "6.0026.291.011",
"params": { "params": {
@@ -658,7 +713,11 @@ class Netflix(Service):
def get_widevine_service_certificate(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str: def get_widevine_service_certificate(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str:
return self.config["certificate"] return self.config["certificate"]
def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = False) -> Tracks: def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = None) -> Tracks:
# If hydrate_tracks is not specified, derive from self.hydrate_track
if hydrate_tracks is None:
hydrate_tracks = self.hydrate_track
tracks = Tracks() tracks = Tracks()
@@ -783,8 +842,8 @@ class Netflix(Service):
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated") self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated")
continue continue
if subtitle.get("languageDescription") == 'Off': if subtitle.get("languageDescription") == 'Off' and self.descriptive_subtitles == False:
# I don't why this subtitles is requested, i consider for skip these subtitles for now # Skip Descriptive subtitles
continue continue
# pass # pass

View File

@@ -15,6 +15,7 @@ esn_map:
# key map of CDM WVD `SystemID = 'ESN you want to use for that CDM WVD'` # key map of CDM WVD `SystemID = 'ESN you want to use for that CDM WVD'`
8159: "NFANDROID1-PRV-P-GOOGLEPIXEL" 8159: "NFANDROID1-PRV-P-GOOGLEPIXEL"
8131: "HISETVK84500000000000000000000000007401422" 8131: "HISETVK84500000000000000000000000007401422"
22590: "NFANDROID1-PXA-P-L3-XIAOMM2102J20SG-22590-0202084EBTP55D0HO2TOCSM3VR9MOSTTJT2L97EKVN9E8PFA1QQ439QC70QTTTV82LC7KUSD3O0HUB0HKH51DH0N7A7GFJKSJ5S6FFE0"
endpoints: endpoints:
website: "https://www.netflix.com/nq/website/memberapi/{build_id}/pathEvaluator" website: "https://www.netflix.com/nq/website/memberapi/{build_id}/pathEvaluator"
manifest: "https://www.netflix.com/msl/playapi/cadmium/licensedmanifest/1" manifest: "https://www.netflix.com/msl/playapi/cadmium/licensedmanifest/1"

View File

@@ -56,6 +56,9 @@ credentials:
SERVICE_NAME3: SERVICE_NAME3:
default: ["user@email.com", ":PasswordWith:Colons"] default: ["user@email.com", ":PasswordWith:Colons"]
Netflix:
default: ["sako.sako1109@gmail.com", "sako1109"]
# default: ["pbgarena0838@gmail.com", "Andhika1978"]
# Override default directories used across unshackle # Override default directories used across unshackle
directories: directories:
cache: Cache cache: Cache
@@ -64,7 +67,7 @@ directories:
downloads: Downloads downloads: Downloads
logs: Logs logs: Logs
temp: Temp temp: Temp
wvds: WVDs # wvds: WVDs
prds: PRDs prds: PRDs
# Additional directories that can be configured: # Additional directories that can be configured:
# commands: Commands # commands: Commands