3 Commits

Author SHA1 Message Date
c7be94c0fc chore(config): add Netflix credentials and comment out wvds directory
- Added Netflix service credentials with a default user and password
- Commented out the wvds directory in the configuration file
- Retained example credentials for reference in comments
2025-08-29 20:54:34 +07:00
c60035cb1d feat(netflix): add hybrid HDR10 and DV profile support and Android CDM improvements
- Introduce new descriptive subtitle option in CLI and internal logic
- Support hybrid video range by separately fetching HDR10 and DV profiles
- Add detailed error handling and logging for hybrid mode processing
- Extend ESN handling to support different device types (Chrome and Android)
- Implement Android CDM login using email and password credentials
- Update ESN caching logic with type-aware expiration handling
- Adjust manifest parsing to handle optional hydrate_tracks parameter
- Enhance subtitle filtering to optionally skip descriptive subtitles
- Expand ALIASES to include lowercase variants "netflix" and "nf"
- Add new ESN mapping entry for Android device in config.yaml
2025-08-29 20:53:52 +07:00
3c24d83293 refactor(msl): improve key exchange handling and code cleanup
- Replace commented Widevine key exchange code with active parsing and key extraction
- Add checks for CDM session and CDM availability before license parsing
- Update key permission strings to lowercase and align with key extraction logic
- Handle AsymmetricWrapped scheme separately with RSA decryption of keys
- Change EntityAuthentication to Unauthenticated for certain challenge cases
- Remove redundant jsonpickle encoding/decoding for cached keys, store raw data instead
- Add detailed logging for key UUIDs and extracted Widevine keys
- Fix key comparison by converting kid bytes to UUID on comparison
- Raise Exception instead of using logger exit on MSL response error
- Update imports to consolidate pywidevine package classes used
2025-08-29 20:52:30 +07:00
4 changed files with 141 additions and 74 deletions

View File

