Files
unshackle/unshackle/services/Netflix/__init__.py
kenzuya 1244141df2 fix(netflix): align MSL manifest payload with Chrome Widevine
Update Netflix manifest request construction to better match current
Widevine-on-Chrome behavior by:
- setting top-level and param `clientVersion` to `9999999`
- sending `challenge` only for Chrome Widevine requests
- removing hardcoded device/platform fields from params

Also refresh Android TV ESN mappings in config by replacing ESN `7110`
and adding ESN `16401` for Hisense devices to improve request validity.
2026-03-10 12:45:59 +07:00

1398 lines
73 KiB
Python

import base64
from datetime import datetime
import json
from math import e
import random
import sys
import time
import typing
from uuid import UUID
import click
import re
from typing import List, Literal, Optional, Set, Union, Tuple
from http.cookiejar import CookieJar
from itertools import zip_longest
from Crypto.Random import get_random_bytes
import jsonpickle
from pymp4.parser import Box
from pywidevine import PSSH, Cdm as WidevineCDM, DeviceTypes
from pyplayready import PSSH as PlayReadyPSSH
import requests
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.drm import Widevine, PlayReady
from unshackle.core.service import Service
from unshackle.core.titles import Titles_T, Title_T
from unshackle.core.titles.episode import Episode, Series
from unshackle.core.titles.movie import Movie, Movies
from unshackle.core.titles.title import Title
from unshackle.core.tracks import Tracks, Chapters
from unshackle.core.tracks.audio import Audio
from unshackle.core.tracks.chapter import Chapter
from unshackle.core.tracks.subtitle import Subtitle
from unshackle.core.tracks.track import Track
from unshackle.core.tracks.video import Video
from unshackle.core.utils.collections import flatten, as_list
from unshackle.core.drm import DRM_T
from unshackle.core.tracks.attachment import Attachment
from unshackle.core.drm.playready import PlayReady
from unshackle.core.titles.song import Song
from unshackle.utils.base62 import decode
from .MSL import MSL, KeyExchangeSchemes
from .MSL.schemes.UserAuthentication import UserAuthentication
class Netflix(Service):
"""
Service for https://netflix.com
Version: 1.0.0
Authorization: Cookies
Security: UHD@SL3000/L1 FHD@SL3000/L1
"""
TITLE_RE = r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<title_id>\d+)"
ALIASES= ("NF", "Netflix", "netflix", "nf")
NF_LANG_MAP = {
"es": "es-419",
"pt": "pt-PT",
}
ANDROID_CONFIG_ENDPOINT = "https://android.prod.ftl.netflix.com/nq/androidui/samurai/v1/config"
@staticmethod
@click.command(name="Netflix", short_help="https://netflix.com")
@click.argument("title", type=str)
@click.option("-drm", "--drm-system", type=click.Choice(["widevine", "playready"], case_sensitive=False),
default="widevine",
help="which drm system to use")
@click.option("-p", "--profile", type=click.Choice(["MPL", "HPL", "QC", "MPL+HPL", "MPL+HPL+QC", "MPL+QC"], case_sensitive=False),
default=None,
help="H.264 profile to use. Default is best available.")
@click.option("--meta-lang", type=str, help="Language to use for metadata")
@click.option("-ht","--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.")
@click.option("-hb", "--high-bitrate", is_flag=True, default=False, help="Get more video bitrate")
@click.option("-ds", "--descriptive-subtitles", is_flag=True, default=False, help="Get descriptive subtitles")
@click.pass_context
def cli(ctx, **kwargs):
return Netflix(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool, descriptive_subtitles: bool):
super().__init__(ctx)
# General
self.title = title
self.profile = profile
self.meta_lang = meta_lang
self.hydrate_track = hydrate_track
self.drm_system: Literal["widevine", "playready"] = drm_system
self.profiles: List[str] = []
self.requested_profiles: List[str] = []
self.high_bitrate = high_bitrate
self.descriptive_subtitles = descriptive_subtitles
# MSL
self.esn = self.cache.get("ESN")
self.msl: Optional[MSL] = None
self.userauthdata = None
# Download options
self.range = ctx.parent.params.get("range_") or [Video.Range.SDR]
self.vcodec = ctx.parent.params.get("vcodec") or Video.Codec.AVC # Defaults to H264
self.acodec : Audio.Codec = ctx.parent.params.get("acodec") or Audio.Codec.EC3
self.quality: List[int] = ctx.parent.params.get("quality")
self.audio_only = ctx.parent.params.get("audio_only")
self.subs_only = ctx.parent.params.get("subs_only")
self.chapters_only = ctx.parent.params.get("chapters_only")
# Inherited from unshackle
self.cdm: Cdm = ctx.obj.cdm
# self.ctx = ctx
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
# Configure first before download
self.log.debug("Authenticating Netflix service")
auth = super().authenticate(cookies, credential)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.configure()
return auth
def get_titles(self) -> Titles_T:
metadata = self.get_metadata(self.title)
if "video" not in metadata:
self.log.error(f"Failed to get metadata: {metadata}")
sys.exit(1)
titles: Titles_T | None = None
if metadata["video"]["type"] == "movie":
movie = Movie(
id_=self.title,
name=metadata["video"]["title"],
year=metadata["video"]["year"],
# language=self.get_original_language(self.get_manifest()),
service=self.__class__,
data=metadata["video"],
description=metadata["video"]["synopsis"]
)
movie.language = self.get_original_language(self.get_manifest(movie, self.profiles))
titles = Movies([
movie
])
else:
# self.log.warning(f"Metadata: {jsonpickle.encode(metadata, indent=2)}")
# print(metadata)
episode_list: List[Episode] = []
for season in metadata["video"]["seasons"]:
for episodes in season["episodes"]:
episode = Episode(
id_=self.title,
title=metadata["video"]["title"],
year=season["year"],
service=self.__class__,
season=season["seq"],
number=episodes["seq"],
name=episodes["title"],
data=episodes,
description=episodes["synopsis"],
)
try:
episode.language = self.get_original_language(self.get_manifest(episode, self.profiles))
self.log.debug(f"Episode S{episode.season:02d}E{episode.number:02d}: {episode.language}")
except Exception as e:
self.log.warning(f"Failed to get original language for episode S{season['seq']:02d}E{episodes['seq']:02d}: {e}")
# Fallback: try to get the original language from the first episode that worked
# or default to English if none worked
if episode_list and hasattr(episode_list[0], 'language') and episode_list[0].language:
episode.language = episode_list[0].language
else:
episode.language = Language.get("en")
self.log.info(f"Using fallback language for episode: {episode.language}")
episode_list.append(
episode
)
titles = Series(episode_list)
return titles
def get_tracks(self, title: Title_T) -> Tracks:
tracks = Tracks()
# If Video Codec is H.264 is selected but `self.profile is none` profile QC has to be requested seperately
if self.vcodec == Video.Codec.AVC:
# self.log.info(f"Profile: {self.profile}")
try:
manifest = self.get_manifest(title, self.profiles)
movie_track = self.manifest_as_tracks(manifest, title, self.hydrate_track)
tracks.add(movie_track)
if self.profile is not None:
self.log.info(f"Requested profiles: {self.profile}")
else:
requested_qualities = self.quality or []
qc_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]["QC"]
use_720_qc_only = len(requested_qualities) == 1 and 720 in requested_qualities
if use_720_qc_only:
qc_profiles = [x for x in qc_profiles if "l40" not in x]
qc_manifest = self.get_manifest(title, qc_profiles)
qc_tracks = self.manifest_as_tracks(qc_manifest, title, False)
tracks.add(qc_tracks.videos)
mpl_manifest = self.get_manifest(title, [x for x in self.config["profiles"]["video"][self.vcodec.extension.upper()]["MPL"] if "l40" not in x])
mpl_tracks = self.manifest_as_tracks(mpl_manifest, title, False)
tracks.add(mpl_tracks.videos)
except Exception as e:
self.log.error(e)
else:
# Handle multiple video ranges
for range_index, video_range in enumerate(self.range):
try:
# Only hydrate tracks on the first range to avoid duplicates
should_hydrate = self.hydrate_track and range_index == 0
if video_range == Video.Range.HYBRID:
# Handle HYBRID mode by getting HDR10 and DV profiles separately
# Get HDR10 profiles for the current codec
hdr10_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("HDR10", [])
if hdr10_profiles:
self.log.info(f"Fetching HDR10 tracks for HYBRID processing (range {range_index + 1}/{len(self.range)})")
hdr10_manifest = self.get_manifest(title, hdr10_profiles)
hdr10_tracks = self.manifest_as_tracks(hdr10_manifest, title, should_hydrate)
tracks.add(hdr10_tracks)
else:
self.log.warning(f"No HDR10 profiles found for codec {self.vcodec.extension.upper()}")
# Get DV profiles for the current codec
dv_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("DV", [])
if dv_profiles:
self.log.info(f"Fetching DV tracks for HYBRID processing (range {range_index + 1}/{len(self.range)})")
dv_manifest = self.get_manifest(title, dv_profiles)
dv_tracks = self.manifest_as_tracks(dv_manifest, title, False) # Don't hydrate DV tracks
tracks.add(dv_tracks.videos)
else:
self.log.warning(f"No DV profiles found for codec {self.vcodec.extension.upper()}")
elif self.high_bitrate:
# Get profiles for the current range
range_profiles = self.get_profiles_for_range(video_range)
if not range_profiles:
self.log.warning(f"No profiles found for range {video_range.name}")
continue
splitted_profiles = self.split_profiles(range_profiles)
for profile_index, profile_list in enumerate(splitted_profiles):
try:
self.log.debug(f"Range {range_index + 1}/{len(self.range)} ({video_range.name}), Profile Index: {profile_index}. Getting profiles: {profile_list}")
manifest = self.get_manifest(title, profile_list)
manifest_tracks = self.manifest_as_tracks(manifest, title, should_hydrate and profile_index == 0)
if should_hydrate and profile_index == 0:
tracks.add(manifest_tracks) # Add all tracks (video, audio, subtitles) on first hydrated profile
else:
tracks.add(manifest_tracks.videos) # Add only videos for additional profiles
except Exception:
self.log.error(f"Error getting profile: {profile_list} for range {video_range.name}. Skipping")
continue
else:
# Get profiles for the current range
range_profiles = self.get_profiles_for_range(video_range)
if not range_profiles:
self.log.warning(f"No profiles found for range {video_range.name}")
continue
self.log.info(f"Processing range {range_index + 1}/{len(self.range)}: {video_range.name}")
manifest = self.get_manifest(title, range_profiles)
manifest_tracks = self.manifest_as_tracks(manifest, title, should_hydrate)
if should_hydrate:
tracks.add(manifest_tracks) # Add all tracks (video, audio, subtitles) when hydrating
elif range_index == 0:
tracks.add(manifest_tracks) # Add all tracks on first range even without hydration
else:
tracks.add(manifest_tracks.videos) # Add only videos for additional ranges
except Exception as e:
self.log.error(f"Error processing range {video_range.name}: {e}")
continue
# Add Attachments for profile picture
if isinstance(title, Movie):
if title.data and "boxart" in title.data and title.data["boxart"]:
tracks.add(
Attachment.from_url(
url=title.data["boxart"][0]["url"],
name=f"{title.name} ({title.year}) Poster"
)
)
else:
if title.data and "stills" in title.data and title.data["stills"]:
tracks.add(
Attachment.from_url(
url=title.data["stills"][0]["url"],
name=f"{title.title} S{title.season:02d}E{title.number:02d}{' - ' + title.name if title.name else ''} Poster"
)
)
return tracks
def split_profiles(self, profiles: List[str]) -> List[List[str]]:
"""
Split profiles with names containing specific patterns based on video codec
For H264: uses patterns "l30", "l31", "l40" (lowercase)
For non-H264: uses patterns "L30", "L31", "L40", "L41", "L50", "L51" (uppercase)
Returns List[List[str]] type with profiles grouped by pattern
"""
# Define the profile patterns to match based on video codec
if self.vcodec == Video.Codec.AVC: # H264
patterns = ["l30", "l31", "l40"]
else:
patterns = ["L30", "L31", "L40", "L41", "L50", "L51"]
# Group profiles by pattern
result: List[List[str]] = []
for pattern in patterns:
pattern_group = []
for profile in profiles:
if pattern in profile:
pattern_group.append(profile)
if pattern_group: # Only add non-empty groups
result.append(pattern_group)
return result
def get_chapters(self, title: Title_T) -> Chapters:
chapters: Chapters = Chapters()
if not title.data:
return chapters
try:
# self.log.info(f"Title data: {title.data}")
if "skipMarkers" in title.data and "credit" in title.data["skipMarkers"]:
credits = title.data["skipMarkers"]["credit"]
if credits.get("start", 0) > 0 and credits.get("end", 0) > 0:
chapters.add(Chapter(
timestamp=credits["start"], # Milliseconds
name="Intro"
))
chapters.add(
Chapter(
timestamp=credits["end"], # Milliseconds
)
)
if "creditsOffset" in title.data and title.data["creditsOffset"] is not None:
chapters.add(Chapter(
timestamp=float(title.data["creditsOffset"]), # this is seconds, needed to assign to float
name="Credits"
))
except Exception as e:
self.log.warning(f"Failed to process chapters: {e}")
return chapters
def get_widevine_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None:
if not self.msl:
self.log.error(f"MSL Client is not intialized!")
sys.exit(1)
application_data = {
"version": 2,
"url": track.data["license_url"],
"id": int(time.time() * 10000),
"esn": self.esn.data["esn"],
"languages": ["en-US"],
# "uiVersion": "shakti-v9dddfde5",
"clientVersion": "6.0026.291.011",
"params": [{
"sessionId": base64.b64encode(get_random_bytes(16)).decode("utf-8"),
"clientTime": int(time.time()),
"challengeBase64": base64.b64encode(challenge).decode("utf-8"),
"xid": str(int((int(time.time()) + 0.1612) * 1000)),
}],
"echo": "sessionId"
}
header, payload_data = self.msl.send_message(
endpoint=self.config["endpoints"]["license"],
params={
"reqAttempt": 1,
"reqName": "license",
},
application_data=application_data,
userauthdata=self.userauthdata
)
if not payload_data:
self.log.error(f" - Failed to get license: {header['message']} [{header['code']}]")
sys.exit(1)
if "error" in payload_data[0]:
error = payload_data[0]["error"]
error_display = error.get("display")
error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", ""))
if error_display:
self.log.critical(f" - {error_display}")
if error_detail:
self.log.critical(f" - {error_detail}")
if not (error_display or error_detail):
self.log.critical(f" - {error}")
sys.exit(1)
return payload_data[0]["licenseResponseBase64"]
def get_playready_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None:
return self.get_widevine_license(challenge=challenge, title=title, track=track)
# return super().get_widevine_license(challenge=challenge, title=title, track=track)
def configure(self):
# if profile is none from argument let's use them all profile in video codec scope
if self.profile is None:
self.profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
if self.profile is not None:
self.requested_profiles = self.profile.split('+')
self.log.info(f"Requested profile: {self.requested_profiles}")
else:
# self.log.info(f"Video Range: {self.range}")
self.requested_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
# Make sure video codec is supported by Netflix
if self.vcodec.extension.upper() not in self.config["profiles"]["video"]:
raise ValueError(f"Video Codec {self.vcodec} is not supported by Netflix")
if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.range[0] != Video.Range.HYBRID and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9:
self.log.error(f"Video range {self.range[0].name} is not supported by Video Codec: {self.vcodec}")
sys.exit(1)
# Validate all ranges are supported
for video_range in self.range:
if video_range.name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and video_range != Video.Range.HYBRID and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9:
self.log.error(f"Video range {video_range.name} is not supported by Video Codec: {self.vcodec}")
sys.exit(1)
if self.vcodec == Video.Codec.AVC:
for video_range in self.range:
if video_range != Video.Range.SDR:
self.log.error(f"H.264 Video Codec only supports SDR, but {video_range.name} was requested")
sys.exit(1)
self.profiles = self.get_profiles()
self.drm_system = self.get_drm_system()
# Log information about video ranges being processed
if len(self.range) > 1:
range_names = [r.name for r in self.range]
self.log.info(f"Processing multiple video ranges: {', '.join(range_names)}")
self.log.info("Intializing a MSL client")
self.get_esn()
# if self.cdm.security_level == 1:
# scheme = KeyExchangeSchemes.Widevine
scheme = {
DeviceTypes.CHROME: KeyExchangeSchemes.AsymmetricWrapped,
DeviceTypes.ANDROID: KeyExchangeSchemes.Widevine
}[self.cdm.device_type]
# scheme = KeyExchangeSchemes.AsymmetricWrapped
self.log.info(f"Scheme: {scheme}")
self.msl = MSL.handshake(
scheme=scheme,
session=self.session,
endpoint=self.config["endpoints"]["manifest"],
sender=self.esn.data["esn"],
cache=self.cache.get("MSL"),
cdm=self.cdm,
config=self.config,
)
cookie = self.session.cookies.get_dict()
if self.cdm.device_type == DeviceTypes.CHROME:
self.userauthdata = UserAuthentication.NetflixIDCookies(
netflixid=cookie["NetflixId"],
securenetflixid=cookie["SecureNetflixId"]
)
else:
if not self.credential:
raise click.ClickException("Android sign-in requires credentials.")
self.userauthdata = self.get_android_userauthdata()
def get_android_userauthdata(self) -> UserAuthentication:
token_cache = self.get_android_user_token_cache()
token_data = token_cache.data if token_cache and isinstance(token_cache.data, dict) else None
if not token_data or not token_data.get("tokendata") or not token_data.get("signature"):
self.log.info("Requesting Android useridtoken")
token_data = self.fetch_android_user_id_token()
token_cache.set(token_data, expiration=self.resolve_android_token_expiration(token_data))
else:
self.log.info("Using cached Android useridtoken")
return UserAuthentication.UserIDToken(
token_data=token_data["tokendata"],
signature=token_data["signature"],
master_token=self.msl.keys.mastertoken
)
def get_android_user_token_cache(self):
return self.cache.get(f"ANDROID_USER_ID_TOKEN/{self.credential.sha1}/{self.esn.data['esn']}")
def fetch_android_user_id_token(self) -> dict:
try:
header, payload_data = self.msl.send_message(
endpoint=self.ANDROID_CONFIG_ENDPOINT,
params=self.build_android_sign_in_query(),
application_data="",
headers=self.build_android_sign_in_headers(),
unwrap_result=False
)
except Exception as exc:
raise click.ClickException(f"Android sign-in request failed: {exc}") from exc
header_data = self.decrypt_android_header(header["headerdata"])
tokens = header_data.get("useridtoken")
if not tokens:
self.log.debug(f"Android sign-in header keys: {list(header_data.keys())}")
sign_in_value = self.extract_android_sign_in_value(payload_data)
error_code = self.extract_android_sign_in_error_code(sign_in_value)
if error_code:
raise click.ClickException(f"Android sign-in failed: {error_code}")
raise click.ClickException("Android sign-in did not return a useridtoken.")
return tokens
@staticmethod
def extract_android_sign_in_value(payload_data: dict) -> Optional[dict]:
if not isinstance(payload_data, dict):
return None
json_graph = payload_data.get("jsonGraph")
if not isinstance(json_graph, dict):
return None
sign_in_verify = json_graph.get("signInVerify")
if not isinstance(sign_in_verify, dict):
return None
value = sign_in_verify.get("value")
return value if isinstance(value, dict) else None
@staticmethod
def extract_android_sign_in_error_code(sign_in_value: Optional[dict]) -> Optional[str]:
if not isinstance(sign_in_value, dict):
return None
fields = sign_in_value.get("fields")
if not isinstance(fields, dict):
return None
error_code = fields.get("errorCode")
if not isinstance(error_code, dict):
return None
value = error_code.get("value")
return value if isinstance(value, str) and value else None
def build_android_sign_in_query(self) -> dict:
cookie = self.session.cookies.get_dict()
return {
"api": "33",
"appType": "samurai",
"appVer": "62902",
"appVersion": "9.18.0",
"chipset": "sm6150",
"chipsetHardware": "qcom",
"clientAppState": "FOREGROUND",
"clientAppVersionState": "NORMAL",
"countryCode": "+385",
"countryIsoCode": "HR",
"ctgr": "phone",
"dbg": "false",
"deviceLocale": "hr",
"devmod": "samsung_SM-A705FN",
"ffbc": "phone",
"flwssn": "c3100219-d002-40c5-80a7-055c00407246",
"installType": "regular",
"isAutomation": "false",
"isConsumptionOnly": "true",
"isNetflixPreloaded": "false",
"isPlayBillingEnabled": "true",
"isStubInSystemPartition": "false",
"lackLocale": "false",
"landingOrigin": "https://www.netflix.com",
"mId": "SAMSUSM-A705FNS",
"memLevel": "HIGH",
"method": "get",
"mnf": "samsung",
"model": "SM-A705FN",
"netflixClientPlatform": "androidNative",
"netflixId": cookie["NetflixId"],
"networkType": "wifi",
"osBoard": "sm6150",
"osDevice": "a70q",
"osDisplay": "TQ1A.230205.002",
"password": self.credential.password,
"path": "[\"signInVerify\"]",
"pathFormat": "hierarchical",
"platform": "android",
"preloadSignupRoValue": "",
"progressive": "false",
"qlty": "hd",
"recaptchaResponseTime": 244,
"recaptchaResponseToken": "",
"responseFormat": "json",
"roBspVer": "Q6150-17263-1",
"secureNetflixId": cookie["SecureNetflixId"],
"sid": "7176",
"store": "google",
"userLoginId": self.credential.username
}
def build_android_sign_in_headers(self) -> dict:
return {
"X-Netflix.Request.NqTracking": "VerifyLoginMslRequest",
"X-Netflix.Client.Request.Name": "VerifyLoginMslRequest",
"X-Netflix.Request.Client.Context": "{\"appState\":\"foreground\"}",
"X-Netflix-Esn": self.esn.data["esn"],
"X-Netflix.EsnPrefix": "NFANDROID1-PRV-P-",
"X-Netflix.msl-header-friendly-client": "true",
"content-encoding": "msl_v1"
}
def decrypt_android_header(self, encrypted_header_b64: str) -> dict:
encrypted_header = json.loads(base64.b64decode(encrypted_header_b64))
iv = base64.b64decode(encrypted_header["iv"])
ciphertext = base64.b64decode(encrypted_header["ciphertext"])
cipher = AES.new(self.msl.keys.encryption, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(ciphertext), AES.block_size)
return json.loads(decrypted.decode("utf-8"))
def resolve_android_token_expiration(self, token_data: dict):
for source in (token_data, self.msl.keys.mastertoken):
if not isinstance(source, dict):
continue
tokendata = source.get("tokendata")
if not tokendata:
continue
try:
parsed = json.loads(base64.b64decode(tokendata).decode("utf-8"))
except (TypeError, ValueError, json.JSONDecodeError):
continue
if parsed.get("expiration"):
return parsed["expiration"]
return None
def get_profiles(self):
result_profiles = []
if self.vcodec == Video.Codec.AVC:
if self.requested_profiles is not None:
for requested_profiles in self.requested_profiles:
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()][requested_profiles])))
return result_profiles
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())))
return result_profiles
# Handle case for codec VP9
if self.vcodec == Video.Codec.VP9 and self.range[0] != Video.Range.HDR10:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())
return result_profiles
for profiles in self.config["profiles"]["video"][self.vcodec.extension.upper()]:
for range in self.range:
if range.name in profiles:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()][range.name])
elif range == Video.Range.HYBRID:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()]["HDR10"])
self.log.debug(f"Result_profiles: {result_profiles}")
return result_profiles
def get_profiles_for_range(self, video_range: Video.Range) -> List[str]:
"""
Get profiles for a specific video range.
Args:
video_range: The video range to get profiles for
Returns:
List of profile strings for the specified range
"""
result_profiles = []
# Handle case for codec VP9
if self.vcodec == Video.Codec.VP9 and video_range != Video.Range.HDR10:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())
return result_profiles
# Get profiles for the specific range
codec_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
if video_range.name in codec_profiles:
result_profiles.extend(codec_profiles[video_range.name])
elif video_range == Video.Range.HYBRID:
# For hybrid, use HDR10 profiles
if "HDR10" in codec_profiles:
result_profiles.extend(codec_profiles["HDR10"])
else:
self.log.warning(f"No HDR10 profiles found for HYBRID range in codec {self.vcodec.extension.upper()}")
else:
self.log.warning(f"Range {video_range.name} not found in codec {self.vcodec.extension.upper()} profiles")
self.log.debug(f"Profiles for range {video_range.name}: {result_profiles}")
return result_profiles
def get_esn(self):
if self.cdm.device_type == DeviceTypes.ANDROID:
try:
# Use ESN map from config.yaml instead of generating a new one
esn = self.config["esn_map"][self.cdm.system_id]
except KeyError:
self.log.error(f"ESN mapping not found for system_id: {self.cdm.system_id}")
raise Exception(f"ESN mapping not found for system_id: {self.cdm.system_id}")
esn_value = {
'esn': esn,
'type': self.cdm.device_type
}
cached_esn = self.esn.data.get("esn") if isinstance(self.esn.data, dict) else self.esn.data
cached_type = self.esn.data.get("type") if isinstance(self.esn.data, dict) else None
if cached_esn != esn or cached_type != DeviceTypes.ANDROID or (hasattr(self.esn, "expired") and self.esn.expired):
self.esn.set(esn_value, expiration=1 * 60 * 60)
else:
ESN_GEN = "".join(random.choice("0123456789ABCDEF") for _ in range(30))
generated_esn = f"NFCDIE-03-{ESN_GEN}"
# Check if ESN is expired or doesn't exist
if not isinstance(self.esn.data, dict) or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired) or (self.esn.data.get("type") != DeviceTypes.CHROME):
# Set new ESN with 6-hour expiration
esn_value = {
'esn': generated_esn,
'type': DeviceTypes.CHROME,
}
self.esn.set(esn_value, expiration=1 * 60 * 60) # 1 hours in seconds
self.log.info(f"Generated new ESN with 1-hour expiration")
else:
self.log.info(f"Using cached ESN.")
final_esn = self.esn.data.get("esn") if isinstance(self.esn.data, dict) else self.esn.data
self.log.info(f"ESN: {final_esn}")
def get_metadata(self, title_id: str):
"""
Obtain Metadata information about a title by it's ID.
:param title_id: Title's ID.
:returns: Title Metadata.
"""
try:
metadata = self.session.get(
self.config["endpoints"]["metadata"].format(build_id="release"),
params={
"movieid": title_id,
"drmSystem": self.config["configuration"]["drm_system"],
"isWatchlistEnabled": False,
"isShortformEnabled": False,
"languages": self.meta_lang
}
).json()
except requests.HTTPError as e:
if e.response.status_code == 500:
self.log.warning(
" - Recieved a HTTP 500 error while getting metadata, deleting cached reactContext data"
)
# self.cache.
# os.unlink(self.get_cache("web_data.json"))
# return self.get_metadata(self, title_id)
raise Exception(f"Error getting metadata: {e}")
except json.JSONDecodeError:
self.log.error(" - Failed to get metadata, title might not be available in your region.")
sys.exit(1)
else:
if "status" in metadata and metadata["status"] == "error":
self.log.error(
f" - Failed to get metadata, cookies might be expired. ({metadata['message']})"
)
sys.exit(1)
return metadata
def _get_empty_manifest(self):
"""Return an empty manifest structure to prevent crashes when manifest retrieval fails"""
return {
"video_tracks": [{
"streams": [],
"drmHeader": {"bytes": b""}
}],
"audio_tracks": [],
"timedtexttracks": [],
"links": {
"license": {"href": ""}
}
}
def get_manifest(self, title: Title_T, video_profiles: List[str], required_text_track_id: Optional[str] = None, required_audio_track_id: Optional[str] = None):
try:
# Log context information for debugging
title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'
self.log.debug(f"Getting manifest for title_id: {title_id}, video_profiles_count: {len(video_profiles)}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}")
audio_profiles = self.config["profiles"]["audio"].values()
video_profiles = sorted(set(flatten(as_list(
video_profiles,
audio_profiles,
self.config["profiles"]["video"]["H264"]["BPL"] if self.vcodec == Video.Codec.AVC else [],
self.config["profiles"]["subtitles"],
))))
# self.log.debug("Profiles:\n\t" + "\n\t".join(video_profiles))
if not self.msl:
self.log.error(f"MSL Client is not initialized for title_id: {title_id}")
return self._get_empty_manifest()
params = {
"reqAttempt": 1,
"reqPriority": 10,
"reqName": "manifest",
}
# session_id = self.cdm.open()
# self.cdm.set_service_certificate(session_id, self.config["certificate"])
# challenge = self.cdm.get_license_challenge(session_id, PSSH("AAAANHBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAABQIARIQAAAAAAPSZ0kAAAAAAAAAAA=="))
_, payload_chunks = self.msl.send_message(
endpoint=self.config["endpoints"]["manifest"],
params=params,
application_data={
"version": 2,
"url": "manifest",
"id": int(time.time()),
"esn": self.esn.data["esn"],
"languages": ["en-US"],
"clientVersion": "9999999",
"params": {
"clientVersion": "9999999",
**({
"challenge": self.config["payload_challenge"]
} if self.drm_system == "widevine" and self.cdm.device_type == DeviceTypes.CHROME else {}),
# "challanges": {
# # "default": base64.b64encode(challenge).decode()
# "default": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"]
# },
"contentPlaygraph": ["v2"],
"drmVersion": 25,
"desiredVmaf": "plus_lts",
"desiredSegmentVmaf": "plus_lts",
"flavor": "STANDARD", # ? PRE_FETCH, SUPPLEMENTAL
"drmType": self.drm_system,
"imageSubtitleHeight": 1080,
"isBranching": False,
"isNonMember": False,
"isUIAutoPlay": False,
"licenseType": "standard",
"liveAdsCapability": "replace",
"liveMetadataFormat": "INDEXED_SEGMENT_TEMPLATE",
"profilesGroups": [{
"name": "default",
"profiles": video_profiles
}],
"profiles": video_profiles,
"preferAssistiveAudio": False,
"requestSegmentVmaf": False,
"requiredAudioTrackId": required_audio_track_id, # This is for getting missing audio tracks (value get from `new_track_id``)
"requiredTextTrackId": required_text_track_id, # This is for getting missing subtitle. (value get from `new_track_id``)
"supportsAdBreakHydration": False,
"supportsNetflixMediaEvents": True,
"supportsPartialHydration": True, # This is important if you want get available all tracks. but you must fetch each missing url tracks with "requiredAudioTracksId" or "requiredTextTrackId"
"supportsPreReleasePin": True,
"supportsUnequalizedDownloadables": True,
"supportsWatermark": True,
"titleSpecificData": {
(title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"): {"unletterboxed": False}
},
"type": "standard", # ? PREPARE
"uiPlatform": "SHAKTI",
"uiVersion": "shakti-v49577320",
"useBetterTextUrls": True,
"useHttpsStreams": True,
"usePsshBox": True,
"videoOutputInfo": [{
# todo ; make this return valid, but "secure" values, maybe it helps
"type": "DigitalVideoOutputDescriptor",
"outputType": "unknown",
"supportedHdcpVersions": self.config["configuration"]["supported_hdcp_versions"],
"isHdcpEngaged": self.config["configuration"]["is_hdcp_engaged"]
}],
"viewableId": (title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"),
"xid": str(int((int(time.time()) + 0.1612) * 1000)),
"showAllSubDubTracks": True,
}
},
userauthdata=self.userauthdata
)
# self.cdm.close(session_id)
if "errorDetails" in payload_chunks:
self.log.error(f"Manifest call failed for title_id: {title_id}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}, error: {payload_chunks['errorDetails']}")
return self._get_empty_manifest()
# with open(f"./manifest_{"+".join(video_profiles)}.json", mode='w') as r:
# r.write(jsonpickle.encode(payload_chunks, indent=4))
return payload_chunks
except Exception as e:
title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'
profile_count = len(video_profiles) if 'video_profiles' in locals() else 0
self.log.error(f"Exception in get_manifest: {e}")
self.log.error(f"Context - title_id: {title_id}, video_profiles_count: {profile_count}, required_audio_track_id: {required_audio_track_id or 'None'}, required_text_track_id: {required_text_track_id or 'None'}")
if 'video_profiles' in locals() and video_profiles:
self.log.error(f"Video profiles being processed: {', '.join(video_profiles[:5])}{'...' if len(video_profiles) > 5 else ''}")
return self._get_empty_manifest()
@staticmethod
def get_original_language(manifest) -> Language:
try:
# First, try to find the original language from audio tracks
if "audio_tracks" in manifest and manifest["audio_tracks"]:
for language in manifest["audio_tracks"]:
if "languageDescription" in language and language["languageDescription"].endswith(" [Original]"):
return Language.get(language["language"])
# Fallback 1: Try to parse from defaultTrackOrderList
if "defaultTrackOrderList" in manifest and manifest["defaultTrackOrderList"]:
try:
media_id = manifest["defaultTrackOrderList"][0]["mediaId"]
lang_code = media_id.split(";")[2]
if lang_code:
return Language.get(lang_code)
except (IndexError, KeyError, AttributeError):
pass
# Fallback 2: Try to get the first available audio track language
if "audio_tracks" in manifest and manifest["audio_tracks"]:
for audio_track in manifest["audio_tracks"]:
if "language" in audio_track and audio_track["language"]:
return Language.get(audio_track["language"])
# Fallback 3: Default to English if all else fails
return Language.get("en")
except Exception as e:
# If anything goes wrong, default to English
return Language.get("en")
def get_widevine_service_certificate(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str:
return self.config["certificate"]
def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = None) -> Tracks:
# If hydrate_tracks is not specified, derive from self.hydrate_track
if hydrate_tracks is None:
hydrate_tracks = self.hydrate_track
tracks = Tracks()
try:
# Handle empty or invalid manifest
if not manifest or not isinstance(manifest, dict):
self.log.warning("Empty or invalid manifest received, returning empty tracks")
return tracks
# Check if manifest has required structure
if "video_tracks" not in manifest or not manifest["video_tracks"]:
self.log.warning("No video tracks in manifest, returning empty tracks")
return tracks
if "links" not in manifest or "license" not in manifest["links"]:
self.log.warning("No license URL in manifest, cannot process tracks")
return tracks
original_language = self.get_original_language(manifest)
self.log.debug(f"Original language: {original_language}")
license_url = manifest["links"]["license"]["href"]
# Process video tracks
if "streams" in manifest["video_tracks"][0] and manifest["video_tracks"][0]["streams"]:
# self.log.info(f"Video: {jsonpickle.encode(manifest["video_tracks"], indent=2)}")
# self.log.info()
for video_index, video in enumerate(reversed(manifest["video_tracks"][0]["streams"])):
try:
# self.log.info(video)
id = video["downloadable_id"]
# self.log.info(f"Adding video {video["res_w"]}x{video["res_h"]}, bitrate: {(float(video["framerate_value"]) / video["framerate_scale"]) if "framerate_value" in video else None} with profile {video["content_profile"]}. kid: {video["drmHeaderId"]}")
drm = Widevine(
pssh=PSSH(manifest["video_tracks"][0]["drmHeader"]["bytes"]),
pssh_b64=video["drmHeaderId"],
)
tracks.add(
Video(
id_=video["downloadable_id"],
url=video["urls"][0]["url"],
codec=Video.Codec.from_netflix_profile(video["content_profile"]),
bitrate=video["bitrate"] * 1000,
width=video["res_w"],
height=video["res_h"],
fps=(float(video["framerate_value"]) / video["framerate_scale"]) if "framerate_value" in video else None,
language=Language.get(original_language),
edition=video["content_profile"],
range_=self.parse_video_range_from_profile(video["content_profile"]),
is_original_lang=True,
drm=[self.create_drm(manifest["video_tracks"][0]["drmHeader"]["bytes"], video["drmHeaderId"])] if manifest["video_tracks"][0].get("drmHeader", {}).get("bytes") else [],
data={
'license_url': license_url
}
)
)
except Exception as e:
video_id = video.get("downloadable_id", "unknown") if isinstance(video, dict) else "unknown"
self.log.warning(f"Failed to process video track at index {video_index}, video_id: {video_id}, error: {e}")
continue
# Process audio tracks
unavailable_audio_tracks: List[Tuple[str, str]] = []
primary_audio_tracks: List[Tuple[str, str]] = [] # Store primary audio tracks with streams
if "audio_tracks" in manifest:
for audio_index, audio in enumerate(manifest["audio_tracks"]):
try:
audio_id = audio.get("id", "unknown")
audio_lang = audio.get("language", "unknown")
if len(audio.get("streams", [])) < 1:
# This
# self.log.debug(f"Audio lang {audio["languageDescription"]} is available but no stream available.")
if "new_track_id" in audio and "id" in audio:
unavailable_audio_tracks.append((audio["new_track_id"], audio["id"])) # Assign to `unavailable_subtitle` for request missing audio tracks later
if hydrate_tracks:
self.log.debug(f"Audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang} has no streams available")
continue
# Store primary audio track info (new_track_id, id) for potential use in hydration
if "new_track_id" in audio and "id" in audio:
primary_audio_tracks.append((audio["new_track_id"], audio["id"]))
# self.log.debug(f"Adding audio lang: {audio["language"]} with profile: {audio["content_profile"]}")
is_original_lang = audio.get("language") == original_language.language
# self.log.info(f"is audio {audio["languageDescription"]} original language: {is_original_lang}")
for stream_index, stream in enumerate(audio["streams"]):
try:
stream_id = stream.get("downloadable_id", "unknown")
tracks.add(
Audio(
id_=stream["downloadable_id"],
url=stream["urls"][0]["url"],
codec=Audio.Codec.from_netflix_profile(stream["content_profile"]),
language=Language.get(self.NF_LANG_MAP.get(audio["language"]) or audio["language"]),
is_original_lang=is_original_lang,
bitrate=stream["bitrate"] * 1000,
channels=stream["channels"],
descriptive=audio.get("rawTrackType", "").lower() == "assistive",
name="[Original]" if Language.get(audio["language"]).language == original_language.language else None,
joc=16 if "atmos" in stream["content_profile"] else None
)
)
except Exception as e:
stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown"
self.log.warning(f"Failed to process audio stream at audio_index {audio_index}, stream_index {stream_index}, audio_id: {audio_id}, stream_id: {stream_id}, language: {audio_lang}, error: {e}")
continue
except Exception as e:
audio_id = audio.get("id", "unknown") if isinstance(audio, dict) else "unknown"
audio_lang = audio.get("language", "unknown") if isinstance(audio, dict) else "unknown"
self.log.warning(f"Failed to process audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang}, error: {e}")
continue
# Process subtitle tracks
unavailable_subtitle: List[Tuple[str, str]] = []
if "timedtexttracks" in manifest:
for subtitle_index, subtitle in enumerate(manifest["timedtexttracks"]):
try:
subtitle_id = subtitle.get("id", "unknown")
subtitle_lang = subtitle.get("language", "unknown")
if "isNoneTrack" in subtitle and subtitle["isNoneTrack"] == True:
continue
if subtitle.get("hydrated") == False:
# This subtitles is there but has to request stream first
if "new_track_id" in subtitle and "id" in subtitle:
unavailable_subtitle.append((subtitle["new_track_id"], subtitle["id"])) # Assign to `unavailable_subtitle` for request missing subtitles later
if hydrate_tracks:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated")
continue
if subtitle.get("languageDescription") == 'Off' and self.descriptive_subtitles == False:
# Skip Descriptive subtitles
continue
# pass
if "downloadableIds" not in subtitle or not subtitle["downloadableIds"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no downloadableIds")
continue
id = list(subtitle["downloadableIds"].values())
if not id:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has empty downloadableIds")
continue
language = Language.get(subtitle["language"])
if "ttDownloadables" not in subtitle or not subtitle["ttDownloadables"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no ttDownloadables")
continue
profile = next(iter(subtitle["ttDownloadables"].keys()))
tt_downloadables = next(iter(subtitle["ttDownloadables"].values()))
is_original_lang = subtitle.get("language") == original_language.language
# self.log.info(f"is subtitle {subtitle["languageDescription"]} original language {is_original_lang}")
# self.log.info(f"ddd")
tracks.add(
Subtitle(
id_=id[0],
url=tt_downloadables["urls"][0]["url"],
codec=Subtitle.Codec.from_netflix_profile(profile),
language=language,
forced=subtitle.get("isForcedNarrative", False),
cc=subtitle.get("rawTrackType") == "closedcaptions",
sdh=subtitle.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitle else False,
is_original_lang=is_original_lang,
name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitle and subtitle["trackVariant"] == "DUBTITLE" else None),
)
)
except Exception as e:
subtitle_id = subtitle.get("id", "unknown") if isinstance(subtitle, dict) else "unknown"
subtitle_lang = subtitle.get("language", "unknown") if isinstance(subtitle, dict) else "unknown"
self.log.warning(f"Failed to process subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang}, error: {e}")
continue
if hydrate_tracks == False:
return tracks
# Hydrate missing tracks
if unavailable_audio_tracks or unavailable_subtitle:
hydrated_tracks = self.hydrate_all_tracks(
title=title,
unavailable_audio_tracks=unavailable_audio_tracks,
unavailable_subtitle=unavailable_subtitle,
primary_audio_tracks=primary_audio_tracks,
original_language=original_language
)
tracks.add(hydrated_tracks)
else:
self.log.info("No tracks need hydration")
except Exception as e:
self.log.error(f"Exception in manifest_as_tracks: {e}")
self.log.debug(f"Failed to process manifest for title: {title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'}")
# Return empty tracks on any critical error
return tracks
def parse_video_range_from_profile(self, profile: str) -> Video.Range:
"""
Parse the video range from a Netflix profile string.
Args:
profile (str): The Netflix profile string (e.g., "hevc-main10-L30-dash-cenc")
Returns:
Video.Range: The corresponding Video.Range enum value
Examples:
>>> parse_video_range_from_profile("hevc-main10-L30-dash-cenc")
<Video.Range.SDR: 'SDR'>
>>> parse_video_range_from_profile("hevc-dv5-main10-L30-dash-cenc")
<Video.Range.DV: 'DV'>
"""
# Get video profiles from config
video_profiles = self.config.get("profiles", {}).get("video", {})
# Search through all codecs and ranges to find the profile
for codec, ranges in video_profiles.items():
# if codec == 'H264':
# return Video.Range.SDR # for H264 video always return SDR
for range_name, profiles in ranges.items():
# self.log.info(f"Checking range {range_name}")
if profile in profiles:
# Return the corresponding Video.Range enum value
try:
# self.log.info(f"Found {range_name}")
return Video.Range(range_name)
except ValueError:
# If range_name is not a valid Video.Range, return SDR as default
self.log.debug(f"Video range is not valid {range_name}")
return Video.Range.SDR
# If profile not found, return SDR as default
return Video.Range.SDR
def _is_valid_track_for_hydration(self, track_data: tuple) -> bool:
"""Check if track data is valid for hydration (not None values)."""
return track_data[0] is not None and track_data[1] is not None
def _get_empty_track_tuple(self) -> tuple:
"""Return an empty track tuple with None values."""
return (None, None)
def hydrate_all_tracks(self, title: Title_T, unavailable_audio_tracks: List[Tuple[str, str]],
unavailable_subtitle: List[Tuple[str, str]], primary_audio_tracks: List[Tuple[str, str]],
original_language: Language) -> Tracks:
"""
Hydrate all missing audio and subtitle tracks.
Args:
title: The title object for which to hydrate tracks
unavailable_audio_tracks: List of audio track tuples (new_track_id, id) that need hydration
unavailable_subtitle: List of subtitle track tuples (new_track_id, id) that need hydration
primary_audio_tracks: List of primary audio track tuples for context in subtitle hydration
original_language: The original language of the content
Returns:
Tracks: A Tracks object containing all hydrated audio and subtitle tracks
"""
hydrated_tracks = Tracks()
# Show hydration information once
audio_count = len(unavailable_audio_tracks)
subtitle_count = len(unavailable_subtitle)
hydration_parts = []
if audio_count > 0:
hydration_parts.append(f"audio ({audio_count})")
if subtitle_count > 0:
hydration_parts.append(f"subtitle ({subtitle_count})")
hydration_info = " and ".join(hydration_parts)
self.log.info(f"Hydrating {hydration_info} tracks. Total: {audio_count + subtitle_count}")
# Handle mismatched lengths - use last successful tracks when needed
last_successful_subtitle = self._get_empty_track_tuple() if not unavailable_subtitle else unavailable_subtitle[-1]
last_successful_audio = self._get_empty_track_tuple() if not unavailable_audio_tracks else unavailable_audio_tracks[-1]
# For subtitle-only hydration, use primary audio track if available
primary_audio_for_subtitle_hydration = primary_audio_tracks[0] if primary_audio_tracks and not unavailable_audio_tracks and unavailable_subtitle else self._get_empty_track_tuple()
# Process audio tracks first, then handle subtitles separately if needed
max_length = max(len(unavailable_audio_tracks), len(unavailable_subtitle))
for hydration_index in range(max_length):
# Get audio track info for this index, or use last successful one if available
if hydration_index < len(unavailable_audio_tracks):
audio_hydration = unavailable_audio_tracks[hydration_index]
is_real_audio_request = True # This is a real audio to be added to tracks
elif unavailable_audio_tracks: # Use last successful audio track for context only
audio_hydration = last_successful_audio
is_real_audio_request = False # This is just for context, don't add to tracks
elif primary_audio_for_subtitle_hydration[0] is not None: # Use primary audio for subtitle hydration
audio_hydration = primary_audio_for_subtitle_hydration
is_real_audio_request = False # This is just for context, don't add to tracks
self.log.debug(f"Using primary audio track for subtitle hydration: {audio_hydration[1]}")
else:
audio_hydration = self._get_empty_track_tuple()
is_real_audio_request = False
# Get subtitle track info for this index, or use last successful one if available
if hydration_index < len(unavailable_subtitle):
subtitle_hydration = unavailable_subtitle[hydration_index]
is_real_subtitle_request = True # This is a real subtitle to be added to tracks
elif unavailable_subtitle: # Use last successful subtitle track for context only
subtitle_hydration = last_successful_subtitle
is_real_subtitle_request = False # This is just for context, don't add to tracks
else:
subtitle_hydration = self._get_empty_track_tuple()
is_real_subtitle_request = False
try:
# Prepare track IDs for API request - convert None to None for proper API handling
audio_track_id = audio_hydration[0] if audio_hydration[0] is not None else None
subtitle_track_id = subtitle_hydration[0] if subtitle_hydration[0] is not None else None
# Log what we're trying to hydrate
self._log_hydration_attempt(hydration_index, audio_hydration, subtitle_hydration,
is_real_audio_request, is_real_subtitle_request)
# Only call get_manifest if we have valid tracks to hydrate
should_hydrate_audio = self._is_valid_track_for_hydration(audio_hydration)
if not should_hydrate_audio:
self.log.debug(f"Skipping hydration at index {hydration_index} - no audio tracks to hydrate")
continue
# If we still don't have a subtitle track ID, skip this hydration to avoid API error
if subtitle_track_id is None:
self.log.warning(f"Skipping hydration at index {hydration_index} - no subtitle track available for API request context")
continue
hydrated_manifest = self.get_manifest(title, self.profiles, subtitle_track_id, audio_track_id)
# Handle hydrated audio tracks (only if it's a real audio request, not reused)
if is_real_audio_request and should_hydrate_audio and "audio_tracks" in hydrated_manifest:
try:
audios = next((item for item in hydrated_manifest["audio_tracks"] if 'id' in item and item["id"] == audio_hydration[1]), None)
if audios and "streams" in audios:
audio_lang = audios.get("language", "unknown")
self.log.debug(f"Processing hydrated audio track_id: {audio_hydration[1]}, language: {audio_lang}, streams_count: {len(audios['streams'])}")
for stream_index, stream in enumerate(audios["streams"]):
try:
stream_id = stream.get("downloadable_id", "unknown")
hydrated_tracks.add(
Audio(
id_=stream["downloadable_id"],
url=stream["urls"][0]["url"],
codec=Audio.Codec.from_netflix_profile(stream["content_profile"]),
language=Language.get(self.NF_LANG_MAP.get(audios["language"]) or audios["language"]),
is_original_lang=audios["language"] == original_language.language,
bitrate=stream["bitrate"] * 1000,
channels=stream["channels"],
descriptive=audios.get("rawTrackType", "").lower() == "assistive",
name="[Original]" if Language.get(audios["language"]).language == original_language.language else None,
joc=16 if "atmos" in stream["content_profile"] else None
)
)
except Exception as e:
stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown"
self.log.warning(f"Failed to process hydrated audio stream at hydration_index {hydration_index}, stream_index {stream_index}, audio_track_id: {audio_hydration[1]}, stream_id: {stream_id}, error: {e}")
continue
else:
self.log.warning(f"No audio streams found for hydrated audio_track_id: {audio_hydration[1]} at hydration_index {hydration_index}")
except Exception as e:
self.log.warning(f"Failed to find hydrated audio track at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]}, error: {e}")
elif not is_real_audio_request and audio_hydration[1] is not None:
self.log.debug(f"Used audio track context for API request at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]} (not adding to tracks)")
# Handle hydrated subtitle tracks (only if it's a real subtitle request, not reused)
if is_real_subtitle_request and self._is_valid_track_for_hydration(subtitle_hydration) and "timedtexttracks" in hydrated_manifest:
try:
subtitles = next((item for item in hydrated_manifest["timedtexttracks"] if 'id' in item and item["id"] == subtitle_hydration[1]), None)
if subtitles and "downloadableIds" in subtitles and "ttDownloadables" in subtitles:
subtitle_lang = subtitles.get("language", "unknown")
self.log.debug(f"Processing hydrated subtitle track_id: {subtitle_hydration[1]}, language: {subtitle_lang}")
id = list(subtitles["downloadableIds"].values())
if id:
language = Language.get(subtitles["language"])
profile = next(iter(subtitles["ttDownloadables"].keys()))
tt_downloadables = next(iter(subtitles["ttDownloadables"].values()))
hydrated_tracks.add(
Subtitle(
id_=id[0],
url=tt_downloadables["urls"][0]["url"],
codec=Subtitle.Codec.from_netflix_profile(profile),
language=language,
forced=subtitles.get("isForcedNarrative", False),
cc=subtitles.get("rawTrackType") == "closedcaptions",
sdh=subtitles.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitles else False,
is_original_lang=subtitles.get("language") == original_language.language,
name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitles and subtitles["trackVariant"] == "DUBTITLE" else None),
)
)
else:
self.log.warning(f"No downloadable IDs found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}")
else:
self.log.warning(f"No subtitle data found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}")
except Exception as e:
self.log.warning(f"Failed to process hydrated subtitle track at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]}, error: {e}")
elif not is_real_subtitle_request and subtitle_hydration[1] is not None:
self.log.debug(f"Used subtitle track context for API request at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]} (not adding to tracks)")
except Exception as e:
self.log.warning(f"Failed to hydrate tracks at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1] or 'None'}, subtitle_track_id: {subtitle_hydration[1] or 'None'}, error: {e}")
continue
return hydrated_tracks
def create_drm(self, pssh: str, kid: str) -> DRM_T:
if self.drm_system == "widevine":
return Widevine(PSSH(pssh), kid)
elif self.drm_system == "playready":
return PlayReady(PlayReadyPSSH(pssh), kid, pssh)
else:
raise ValueError("Unknown DRM system while creating DRM")
def get_drm_system(self) -> Literal["widevine", "playready"]:
# This is widevine?
if isinstance(self.cdm, WidevineCDM):
return "widevine"
elif isinstance(self.cdm, PlayReady):
return "playready"
else:
# Maybe this is DecryptLabsRemoteCDM
from unshackle.core.cdm import DecryptLabsRemoteCDM
if (isinstance(self.cdm, DecryptLabsRemoteCDM)):
# Is Decrypt Labs using PlayReady?
if self.cdm.is_playready:
return "playready"
else:
return "widevine"
raise ValueError("Unknown DRM system")
def _log_hydration_attempt(self, hydration_index: int, audio_data: tuple, subtitle_data: tuple,
is_real_audio: bool, is_real_subtitle: bool) -> None:
"""Log hydration attempt details."""
audio_id = audio_data[1] if audio_data[1] is not None else 'None'
subtitle_id = subtitle_data[1] if subtitle_data[1] is not None else 'None'
self.log.debug(
f"Hydrating tracks at index {hydration_index}, "
f"audio_track_id: {audio_id}, subtitle_track_id: {subtitle_id}, "
f"is_real_audio: {is_real_audio}, is_real_subtitle: {is_real_subtitle}"
)