Files
unshackle/unshackle/services/Netflix/__init__.py
kenzuyaa 0d2237d09a fix(Netflix): correct video range and codec validation logic
- Adjust condition order in video range and codec compatibility check
- Change range membership check from object to name string in profile loop
- Remove redundant error logging and sys.exit call in profile handling
- Improve consistency of video profile retrieval based on codec and range
2025-08-30 11:37:31 +07:00

1066 lines
58 KiB
Python

import base64
from datetime import datetime
import json
from math import e
import random
import sys
import time
import typing
from uuid import UUID
import click
import re
from typing import List, Literal, Optional, Set, Union, Tuple
from http.cookiejar import CookieJar
from itertools import zip_longest
from Crypto.Random import get_random_bytes
import jsonpickle
from pymp4.parser import Box
from pywidevine import PSSH, Cdm, DeviceTypes
import requests
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.drm.widevine import Widevine
from unshackle.core.service import Service
from unshackle.core.titles import Titles_T, Title_T
from unshackle.core.titles.episode import Episode, Series
from unshackle.core.titles.movie import Movie, Movies
from unshackle.core.titles.title import Title
from unshackle.core.tracks import Tracks, Chapters
from unshackle.core.tracks.audio import Audio
from unshackle.core.tracks.chapter import Chapter
from unshackle.core.tracks.subtitle import Subtitle
from unshackle.core.tracks.track import Track
from unshackle.core.tracks.video import Video
from unshackle.core.utils.collections import flatten, as_list
from unshackle.core.tracks.attachment import Attachment
from unshackle.core.drm.playready import PlayReady
from unshackle.core.titles.song import Song
from unshackle.utils.base62 import decode
from .MSL import MSL, KeyExchangeSchemes
from .MSL.schemes.UserAuthentication import UserAuthentication
class Netflix(Service):
"""
Service for https://netflix.com
Version: 1.0.0
Authorization: Cookies
Security: UHD@SL3000/L1 FHD@SL3000/L1
"""
TITLE_RE = r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<title_id>\d+)"
ALIASES= ("NF", "Netflix", "netflix", "nf")
NF_LANG_MAP = {
"es": "es-419",
"pt": "pt-PT",
}
@staticmethod
@click.command(name="Netflix", short_help="https://netflix.com")
@click.argument("title", type=str)
@click.option("-drm", "--drm-system", type=click.Choice(["widevine", "playready"], case_sensitive=False),
default="widevine",
help="which drm system to use")
@click.option("-p", "--profile", type=click.Choice(["MPL", "HPL", "QC", "MPL+HPL", "MPL+HPL+QC", "MPL+QC"], case_sensitive=False),
default=None,
help="H.264 profile to use. Default is best available.")
@click.option("--meta-lang", type=str, help="Language to use for metadata")
@click.option("-ht","--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.")
@click.option("-hb", "--high-bitrate", is_flag=True, default=False, help="Get more video bitrate")
@click.option("-ds", "--descriptive-subtitles", is_flag=True, default=False, help="Get descriptive subtitles")
@click.pass_context
def cli(ctx, **kwargs):
return Netflix(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool, descriptive_subtitles: bool):
super().__init__(ctx)
# General
self.title = title
self.profile = profile
self.meta_lang = meta_lang
self.hydrate_track = hydrate_track
self.drm_system = drm_system
self.profiles: List[str] = []
self.requested_profiles: List[str] = []
self.high_bitrate = high_bitrate
self.descriptive_subtitles = descriptive_subtitles
# MSL
self.esn = self.cache.get("ESN")
self.msl: Optional[MSL] = None
self.userauthdata = None
# Download options
self.range = ctx.parent.params.get("range_") or [Video.Range.SDR]
self.vcodec = ctx.parent.params.get("vcodec") or Video.Codec.AVC # Defaults to H264
self.acodec : Audio.Codec = ctx.parent.params.get("acodec") or Audio.Codec.EC3
self.quality: List[int] = ctx.parent.params.get("quality")
self.audio_only = ctx.parent.params.get("audio_only")
self.subs_only = ctx.parent.params.get("subs_only")
self.chapters_only = ctx.parent.params.get("chapters_only")
# Inherited from unshackle
self.cdm: Cdm = ctx.obj.cdm
# self.ctx = ctx
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
# Configure first before download
self.log.debug("Authenticating Netflix service")
auth = super().authenticate(cookies, credential)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.configure()
return auth
def get_titles(self) -> Titles_T:
metadata = self.get_metadata(self.title)
if "video" not in metadata:
self.log.error(f"Failed to get metadata: {metadata}")
sys.exit(1)
titles: Titles_T | None = None
if metadata["video"]["type"] == "movie":
movie = Movie(
id_=self.title,
name=metadata["video"]["title"],
year=metadata["video"]["year"],
# language=self.get_original_language(self.get_manifest()),
service=self.__class__,
data=metadata["video"],
description=metadata["video"]["synopsis"]
)
movie.language = self.get_original_language(self.get_manifest(movie, self.profiles))
titles = Movies([
movie
])
else:
# self.log.warning(f"Metadata: {jsonpickle.encode(metadata, indent=2)}")
# print(metadata)
episode_list: List[Episode] = []
for season in metadata["video"]["seasons"]:
for episodes in season["episodes"]:
episode = Episode(
id_=self.title,
title=metadata["video"]["title"],
year=season["year"],
service=self.__class__,
season=season["seq"],
number=episodes["seq"],
name=episodes["title"],
data=episodes,
description=episodes["synopsis"],
)
try:
episode.language = self.get_original_language(self.get_manifest(episode, self.profiles))
self.log.debug(f"Episode S{episode.season:02d}E{episode.number:02d}: {episode.language}")
except Exception as e:
self.log.warning(f"Failed to get original language for episode S{season['seq']:02d}E{episodes['seq']:02d}: {e}")
# Fallback: try to get the original language from the first episode that worked
# or default to English if none worked
if episode_list and hasattr(episode_list[0], 'language') and episode_list[0].language:
episode.language = episode_list[0].language
else:
episode.language = Language.get("en")
self.log.info(f"Using fallback language for episode: {episode.language}")
episode_list.append(
episode
)
titles = Series(episode_list)
return titles
def get_tracks(self, title: Title_T) -> Tracks:
tracks = Tracks()
# If Video Codec is H.264 is selected but `self.profile is none` profile QC has to be requested seperately
if self.vcodec == Video.Codec.AVC:
# self.log.info(f"Profile: {self.profile}")
try:
manifest = self.get_manifest(title, self.profiles)
movie_track = self.manifest_as_tracks(manifest, title, self.hydrate_track)
tracks.add(movie_track)
if self.profile is not None:
self.log.info(f"Requested profiles: {self.profile}")
else:
qc_720_profile = [x for x in self.config["profiles"]["video"][self.vcodec.extension.upper()]["QC"] if "l40" not in x and 720 in self.quality]
qc_manifest = self.get_manifest(title, qc_720_profile if 720 in self.quality else self.config["profiles"]["video"][self.vcodec.extension.upper()]["QC"])
qc_tracks = self.manifest_as_tracks(qc_manifest, title, False)
tracks.add(qc_tracks.videos)
mpl_manifest = self.get_manifest(title, [x for x in self.config["profiles"]["video"][self.vcodec.extension.upper()]["MPL"] if "l40" not in x])
mpl_tracks = self.manifest_as_tracks(mpl_manifest, title, False)
tracks.add(mpl_tracks.videos)
except Exception as e:
self.log.error(e)
else:
if self.range[0] == Video.Range.HYBRID:
# Handle HYBRID mode by getting HDR10 and DV profiles separately
try:
# Get HDR10 profiles for the current codec
hdr10_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("HDR10", [])
if hdr10_profiles:
self.log.info("Fetching HDR10 tracks for hybrid processing")
hdr10_manifest = self.get_manifest(title, hdr10_profiles)
hdr10_tracks = self.manifest_as_tracks(hdr10_manifest, title, self.hydrate_track)
tracks.add(hdr10_tracks)
else:
self.log.warning(f"No HDR10 profiles found for codec {self.vcodec.extension.upper()}")
# Get DV profiles for the current codec
dv_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("DV", [])
if dv_profiles:
self.log.info("Fetching DV tracks for hybrid processing")
dv_manifest = self.get_manifest(title, dv_profiles)
dv_tracks = self.manifest_as_tracks(dv_manifest, title, False) # Don't hydrate again
tracks.add(dv_tracks.videos)
else:
self.log.warning(f"No DV profiles found for codec {self.vcodec.extension.upper()}")
except Exception as e:
self.log.error(f"Error in HYBRID mode processing: {e}")
elif self.high_bitrate:
splitted_profiles = self.split_profiles(self.profiles)
for index, profile_list in enumerate(splitted_profiles):
try:
self.log.debug(f"Index: {index}. Getting profiles: {profile_list}")
manifest = self.get_manifest(title, profile_list)
manifest_tracks = self.manifest_as_tracks(manifest, title, self.hydrate_track if index == 0 else False)
tracks.add(manifest_tracks if index == 0 else manifest_tracks.videos)
except Exception:
self.log.error(f"Error getting profile: {profile_list}. Skipping")
continue
else:
try:
manifest = self.get_manifest(title, self.profiles)
manifest_tracks = self.manifest_as_tracks(manifest, title, self.hydrate_track)
tracks.add(manifest_tracks)
except Exception as e:
self.log.error(e)
# Add Attachments for profile picture
if isinstance(title, Movie):
if title.data and "boxart" in title.data and title.data["boxart"]:
tracks.add(
Attachment.from_url(
url=title.data["boxart"][0]["url"]
)
)
else:
if title.data and "stills" in title.data and title.data["stills"]:
tracks.add(
Attachment.from_url(title.data["stills"][0]["url"])
)
return tracks
def split_profiles(self, profiles: List[str]) -> List[List[str]]:
"""
Split profiles with names containing specific patterns based on video codec
For H264: uses patterns "l30", "l31", "l40" (lowercase)
For non-H264: uses patterns "L30", "L31", "L40", "L41", "L50", "L51" (uppercase)
Returns List[List[str]] type with profiles grouped by pattern
"""
# Define the profile patterns to match based on video codec
if self.vcodec == Video.Codec.AVC: # H264
patterns = ["l30", "l31", "l40"]
else:
patterns = ["L30", "L31", "L40", "L41", "L50", "L51"]
# Group profiles by pattern
result: List[List[str]] = []
for pattern in patterns:
pattern_group = []
for profile in profiles:
if pattern in profile:
pattern_group.append(profile)
if pattern_group: # Only add non-empty groups
result.append(pattern_group)
return result
def get_chapters(self, title: Title_T) -> Chapters:
chapters: Chapters = Chapters()
if not title.data:
return chapters
try:
# self.log.info(f"Title data: {title.data}")
if "skipMarkers" in title.data and "credit" in title.data["skipMarkers"]:
credits = title.data["skipMarkers"]["credit"]
if credits.get("start", 0) > 0 and credits.get("end", 0) > 0:
chapters.add(Chapter(
timestamp=credits["start"], # Milliseconds
name="Intro"
))
chapters.add(
Chapter(
timestamp=credits["end"], # Milliseconds
)
)
if "creditsOffset" in title.data and title.data["creditsOffset"] is not None:
chapters.add(Chapter(
timestamp=float(title.data["creditsOffset"]), # this is seconds, needed to assign to float
name="Credits"
))
except Exception as e:
self.log.warning(f"Failed to process chapters: {e}")
return chapters
def get_widevine_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None:
if not self.msl:
self.log.error(f"MSL Client is not intialized!")
sys.exit(1)
application_data = {
"version": 2,
"url": track.data["license_url"],
"id": int(time.time() * 10000),
"esn": self.esn.data["esn"],
"languages": ["en-US"],
# "uiVersion": "shakti-v9dddfde5",
"clientVersion": "6.0026.291.011",
"params": [{
"sessionId": base64.b64encode(get_random_bytes(16)).decode("utf-8"),
"clientTime": int(time.time()),
"challengeBase64": base64.b64encode(challenge).decode("utf-8"),
"xid": str(int((int(time.time()) + 0.1612) * 1000)),
}],
"echo": "sessionId"
}
header, payload_data = self.msl.send_message(
endpoint=self.config["endpoints"]["license"],
params={
"reqAttempt": 1,
"reqName": "license",
},
application_data=application_data,
userauthdata=self.userauthdata
)
if not payload_data:
self.log.error(f" - Failed to get license: {header['message']} [{header['code']}]")
sys.exit(1)
if "error" in payload_data[0]:
error = payload_data[0]["error"]
error_display = error.get("display")
error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", ""))
if error_display:
self.log.critical(f" - {error_display}")
if error_detail:
self.log.critical(f" - {error_detail}")
if not (error_display or error_detail):
self.log.critical(f" - {error}")
sys.exit(1)
return payload_data[0]["licenseResponseBase64"]
def get_playready_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None:
return None
# return super().get_widevine_license(challenge=challenge, title=title, track=track)
def configure(self):
# self.log.info(ctx)
# if profile is none from argument let's use them all profile in video codec scope
# self.log.info(f"Requested profiles: {self.profile}")
if self.profile is None:
self.profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
if self.profile is not None:
self.requested_profiles = self.profile.split('+')
self.log.info(f"Requested profile: {self.requested_profiles}")
else:
# self.log.info(f"Video Range: {self.range}")
self.requested_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
# Make sure video codec is supported by Netflix
if self.vcodec.extension.upper() not in self.config["profiles"]["video"]:
raise ValueError(f"Video Codec {self.vcodec} is not supported by Netflix")
if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.range[0] != Video.Range.HYBRID and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9:
self.log.error(f"Video range {self.range[0].name} is not supported by Video Codec: {self.vcodec}")
sys.exit(1)
if len(self.range) > 1:
self.log.error(f"Multiple video range is not supported right now.")
sys.exit(1)
if self.vcodec == Video.Codec.AVC and self.range[0] != Video.Range.SDR:
self.log.error(f"H.264 Video Codec only supports SDR")
sys.exit(1)
self.profiles = self.get_profiles()
self.log.info("Intializing a MSL client")
self.get_esn()
# if self.cdm.security_level == 1:
# scheme = KeyExchangeSchemes.Widevine
scheme = {
DeviceTypes.CHROME: KeyExchangeSchemes.AsymmetricWrapped,
DeviceTypes.ANDROID: KeyExchangeSchemes.Widevine
}[self.cdm.device_type]
# scheme = KeyExchangeSchemes.AsymmetricWrapped
self.log.info(f"Scheme: {scheme}")
self.msl = MSL.handshake(
scheme=scheme,
session=self.session,
endpoint=self.config["endpoints"]["manifest"],
sender=self.esn.data["esn"],
cache=self.cache.get("MSL"),
cdm=self.cdm,
config=self.config,
)
cookie = self.session.cookies.get_dict()
if self.cdm.device_type == DeviceTypes.CHROME:
self.userauthdata = UserAuthentication.NetflixIDCookies(
netflixid=cookie["NetflixId"],
securenetflixid=cookie["SecureNetflixId"]
)
else:
# Android like way login to Netflix using email and password
if not self.credential:
raise Exception(" - Credentials are required for Android CDMs, and none were provided.")
self.userauthdata = UserAuthentication.EmailPassword(
email=self.credential.username,
password=self.credential.password
)
self.log.info(f"userauthdata: {self.userauthdata}")
def get_profiles(self):
result_profiles = []
if self.vcodec == Video.Codec.AVC:
if self.requested_profiles is not None:
for requested_profiles in self.requested_profiles:
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()][requested_profiles])))
return result_profiles
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())))
return result_profiles
# Handle case for codec VP9
if self.vcodec == Video.Codec.VP9 and self.range[0] != Video.Range.HDR10:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())
return result_profiles
for profiles in self.config["profiles"]["video"][self.vcodec.extension.upper()]:
for range in self.range:
if range.name in profiles:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()][range.name])
elif range == Video.Range.HYBRID:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()]["HDR10"])
self.log.debug(f"Result_profiles: {result_profiles}")
return result_profiles
def get_esn(self):
if self.cdm.device_type == DeviceTypes.ANDROID:
try:
# Use ESN map from config.yaml instead of generating a new one
esn = self.config["esn_map"][self.cdm.system_id]
except KeyError:
self.log.error(f"ESN mapping not found for system_id: {self.cdm.system_id}")
raise Exception(f"ESN mapping not found for system_id: {self.cdm.system_id}")
esn_value = {
'esn': esn,
'type': self.cdm.device_type
}
if self.esn.data["esn"] != esn:
self.esn.set(self.config["esn_map"][self.cdm.system_id], 1 * 60 * 60)
else:
ESN_GEN = "".join(random.choice("0123456789ABCDEF") for _ in range(30))
generated_esn = f"NFCDIE-03-{ESN_GEN}"
# Check if ESN is expired or doesn't exist
if self.esn.data is None or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired) or (self.esn.data["type"] != DeviceTypes.CHROME):
# Set new ESN with 6-hour expiration
esn_value = {
'esn': generated_esn,
'type': DeviceTypes.CHROME,
}
self.esn.set(esn_value, expiration=1 * 60 * 60) # 1 hours in seconds
self.log.info(f"Generated new ESN with 1-hour expiration")
else:
self.log.info(f"Using cached ESN.")
self.log.info(f"ESN: {self.esn.data["esn"]}")
def get_metadata(self, title_id: str):
"""
Obtain Metadata information about a title by it's ID.
:param title_id: Title's ID.
:returns: Title Metadata.
"""
try:
metadata = self.session.get(
self.config["endpoints"]["metadata"].format(build_id="release"),
params={
"movieid": title_id,
"drmSystem": self.config["configuration"]["drm_system"],
"isWatchlistEnabled": False,
"isShortformEnabled": False,
"languages": self.meta_lang
}
).json()
except requests.HTTPError as e:
if e.response.status_code == 500:
self.log.warning(
" - Recieved a HTTP 500 error while getting metadata, deleting cached reactContext data"
)
# self.cache.
# os.unlink(self.get_cache("web_data.json"))
# return self.get_metadata(self, title_id)
raise Exception(f"Error getting metadata: {e}")
except json.JSONDecodeError:
self.log.error(" - Failed to get metadata, title might not be available in your region.")
sys.exit(1)
else:
if "status" in metadata and metadata["status"] == "error":
self.log.error(
f" - Failed to get metadata, cookies might be expired. ({metadata['message']})"
)
sys.exit(1)
return metadata
def _get_empty_manifest(self):
"""Return an empty manifest structure to prevent crashes when manifest retrieval fails"""
return {
"video_tracks": [{
"streams": [],
"drmHeader": {"bytes": b""}
}],
"audio_tracks": [],
"timedtexttracks": [],
"links": {
"license": {"href": ""}
}
}
def get_manifest(self, title: Title_T, video_profiles: List[str], required_text_track_id: Optional[str] = None, required_audio_track_id: Optional[str] = None):
try:
# Log context information for debugging
title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'
self.log.debug(f"Getting manifest for title_id: {title_id}, video_profiles_count: {len(video_profiles)}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}")
audio_profiles = self.config["profiles"]["audio"].values()
video_profiles = sorted(set(flatten(as_list(
video_profiles,
audio_profiles,
self.config["profiles"]["video"]["H264"]["BPL"] if self.vcodec == Video.Codec.AVC else [],
self.config["profiles"]["subtitles"],
))))
self.log.debug("Profiles:\n\t" + "\n\t".join(video_profiles))
if not self.msl:
self.log.error(f"MSL Client is not initialized for title_id: {title_id}")
return self._get_empty_manifest()
params = {
"reqAttempt": 1,
"reqPriority": 10,
"reqName": "manifest",
}
# session_id = self.cdm.open()
# self.cdm.set_service_certificate(session_id, self.config["certificate"])
# challenge = self.cdm.get_license_challenge(session_id, PSSH("AAAANHBzc2gAAAAA7e+LqXnWSs6jyCfc1R0h7QAAABQIARIQAAAAAAPSZ0kAAAAAAAAAAA=="))
_, payload_chunks = self.msl.send_message(
endpoint=self.config["endpoints"]["manifest"],
params=params,
application_data={
"version": 2,
"url": "manifest",
"id": int(time.time()),
"esn": self.esn.data["esn"],
"languages": ["en-US"],
"clientVersion": "6.0026.291.011",
"params": {
"clientVersion": "6.0051.090.911",
"challenge": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"],
# "challenge": base64.b64encode(challenge).decode(),
"challanges": {
# "default": base64.b64encode(challenge).decode()
"default": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"]
},
"contentPlaygraph": ["v2"],
"deviceSecurityLevel": "3000",
"drmVersion": 25,
"desiredVmaf": "plus_lts",
"desiredSegmentVmaf": "plus_lts",
"flavor": "STANDARD", # ? PRE_FETCH, SUPPLEMENTAL
"drmType": self.drm_system,
"imageSubtitleHeight": 1080,
"isBranching": False,
"isNonMember": False,
"isUIAutoPlay": False,
"licenseType": "standard",
"liveAdsCapability": "replace",
"liveMetadataFormat": "INDEXED_SEGMENT_TEMPLATE",
"manifestVersion": "v2",
"osName": "windows",
"osVersion": "10.0",
"platform": "138.0.0.0",
"profilesGroups": [{
"name": "default",
"profiles": video_profiles
}],
"profiles": video_profiles,
"preferAssistiveAudio": False,
"requestSegmentVmaf": False,
"requiredAudioTrackId": required_audio_track_id, # This is for getting missing audio tracks (value get from `new_track_id``)
"requiredTextTrackId": required_text_track_id, # This is for getting missing subtitle. (value get from `new_track_id``)
"supportsAdBreakHydration": False,
"supportsNetflixMediaEvents": True,
"supportsPartialHydration": True, # This is important if you want get available all tracks. but you must fetch each missing url tracks with "requiredAudioTracksId" or "requiredTextTrackId"
"supportsPreReleasePin": True,
"supportsUnequalizedDownloadables": True,
"supportsWatermark": True,
"titleSpecificData": {
(title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"): {"unletterboxed": False}
},
"type": "standard", # ? PREPARE
"uiPlatform": "SHAKTI",
"uiVersion": "shakti-v49577320",
"useBetterTextUrls": True,
"useHttpsStreams": True,
"usePsshBox": True,
"videoOutputInfo": [{
# todo ; make this return valid, but "secure" values, maybe it helps
"type": "DigitalVideoOutputDescriptor",
"outputType": "unknown",
"supportedHdcpVersions": self.config["configuration"]["supported_hdcp_versions"],
"isHdcpEngaged": self.config["configuration"]["is_hdcp_engaged"]
}],
"viewableId": (title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"),
"xid": str(int((int(time.time()) + 0.1612) * 1000)),
"showAllSubDubTracks": True,
}
},
userauthdata=self.userauthdata
)
# self.cdm.close(session_id)
if "errorDetails" in payload_chunks:
self.log.error(f"Manifest call failed for title_id: {title_id}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}, error: {payload_chunks['errorDetails']}")
return self._get_empty_manifest()
# with open(f"./manifest_{"+".join(video_profiles)}.json", mode='w') as r:
# r.write(jsonpickle.encode(payload_chunks, indent=4))
return payload_chunks
except Exception as e:
title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'
profile_count = len(video_profiles) if 'video_profiles' in locals() else 0
self.log.error(f"Exception in get_manifest: {e}")
self.log.error(f"Context - title_id: {title_id}, video_profiles_count: {profile_count}, required_audio_track_id: {required_audio_track_id or 'None'}, required_text_track_id: {required_text_track_id or 'None'}")
if 'video_profiles' in locals() and video_profiles:
self.log.error(f"Video profiles being processed: {', '.join(video_profiles[:5])}{'...' if len(video_profiles) > 5 else ''}")
return self._get_empty_manifest()
@staticmethod
def get_original_language(manifest) -> Language:
try:
# First, try to find the original language from audio tracks
if "audio_tracks" in manifest and manifest["audio_tracks"]:
for language in manifest["audio_tracks"]:
if "languageDescription" in language and language["languageDescription"].endswith(" [Original]"):
return Language.get(language["language"])
# Fallback 1: Try to parse from defaultTrackOrderList
if "defaultTrackOrderList" in manifest and manifest["defaultTrackOrderList"]:
try:
media_id = manifest["defaultTrackOrderList"][0]["mediaId"]
lang_code = media_id.split(";")[2]
if lang_code:
return Language.get(lang_code)
except (IndexError, KeyError, AttributeError):
pass
# Fallback 2: Try to get the first available audio track language
if "audio_tracks" in manifest and manifest["audio_tracks"]:
for audio_track in manifest["audio_tracks"]:
if "language" in audio_track and audio_track["language"]:
return Language.get(audio_track["language"])
# Fallback 3: Default to English if all else fails
return Language.get("en")
except Exception as e:
# If anything goes wrong, default to English
return Language.get("en")
def get_widevine_service_certificate(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str:
return self.config["certificate"]
def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = None) -> Tracks:
# If hydrate_tracks is not specified, derive from self.hydrate_track
if hydrate_tracks is None:
hydrate_tracks = self.hydrate_track
tracks = Tracks()
try:
# Handle empty or invalid manifest
if not manifest or not isinstance(manifest, dict):
self.log.warning("Empty or invalid manifest received, returning empty tracks")
return tracks
# Check if manifest has required structure
if "video_tracks" not in manifest or not manifest["video_tracks"]:
self.log.warning("No video tracks in manifest, returning empty tracks")
return tracks
if "links" not in manifest or "license" not in manifest["links"]:
self.log.warning("No license URL in manifest, cannot process tracks")
return tracks
original_language = self.get_original_language(manifest)
self.log.debug(f"Original language: {original_language}")
license_url = manifest["links"]["license"]["href"]
# Process video tracks
if "streams" in manifest["video_tracks"][0] and manifest["video_tracks"][0]["streams"]:
# self.log.info(f"Video: {jsonpickle.encode(manifest["video_tracks"], indent=2)}")
# self.log.info()
for video_index, video in enumerate(reversed(manifest["video_tracks"][0]["streams"])):
try:
# self.log.info(video)
id = video["downloadable_id"]
# self.log.info(f"Adding video {video["res_w"]}x{video["res_h"]}, bitrate: {(float(video["framerate_value"]) / video["framerate_scale"]) if "framerate_value" in video else None} with profile {video["content_profile"]}. kid: {video["drmHeaderId"]}")
tracks.add(
Video(
id_=video["downloadable_id"],
url=video["urls"][0]["url"],
codec=Video.Codec.from_netflix_profile(video["content_profile"]),
bitrate=video["bitrate"] * 1000,
width=video["res_w"],
height=video["res_h"],
fps=(float(video["framerate_value"]) / video["framerate_scale"]) if "framerate_value" in video else None,
language=Language.get(original_language),
edition=video["content_profile"],
range_=self.parse_video_range_from_profile(video["content_profile"]),
is_original_lang=True,
drm=[Widevine(
pssh=PSSH(
manifest["video_tracks"][0]["drmHeader"]["bytes"]
),
kid=video["drmHeaderId"]
)] if manifest["video_tracks"][0].get("drmHeader", {}).get("bytes") else [],
data={
'license_url': license_url
}
)
)
except Exception as e:
video_id = video.get("downloadable_id", "unknown") if isinstance(video, dict) else "unknown"
self.log.warning(f"Failed to process video track at index {video_index}, video_id: {video_id}, error: {e}")
continue
# Process audio tracks
unavailable_audio_tracks: List[Tuple[str, str]] = []
if "audio_tracks" in manifest:
for audio_index, audio in enumerate(manifest["audio_tracks"]):
try:
audio_id = audio.get("id", "unknown")
audio_lang = audio.get("language", "unknown")
if len(audio.get("streams", [])) < 1:
# This
# self.log.debug(f"Audio lang {audio["languageDescription"]} is available but no stream available.")
if "new_track_id" in audio and "id" in audio:
unavailable_audio_tracks.append((audio["new_track_id"], audio["id"])) # Assign to `unavailable_subtitle` for request missing audio tracks later
self.log.debug(f"Audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang} has no streams available")
continue
# self.log.debug(f"Adding audio lang: {audio["language"]} with profile: {audio["content_profile"]}")
is_original_lang = audio.get("language") == original_language.language
# self.log.info(f"is audio {audio["languageDescription"]} original language: {is_original_lang}")
for stream_index, stream in enumerate(audio["streams"]):
try:
stream_id = stream.get("downloadable_id", "unknown")
tracks.add(
Audio(
id_=stream["downloadable_id"],
url=stream["urls"][0]["url"],
codec=Audio.Codec.from_netflix_profile(stream["content_profile"]),
language=Language.get(self.NF_LANG_MAP.get(audio["language"]) or audio["language"]),
is_original_lang=is_original_lang,
bitrate=stream["bitrate"] * 1000,
channels=stream["channels"],
descriptive=audio.get("rawTrackType", "").lower() == "assistive",
name="[Original]" if Language.get(audio["language"]).language == original_language.language else None,
joc=16 if "atmos" in stream["content_profile"] else None
)
)
except Exception as e:
stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown"
self.log.warning(f"Failed to process audio stream at audio_index {audio_index}, stream_index {stream_index}, audio_id: {audio_id}, stream_id: {stream_id}, language: {audio_lang}, error: {e}")
continue
except Exception as e:
audio_id = audio.get("id", "unknown") if isinstance(audio, dict) else "unknown"
audio_lang = audio.get("language", "unknown") if isinstance(audio, dict) else "unknown"
self.log.warning(f"Failed to process audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang}, error: {e}")
continue
# Process subtitle tracks
unavailable_subtitle: List[Tuple[str, str]] = []
if "timedtexttracks" in manifest:
for subtitle_index, subtitle in enumerate(manifest["timedtexttracks"]):
try:
subtitle_id = subtitle.get("id", "unknown")
subtitle_lang = subtitle.get("language", "unknown")
if "isNoneTrack" in subtitle and subtitle["isNoneTrack"] == True:
continue
if subtitle.get("hydrated") == False:
# This subtitles is there but has to request stream first
if "new_track_id" in subtitle and "id" in subtitle:
unavailable_subtitle.append((subtitle["new_track_id"], subtitle["id"])) # Assign to `unavailable_subtitle` for request missing subtitles later
# self.log.debug(f"Audio language: {subtitle["languageDescription"]} id: {subtitle["new_track_id"]} is not hydrated.")
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated")
continue
if subtitle.get("languageDescription") == 'Off' and self.descriptive_subtitles == False:
# Skip Descriptive subtitles
continue
# pass
if "downloadableIds" not in subtitle or not subtitle["downloadableIds"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no downloadableIds")
continue
id = list(subtitle["downloadableIds"].values())
if not id:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has empty downloadableIds")
continue
language = Language.get(subtitle["language"])
if "ttDownloadables" not in subtitle or not subtitle["ttDownloadables"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no ttDownloadables")
continue
profile = next(iter(subtitle["ttDownloadables"].keys()))
tt_downloadables = next(iter(subtitle["ttDownloadables"].values()))
is_original_lang = subtitle.get("language") == original_language.language
# self.log.info(f"is subtitle {subtitle["languageDescription"]} original language {is_original_lang}")
# self.log.info(f"ddd")
tracks.add(
Subtitle(
id_=id[0],
url=tt_downloadables["urls"][0]["url"],
codec=Subtitle.Codec.from_netflix_profile(profile),
language=language,
forced=subtitle.get("isForcedNarrative", False),
cc=subtitle.get("rawTrackType") == "closedcaptions",
sdh=subtitle.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitle else False,
is_original_lang=is_original_lang,
name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitle and subtitle["trackVariant"] == "DUBTITLE" else None),
)
)
except Exception as e:
subtitle_id = subtitle.get("id", "unknown") if isinstance(subtitle, dict) else "unknown"
subtitle_lang = subtitle.get("language", "unknown") if isinstance(subtitle, dict) else "unknown"
self.log.warning(f"Failed to process subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang}, error: {e}")
continue
if hydrate_tracks == False:
return tracks
# Hydrate missing tracks
if unavailable_audio_tracks or unavailable_subtitle:
# Show hydration information once
audio_count = len(unavailable_audio_tracks)
subtitle_count = len(unavailable_subtitle)
hydration_parts = []
if audio_count > 0:
hydration_parts.append(f"audio ({audio_count})")
if subtitle_count > 0:
hydration_parts.append(f"subtitle ({subtitle_count})")
hydration_info = " and ".join(hydration_parts)
self.log.info(f"Hydrating {hydration_info} tracks. Total: {audio_count + subtitle_count}")
# Handle mismatched lengths - use last successful subtitle track when needed
last_successful_subtitle = ("N/A", "N/A") if not unavailable_subtitle else unavailable_subtitle[-1]
# Process audio tracks first, then handle subtitles separately if needed
max_length = max(len(unavailable_audio_tracks), len(unavailable_subtitle))
for hydration_index in range(max_length):
# Get audio track info for this index
audio_hydration = unavailable_audio_tracks[hydration_index] if hydration_index < len(unavailable_audio_tracks) else ("N/A", "N/A")
# Get subtitle track info for this index, or use last successful one if available
if hydration_index < len(unavailable_subtitle):
subtitle_hydration = unavailable_subtitle[hydration_index]
is_real_subtitle_request = True # This is a real subtitle to be added to tracks
elif unavailable_subtitle: # Use last successful subtitle track for context only
subtitle_hydration = last_successful_subtitle
is_real_subtitle_request = False # This is just for context, don't add to tracks
else:
subtitle_hydration = ("N/A", "N/A")
is_real_subtitle_request = False
try:
# Log what we're trying to hydrate
self.log.debug(f"Hydrating tracks at index {hydration_index}, audio_track_id: {audio_hydration[1] if audio_hydration[1] != 'N/A' else 'N/A'}, subtitle_track_id: {subtitle_hydration[1] if subtitle_hydration[1] != 'N/A' else 'N/A'}, is_real_subtitle: {is_real_subtitle_request}")
# Only call get_manifest if we have audio to hydrate
should_hydrate_audio = audio_hydration[0] != 'N/A' and audio_hydration[1] != 'N/A'
if not should_hydrate_audio:
self.log.debug(f"Skipping hydration at index {hydration_index} - no audio tracks to hydrate")
continue
# Always use a valid subtitle track ID for the manifest request to avoid API errors
# Use the subtitle track (real or reused) if available, otherwise use N/A
subtitle_track_for_request = subtitle_hydration[0] if subtitle_hydration[0] != 'N/A' else None
# If we still don't have a subtitle track ID, skip this hydration to avoid API error
if subtitle_track_for_request is None:
self.log.warning(f"Skipping hydration at index {hydration_index} - no subtitle track available for API request context")
continue
hydrated_manifest = self.get_manifest(title, self.profiles, subtitle_track_for_request, audio_hydration[0])
# Handle hydrated audio tracks
if should_hydrate_audio and "audio_tracks" in hydrated_manifest:
try:
audios = next((item for item in hydrated_manifest["audio_tracks"] if 'id' in item and item["id"] == audio_hydration[1]), None)
if audios and "streams" in audios:
audio_lang = audios.get("language", "unknown")
self.log.debug(f"Processing hydrated audio track_id: {audio_hydration[1]}, language: {audio_lang}, streams_count: {len(audios['streams'])}")
for stream_index, stream in enumerate(audios["streams"]):
try:
stream_id = stream.get("downloadable_id", "unknown")
tracks.add(
Audio(
id_=stream["downloadable_id"],
url=stream["urls"][0]["url"],
codec=Audio.Codec.from_netflix_profile(stream["content_profile"]),
language=Language.get(self.NF_LANG_MAP.get(audios["language"]) or audios["language"]),
is_original_lang=audios["language"] == original_language.language,
bitrate=stream["bitrate"] * 1000,
channels=stream["channels"],
descriptive=audios.get("rawTrackType", "").lower() == "assistive",
name="[Original]" if Language.get(audios["language"]).language == original_language.language else None,
joc=16 if "atmos" in stream["content_profile"] else None
)
)
except Exception as e:
stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown"
self.log.warning(f"Failed to process hydrated audio stream at hydration_index {hydration_index}, stream_index {stream_index}, audio_track_id: {audio_hydration[1]}, stream_id: {stream_id}, error: {e}")
continue
else:
self.log.warning(f"No audio streams found for hydrated audio_track_id: {audio_hydration[1]} at hydration_index {hydration_index}")
except Exception as e:
self.log.warning(f"Failed to find hydrated audio track at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]}, error: {e}")
# Handle hydrated subtitle tracks (only if it's a real subtitle request, not reused)
if is_real_subtitle_request and subtitle_hydration[0] != 'N/A' and subtitle_hydration[1] != 'N/A' and "timedtexttracks" in hydrated_manifest:
try:
subtitles = next((item for item in hydrated_manifest["timedtexttracks"] if 'id' in item and item["id"] == subtitle_hydration[1]), None)
if subtitles and "downloadableIds" in subtitles and "ttDownloadables" in subtitles:
subtitle_lang = subtitles.get("language", "unknown")
self.log.debug(f"Processing hydrated subtitle track_id: {subtitle_hydration[1]}, language: {subtitle_lang}")
id = list(subtitles["downloadableIds"].values())
if id:
language = Language.get(subtitles["language"])
profile = next(iter(subtitles["ttDownloadables"].keys()))
tt_downloadables = next(iter(subtitles["ttDownloadables"].values()))
tracks.add(
Subtitle(
id_=id[0],
url=tt_downloadables["urls"][0]["url"],
codec=Subtitle.Codec.from_netflix_profile(profile),
language=language,
forced=subtitles.get("isForcedNarrative", False),
cc=subtitles.get("rawTrackType") == "closedcaptions",
sdh=subtitles.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitles else False,
is_original_lang=subtitles.get("language") == original_language.language,
name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitles and subtitles["trackVariant"] == "DUBTITLE" else None),
)
)
else:
self.log.warning(f"No downloadable IDs found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}")
else:
self.log.warning(f"No subtitle data found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}")
except Exception as e:
self.log.warning(f"Failed to process hydrated subtitle track at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]}, error: {e}")
elif not is_real_subtitle_request and subtitle_hydration[1] != 'N/A':
self.log.debug(f"Used subtitle track context for API request at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]} (not adding to tracks)")
except Exception as e:
self.log.warning(f"Failed to hydrate tracks at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1] if audio_hydration[1] != 'N/A' else 'N/A'}, subtitle_track_id: {subtitle_hydration[1] if subtitle_hydration[1] != 'N/A' else 'N/A'}, error: {e}")
continue
else:
self.log.info("No tracks need hydration")
except Exception as e:
self.log.error(f"Exception in manifest_as_tracks: {e}")
self.log.debug(f"Failed to process manifest for title: {title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'}")
# Return empty tracks on any critical error
return tracks
def parse_video_range_from_profile(self, profile: str) -> Video.Range:
"""
Parse the video range from a Netflix profile string.
Args:
profile (str): The Netflix profile string (e.g., "hevc-main10-L30-dash-cenc")
Returns:
Video.Range: The corresponding Video.Range enum value
Examples:
>>> parse_video_range_from_profile("hevc-main10-L30-dash-cenc")
<Video.Range.SDR: 'SDR'>
>>> parse_video_range_from_profile("hevc-dv5-main10-L30-dash-cenc")
<Video.Range.DV: 'DV'>
"""
# Get video profiles from config
video_profiles = self.config.get("profiles", {}).get("video", {})
# Search through all codecs and ranges to find the profile
for codec, ranges in video_profiles.items():
# if codec == 'H264':
# return Video.Range.SDR # for H264 video always return SDR
for range_name, profiles in ranges.items():
# self.log.info(f"Checking range {range_name}")
if profile in profiles:
# Return the corresponding Video.Range enum value
try:
# self.log.info(f"Found {range_name}")
return Video.Range(range_name)
except ValueError:
# If range_name is not a valid Video.Range, return SDR as default
self.log.debug(f"Video range is not valid {range_name}")
return Video.Range.SDR
# If profile not found, return SDR as default
return Video.Range.SDR