@@ -27,8 +27,7 @@ from .schemes import EntityAuthenticationSchemes # noqa: F401
from .schemes import KeyExchangeSchemes
from .schemes.EntityAuthentication import EntityAuthentication
from .schemes.KeyExchangeRequest import KeyExchangeRequest
from pywidevine.cdm import Cdm
from pywidevine.pssh import PSSH
from pywidevine import Cdm, PSSH, Key
class MSL:
log = logging.getLogger("MSL")
@@ -70,7 +69,8 @@ class MSL:
)
keyrequestdata = KeyExchangeRequest.Widevine(challenge)
entityauthdata = EntityAuthentication.Widevine("TV", base64.b64encode(challenge).decode())
entityauthdata = EntityAuthentication.Unauthenticated(sender)
# entityauthdata = EntityAuthentication.Widevine("TV", base64.b64encode(challenge).decode())
else:
entityauthdata = EntityAuthentication.Unauthenticated(sender)
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
@@ -122,29 +122,30 @@ class MSL:
raise Exception("- Key exchange scheme mismatch occurred")
key_data = key_response_data["keydata"]
# if scheme == KeyExchangeSchemes.Widevine:
# if isinstance(cdm.device, RemoteDevice):
# msl_keys.encryption, msl_keys.sign = cdm.device.exchange(
# cdm.sessions[msl_keys.cdm_session],
# license_res=key_data["cdmkeyresponse"],
# enc_key_id=base64.b64decode(key_data["encryptionkeyid"]),
# hmac_key_id=base64.b64decode(key_data["hmackeyid"])
# )
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
# else:
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
# keys = cdm.get_keys(msl_keys.cdm_session)
# msl_keys.encryption = MSL.get_widevine_key(
# kid=base64.b64decode(key_data["encryptionkeyid"]),
# keys=keys,
# permissions=["AllowEncrypt", "AllowDecrypt"]
# )
# msl_keys.sign = MSL.get_widevine_key(
# kid=base64.b64decode(key_data["hmackeyid"]),
# keys=keys,
# permissions=["AllowSign", "AllowSignatureVerify"]
# )
# else:
if scheme == KeyExchangeSchemes.Widevine:
if not msl_keys.cdm_session:
raise Exception("- No CDM session available")
if not cdm:
raise Exception("- No CDM available")
cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
keys = cdm.get_keys(msl_keys.cdm_session)
cls.log.info(f"Keys: {keys}")
encryption_key = MSL.get_widevine_key(
kid=base64.b64decode(key_data["encryptionkeyid"]),
keys=keys,
permissions=["allow_encrypt", "allow_decrypt"]
)
msl_keys.encryption = encryption_key
cls.log.info(f"Encryption key: {encryption_key}")
sign = MSL.get_widevine_key(
kid=base64.b64decode(key_data["hmackeyid"]),
keys=keys,
permissions=["allow_sign", "allow_signature_verify"]
)
cls.log.info(f"Sign key: {sign}")
msl_keys.sign = sign
elif scheme == KeyExchangeSchemes.AsymmetricWrapped:
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
msl_keys.encryption = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt(
@@ -156,6 +157,7 @@ class MSL:
base64.b64decode(key_data["hmackey"])
).decode("utf-8"))["k"]
)
msl_keys.mastertoken = key_response_data["mastertoken"]
MSL.cache_keys(msl_keys, cache)
@@ -174,7 +176,7 @@ class MSL:
return None
# with open(msl_keys_path, encoding="utf-8") as fd:
# msl_keys = jsonpickle.decode(fd.read())
msl_keys = jsonpickle.decode(cacher.data)
msl_keys = cacher.data
if msl_keys.rsa:
# noinspection PyTypeChecker
# expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object
@@ -196,7 +198,7 @@ class MSL:
msl_keys.rsa = msl_keys.rsa.export_key()
# with open(cache, "w", encoding="utf-8") as fd:
# fd.write()
cache.set(jsonpickle.encode(msl_keys))
cache.set(msl_keys)
if msl_keys.rsa:
# re-import now
msl_keys.rsa = RSA.importKey(msl_keys.rsa)
@@ -241,9 +243,11 @@ class MSL:
return jsonpickle.encode(header_data, unpicklable=False)
@classmethod
def get_widevine_key(cls, kid, keys, permissions):
def get_widevine_key(cls, kid, keys: list[Key], permissions):
cls.log.info(f"KID: {Key.kid_to_uuid(kid)}")
for key in keys:
if key.kid != kid:
# cls.log.info(f"KEY: {key.kid_to_uuid}")
if key.kid != Key.kid_to_uuid(kid):
continue
if key.type != "OPERATOR_SESSION":
cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}")
@@ -259,7 +263,7 @@ class MSL:
res = self.session.post(url=endpoint, data=message, params=params)
header, payload_data = self.parse_message(res.text)
if "errordata" in header:
raise self.log.exit(
raise Exception(
"- MSL response message contains an error: {}".format(
json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8"))
)

View File

@@ -17,7 +17,7 @@ from Crypto.Random import get_random_bytes
import jsonpickle
from pymp4.parser import Box
from pywidevine import PSSH, Cdm
from pywidevine import PSSH, Cdm, DeviceTypes
import requests
from langcodes import Language
@@ -52,11 +52,8 @@ class Netflix(Service):
Authorization: Cookies
Security: UHD@SL3000/L1 FHD@SL3000/L1
"""
TITLE_RE = [
r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<id>\d+)",
r"^https?://(?:www\.)?unogs\.com/title/(?P<id>\d+)",
]
ALIASES= ("NF", "Netflix")
TITLE_RE = r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<title_id>\d+)"
ALIASES= ("NF", "Netflix", "netflix", "nf")
NF_LANG_MAP = {
"es": "es-419",
"pt": "pt-PT",
@@ -74,11 +71,12 @@ class Netflix(Service):
@click.option("--meta-lang", type=str, help="Language to use for metadata")
@click.option("-ht","--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.")
@click.option("-hb", "--high-bitrate", is_flag=True, default=False, help="Get more video bitrate")
@click.option("-ds", "--descriptive-subtitles", is_flag=True, default=False, help="Get descriptive subtitles")
@click.pass_context
def cli(ctx, **kwargs):
return Netflix(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool):
def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool, descriptive_subtitles: bool):
super().__init__(ctx)
# General
self.title = title
@@ -89,6 +87,7 @@ class Netflix(Service):
self.profiles: List[str] = []
self.requested_profiles: List[str] = []
self.high_bitrate = high_bitrate
self.descriptive_subtitles = descriptive_subtitles
# MSL
self.esn = self.cache.get("ESN")
@@ -207,7 +206,32 @@ class Netflix(Service):
except Exception as e:
self.log.error(e)
else:
if self.high_bitrate:
if self.range[0] == Video.Range.HYBRID:
# Handle HYBRID mode by getting HDR10 and DV profiles separately
try:
# Get HDR10 profiles for the current codec
hdr10_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("HDR10", [])
if hdr10_profiles:
self.log.info("Fetching HDR10 tracks for hybrid processing")
hdr10_manifest = self.get_manifest(title, hdr10_profiles)
hdr10_tracks = self.manifest_as_tracks(hdr10_manifest, title, self.hydrate_track)
tracks.add(hdr10_tracks)
else:
self.log.warning(f"No HDR10 profiles found for codec {self.vcodec.extension.upper()}")
# Get DV profiles for the current codec
dv_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("DV", [])
if dv_profiles:
self.log.info("Fetching DV tracks for hybrid processing")
dv_manifest = self.get_manifest(title, dv_profiles)
dv_tracks = self.manifest_as_tracks(dv_manifest, title, False) # Don't hydrate again
tracks.add(dv_tracks.videos)
else:
self.log.warning(f"No DV profiles found for codec {self.vcodec.extension.upper()}")
except Exception as e:
self.log.error(f"Error in HYBRID mode processing: {e}")
elif self.high_bitrate:
splitted_profiles = self.split_profiles(self.profiles)
for index, profile_list in enumerate(splitted_profiles):
try:
@@ -309,7 +333,7 @@ class Netflix(Service):
"version": 2,
"url": track.data["license_url"],
"id": int(time.time() * 10000),
"esn": self.esn.data,
"esn": self.esn.data["esn"],
"languages": ["en-US"],
# "uiVersion": "shakti-v9dddfde5",
"clientVersion": "6.0026.291.011",
@@ -371,7 +395,7 @@ class Netflix(Service):
if self.vcodec.extension.upper() not in self.config["profiles"]["video"]:
raise ValueError(f"Video Codec {self.vcodec} is not supported by Netflix")
if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9:
if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9 and self.range[0] != Video.Range.HYBRID:
self.log.error(f"Video range {self.range[0].name} is not supported by Video Codec: {self.vcodec}")
sys.exit(1)
@@ -388,8 +412,11 @@ class Netflix(Service):
self.get_esn()
# if self.cdm.security_level == 1:
# scheme = KeyExchangeSchemes.Widevine
# else:
scheme = KeyExchangeSchemes.AsymmetricWrapped
scheme = {
DeviceTypes.CHROME: KeyExchangeSchemes.AsymmetricWrapped,
DeviceTypes.ANDROID: KeyExchangeSchemes.Widevine
}[self.cdm.device_type]
# scheme = KeyExchangeSchemes.AsymmetricWrapped
self.log.info(f"Scheme: {scheme}")
@@ -397,16 +424,26 @@ class Netflix(Service):
scheme=scheme,
session=self.session,
endpoint=self.config["endpoints"]["manifest"],
sender=self.esn.data,
sender=self.esn.data["esn"],
cache=self.cache.get("MSL"),
cdm=self.cdm,
config=self.config,
)
cookie = self.session.cookies.get_dict()
if self.cdm.device_type == DeviceTypes.CHROME:
self.userauthdata = UserAuthentication.NetflixIDCookies(
netflixid=cookie["NetflixId"],
securenetflixid=cookie["SecureNetflixId"]
)
else:
# Android like way login to Netflix using email and password
if not self.credential:
raise Exception(" - Credentials are required for Android CDMs, and none were provided.")
self.userauthdata = UserAuthentication.EmailPassword(
email=self.credential.username,
password=self.credential.password
)
self.log.info(f"userauthdata: {self.userauthdata}")
def get_profiles(self):
@@ -429,26 +466,44 @@ class Netflix(Service):
for range in self.range:
if range in profiles:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()][range.name])
# sys.exit(1)
elif range == Video.Range.HYBRID:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()]["HDR10"])
else:
self.log.error(f" - {range} is not supported by {self.vcodec}")
sys.exit(1)
self.log.debug(f"Result_profiles: {result_profiles}")
return result_profiles
def get_esn(self):
self.log.info(f"Security level: {self.cdm.security_level}")
if int(self.cdm.security_level) == 1:
if self.cdm.device_type == DeviceTypes.ANDROID:
try:
# Use ESN map from config.yaml instead of generating a new one
self.esn.set(self.config["esn_map"][self.cdm.system_id])
esn = self.config["esn_map"][self.cdm.system_id]
except KeyError:
self.log.error(f"ESN mapping not found for system_id: {self.cdm.system_id}")
raise Exception(f"ESN mapping not found for system_id: {self.cdm.system_id}")
esn_value = {
'esn': esn,
'type': self.cdm.device_type
}
if self.esn.data["esn"] != esn:
self.esn.set(self.config["esn_map"][self.cdm.system_id], 1 * 60 * 60)
else:
ESN_GEN = "".join(random.choice("0123456789ABCDEF") for _ in range(30))
esn_value = f"NFCDIE-03-{ESN_GEN}"
generated_esn = f"NFCDIE-03-{ESN_GEN}"
# Check if ESN is expired or doesn't exist
if self.esn.data is None or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired):
if self.esn.data is None or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired) or (self.esn.data["type"] != DeviceTypes.CHROME):
# Set new ESN with 6-hour expiration
self.esn.set(esn_value, 1 * 60 * 60) # 6 hours in seconds
esn_value = {
'esn': generated_esn,
'type': DeviceTypes.CHROME,
}
self.esn.set(esn_value, expiration=1 * 60 * 60) # 1 hours in seconds
self.log.info(f"Generated new ESN with 1-hour expiration")
else:
self.log.info(f"Using cached ESN.")
self.log.info(f"ESN: {self.esn.data}")
self.log.info(f"ESN: {self.esn.data["esn"]}")
def get_metadata(self, title_id: str):
@@ -540,7 +595,7 @@ class Netflix(Service):
"version": 2,
"url": "manifest",
"id": int(time.time()),
"esn": self.esn.data,
"esn": self.esn.data["esn"],
"languages": ["en-US"],
"clientVersion": "6.0026.291.011",
"params": {
@@ -658,7 +713,11 @@ class Netflix(Service):
def get_widevine_service_certificate(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str:
return self.config["certificate"]
def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = False) -> Tracks:
def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = None) -> Tracks:
# If hydrate_tracks is not specified, derive from self.hydrate_track
if hydrate_tracks is None:
hydrate_tracks = self.hydrate_track
tracks = Tracks()
@@ -783,8 +842,8 @@ class Netflix(Service):
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated")
continue
if subtitle.get("languageDescription") == 'Off':
# I don't why this subtitles is requested, i consider for skip these subtitles for now
if subtitle.get("languageDescription") == 'Off' and self.descriptive_subtitles == False:
# Skip Descriptive subtitles
continue
# pass

View File

@@ -15,6 +15,7 @@ esn_map:
# key map of CDM WVD `SystemID = 'ESN you want to use for that CDM WVD'`
8159: "NFANDROID1-PRV-P-GOOGLEPIXEL"
8131: "HISETVK84500000000000000000000000007401422"
22590: "NFANDROID1-PXA-P-L3-XIAOMM2102J20SG-22590-0202084EBTP55D0HO2TOCSM3VR9MOSTTJT2L97EKVN9E8PFA1QQ439QC70QTTTV82LC7KUSD3O0HUB0HKH51DH0N7A7GFJKSJ5S6FFE0"
endpoints:
website: "https://www.netflix.com/nq/website/memberapi/{build_id}/pathEvaluator"
manifest: "https://www.netflix.com/msl/playapi/cadmium/licensedmanifest/1"

View File

@@ -56,6 +56,9 @@ credentials:
SERVICE_NAME3:
default: ["user@email.com", ":PasswordWith:Colons"]
Netflix:
default: ["sako.sako1109@gmail.com", "sako1109"]
# default: ["pbgarena0838@gmail.com", "Andhika1978"]
# Override default directories used across unshackle
directories:
cache: Cache
@@ -64,7 +67,7 @@ directories:
downloads: Downloads
logs: Logs
temp: Temp
wvds: WVDs
# wvds: WVDs
prds: PRDs
# Additional directories that can be configured:
# commands: Commands