Use correct name of image attachment

This commit is contained in:
2025-10-25 16:09:22 +07:00
parent 45f18b046f
commit fe7a3f019f

View File

@@ -90,7 +90,7 @@ class Netflix(Service):
self.requested_profiles: List[str] = []
self.high_bitrate = high_bitrate
self.descriptive_subtitles = descriptive_subtitles
# MSL
self.esn = self.cache.get("ESN")
self.msl: Optional[MSL] = None
@@ -168,7 +168,7 @@ class Netflix(Service):
else:
episode.language = Language.get("en")
self.log.info(f"Using fallback language for episode: {episode.language}")
episode_list.append(
episode
)
@@ -181,12 +181,12 @@ class Netflix(Service):
return titles
def get_tracks(self, title: Title_T) -> Tracks:
tracks = Tracks()
# If Video Codec is H.264 is selected but `self.profile is none` profile QC has to be requested seperately
if self.vcodec == Video.Codec.AVC:
# self.log.info(f"Profile: {self.profile}")
@@ -214,7 +214,7 @@ class Netflix(Service):
try:
# Only hydrate tracks on the first range to avoid duplicates
should_hydrate = self.hydrate_track and range_index == 0
if video_range == Video.Range.HYBRID:
# Handle HYBRID mode by getting HDR10 and DV profiles separately
# Get HDR10 profiles for the current codec
@@ -226,7 +226,7 @@ class Netflix(Service):
tracks.add(hdr10_tracks)
else:
self.log.warning(f"No HDR10 profiles found for codec {self.vcodec.extension.upper()}")
# Get DV profiles for the current codec
dv_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("DV", [])
if dv_profiles:
@@ -236,14 +236,14 @@ class Netflix(Service):
tracks.add(dv_tracks.videos)
else:
self.log.warning(f"No DV profiles found for codec {self.vcodec.extension.upper()}")
elif self.high_bitrate:
# Get profiles for the current range
range_profiles = self.get_profiles_for_range(video_range)
if not range_profiles:
self.log.warning(f"No profiles found for range {video_range.name}")
continue
splitted_profiles = self.split_profiles(range_profiles)
for profile_index, profile_list in enumerate(splitted_profiles):
try:
@@ -263,7 +263,7 @@ class Netflix(Service):
if not range_profiles:
self.log.warning(f"No profiles found for range {video_range.name}")
continue
self.log.info(f"Processing range {range_index + 1}/{len(self.range)}: {video_range.name}")
manifest = self.get_manifest(title, range_profiles)
manifest_tracks = self.manifest_as_tracks(manifest, title, should_hydrate)
@@ -273,29 +273,33 @@ class Netflix(Service):
tracks.add(manifest_tracks) # Add all tracks on first range even without hydration
else:
tracks.add(manifest_tracks.videos) # Add only videos for additional ranges
except Exception as e:
self.log.error(f"Error processing range {video_range.name}: {e}")
continue
# Add Attachments for profile picture
if isinstance(title, Movie):
if title.data and "boxart" in title.data and title.data["boxart"]:
tracks.add(
Attachment.from_url(
url=title.data["boxart"][0]["url"]
url=title.data["boxart"][0]["url"],
name=f"{title.name} ({title.year}) Poster"
)
)
else:
if title.data and "stills" in title.data and title.data["stills"]:
tracks.add(
Attachment.from_url(title.data["stills"][0]["url"])
Attachment.from_url(
url=title.data["stills"][0]["url"],
name=f"{title.title} S{title.season:02d}E{title.number:02d}{' - ' + title.name if title.name else ''} Poster"
)
)
return tracks
def split_profiles(self, profiles: List[str]) -> List[List[str]]:
"""
Split profiles with names containing specific patterns based on video codec
@@ -308,7 +312,7 @@ class Netflix(Service):
patterns = ["l30", "l31", "l40"]
else:
patterns = ["L30", "L31", "L40", "L41", "L50", "L51"]
# Group profiles by pattern
result: List[List[str]] = []
for pattern in patterns:
@@ -318,16 +322,16 @@ class Netflix(Service):
pattern_group.append(profile)
if pattern_group: # Only add non-empty groups
result.append(pattern_group)
return result
def get_chapters(self, title: Title_T) -> Chapters:
chapters: Chapters = Chapters()
if not title.data:
return chapters
try:
# self.log.info(f"Title data: {title.data}")
if "skipMarkers" in title.data and "credit" in title.data["skipMarkers"]:
@@ -400,7 +404,7 @@ class Netflix(Service):
sys.exit(1)
return payload_data[0]["licenseResponseBase64"]
def get_playready_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None:
return self.get_widevine_license(challenge=challenge, title=title, track=track)
# return super().get_widevine_license(challenge=challenge, title=title, track=track)
@@ -429,7 +433,7 @@ class Netflix(Service):
if video_range.name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and video_range != Video.Range.HYBRID and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9:
self.log.error(f"Video range {video_range.name} is not supported by Video Codec: {self.vcodec}")
sys.exit(1)
if self.vcodec == Video.Codec.AVC:
for video_range in self.range:
if video_range != Video.Range.SDR:
@@ -442,7 +446,7 @@ class Netflix(Service):
if len(self.range) > 1:
range_names = [r.name for r in self.range]
self.log.info(f"Processing multiple video ranges: {', '.join(range_names)}")
self.log.info("Intializing a MSL client")
self.get_esn()
# if self.cdm.security_level == 1:
@@ -489,7 +493,7 @@ class Netflix(Service):
for requested_profiles in self.requested_profiles:
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()][requested_profiles])))
return result_profiles
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())))
return result_profiles
@@ -505,27 +509,27 @@ class Netflix(Service):
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()]["HDR10"])
self.log.debug(f"Result_profiles: {result_profiles}")
return result_profiles
def get_profiles_for_range(self, video_range: Video.Range) -> List[str]:
"""
Get profiles for a specific video range.
Args:
video_range: The video range to get profiles for
Returns:
List of profile strings for the specified range
"""
result_profiles = []
# Handle case for codec VP9
if self.vcodec == Video.Codec.VP9 and video_range != Video.Range.HDR10:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())
return result_profiles
# Get profiles for the specific range
codec_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
if video_range.name in codec_profiles:
result_profiles.extend(codec_profiles[video_range.name])
elif video_range == Video.Range.HYBRID:
@@ -536,10 +540,10 @@ class Netflix(Service):
self.log.warning(f"No HDR10 profiles found for HYBRID range in codec {self.vcodec.extension.upper()}")
else:
self.log.warning(f"Range {video_range.name} not found in codec {self.vcodec.extension.upper()} profiles")
self.log.debug(f"Profiles for range {video_range.name}: {result_profiles}")
return result_profiles
def get_esn(self):
if self.cdm.device_type == DeviceTypes.ANDROID:
try:
@@ -629,7 +633,7 @@ class Netflix(Service):
# Log context information for debugging
title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'
self.log.debug(f"Getting manifest for title_id: {title_id}, video_profiles_count: {len(video_profiles)}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}")
audio_profiles = self.config["profiles"]["audio"].values()
video_profiles = sorted(set(flatten(as_list(
video_profiles,
@@ -637,9 +641,9 @@ class Netflix(Service):
self.config["profiles"]["video"]["H264"]["BPL"] if self.vcodec == Video.Codec.AVC else [],
self.config["profiles"]["subtitles"],
))))
# self.log.debug("Profiles:\n\t" + "\n\t".join(video_profiles))
if not self.msl:
@@ -743,7 +747,7 @@ class Netflix(Service):
if 'video_profiles' in locals() and video_profiles:
self.log.error(f"Video profiles being processed: {', '.join(video_profiles[:5])}{'...' if len(video_profiles) > 5 else ''}")
return self._get_empty_manifest()
@staticmethod
def get_original_language(manifest) -> Language:
try:
@@ -752,7 +756,7 @@ class Netflix(Service):
for language in manifest["audio_tracks"]:
if "languageDescription" in language and language["languageDescription"].endswith(" [Original]"):
return Language.get(language["language"])
# Fallback 1: Try to parse from defaultTrackOrderList
if "defaultTrackOrderList" in manifest and manifest["defaultTrackOrderList"]:
try:
@@ -762,16 +766,16 @@ class Netflix(Service):
return Language.get(lang_code)
except (IndexError, KeyError, AttributeError):
pass
# Fallback 2: Try to get the first available audio track language
if "audio_tracks" in manifest and manifest["audio_tracks"]:
for audio_track in manifest["audio_tracks"]:
if "language" in audio_track and audio_track["language"]:
return Language.get(audio_track["language"])
# Fallback 3: Default to English if all else fails
return Language.get("en")
except Exception as e:
# If anything goes wrong, default to English
return Language.get("en")
@@ -780,32 +784,32 @@ class Netflix(Service):
return self.config["certificate"]
def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = None) -> Tracks:
# If hydrate_tracks is not specified, derive from self.hydrate_track
if hydrate_tracks is None:
hydrate_tracks = self.hydrate_track
tracks = Tracks()
try:
# Handle empty or invalid manifest
if not manifest or not isinstance(manifest, dict):
self.log.warning("Empty or invalid manifest received, returning empty tracks")
return tracks
# Check if manifest has required structure
if "video_tracks" not in manifest or not manifest["video_tracks"]:
self.log.warning("No video tracks in manifest, returning empty tracks")
return tracks
if "links" not in manifest or "license" not in manifest["links"]:
self.log.warning("No license URL in manifest, cannot process tracks")
return tracks
original_language = self.get_original_language(manifest)
self.log.debug(f"Original language: {original_language}")
license_url = manifest["links"]["license"]["href"]
# Process video tracks
if "streams" in manifest["video_tracks"][0] and manifest["video_tracks"][0]["streams"]:
# self.log.info(f"Video: {jsonpickle.encode(manifest["video_tracks"], indent=2)}")
@@ -842,7 +846,7 @@ class Netflix(Service):
video_id = video.get("downloadable_id", "unknown") if isinstance(video, dict) else "unknown"
self.log.warning(f"Failed to process video track at index {video_index}, video_id: {video_id}, error: {e}")
continue
# Process audio tracks
unavailable_audio_tracks: List[Tuple[str, str]] = []
primary_audio_tracks: List[Tuple[str, str]] = [] # Store primary audio tracks with streams
@@ -852,18 +856,18 @@ class Netflix(Service):
audio_id = audio.get("id", "unknown")
audio_lang = audio.get("language", "unknown")
if len(audio.get("streams", [])) < 1:
# This
# This
# self.log.debug(f"Audio lang {audio["languageDescription"]} is available but no stream available.")
if "new_track_id" in audio and "id" in audio:
unavailable_audio_tracks.append((audio["new_track_id"], audio["id"])) # Assign to `unavailable_subtitle` for request missing audio tracks later
if hydrate_tracks:
self.log.debug(f"Audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang} has no streams available")
continue
# Store primary audio track info (new_track_id, id) for potential use in hydration
if "new_track_id" in audio and "id" in audio:
primary_audio_tracks.append((audio["new_track_id"], audio["id"]))
# self.log.debug(f"Adding audio lang: {audio["language"]} with profile: {audio["content_profile"]}")
is_original_lang = audio.get("language") == original_language.language
# self.log.info(f"is audio {audio["languageDescription"]} original language: {is_original_lang}")
@@ -894,7 +898,7 @@ class Netflix(Service):
self.log.warning(f"Failed to process audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang}, error: {e}")
continue
# Process subtitle tracks
@@ -913,7 +917,7 @@ class Netflix(Service):
if hydrate_tracks:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated")
continue
if subtitle.get("languageDescription") == 'Off' and self.descriptive_subtitles == False:
# Skip Descriptive subtitles
continue
@@ -922,22 +926,22 @@ class Netflix(Service):
if "downloadableIds" not in subtitle or not subtitle["downloadableIds"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no downloadableIds")
continue
id = list(subtitle["downloadableIds"].values())
if not id:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has empty downloadableIds")
continue
language = Language.get(subtitle["language"])
if "ttDownloadables" not in subtitle or not subtitle["ttDownloadables"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no ttDownloadables")
continue
profile = next(iter(subtitle["ttDownloadables"].keys()))
tt_downloadables = next(iter(subtitle["ttDownloadables"].values()))
is_original_lang = subtitle.get("language") == original_language.language
# self.log.info(f"is subtitle {subtitle["languageDescription"]} original language {is_original_lang}")
# self.log.info(f"is subtitle {subtitle["languageDescription"]} original language {is_original_lang}")
# self.log.info(f"ddd")
tracks.add(
Subtitle(
@@ -957,10 +961,10 @@ class Netflix(Service):
subtitle_lang = subtitle.get("language", "unknown") if isinstance(subtitle, dict) else "unknown"
self.log.warning(f"Failed to process subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang}, error: {e}")
continue
if hydrate_tracks == False:
return tracks
# Hydrate missing tracks
if unavailable_audio_tracks or unavailable_subtitle:
hydrated_tracks = self.hydrate_all_tracks(
@@ -973,35 +977,35 @@ class Netflix(Service):
tracks.add(hydrated_tracks)
else:
self.log.info("No tracks need hydration")
except Exception as e:
self.log.error(f"Exception in manifest_as_tracks: {e}")
self.log.debug(f"Failed to process manifest for title: {title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'}")
# Return empty tracks on any critical error
return tracks
def parse_video_range_from_profile(self, profile: str) -> Video.Range:
"""
Parse the video range from a Netflix profile string.
Args:
profile (str): The Netflix profile string (e.g., "hevc-main10-L30-dash-cenc")
Returns:
Video.Range: The corresponding Video.Range enum value
Examples:
>>> parse_video_range_from_profile("hevc-main10-L30-dash-cenc")
<Video.Range.SDR: 'SDR'>
>>> parse_video_range_from_profile("hevc-dv5-main10-L30-dash-cenc")
<Video.Range.DV: 'DV'>
"""
# Get video profiles from config
video_profiles = self.config.get("profiles", {}).get("video", {})
# Search through all codecs and ranges to find the profile
for codec, ranges in video_profiles.items():
# if codec == 'H264':
@@ -1017,59 +1021,59 @@ class Netflix(Service):
# If range_name is not a valid Video.Range, return SDR as default
self.log.debug(f"Video range is not valid {range_name}")
return Video.Range.SDR
# If profile not found, return SDR as default
return Video.Range.SDR
def _is_valid_track_for_hydration(self, track_data: tuple) -> bool:
"""Check if track data is valid for hydration (not None values)."""
return track_data[0] is not None and track_data[1] is not None
def _get_empty_track_tuple(self) -> tuple:
"""Return an empty track tuple with None values."""
return (None, None)
def hydrate_all_tracks(self, title: Title_T, unavailable_audio_tracks: List[Tuple[str, str]],
unavailable_subtitle: List[Tuple[str, str]], primary_audio_tracks: List[Tuple[str, str]],
def hydrate_all_tracks(self, title: Title_T, unavailable_audio_tracks: List[Tuple[str, str]],
unavailable_subtitle: List[Tuple[str, str]], primary_audio_tracks: List[Tuple[str, str]],
original_language: Language) -> Tracks:
"""
Hydrate all missing audio and subtitle tracks.
Args:
title: The title object for which to hydrate tracks
unavailable_audio_tracks: List of audio track tuples (new_track_id, id) that need hydration
unavailable_subtitle: List of subtitle track tuples (new_track_id, id) that need hydration
primary_audio_tracks: List of primary audio track tuples for context in subtitle hydration
original_language: The original language of the content
Returns:
Tracks: A Tracks object containing all hydrated audio and subtitle tracks
"""
hydrated_tracks = Tracks()
# Show hydration information once
audio_count = len(unavailable_audio_tracks)
subtitle_count = len(unavailable_subtitle)
hydration_parts = []
if audio_count > 0:
hydration_parts.append(f"audio ({audio_count})")
if subtitle_count > 0:
hydration_parts.append(f"subtitle ({subtitle_count})")
hydration_info = " and ".join(hydration_parts)
self.log.info(f"Hydrating {hydration_info} tracks. Total: {audio_count + subtitle_count}")
# Handle mismatched lengths - use last successful tracks when needed
last_successful_subtitle = self._get_empty_track_tuple() if not unavailable_subtitle else unavailable_subtitle[-1]
last_successful_audio = self._get_empty_track_tuple() if not unavailable_audio_tracks else unavailable_audio_tracks[-1]
# For subtitle-only hydration, use primary audio track if available
primary_audio_for_subtitle_hydration = primary_audio_tracks[0] if primary_audio_tracks and not unavailable_audio_tracks and unavailable_subtitle else self._get_empty_track_tuple()
# Process audio tracks first, then handle subtitles separately if needed
max_length = max(len(unavailable_audio_tracks), len(unavailable_subtitle))
for hydration_index in range(max_length):
# Get audio track info for this index, or use last successful one if available
if hydration_index < len(unavailable_audio_tracks):
@@ -1085,7 +1089,7 @@ class Netflix(Service):
else:
audio_hydration = self._get_empty_track_tuple()
is_real_audio_request = False
# Get subtitle track info for this index, or use last successful one if available
if hydration_index < len(unavailable_subtitle):
subtitle_hydration = unavailable_subtitle[hydration_index]
@@ -1096,30 +1100,30 @@ class Netflix(Service):
else:
subtitle_hydration = self._get_empty_track_tuple()
is_real_subtitle_request = False
try:
# Prepare track IDs for API request - convert None to None for proper API handling
audio_track_id = audio_hydration[0] if audio_hydration[0] is not None else None
subtitle_track_id = subtitle_hydration[0] if subtitle_hydration[0] is not None else None
# Log what we're trying to hydrate
self._log_hydration_attempt(hydration_index, audio_hydration, subtitle_hydration,
self._log_hydration_attempt(hydration_index, audio_hydration, subtitle_hydration,
is_real_audio_request, is_real_subtitle_request)
# Only call get_manifest if we have valid tracks to hydrate
should_hydrate_audio = self._is_valid_track_for_hydration(audio_hydration)
if not should_hydrate_audio:
self.log.debug(f"Skipping hydration at index {hydration_index} - no audio tracks to hydrate")
continue
# If we still don't have a subtitle track ID, skip this hydration to avoid API error
if subtitle_track_id is None:
self.log.warning(f"Skipping hydration at index {hydration_index} - no subtitle track available for API request context")
continue
hydrated_manifest = self.get_manifest(title, self.profiles, subtitle_track_id, audio_track_id)
# Handle hydrated audio tracks (only if it's a real audio request, not reused)
if is_real_audio_request and should_hydrate_audio and "audio_tracks" in hydrated_manifest:
try:
@@ -1154,7 +1158,7 @@ class Netflix(Service):
self.log.warning(f"Failed to find hydrated audio track at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]}, error: {e}")
elif not is_real_audio_request and audio_hydration[1] is not None:
self.log.debug(f"Used audio track context for API request at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]} (not adding to tracks)")
# Handle hydrated subtitle tracks (only if it's a real subtitle request, not reused)
if is_real_subtitle_request and self._is_valid_track_for_hydration(subtitle_hydration) and "timedtexttracks" in hydrated_manifest:
try:
@@ -1162,7 +1166,7 @@ class Netflix(Service):
if subtitles and "downloadableIds" in subtitles and "ttDownloadables" in subtitles:
subtitle_lang = subtitles.get("language", "unknown")
self.log.debug(f"Processing hydrated subtitle track_id: {subtitle_hydration[1]}, language: {subtitle_lang}")
id = list(subtitles["downloadableIds"].values())
if id:
language = Language.get(subtitles["language"])
@@ -1189,13 +1193,13 @@ class Netflix(Service):
self.log.warning(f"Failed to process hydrated subtitle track at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]}, error: {e}")
elif not is_real_subtitle_request and subtitle_hydration[1] is not None:
self.log.debug(f"Used subtitle track context for API request at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]} (not adding to tracks)")
except Exception as e:
self.log.warning(f"Failed to hydrate tracks at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1] or 'None'}, subtitle_track_id: {subtitle_hydration[1] or 'None'}, error: {e}")
continue
return hydrated_tracks
def create_drm(self, pssh: str, kid: str) -> DRM_T:
if self.drm_system == "widevine":
return Widevine(PSSH(pssh), kid)
@@ -1221,7 +1225,7 @@ class Netflix(Service):
return "widevine"
raise ValueError("Unknown DRM system")
def _log_hydration_attempt(self, hydration_index: int, audio_data: tuple, subtitle_data: tuple,
def _log_hydration_attempt(self, hydration_index: int, audio_data: tuple, subtitle_data: tuple,
is_real_audio: bool, is_real_subtitle: bool) -> None:
"""Log hydration attempt details."""
audio_id = audio_data[1] if audio_data[1] is not None else 'None'