From fe7a3f019f70f50a4c282d0a459132ba654404ce Mon Sep 17 00:00:00 2001 From: Kenzuya Date: Sat, 25 Oct 2025 16:09:22 +0700 Subject: [PATCH] Use correct name of image attachment --- unshackle/services/Netflix/__init__.py | 204 +++++++++++++------------ 1 file changed, 104 insertions(+), 100 deletions(-) diff --git a/unshackle/services/Netflix/__init__.py b/unshackle/services/Netflix/__init__.py index 994b161..2a1d45a 100644 --- a/unshackle/services/Netflix/__init__.py +++ b/unshackle/services/Netflix/__init__.py @@ -90,7 +90,7 @@ class Netflix(Service): self.requested_profiles: List[str] = [] self.high_bitrate = high_bitrate self.descriptive_subtitles = descriptive_subtitles - + # MSL self.esn = self.cache.get("ESN") self.msl: Optional[MSL] = None @@ -168,7 +168,7 @@ class Netflix(Service): else: episode.language = Language.get("en") self.log.info(f"Using fallback language for episode: {episode.language}") - + episode_list.append( episode ) @@ -181,12 +181,12 @@ class Netflix(Service): return titles - + def get_tracks(self, title: Title_T) -> Tracks: - + tracks = Tracks() - + # If Video Codec is H.264 is selected but `self.profile is none` profile QC has to be requested seperately if self.vcodec == Video.Codec.AVC: # self.log.info(f"Profile: {self.profile}") @@ -214,7 +214,7 @@ class Netflix(Service): try: # Only hydrate tracks on the first range to avoid duplicates should_hydrate = self.hydrate_track and range_index == 0 - + if video_range == Video.Range.HYBRID: # Handle HYBRID mode by getting HDR10 and DV profiles separately # Get HDR10 profiles for the current codec @@ -226,7 +226,7 @@ class Netflix(Service): tracks.add(hdr10_tracks) else: self.log.warning(f"No HDR10 profiles found for codec {self.vcodec.extension.upper()}") - + # Get DV profiles for the current codec dv_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("DV", []) if dv_profiles: @@ -236,14 +236,14 @@ class Netflix(Service): tracks.add(dv_tracks.videos) else: self.log.warning(f"No DV profiles found for codec {self.vcodec.extension.upper()}") - + elif self.high_bitrate: # Get profiles for the current range range_profiles = self.get_profiles_for_range(video_range) if not range_profiles: self.log.warning(f"No profiles found for range {video_range.name}") continue - + splitted_profiles = self.split_profiles(range_profiles) for profile_index, profile_list in enumerate(splitted_profiles): try: @@ -263,7 +263,7 @@ class Netflix(Service): if not range_profiles: self.log.warning(f"No profiles found for range {video_range.name}") continue - + self.log.info(f"Processing range {range_index + 1}/{len(self.range)}: {video_range.name}") manifest = self.get_manifest(title, range_profiles) manifest_tracks = self.manifest_as_tracks(manifest, title, should_hydrate) @@ -273,29 +273,33 @@ class Netflix(Service): tracks.add(manifest_tracks) # Add all tracks on first range even without hydration else: tracks.add(manifest_tracks.videos) # Add only videos for additional ranges - + except Exception as e: self.log.error(f"Error processing range {video_range.name}: {e}") continue - + # Add Attachments for profile picture if isinstance(title, Movie): if title.data and "boxart" in title.data and title.data["boxart"]: tracks.add( Attachment.from_url( - url=title.data["boxart"][0]["url"] + url=title.data["boxart"][0]["url"], + name=f"{title.name} ({title.year}) Poster" ) ) else: if title.data and "stills" in title.data and title.data["stills"]: tracks.add( - Attachment.from_url(title.data["stills"][0]["url"]) + Attachment.from_url( + url=title.data["stills"][0]["url"], + name=f"{title.title} S{title.season:02d}E{title.number:02d}{' - ' + title.name if title.name else ''} Poster" + ) ) - + return tracks - + def split_profiles(self, profiles: List[str]) -> List[List[str]]: """ Split profiles with names containing specific patterns based on video codec @@ -308,7 +312,7 @@ class Netflix(Service): patterns = ["l30", "l31", "l40"] else: patterns = ["L30", "L31", "L40", "L41", "L50", "L51"] - + # Group profiles by pattern result: List[List[str]] = [] for pattern in patterns: @@ -318,16 +322,16 @@ class Netflix(Service): pattern_group.append(profile) if pattern_group: # Only add non-empty groups result.append(pattern_group) - + return result - - + + def get_chapters(self, title: Title_T) -> Chapters: chapters: Chapters = Chapters() - + if not title.data: return chapters - + try: # self.log.info(f"Title data: {title.data}") if "skipMarkers" in title.data and "credit" in title.data["skipMarkers"]: @@ -400,7 +404,7 @@ class Netflix(Service): sys.exit(1) return payload_data[0]["licenseResponseBase64"] - + def get_playready_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None: return self.get_widevine_license(challenge=challenge, title=title, track=track) # return super().get_widevine_license(challenge=challenge, title=title, track=track) @@ -429,7 +433,7 @@ class Netflix(Service): if video_range.name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and video_range != Video.Range.HYBRID and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9: self.log.error(f"Video range {video_range.name} is not supported by Video Codec: {self.vcodec}") sys.exit(1) - + if self.vcodec == Video.Codec.AVC: for video_range in self.range: if video_range != Video.Range.SDR: @@ -442,7 +446,7 @@ class Netflix(Service): if len(self.range) > 1: range_names = [r.name for r in self.range] self.log.info(f"Processing multiple video ranges: {', '.join(range_names)}") - + self.log.info("Intializing a MSL client") self.get_esn() # if self.cdm.security_level == 1: @@ -489,7 +493,7 @@ class Netflix(Service): for requested_profiles in self.requested_profiles: result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()][requested_profiles]))) return result_profiles - + result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()].values()))) return result_profiles @@ -505,27 +509,27 @@ class Netflix(Service): result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()]["HDR10"]) self.log.debug(f"Result_profiles: {result_profiles}") return result_profiles - + def get_profiles_for_range(self, video_range: Video.Range) -> List[str]: """ Get profiles for a specific video range. - + Args: video_range: The video range to get profiles for - + Returns: List of profile strings for the specified range """ result_profiles = [] - + # Handle case for codec VP9 if self.vcodec == Video.Codec.VP9 and video_range != Video.Range.HDR10: result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()].values()) return result_profiles - + # Get profiles for the specific range codec_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()] - + if video_range.name in codec_profiles: result_profiles.extend(codec_profiles[video_range.name]) elif video_range == Video.Range.HYBRID: @@ -536,10 +540,10 @@ class Netflix(Service): self.log.warning(f"No HDR10 profiles found for HYBRID range in codec {self.vcodec.extension.upper()}") else: self.log.warning(f"Range {video_range.name} not found in codec {self.vcodec.extension.upper()} profiles") - + self.log.debug(f"Profiles for range {video_range.name}: {result_profiles}") return result_profiles - + def get_esn(self): if self.cdm.device_type == DeviceTypes.ANDROID: try: @@ -629,7 +633,7 @@ class Netflix(Service): # Log context information for debugging title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown' self.log.debug(f"Getting manifest for title_id: {title_id}, video_profiles_count: {len(video_profiles)}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}") - + audio_profiles = self.config["profiles"]["audio"].values() video_profiles = sorted(set(flatten(as_list( video_profiles, @@ -637,9 +641,9 @@ class Netflix(Service): self.config["profiles"]["video"]["H264"]["BPL"] if self.vcodec == Video.Codec.AVC else [], self.config["profiles"]["subtitles"], )))) - - + + # self.log.debug("Profiles:\n\t" + "\n\t".join(video_profiles)) if not self.msl: @@ -743,7 +747,7 @@ class Netflix(Service): if 'video_profiles' in locals() and video_profiles: self.log.error(f"Video profiles being processed: {', '.join(video_profiles[:5])}{'...' if len(video_profiles) > 5 else ''}") return self._get_empty_manifest() - + @staticmethod def get_original_language(manifest) -> Language: try: @@ -752,7 +756,7 @@ class Netflix(Service): for language in manifest["audio_tracks"]: if "languageDescription" in language and language["languageDescription"].endswith(" [Original]"): return Language.get(language["language"]) - + # Fallback 1: Try to parse from defaultTrackOrderList if "defaultTrackOrderList" in manifest and manifest["defaultTrackOrderList"]: try: @@ -762,16 +766,16 @@ class Netflix(Service): return Language.get(lang_code) except (IndexError, KeyError, AttributeError): pass - + # Fallback 2: Try to get the first available audio track language if "audio_tracks" in manifest and manifest["audio_tracks"]: for audio_track in manifest["audio_tracks"]: if "language" in audio_track and audio_track["language"]: return Language.get(audio_track["language"]) - + # Fallback 3: Default to English if all else fails return Language.get("en") - + except Exception as e: # If anything goes wrong, default to English return Language.get("en") @@ -780,32 +784,32 @@ class Netflix(Service): return self.config["certificate"] def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = None) -> Tracks: - + # If hydrate_tracks is not specified, derive from self.hydrate_track if hydrate_tracks is None: hydrate_tracks = self.hydrate_track - + tracks = Tracks() - + try: # Handle empty or invalid manifest if not manifest or not isinstance(manifest, dict): self.log.warning("Empty or invalid manifest received, returning empty tracks") return tracks - + # Check if manifest has required structure if "video_tracks" not in manifest or not manifest["video_tracks"]: self.log.warning("No video tracks in manifest, returning empty tracks") return tracks - + if "links" not in manifest or "license" not in manifest["links"]: self.log.warning("No license URL in manifest, cannot process tracks") return tracks - + original_language = self.get_original_language(manifest) self.log.debug(f"Original language: {original_language}") license_url = manifest["links"]["license"]["href"] - + # Process video tracks if "streams" in manifest["video_tracks"][0] and manifest["video_tracks"][0]["streams"]: # self.log.info(f"Video: {jsonpickle.encode(manifest["video_tracks"], indent=2)}") @@ -842,7 +846,7 @@ class Netflix(Service): video_id = video.get("downloadable_id", "unknown") if isinstance(video, dict) else "unknown" self.log.warning(f"Failed to process video track at index {video_index}, video_id: {video_id}, error: {e}") continue - + # Process audio tracks unavailable_audio_tracks: List[Tuple[str, str]] = [] primary_audio_tracks: List[Tuple[str, str]] = [] # Store primary audio tracks with streams @@ -852,18 +856,18 @@ class Netflix(Service): audio_id = audio.get("id", "unknown") audio_lang = audio.get("language", "unknown") if len(audio.get("streams", [])) < 1: - # This + # This # self.log.debug(f"Audio lang {audio["languageDescription"]} is available but no stream available.") if "new_track_id" in audio and "id" in audio: unavailable_audio_tracks.append((audio["new_track_id"], audio["id"])) # Assign to `unavailable_subtitle` for request missing audio tracks later if hydrate_tracks: self.log.debug(f"Audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang} has no streams available") continue - + # Store primary audio track info (new_track_id, id) for potential use in hydration if "new_track_id" in audio and "id" in audio: primary_audio_tracks.append((audio["new_track_id"], audio["id"])) - + # self.log.debug(f"Adding audio lang: {audio["language"]} with profile: {audio["content_profile"]}") is_original_lang = audio.get("language") == original_language.language # self.log.info(f"is audio {audio["languageDescription"]} original language: {is_original_lang}") @@ -894,7 +898,7 @@ class Netflix(Service): self.log.warning(f"Failed to process audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang}, error: {e}") continue - + # Process subtitle tracks @@ -913,7 +917,7 @@ class Netflix(Service): if hydrate_tracks: self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated") continue - + if subtitle.get("languageDescription") == 'Off' and self.descriptive_subtitles == False: # Skip Descriptive subtitles continue @@ -922,22 +926,22 @@ class Netflix(Service): if "downloadableIds" not in subtitle or not subtitle["downloadableIds"]: self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no downloadableIds") continue - + id = list(subtitle["downloadableIds"].values()) if not id: self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has empty downloadableIds") continue - + language = Language.get(subtitle["language"]) - + if "ttDownloadables" not in subtitle or not subtitle["ttDownloadables"]: self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no ttDownloadables") continue - + profile = next(iter(subtitle["ttDownloadables"].keys())) tt_downloadables = next(iter(subtitle["ttDownloadables"].values())) is_original_lang = subtitle.get("language") == original_language.language - # self.log.info(f"is subtitle {subtitle["languageDescription"]} original language {is_original_lang}") + # self.log.info(f"is subtitle {subtitle["languageDescription"]} original language {is_original_lang}") # self.log.info(f"ddd") tracks.add( Subtitle( @@ -957,10 +961,10 @@ class Netflix(Service): subtitle_lang = subtitle.get("language", "unknown") if isinstance(subtitle, dict) else "unknown" self.log.warning(f"Failed to process subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang}, error: {e}") continue - + if hydrate_tracks == False: return tracks - + # Hydrate missing tracks if unavailable_audio_tracks or unavailable_subtitle: hydrated_tracks = self.hydrate_all_tracks( @@ -973,35 +977,35 @@ class Netflix(Service): tracks.add(hydrated_tracks) else: self.log.info("No tracks need hydration") - + except Exception as e: self.log.error(f"Exception in manifest_as_tracks: {e}") self.log.debug(f"Failed to process manifest for title: {title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'}") # Return empty tracks on any critical error - + return tracks - + def parse_video_range_from_profile(self, profile: str) -> Video.Range: """ Parse the video range from a Netflix profile string. - + Args: profile (str): The Netflix profile string (e.g., "hevc-main10-L30-dash-cenc") - + Returns: Video.Range: The corresponding Video.Range enum value - + Examples: >>> parse_video_range_from_profile("hevc-main10-L30-dash-cenc") >>> parse_video_range_from_profile("hevc-dv5-main10-L30-dash-cenc") """ - + # Get video profiles from config video_profiles = self.config.get("profiles", {}).get("video", {}) - + # Search through all codecs and ranges to find the profile for codec, ranges in video_profiles.items(): # if codec == 'H264': @@ -1017,59 +1021,59 @@ class Netflix(Service): # If range_name is not a valid Video.Range, return SDR as default self.log.debug(f"Video range is not valid {range_name}") return Video.Range.SDR - + # If profile not found, return SDR as default return Video.Range.SDR - + def _is_valid_track_for_hydration(self, track_data: tuple) -> bool: """Check if track data is valid for hydration (not None values).""" return track_data[0] is not None and track_data[1] is not None - + def _get_empty_track_tuple(self) -> tuple: """Return an empty track tuple with None values.""" return (None, None) - - def hydrate_all_tracks(self, title: Title_T, unavailable_audio_tracks: List[Tuple[str, str]], - unavailable_subtitle: List[Tuple[str, str]], primary_audio_tracks: List[Tuple[str, str]], + + def hydrate_all_tracks(self, title: Title_T, unavailable_audio_tracks: List[Tuple[str, str]], + unavailable_subtitle: List[Tuple[str, str]], primary_audio_tracks: List[Tuple[str, str]], original_language: Language) -> Tracks: """ Hydrate all missing audio and subtitle tracks. - + Args: title: The title object for which to hydrate tracks unavailable_audio_tracks: List of audio track tuples (new_track_id, id) that need hydration unavailable_subtitle: List of subtitle track tuples (new_track_id, id) that need hydration primary_audio_tracks: List of primary audio track tuples for context in subtitle hydration original_language: The original language of the content - + Returns: Tracks: A Tracks object containing all hydrated audio and subtitle tracks """ hydrated_tracks = Tracks() - + # Show hydration information once audio_count = len(unavailable_audio_tracks) subtitle_count = len(unavailable_subtitle) - + hydration_parts = [] if audio_count > 0: hydration_parts.append(f"audio ({audio_count})") if subtitle_count > 0: hydration_parts.append(f"subtitle ({subtitle_count})") - + hydration_info = " and ".join(hydration_parts) self.log.info(f"Hydrating {hydration_info} tracks. Total: {audio_count + subtitle_count}") - + # Handle mismatched lengths - use last successful tracks when needed last_successful_subtitle = self._get_empty_track_tuple() if not unavailable_subtitle else unavailable_subtitle[-1] last_successful_audio = self._get_empty_track_tuple() if not unavailable_audio_tracks else unavailable_audio_tracks[-1] - + # For subtitle-only hydration, use primary audio track if available primary_audio_for_subtitle_hydration = primary_audio_tracks[0] if primary_audio_tracks and not unavailable_audio_tracks and unavailable_subtitle else self._get_empty_track_tuple() - + # Process audio tracks first, then handle subtitles separately if needed max_length = max(len(unavailable_audio_tracks), len(unavailable_subtitle)) - + for hydration_index in range(max_length): # Get audio track info for this index, or use last successful one if available if hydration_index < len(unavailable_audio_tracks): @@ -1085,7 +1089,7 @@ class Netflix(Service): else: audio_hydration = self._get_empty_track_tuple() is_real_audio_request = False - + # Get subtitle track info for this index, or use last successful one if available if hydration_index < len(unavailable_subtitle): subtitle_hydration = unavailable_subtitle[hydration_index] @@ -1096,30 +1100,30 @@ class Netflix(Service): else: subtitle_hydration = self._get_empty_track_tuple() is_real_subtitle_request = False - + try: # Prepare track IDs for API request - convert None to None for proper API handling audio_track_id = audio_hydration[0] if audio_hydration[0] is not None else None subtitle_track_id = subtitle_hydration[0] if subtitle_hydration[0] is not None else None - + # Log what we're trying to hydrate - self._log_hydration_attempt(hydration_index, audio_hydration, subtitle_hydration, + self._log_hydration_attempt(hydration_index, audio_hydration, subtitle_hydration, is_real_audio_request, is_real_subtitle_request) - + # Only call get_manifest if we have valid tracks to hydrate should_hydrate_audio = self._is_valid_track_for_hydration(audio_hydration) - + if not should_hydrate_audio: self.log.debug(f"Skipping hydration at index {hydration_index} - no audio tracks to hydrate") continue - + # If we still don't have a subtitle track ID, skip this hydration to avoid API error if subtitle_track_id is None: self.log.warning(f"Skipping hydration at index {hydration_index} - no subtitle track available for API request context") continue - + hydrated_manifest = self.get_manifest(title, self.profiles, subtitle_track_id, audio_track_id) - + # Handle hydrated audio tracks (only if it's a real audio request, not reused) if is_real_audio_request and should_hydrate_audio and "audio_tracks" in hydrated_manifest: try: @@ -1154,7 +1158,7 @@ class Netflix(Service): self.log.warning(f"Failed to find hydrated audio track at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]}, error: {e}") elif not is_real_audio_request and audio_hydration[1] is not None: self.log.debug(f"Used audio track context for API request at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]} (not adding to tracks)") - + # Handle hydrated subtitle tracks (only if it's a real subtitle request, not reused) if is_real_subtitle_request and self._is_valid_track_for_hydration(subtitle_hydration) and "timedtexttracks" in hydrated_manifest: try: @@ -1162,7 +1166,7 @@ class Netflix(Service): if subtitles and "downloadableIds" in subtitles and "ttDownloadables" in subtitles: subtitle_lang = subtitles.get("language", "unknown") self.log.debug(f"Processing hydrated subtitle track_id: {subtitle_hydration[1]}, language: {subtitle_lang}") - + id = list(subtitles["downloadableIds"].values()) if id: language = Language.get(subtitles["language"]) @@ -1189,13 +1193,13 @@ class Netflix(Service): self.log.warning(f"Failed to process hydrated subtitle track at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]}, error: {e}") elif not is_real_subtitle_request and subtitle_hydration[1] is not None: self.log.debug(f"Used subtitle track context for API request at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]} (not adding to tracks)") - + except Exception as e: self.log.warning(f"Failed to hydrate tracks at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1] or 'None'}, subtitle_track_id: {subtitle_hydration[1] or 'None'}, error: {e}") continue - + return hydrated_tracks - + def create_drm(self, pssh: str, kid: str) -> DRM_T: if self.drm_system == "widevine": return Widevine(PSSH(pssh), kid) @@ -1221,7 +1225,7 @@ class Netflix(Service): return "widevine" raise ValueError("Unknown DRM system") - def _log_hydration_attempt(self, hydration_index: int, audio_data: tuple, subtitle_data: tuple, + def _log_hydration_attempt(self, hydration_index: int, audio_data: tuple, subtitle_data: tuple, is_real_audio: bool, is_real_subtitle: bool) -> None: """Log hydration attempt details.""" audio_id = audio_data[1] if audio_data[1] is not None else 'None'