From b61135175d1393fb42093a8e14132f098cb22860 Mon Sep 17 00:00:00 2001 From: kenzuya Date: Fri, 13 Mar 2026 06:29:50 +0700 Subject: [PATCH] Update Netflix service --- ...sung_sm-g975f_16.0.0_1e7c5ba2_22589_l3.wvd | Bin 0 -> 3016 bytes unshackle/services/Netflix/__init__.py | 667 +++++++++++------- unshackle/services/Netflix/config.yaml | 2 + unshackle/unshackle.yaml | 2 +- 4 files changed, 431 insertions(+), 240 deletions(-) create mode 100644 unshackle/WVDs/samsung_sm-g975f_16.0.0_1e7c5ba2_22589_l3.wvd diff --git a/unshackle/WVDs/samsung_sm-g975f_16.0.0_1e7c5ba2_22589_l3.wvd b/unshackle/WVDs/samsung_sm-g975f_16.0.0_1e7c5ba2_22589_l3.wvd new file mode 100644 index 0000000000000000000000000000000000000000..0be6b77e31635cc6f5270ed2300faf9ae838c26a GIT binary patch literal 3016 zcmeH|`9IYA7sqEtmWCKZn8{YwFlMon5@X9UzUF378Dr)nlUbOBrtC_}Qno0m%PkbL zhpwejA&FAy)g8Owuy1Hjm55GEoGkwA z+kP+QV4r!=qH%5>tu0pYzM3;3tmjs}mQG(1?sr)|zPa#ax`A4p1&lGKd44m^ebBDh zUX+9QZ5(}#CnT6Q?sT@Z?*(<(+-gY}5RQ5=3VevekvW?1U1Rk{PbUd!K?(+mllVuc zT|B5CvQahamVz9J#;Z#Lsr$zif94yBK4_L4Ti7zuB&nW8L_AybzQPBuPX_C-vUU6< z!`F^|1qJ6WwZnE?wSKQ-l^o{%Li)fgMB`@M_~-pfL*m+so%yu`C*HD^&gmM{6ONc~ zJq!tlNPt1$4JSY@3tzi_UbUPIB>ds~HMpnMkAzIt%S_Rz-2BXe0j8XKtX|U#;P)<+nO16+n%DI$E{n!; zdCeXtDt3J|ktcnOR{zx0eA&!q9n+!sK2B6Iax8#1im|Svdyoo-TqE|It_! znBic%`l{C*xBB)#nXzi-jB~nNfLGxh3#%H1e1_N53|h;NOaP#n9oOO+L@Q0jc( zl96)-zhMC^T_5%8sIf#%V+cK{Nj^EfxT(jn=dS(5j;aivpZzz6ky4dd;U4i!b5F~- z_u^M-jHgGn6ltswbh<}zw!p#MX+e7cl&0nV)U8G)Wpv;dB``pk6nr6x`99B%c zW+6Twm48dC+=1B*NlYN7`EIq74nOjIXd(YgmoQ)9s)Y9v^tgOo`;yv}$yePna!EnU z9DT5^eR^-GsZqDK!B$7P1}6j%RxupOk_|DwQU)pBwm#!I?B(~Sa9_(3;z(CG1qYo` zZYNg4W>dt!XS8g(;IazxI6_pyZ6jucKDasJWst@w?elvQQ5dj+aFASne+jD>NQK4J z7#hjux0LrY*Dbo!YhT=UlB^P$XsOki*Xz1TfL2i^u&hXanLc}KB)30V7LMrj>0(vh zmy)XmYn(>5WIpRvsrGyk!?gkRC-`wB`=fY6s=KKHLys06=Mu$lZsm+lW@Hp?u1p)U zw;;LC{5H+kR(mu3trO>ZIVqX#!^h*j@h2;^x{4gAhf`0ZnQ!L{UF`S13+z;Fxmq=} zLM^c3E4#|sDkCo+MQ;}t&B|39?(JxfAG?=#@<_`fpV2dBFn20ojkc)r;zs{XiyanI zWQVhpvmXiVPT@2!#qEG7z}7 z!RAs_(o86S{G7r-HG>g!o%4R*V-`U@p>hsyt6P3otuue9PHIOc1d{}Y{{L(H=W8=; zs{y-#;Rz7DvEuQ_l!nVH-V@BAu4h}0>g^vI`H-mWt+3hPRH=sQ+UwAF(S~v6kHof_ z$*F*DgI z=NlIh^C$9!Hj!$o9G^S!ugU45Ro#jcJNR`(Uos2p zPuq%^#Pq@fOIinli&&}FHr8}b{F5iH+-wEsd?ZioGJ@an*{S>PPFQ$O(y2{dtN=G*ju}Gc#NG*u%nkPdgWcqex6fo zV`N#V#_R7(@t?_&piX9k6{M<*6&q6Y zYCTkG_h)vPp^(lyYk)JpZ_B&IGxR-5EXYGYLy>JslE!VhMp3E@25ugk4gaq6|a zbKZBnbaYz>7SUmQZF4Tph9YcWe@c?aq>tnfxyj#k`*oAOU-XR}1EEV=v@ z%#_{43(^4v@z_4gma6(gSLKg~jAopDmwZQ8WjBZ9r#v?-Kb<~(FD5IF@bFDkxr3eH z=`)iH>+N!Ce%sE8oYbAD@RiABpWrj6iGTi7(@cDnm)ExmM>==8ZGaR`dTFpw@bVCB zMc+?_a1v+$u6#bfh$d{QQS^K^MIKLuU9hN$s^YiTol;UYtT>T}F4OUlKwQuzcJ)K} z%LT7+k#|EX4MKFPZ`6D|DoX2p8G1XWs6ZwZvD~o6nXA06?0RU}BoyXv@I@;|f^^R# z;`rq=ZtGZacoV4JrG8%7T-yFojv(ZeFd4RB^zlF+9y@wql_sAPJ1R8jN+VVhbYByk zqwmKfc!ZfH7*Zb!qeMh7=u`@i&SsHW6ea+NQ`t;Y3X8^J(`lyEFb+JDTKOZWG*G>ulv3T4B6{-!|xE9sw;Ef$9+`e65(;;\d+)" - ALIASES= ("NF", "Netflix", "netflix", "nf") + ALIASES = ("NF", "Netflix", "netflix", "nf") NF_LANG_MAP = { "es": "es-419", "pt": "pt-PT", @@ -67,21 +68,39 @@ class Netflix(Service): @staticmethod @click.command(name="Netflix", short_help="https://netflix.com") @click.argument("title", type=str) - @click.option("-drm", "--drm-system", type=click.Choice(["widevine", "playready"], case_sensitive=False), - default="widevine", - help="which drm system to use") - @click.option("-p", "--profile", type=click.Choice(["MPL", "HPL", "QC", "MPL+HPL", "MPL+HPL+QC", "MPL+QC"], case_sensitive=False), - default=None, - help="H.264 profile to use. Default is best available.") + @click.option( + "-drm", + "--drm-system", + type=click.Choice(["widevine", "playready"], case_sensitive=False), + default="widevine", + help="which drm system to use", + ) + @click.option( + "-p", + "--profile", + type=click.Choice(["MPL", "HPL", "QC", "MPL+HPL", "MPL+HPL+QC", "MPL+QC"], case_sensitive=False), + default=None, + help="H.264 profile to use. Default is best available.", + ) @click.option("--meta-lang", type=str, help="Language to use for metadata") - @click.option("-ht","--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.") + @click.option("-ht", "--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.") @click.option("-hb", "--high-bitrate", is_flag=True, default=False, help="Get more video bitrate") @click.option("-ds", "--descriptive-subtitles", is_flag=True, default=False, help="Get descriptive subtitles") @click.pass_context def cli(ctx, **kwargs): return Netflix(ctx, **kwargs) - def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool, descriptive_subtitles: bool): + def __init__( + self, + ctx: click.Context, + title: str, + drm_system: Literal["widevine", "playready"], + profile: str, + meta_lang: str, + hydrate_track: bool, + high_bitrate: bool, + descriptive_subtitles: bool, + ): super().__init__(ctx) # General self.title = title @@ -101,8 +120,8 @@ class Netflix(Service): # Download options self.range = ctx.parent.params.get("range_") or [Video.Range.SDR] - self.vcodec = ctx.parent.params.get("vcodec") or Video.Codec.AVC # Defaults to H264 - self.acodec : Audio.Codec = ctx.parent.params.get("acodec") or Audio.Codec.EC3 + self.vcodec = ctx.parent.params.get("vcodec") or Video.Codec.AVC # Defaults to H264 + self.acodec: Audio.Codec = ctx.parent.params.get("acodec") or Audio.Codec.EC3 self.quality: List[int] = ctx.parent.params.get("quality") self.audio_only = ctx.parent.params.get("audio_only") self.subs_only = ctx.parent.params.get("subs_only") @@ -112,7 +131,6 @@ class Netflix(Service): self.cdm: Cdm = ctx.obj.cdm # self.ctx = ctx - def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: # Configure first before download self.log.debug("Authenticating Netflix service") @@ -136,12 +154,10 @@ class Netflix(Service): # language=self.get_original_language(self.get_manifest()), service=self.__class__, data=metadata["video"], - description=metadata["video"]["synopsis"] + description=metadata["video"]["synopsis"], ) movie.language = self.get_original_language(self.get_manifest(movie, self.profiles)) - titles = Movies([ - movie - ]) + titles = Movies([movie]) else: # self.log.warning(f"Metadata: {jsonpickle.encode(metadata, indent=2)}") # print(metadata) @@ -163,31 +179,24 @@ class Netflix(Service): episode.language = self.get_original_language(self.get_manifest(episode, self.profiles)) self.log.debug(f"Episode S{episode.season:02d}E{episode.number:02d}: {episode.language}") except Exception as e: - self.log.warning(f"Failed to get original language for episode S{season['seq']:02d}E{episodes['seq']:02d}: {e}") + self.log.warning( + f"Failed to get original language for episode S{season['seq']:02d}E{episodes['seq']:02d}: {e}" + ) # Fallback: try to get the original language from the first episode that worked # or default to English if none worked - if episode_list and hasattr(episode_list[0], 'language') and episode_list[0].language: + if episode_list and hasattr(episode_list[0], "language") and episode_list[0].language: episode.language = episode_list[0].language else: episode.language = Language.get("en") self.log.info(f"Using fallback language for episode: {episode.language}") - episode_list.append( - episode - ) - + episode_list.append(episode) titles = Series(episode_list) - - return titles - - - def get_tracks(self, title: Title_T) -> Tracks: - tracks = Tracks() # If Video Codec is H.264 is selected but `self.profile is none` profile QC has to be requested seperately @@ -212,7 +221,14 @@ class Netflix(Service): qc_tracks = self.manifest_as_tracks(qc_manifest, title, False) tracks.add(qc_tracks.videos) - mpl_manifest = self.get_manifest(title, [x for x in self.config["profiles"]["video"][self.vcodec.extension.upper()]["MPL"] if "l40" not in x]) + mpl_manifest = self.get_manifest( + title, + [ + x + for x in self.config["profiles"]["video"][self.vcodec.extension.upper()]["MPL"] + if "l40" not in x + ], + ) mpl_tracks = self.manifest_as_tracks(mpl_manifest, title, False) tracks.add(mpl_tracks.videos) except Exception as e: @@ -227,9 +243,13 @@ class Netflix(Service): if video_range == Video.Range.HYBRID: # Handle HYBRID mode by getting HDR10 and DV profiles separately # Get HDR10 profiles for the current codec - hdr10_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("HDR10", []) + hdr10_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get( + "HDR10", [] + ) if hdr10_profiles: - self.log.info(f"Fetching HDR10 tracks for HYBRID processing (range {range_index + 1}/{len(self.range)})") + self.log.info( + f"Fetching HDR10 tracks for HYBRID processing (range {range_index + 1}/{len(self.range)})" + ) hdr10_manifest = self.get_manifest(title, hdr10_profiles) hdr10_tracks = self.manifest_as_tracks(hdr10_manifest, title, should_hydrate) tracks.add(hdr10_tracks) @@ -239,7 +259,9 @@ class Netflix(Service): # Get DV profiles for the current codec dv_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()].get("DV", []) if dv_profiles: - self.log.info(f"Fetching DV tracks for HYBRID processing (range {range_index + 1}/{len(self.range)})") + self.log.info( + f"Fetching DV tracks for HYBRID processing (range {range_index + 1}/{len(self.range)})" + ) dv_manifest = self.get_manifest(title, dv_profiles) dv_tracks = self.manifest_as_tracks(dv_manifest, title, False) # Don't hydrate DV tracks tracks.add(dv_tracks.videos) @@ -256,15 +278,23 @@ class Netflix(Service): splitted_profiles = self.split_profiles(range_profiles) for profile_index, profile_list in enumerate(splitted_profiles): try: - self.log.debug(f"Range {range_index + 1}/{len(self.range)} ({video_range.name}), Profile Index: {profile_index}. Getting profiles: {profile_list}") + self.log.debug( + f"Range {range_index + 1}/{len(self.range)} ({video_range.name}), Profile Index: {profile_index}. Getting profiles: {profile_list}" + ) manifest = self.get_manifest(title, profile_list) - manifest_tracks = self.manifest_as_tracks(manifest, title, should_hydrate and profile_index == 0) + manifest_tracks = self.manifest_as_tracks( + manifest, title, should_hydrate and profile_index == 0 + ) if should_hydrate and profile_index == 0: - tracks.add(manifest_tracks) # Add all tracks (video, audio, subtitles) on first hydrated profile + tracks.add( + manifest_tracks + ) # Add all tracks (video, audio, subtitles) on first hydrated profile else: tracks.add(manifest_tracks.videos) # Add only videos for additional profiles except Exception: - self.log.error(f"Error getting profile: {profile_list} for range {video_range.name}. Skipping") + self.log.error( + f"Error getting profile: {profile_list} for range {video_range.name}. Skipping" + ) continue else: # Get profiles for the current range @@ -287,23 +317,18 @@ class Netflix(Service): self.log.error(f"Error processing range {video_range.name}: {e}") continue - - # Add Attachments for profile picture if isinstance(title, Movie): if title.data and "boxart" in title.data and title.data["boxart"]: tracks.add( - Attachment.from_url( - url=title.data["boxart"][0]["url"], - name=f"{title.name} ({title.year}) Poster" - ) + Attachment.from_url(url=title.data["boxart"][0]["url"], name=f"{title.name} ({title.year}) Poster") ) else: if title.data and "stills" in title.data and title.data["stills"]: tracks.add( Attachment.from_url( url=title.data["stills"][0]["url"], - name=f"{title.title} S{title.season:02d}E{title.number:02d}{' - ' + title.name if title.name else ''} Poster" + name=f"{title.title} S{title.season:02d}E{title.number:02d}{' - ' + title.name if title.name else ''} Poster", ) ) @@ -334,7 +359,6 @@ class Netflix(Service): return result - def get_chapters(self, title: Title_T) -> Chapters: chapters: Chapters = Chapters() @@ -346,46 +370,54 @@ class Netflix(Service): if "skipMarkers" in title.data and "credit" in title.data["skipMarkers"]: credits = title.data["skipMarkers"]["credit"] if credits.get("start", 0) > 0 and credits.get("end", 0) > 0: - chapters.add(Chapter( - timestamp=credits["start"], # Milliseconds - name="Intro" - )) chapters.add( Chapter( - timestamp=credits["end"], # Milliseconds + timestamp=credits["start"], # Milliseconds + name="Intro", + ) + ) + chapters.add( + Chapter( + timestamp=credits["end"], # Milliseconds ) ) if "creditsOffset" in title.data and title.data["creditsOffset"] is not None: - chapters.add(Chapter( - timestamp=float(title.data["creditsOffset"]), # this is seconds, needed to assign to float - name="Credits" - )) + chapters.add( + Chapter( + timestamp=float(title.data["creditsOffset"]), # this is seconds, needed to assign to float + name="Credits", + ) + ) except Exception as e: self.log.warning(f"Failed to process chapters: {e}") return chapters - def get_widevine_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None: + def get_widevine_license( + self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack + ) -> bytes | str | None: if not self.msl: self.log.error(f"MSL Client is not intialized!") sys.exit(1) application_data = { - "version": 2, - "url": track.data["license_url"], - "id": int(time.time() * 10000), - "esn": self.esn.data["esn"], - "languages": ["en-US"], - # "uiVersion": "shakti-v9dddfde5", - "clientVersion": "6.0026.291.011", - "params": [{ + "version": 2, + "url": track.data["license_url"], + "id": int(time.time() * 10000), + "esn": self.esn.data["esn"], + "languages": ["en-US"], + # "uiVersion": "shakti-v9dddfde5", + "clientVersion": "6.0026.291.011", + "params": [ + { "sessionId": base64.b64encode(get_random_bytes(16)).decode("utf-8"), "clientTime": int(time.time()), "challengeBase64": base64.b64encode(challenge).decode("utf-8"), "xid": str(int((int(time.time()) + 0.1612) * 1000)), - }], - "echo": "sessionId" - } + } + ], + "echo": "sessionId", + } header, payload_data = self.msl.send_message( endpoint=self.config["endpoints"]["license"], params={ @@ -393,7 +425,7 @@ class Netflix(Service): "reqName": "license", }, application_data=application_data, - userauthdata=self.userauthdata + userauthdata=self.userauthdata, ) if not payload_data: self.log.error(f" - Failed to get license: {header['message']} [{header['code']}]") @@ -414,7 +446,9 @@ class Netflix(Service): sys.exit(1) return payload_data[0]["licenseResponseBase64"] - def get_playready_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None: + def get_playready_license( + self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack + ) -> bytes | str | None: return self.get_widevine_license(challenge=challenge, title=title, track=track) # return super().get_widevine_license(challenge=challenge, title=title, track=track) @@ -424,7 +458,7 @@ class Netflix(Service): self.profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()] if self.profile is not None: - self.requested_profiles = self.profile.split('+') + self.requested_profiles = self.profile.split("+") self.log.info(f"Requested profile: {self.requested_profiles}") else: # self.log.info(f"Video Range: {self.range}") @@ -433,13 +467,23 @@ class Netflix(Service): if self.vcodec.extension.upper() not in self.config["profiles"]["video"]: raise ValueError(f"Video Codec {self.vcodec} is not supported by Netflix") - if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.range[0] != Video.Range.HYBRID and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9: + if ( + self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) + and self.range[0] != Video.Range.HYBRID + and self.vcodec != Video.Codec.AVC + and self.vcodec != Video.Codec.VP9 + ): self.log.error(f"Video range {self.range[0].name} is not supported by Video Codec: {self.vcodec}") sys.exit(1) # Validate all ranges are supported for video_range in self.range: - if video_range.name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and video_range != Video.Range.HYBRID and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9: + if ( + video_range.name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) + and video_range != Video.Range.HYBRID + and self.vcodec != Video.Codec.AVC + and self.vcodec != Video.Codec.VP9 + ): self.log.error(f"Video range {video_range.name} is not supported by Video Codec: {self.vcodec}") sys.exit(1) @@ -462,12 +506,11 @@ class Netflix(Service): # scheme = KeyExchangeSchemes.Widevine scheme = { DeviceTypes.CHROME: KeyExchangeSchemes.AsymmetricWrapped, - DeviceTypes.ANDROID: KeyExchangeSchemes.Widevine + DeviceTypes.ANDROID: KeyExchangeSchemes.Widevine, }[self.cdm.device_type] # scheme = KeyExchangeSchemes.AsymmetricWrapped self.log.info(f"Scheme: {scheme}") - self.msl = MSL.handshake( scheme=scheme, session=self.session, @@ -480,8 +523,7 @@ class Netflix(Service): cookie = self.session.cookies.get_dict() if self.cdm.device_type == DeviceTypes.CHROME: self.userauthdata = UserAuthentication.NetflixIDCookies( - netflixid=cookie["NetflixId"], - securenetflixid=cookie["SecureNetflixId"] + netflixid=cookie["NetflixId"], securenetflixid=cookie["SecureNetflixId"] ) else: if not self.credential: @@ -502,7 +544,7 @@ class Netflix(Service): return UserAuthentication.UserIDToken( token_data=token_data["tokendata"], signature=token_data["signature"], - master_token=self.msl.keys.mastertoken + master_token=self.msl.keys.mastertoken, ) def get_android_user_token_cache(self): @@ -515,7 +557,7 @@ class Netflix(Service): params=self.build_android_sign_in_query(), application_data="", headers=self.build_android_sign_in_headers(), - unwrap_result=False + unwrap_result=False, ) except Exception as exc: raise click.ClickException(f"Android sign-in request failed: {exc}") from exc @@ -595,7 +637,7 @@ class Netflix(Service): "osDevice": "a70q", "osDisplay": "TQ1A.230205.002", "password": self.credential.password, - "path": "[\"signInVerify\"]", + "path": '["signInVerify"]', "pathFormat": "hierarchical", "platform": "android", "preloadSignupRoValue": "", @@ -608,18 +650,18 @@ class Netflix(Service): "secureNetflixId": cookie["SecureNetflixId"], "sid": "7176", "store": "google", - "userLoginId": self.credential.username + "userLoginId": self.credential.username, } def build_android_sign_in_headers(self) -> dict: return { "X-Netflix.Request.NqTracking": "VerifyLoginMslRequest", "X-Netflix.Client.Request.Name": "VerifyLoginMslRequest", - "X-Netflix.Request.Client.Context": "{\"appState\":\"foreground\"}", + "X-Netflix.Request.Client.Context": '{"appState":"foreground"}', "X-Netflix-Esn": self.esn.data["esn"], "X-Netflix.EsnPrefix": "NFANDROID1-PRV-P-", "X-Netflix.msl-header-friendly-client": "true", - "content-encoding": "msl_v1" + "content-encoding": "msl_v1", } def decrypt_android_header(self, encrypted_header_b64: str) -> dict: @@ -645,17 +687,22 @@ class Netflix(Service): return parsed["expiration"] return None - def get_profiles(self): result_profiles = [] if self.vcodec == Video.Codec.AVC: if self.requested_profiles is not None: for requested_profiles in self.requested_profiles: - result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()][requested_profiles]))) + result_profiles.extend( + flatten( + list(self.config["profiles"]["video"][self.vcodec.extension.upper()][requested_profiles]) + ) + ) return result_profiles - result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()].values()))) + result_profiles.extend( + flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())) + ) return result_profiles # Handle case for codec VP9 @@ -719,42 +766,52 @@ class Netflix(Service): randomchar_pattern = r"\{randomchar_(\d+)\}" if re.search(randomchar_pattern, esn_template): - esn_regex = "^" + re.sub( - r"\\\{randomchar_(\d+)\\\}", - lambda match: rf"[A-Z0-9]{{{match.group(1)}}}", - re.escape(esn_template) - ) + "$" + esn_regex = ( + "^" + + re.sub( + r"\\\{randomchar_(\d+)\\\}", + lambda match: rf"[A-Z0-9]{{{match.group(1)}}}", + re.escape(esn_template), + ) + + "$" + ) - if cached_type == DeviceTypes.ANDROID and isinstance(cached_esn, str) and re.match(esn_regex, cached_esn) and not cache_expired: + if ( + cached_type == DeviceTypes.ANDROID + and isinstance(cached_esn, str) + and re.match(esn_regex, cached_esn) + and not cache_expired + ): esn = cached_esn else: self.log.info("Generating Android ESN from configured randomchar template") esn = re.sub( randomchar_pattern, - lambda match: "".join(random.choice("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ") for _ in range(int(match.group(1)))), - esn_template + lambda match: "".join( + random.choice("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ") for _ in range(int(match.group(1))) + ), + esn_template, ) - self.esn.set({ - 'esn': esn, - 'type': self.cdm.device_type - }, expiration=1 * 60 * 60) + self.esn.set({"esn": esn, "type": self.cdm.device_type}, expiration=1 * 60 * 60) else: esn = esn_template - esn_value = { - 'esn': esn, - 'type': self.cdm.device_type - } + esn_value = {"esn": esn, "type": self.cdm.device_type} if cached_esn != esn or cached_type != DeviceTypes.ANDROID or cache_expired: self.esn.set(esn_value, expiration=1 * 60 * 60) else: ESN_GEN = "".join(random.choice("0123456789ABCDEF") for _ in range(30)) generated_esn = f"NFCDIE-03-{ESN_GEN}" # Check if ESN is expired or doesn't exist - if not isinstance(self.esn.data, dict) or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired) or (self.esn.data.get("type") != DeviceTypes.CHROME): + if ( + not isinstance(self.esn.data, dict) + or self.esn.data == {} + or (hasattr(self.esn, "expired") and self.esn.expired) + or (self.esn.data.get("type") != DeviceTypes.CHROME) + ): # Set new ESN with 6-hour expiration esn_value = { - 'esn': generated_esn, - 'type': DeviceTypes.CHROME, + "esn": generated_esn, + "type": DeviceTypes.CHROME, } self.esn.set(esn_value, expiration=1 * 60 * 60) # 1 hours in seconds self.log.info(f"Generated new ESN with 1-hour expiration") @@ -763,7 +820,6 @@ class Netflix(Service): final_esn = self.esn.data.get("esn") if isinstance(self.esn.data, dict) else self.esn.data self.log.info(f"ESN: {final_esn}") - def get_metadata(self, title_id: str): """ Obtain Metadata information about a title by it's ID. @@ -779,8 +835,8 @@ class Netflix(Service): "drmSystem": self.config["configuration"]["drm_system"], "isWatchlistEnabled": False, "isShortformEnabled": False, - "languages": self.meta_lang - } + "languages": self.meta_lang, + }, ).json() except requests.HTTPError as e: if e.response.status_code == 500: @@ -796,41 +852,46 @@ class Netflix(Service): sys.exit(1) else: if "status" in metadata and metadata["status"] == "error": - self.log.error( - f" - Failed to get metadata, cookies might be expired. ({metadata['message']})" - ) + self.log.error(f" - Failed to get metadata, cookies might be expired. ({metadata['message']})") sys.exit(1) return metadata def _get_empty_manifest(self): """Return an empty manifest structure to prevent crashes when manifest retrieval fails""" return { - "video_tracks": [{ - "streams": [], - "drmHeader": {"bytes": b""} - }], + "video_tracks": [{"streams": [], "drmHeader": {"bytes": b""}}], "audio_tracks": [], "timedtexttracks": [], - "links": { - "license": {"href": ""} - } + "links": {"license": {"href": ""}}, } - def get_manifest(self, title: Title_T, video_profiles: List[str], required_text_track_id: Optional[str] = None, required_audio_track_id: Optional[str] = None): + def get_manifest( + self, + title: Title_T, + video_profiles: List[str], + required_text_track_id: Optional[str] = None, + required_audio_track_id: Optional[str] = None, + ): try: # Log context information for debugging - title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown' - self.log.debug(f"Getting manifest for title_id: {title_id}, video_profiles_count: {len(video_profiles)}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}") + title_id = title.data.get("episodeId", title.data.get("id", "unknown")) if title.data else "unknown" + self.log.debug( + f"Getting manifest for title_id: {title_id}, video_profiles_count: {len(video_profiles)}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}" + ) audio_profiles = self.config["profiles"]["audio"].values() - video_profiles = sorted(set(flatten(as_list( - video_profiles, - audio_profiles, - self.config["profiles"]["video"]["H264"]["BPL"] if self.vcodec == Video.Codec.AVC else [], - self.config["profiles"]["subtitles"], - )))) - - + video_profiles = sorted( + set( + flatten( + as_list( + video_profiles, + audio_profiles, + self.config["profiles"]["video"]["H264"]["BPL"] if self.vcodec == Video.Codec.AVC else [], + self.config["profiles"]["subtitles"], + ) + ) + ) + ) # self.log.debug("Profiles:\n\t" + "\n\t".join(video_profiles)) @@ -858,9 +919,11 @@ class Netflix(Service): "clientVersion": "9999999", "params": { "clientVersion": "9999999", - **({ - "challenge": self.config["payload_challenge"] - } if self.drm_system == "widevine" and self.cdm.device_type == DeviceTypes.CHROME else {}), + **( + {"challenge": self.config["payload_challenge"]} + if self.drm_system == "widevine" and self.cdm.device_type == DeviceTypes.CHROME + else {} + ), # "challanges": { # # "default": base64.b64encode(challenge).decode() # "default": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"] @@ -878,23 +941,21 @@ class Netflix(Service): "licenseType": "standard", "liveAdsCapability": "replace", "liveMetadataFormat": "INDEXED_SEGMENT_TEMPLATE", - "profilesGroups": [{ - "name": "default", - "profiles": video_profiles - }], + "profilesGroups": [{"name": "default", "profiles": video_profiles}], "profiles": video_profiles, "preferAssistiveAudio": False, "requestSegmentVmaf": False, - "requiredAudioTrackId": required_audio_track_id, # This is for getting missing audio tracks (value get from `new_track_id``) - "requiredTextTrackId": required_text_track_id, # This is for getting missing subtitle. (value get from `new_track_id``) + "requiredAudioTrackId": required_audio_track_id, # This is for getting missing audio tracks (value get from `new_track_id``) + "requiredTextTrackId": required_text_track_id, # This is for getting missing subtitle. (value get from `new_track_id``) "supportsAdBreakHydration": False, "supportsNetflixMediaEvents": True, - "supportsPartialHydration": True, # This is important if you want get available all tracks. but you must fetch each missing url tracks with "requiredAudioTracksId" or "requiredTextTrackId" + "supportsPartialHydration": True, # This is important if you want get available all tracks. but you must fetch each missing url tracks with "requiredAudioTracksId" or "requiredTextTrackId" "supportsPreReleasePin": True, "supportsUnequalizedDownloadables": True, "supportsWatermark": True, "titleSpecificData": { - (title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"): {"unletterboxed": False} + (title.data.get("episodeId") if title.data else None) + or (title.data.get("id") if title.data else "unknown"): {"unletterboxed": False} }, "type": "standard", # ? PREPARE "uiPlatform": "SHAKTI", @@ -902,34 +963,43 @@ class Netflix(Service): "useBetterTextUrls": True, "useHttpsStreams": True, "usePsshBox": True, - "videoOutputInfo": [{ - # todo ; make this return valid, but "secure" values, maybe it helps - "type": "DigitalVideoOutputDescriptor", - "outputType": "unknown", - "supportedHdcpVersions": self.config["configuration"]["supported_hdcp_versions"], - "isHdcpEngaged": self.config["configuration"]["is_hdcp_engaged"] - }], - "viewableId": (title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"), + "videoOutputInfo": [ + { + # todo ; make this return valid, but "secure" values, maybe it helps + "type": "DigitalVideoOutputDescriptor", + "outputType": "unknown", + "supportedHdcpVersions": self.config["configuration"]["supported_hdcp_versions"], + "isHdcpEngaged": self.config["configuration"]["is_hdcp_engaged"], + } + ], + "viewableId": (title.data.get("episodeId") if title.data else None) + or (title.data.get("id") if title.data else "unknown"), "xid": str(int((int(time.time()) + 0.1612) * 1000)), "showAllSubDubTracks": True, - } + }, }, - userauthdata=self.userauthdata + userauthdata=self.userauthdata, ) # self.cdm.close(session_id) if "errorDetails" in payload_chunks: - self.log.error(f"Manifest call failed for title_id: {title_id}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}, error: {payload_chunks['errorDetails']}") + self.log.error( + f"Manifest call failed for title_id: {title_id}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}, error: {payload_chunks['errorDetails']}" + ) return self._get_empty_manifest() # with open(f"./manifest_{"+".join(video_profiles)}.json", mode='w') as r: # r.write(jsonpickle.encode(payload_chunks, indent=4)) return payload_chunks except Exception as e: - title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown' - profile_count = len(video_profiles) if 'video_profiles' in locals() else 0 + title_id = title.data.get("episodeId", title.data.get("id", "unknown")) if title.data else "unknown" + profile_count = len(video_profiles) if "video_profiles" in locals() else 0 self.log.error(f"Exception in get_manifest: {e}") - self.log.error(f"Context - title_id: {title_id}, video_profiles_count: {profile_count}, required_audio_track_id: {required_audio_track_id or 'None'}, required_text_track_id: {required_text_track_id or 'None'}") - if 'video_profiles' in locals() and video_profiles: - self.log.error(f"Video profiles being processed: {', '.join(video_profiles[:5])}{'...' if len(video_profiles) > 5 else ''}") + self.log.error( + f"Context - title_id: {title_id}, video_profiles_count: {profile_count}, required_audio_track_id: {required_audio_track_id or 'None'}, required_text_track_id: {required_text_track_id or 'None'}" + ) + if "video_profiles" in locals() and video_profiles: + self.log.error( + f"Video profiles being processed: {', '.join(video_profiles[:5])}{'...' if len(video_profiles) > 5 else ''}" + ) return self._get_empty_manifest() @staticmethod @@ -964,11 +1034,12 @@ class Netflix(Service): # If anything goes wrong, default to English return Language.get("en") - def get_widevine_service_certificate(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str: + def get_widevine_service_certificate( + self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack + ) -> bytes | str: return self.config["certificate"] - def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = None) -> Tracks: - + def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks=None) -> Tracks: # If hydrate_tracks is not specified, derive from self.hydrate_track if hydrate_tracks is None: hydrate_tracks = self.hydrate_track @@ -1015,20 +1086,28 @@ class Netflix(Service): bitrate=video["bitrate"] * 1000, width=video["res_w"], height=video["res_h"], - fps=(float(video["framerate_value"]) / video["framerate_scale"]) if "framerate_value" in video else None, + fps=(float(video["framerate_value"]) / video["framerate_scale"]) + if "framerate_value" in video + else None, language=Language.get(original_language), edition=video["content_profile"], range_=self.parse_video_range_from_profile(video["content_profile"]), is_original_lang=True, - drm=[self.create_drm(manifest["video_tracks"][0]["drmHeader"]["bytes"], video["drmHeaderId"])] if manifest["video_tracks"][0].get("drmHeader", {}).get("bytes") else [], - data={ - 'license_url': license_url - } + drm=[ + self.create_drm( + manifest["video_tracks"][0]["drmHeader"]["bytes"], video["drmHeaderId"] + ) + ] + if manifest["video_tracks"][0].get("drmHeader", {}).get("bytes") + else [], + data={"license_url": license_url}, ) ) except Exception as e: video_id = video.get("downloadable_id", "unknown") if isinstance(video, dict) else "unknown" - self.log.warning(f"Failed to process video track at index {video_index}, video_id: {video_id}, error: {e}") + self.log.warning( + f"Failed to process video track at index {video_index}, video_id: {video_id}, error: {e}" + ) continue # Process audio tracks @@ -1043,9 +1122,13 @@ class Netflix(Service): # This # self.log.debug(f"Audio lang {audio["languageDescription"]} is available but no stream available.") if "new_track_id" in audio and "id" in audio: - unavailable_audio_tracks.append((audio["new_track_id"], audio["id"])) # Assign to `unavailable_subtitle` for request missing audio tracks later + unavailable_audio_tracks.append( + (audio["new_track_id"], audio["id"]) + ) # Assign to `unavailable_subtitle` for request missing audio tracks later if hydrate_tracks: - self.log.debug(f"Audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang} has no streams available") + self.log.debug( + f"Audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang} has no streams available" + ) continue # Store primary audio track info (new_track_id, id) for potential use in hydration @@ -1063,28 +1146,35 @@ class Netflix(Service): id_=stream["downloadable_id"], url=stream["urls"][0]["url"], codec=Audio.Codec.from_netflix_profile(stream["content_profile"]), - language=Language.get(self.NF_LANG_MAP.get(audio["language"]) or audio["language"]), + language=Language.get( + self.NF_LANG_MAP.get(audio["language"]) or audio["language"] + ), is_original_lang=is_original_lang, bitrate=stream["bitrate"] * 1000, channels=stream["channels"], descriptive=audio.get("rawTrackType", "").lower() == "assistive", - name="[Original]" if Language.get(audio["language"]).language == original_language.language else None, - joc=16 if "atmos" in stream["content_profile"] else None + name="[Original]" + if Language.get(audio["language"]).language == original_language.language + else None, + joc=16 if "atmos" in stream["content_profile"] else None, ) ) except Exception as e: - stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown" - self.log.warning(f"Failed to process audio stream at audio_index {audio_index}, stream_index {stream_index}, audio_id: {audio_id}, stream_id: {stream_id}, language: {audio_lang}, error: {e}") + stream_id = ( + stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown" + ) + self.log.warning( + f"Failed to process audio stream at audio_index {audio_index}, stream_index {stream_index}, audio_id: {audio_id}, stream_id: {stream_id}, language: {audio_lang}, error: {e}" + ) continue except Exception as e: audio_id = audio.get("id", "unknown") if isinstance(audio, dict) else "unknown" audio_lang = audio.get("language", "unknown") if isinstance(audio, dict) else "unknown" - self.log.warning(f"Failed to process audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang}, error: {e}") + self.log.warning( + f"Failed to process audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang}, error: {e}" + ) continue - - - # Process subtitle tracks unavailable_subtitle: List[Tuple[str, str]] = [] if "timedtexttracks" in manifest: @@ -1097,29 +1187,39 @@ class Netflix(Service): if subtitle.get("hydrated") == False: # This subtitles is there but has to request stream first if "new_track_id" in subtitle and "id" in subtitle: - unavailable_subtitle.append((subtitle["new_track_id"], subtitle["id"])) # Assign to `unavailable_subtitle` for request missing subtitles later + unavailable_subtitle.append( + (subtitle["new_track_id"], subtitle["id"]) + ) # Assign to `unavailable_subtitle` for request missing subtitles later if hydrate_tracks: - self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated") + self.log.debug( + f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated" + ) continue - if subtitle.get("languageDescription") == 'Off' and self.descriptive_subtitles == False: + if subtitle.get("languageDescription") == "Off" and self.descriptive_subtitles == False: # Skip Descriptive subtitles continue # pass if "downloadableIds" not in subtitle or not subtitle["downloadableIds"]: - self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no downloadableIds") + self.log.debug( + f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no downloadableIds" + ) continue id = list(subtitle["downloadableIds"].values()) if not id: - self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has empty downloadableIds") + self.log.debug( + f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has empty downloadableIds" + ) continue language = Language.get(subtitle["language"]) if "ttDownloadables" not in subtitle or not subtitle["ttDownloadables"]: - self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no ttDownloadables") + self.log.debug( + f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no ttDownloadables" + ) continue profile = next(iter(subtitle["ttDownloadables"].keys())) @@ -1135,15 +1235,25 @@ class Netflix(Service): language=language, forced=subtitle.get("isForcedNarrative", False), cc=subtitle.get("rawTrackType") == "closedcaptions", - sdh=subtitle.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitle else False, + sdh=subtitle.get("trackVariant") == "STRIPPED_SDH" + if "trackVariant" in subtitle + else False, is_original_lang=is_original_lang, - name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitle and subtitle["trackVariant"] == "DUBTITLE" else None), + name=( + "[Original]" + if language.language == original_language.language + else None or "[Dubbing]" + if "trackVariant" in subtitle and subtitle["trackVariant"] == "DUBTITLE" + else None + ), ) ) except Exception as e: subtitle_id = subtitle.get("id", "unknown") if isinstance(subtitle, dict) else "unknown" subtitle_lang = subtitle.get("language", "unknown") if isinstance(subtitle, dict) else "unknown" - self.log.warning(f"Failed to process subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang}, error: {e}") + self.log.warning( + f"Failed to process subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang}, error: {e}" + ) continue if hydrate_tracks == False: @@ -1156,7 +1266,7 @@ class Netflix(Service): unavailable_audio_tracks=unavailable_audio_tracks, unavailable_subtitle=unavailable_subtitle, primary_audio_tracks=primary_audio_tracks, - original_language=original_language + original_language=original_language, ) tracks.add(hydrated_tracks) else: @@ -1164,12 +1274,13 @@ class Netflix(Service): except Exception as e: self.log.error(f"Exception in manifest_as_tracks: {e}") - self.log.debug(f"Failed to process manifest for title: {title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'}") + self.log.debug( + f"Failed to process manifest for title: {title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'}" + ) # Return empty tracks on any critical error return tracks - def parse_video_range_from_profile(self, profile: str) -> Video.Range: """ Parse the video range from a Netflix profile string. @@ -1217,9 +1328,14 @@ class Netflix(Service): """Return an empty track tuple with None values.""" return (None, None) - def hydrate_all_tracks(self, title: Title_T, unavailable_audio_tracks: List[Tuple[str, str]], - unavailable_subtitle: List[Tuple[str, str]], primary_audio_tracks: List[Tuple[str, str]], - original_language: Language) -> Tracks: + def hydrate_all_tracks( + self, + title: Title_T, + unavailable_audio_tracks: List[Tuple[str, str]], + unavailable_subtitle: List[Tuple[str, str]], + primary_audio_tracks: List[Tuple[str, str]], + original_language: Language, + ) -> Tracks: """ Hydrate all missing audio and subtitle tracks. @@ -1249,11 +1365,19 @@ class Netflix(Service): self.log.info(f"Hydrating {hydration_info} tracks. Total: {audio_count + subtitle_count}") # Handle mismatched lengths - use last successful tracks when needed - last_successful_subtitle = self._get_empty_track_tuple() if not unavailable_subtitle else unavailable_subtitle[-1] - last_successful_audio = self._get_empty_track_tuple() if not unavailable_audio_tracks else unavailable_audio_tracks[-1] + last_successful_subtitle = ( + self._get_empty_track_tuple() if not unavailable_subtitle else unavailable_subtitle[-1] + ) + last_successful_audio = ( + self._get_empty_track_tuple() if not unavailable_audio_tracks else unavailable_audio_tracks[-1] + ) # For subtitle-only hydration, use primary audio track if available - primary_audio_for_subtitle_hydration = primary_audio_tracks[0] if primary_audio_tracks and not unavailable_audio_tracks and unavailable_subtitle else self._get_empty_track_tuple() + primary_audio_for_subtitle_hydration = ( + primary_audio_tracks[0] + if primary_audio_tracks and not unavailable_audio_tracks and unavailable_subtitle + else self._get_empty_track_tuple() + ) # Process audio tracks first, then handle subtitles separately if needed max_length = max(len(unavailable_audio_tracks), len(unavailable_subtitle)) @@ -1291,8 +1415,13 @@ class Netflix(Service): subtitle_track_id = subtitle_hydration[0] if subtitle_hydration[0] is not None else None # Log what we're trying to hydrate - self._log_hydration_attempt(hydration_index, audio_hydration, subtitle_hydration, - is_real_audio_request, is_real_subtitle_request) + self._log_hydration_attempt( + hydration_index, + audio_hydration, + subtitle_hydration, + is_real_audio_request, + is_real_subtitle_request, + ) # Only call get_manifest if we have valid tracks to hydrate should_hydrate_audio = self._is_valid_track_for_hydration(audio_hydration) @@ -1303,7 +1432,9 @@ class Netflix(Service): # If we still don't have a subtitle track ID, skip this hydration to avoid API error if subtitle_track_id is None: - self.log.warning(f"Skipping hydration at index {hydration_index} - no subtitle track available for API request context") + self.log.warning( + f"Skipping hydration at index {hydration_index} - no subtitle track available for API request context" + ) continue hydrated_manifest = self.get_manifest(title, self.profiles, subtitle_track_id, audio_track_id) @@ -1311,10 +1442,19 @@ class Netflix(Service): # Handle hydrated audio tracks (only if it's a real audio request, not reused) if is_real_audio_request and should_hydrate_audio and "audio_tracks" in hydrated_manifest: try: - audios = next((item for item in hydrated_manifest["audio_tracks"] if 'id' in item and item["id"] == audio_hydration[1]), None) + audios = next( + ( + item + for item in hydrated_manifest["audio_tracks"] + if "id" in item and item["id"] == audio_hydration[1] + ), + None, + ) if audios and "streams" in audios: audio_lang = audios.get("language", "unknown") - self.log.debug(f"Processing hydrated audio track_id: {audio_hydration[1]}, language: {audio_lang}, streams_count: {len(audios['streams'])}") + self.log.debug( + f"Processing hydrated audio track_id: {audio_hydration[1]}, language: {audio_lang}, streams_count: {len(audios['streams'])}" + ) for stream_index, stream in enumerate(audios["streams"]): try: stream_id = stream.get("downloadable_id", "unknown") @@ -1323,33 +1463,62 @@ class Netflix(Service): id_=stream["downloadable_id"], url=stream["urls"][0]["url"], codec=Audio.Codec.from_netflix_profile(stream["content_profile"]), - language=Language.get(self.NF_LANG_MAP.get(audios["language"]) or audios["language"]), + language=Language.get( + self.NF_LANG_MAP.get(audios["language"]) or audios["language"] + ), is_original_lang=audios["language"] == original_language.language, bitrate=stream["bitrate"] * 1000, channels=stream["channels"], descriptive=audios.get("rawTrackType", "").lower() == "assistive", - name="[Original]" if Language.get(audios["language"]).language == original_language.language else None, - joc=16 if "atmos" in stream["content_profile"] else None + name="[Original]" + if Language.get(audios["language"]).language == original_language.language + else None, + joc=16 if "atmos" in stream["content_profile"] else None, ) ) except Exception as e: - stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown" - self.log.warning(f"Failed to process hydrated audio stream at hydration_index {hydration_index}, stream_index {stream_index}, audio_track_id: {audio_hydration[1]}, stream_id: {stream_id}, error: {e}") + stream_id = ( + stream.get("downloadable_id", "unknown") + if isinstance(stream, dict) + else "unknown" + ) + self.log.warning( + f"Failed to process hydrated audio stream at hydration_index {hydration_index}, stream_index {stream_index}, audio_track_id: {audio_hydration[1]}, stream_id: {stream_id}, error: {e}" + ) continue else: - self.log.warning(f"No audio streams found for hydrated audio_track_id: {audio_hydration[1]} at hydration_index {hydration_index}") + self.log.warning( + f"No audio streams found for hydrated audio_track_id: {audio_hydration[1]} at hydration_index {hydration_index}" + ) except Exception as e: - self.log.warning(f"Failed to find hydrated audio track at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]}, error: {e}") + self.log.warning( + f"Failed to find hydrated audio track at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]}, error: {e}" + ) elif not is_real_audio_request and audio_hydration[1] is not None: - self.log.debug(f"Used audio track context for API request at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]} (not adding to tracks)") + self.log.debug( + f"Used audio track context for API request at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]} (not adding to tracks)" + ) # Handle hydrated subtitle tracks (only if it's a real subtitle request, not reused) - if is_real_subtitle_request and self._is_valid_track_for_hydration(subtitle_hydration) and "timedtexttracks" in hydrated_manifest: + if ( + is_real_subtitle_request + and self._is_valid_track_for_hydration(subtitle_hydration) + and "timedtexttracks" in hydrated_manifest + ): try: - subtitles = next((item for item in hydrated_manifest["timedtexttracks"] if 'id' in item and item["id"] == subtitle_hydration[1]), None) + subtitles = next( + ( + item + for item in hydrated_manifest["timedtexttracks"] + if "id" in item and item["id"] == subtitle_hydration[1] + ), + None, + ) if subtitles and "downloadableIds" in subtitles and "ttDownloadables" in subtitles: subtitle_lang = subtitles.get("language", "unknown") - self.log.debug(f"Processing hydrated subtitle track_id: {subtitle_hydration[1]}, language: {subtitle_lang}") + self.log.debug( + f"Processing hydrated subtitle track_id: {subtitle_hydration[1]}, language: {subtitle_lang}" + ) id = list(subtitles["downloadableIds"].values()) if id: @@ -1364,22 +1533,40 @@ class Netflix(Service): language=language, forced=subtitles.get("isForcedNarrative", False), cc=subtitles.get("rawTrackType") == "closedcaptions", - sdh=subtitles.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitles else False, + sdh=subtitles.get("trackVariant") == "STRIPPED_SDH" + if "trackVariant" in subtitles + else False, is_original_lang=subtitles.get("language") == original_language.language, - name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitles and subtitles["trackVariant"] == "DUBTITLE" else None), + name=( + "[Original]" + if language.language == original_language.language + else None or "[Dubbing]" + if "trackVariant" in subtitles and subtitles["trackVariant"] == "DUBTITLE" + else None + ), ) ) else: - self.log.warning(f"No downloadable IDs found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}") + self.log.warning( + f"No downloadable IDs found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}" + ) else: - self.log.warning(f"No subtitle data found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}") + self.log.warning( + f"No subtitle data found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}" + ) except Exception as e: - self.log.warning(f"Failed to process hydrated subtitle track at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]}, error: {e}") + self.log.warning( + f"Failed to process hydrated subtitle track at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]}, error: {e}" + ) elif not is_real_subtitle_request and subtitle_hydration[1] is not None: - self.log.debug(f"Used subtitle track context for API request at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]} (not adding to tracks)") + self.log.debug( + f"Used subtitle track context for API request at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]} (not adding to tracks)" + ) except Exception as e: - self.log.warning(f"Failed to hydrate tracks at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1] or 'None'}, subtitle_track_id: {subtitle_hydration[1] or 'None'}, error: {e}") + self.log.warning( + f"Failed to hydrate tracks at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1] or 'None'}, subtitle_track_id: {subtitle_hydration[1] or 'None'}, error: {e}" + ) continue return hydrated_tracks @@ -1401,7 +1588,8 @@ class Netflix(Service): else: # Maybe this is DecryptLabsRemoteCDM from unshackle.core.cdm import DecryptLabsRemoteCDM - if (isinstance(self.cdm, DecryptLabsRemoteCDM)): + + if isinstance(self.cdm, DecryptLabsRemoteCDM): # Is Decrypt Labs using PlayReady? if self.cdm.is_playready: return "playready" @@ -1409,11 +1597,12 @@ class Netflix(Service): return "widevine" raise ValueError("Unknown DRM system") - def _log_hydration_attempt(self, hydration_index: int, audio_data: tuple, subtitle_data: tuple, - is_real_audio: bool, is_real_subtitle: bool) -> None: + def _log_hydration_attempt( + self, hydration_index: int, audio_data: tuple, subtitle_data: tuple, is_real_audio: bool, is_real_subtitle: bool + ) -> None: """Log hydration attempt details.""" - audio_id = audio_data[1] if audio_data[1] is not None else 'None' - subtitle_id = subtitle_data[1] if subtitle_data[1] is not None else 'None' + audio_id = audio_data[1] if audio_data[1] is not None else "None" + subtitle_id = subtitle_data[1] if subtitle_data[1] is not None else "None" self.log.debug( f"Hydrating tracks at index {hydration_index}, " f"audio_track_id: {audio_id}, subtitle_track_id: {subtitle_id}, " diff --git a/unshackle/services/Netflix/config.yaml b/unshackle/services/Netflix/config.yaml index 204ccfd..0f39f8a 100644 --- a/unshackle/services/Netflix/config.yaml +++ b/unshackle/services/Netflix/config.yaml @@ -20,10 +20,12 @@ esn_map: # key map of CDM WVD `SystemID = 'ESN you want to use for that CDM WVD'` 8159: "NFANDROID1-PRV-P-GOOGLEPIXEL" 8131: "HISETVK84500000000000000000000000007401422" + 22589: "NFANDROID1-PRV-P-SAMSUNG-SM-G975F-22589-RYFPGCV4K7QNZJFT5EK3YZ25TYOC1B1MSFEXR4L8JM99YOFXKZLFLOBTATE2AA2UJ" 22590: "NFANDROID1-PXA-P-L3-XIAOMM2102J20SG-22590-020NTB086HJPGG70MDDMR0306MR0NNO5G3DJGFCKS9HJF58ER9QA21VFG4I0246JRN6TF16L9I627EPK708SH42UUMG1ASFVG20F3" 12063: "NFANDROID1-PRV-P-SHENZHENKTC-49B1U-12063-2PAENERYJWY35H7F24163TMUCBBA4VRHQ2XZX4OBU4MUTKYFW50BMFBVGTUMN6IM0" 7110: "NFANDROID1-PRV-P-MSD6886602GUHDANDROIDTV-HISENHISMARTTV-A4-7110-{randomchar_64}" 16401: "NFANDROID1-PRV-P-MSD6886602GUHDANDROIDTV-HISENHISMARTTV-A4-16401-FA2CF15C2E3A00BDDC3B6811C210893F0CD2C062471A62C2A0DD8C28BAE8DF42" + endpoints: website: "https://www.netflix.com/nq/website/memberapi/{build_id}/pathEvaluator" manifest: "https://www.netflix.com/msl/playapi/cadmium/licensedmanifest/1" diff --git a/unshackle/unshackle.yaml b/unshackle/unshackle.yaml index d87d05c..00004de 100644 --- a/unshackle/unshackle.yaml +++ b/unshackle/unshackle.yaml @@ -60,7 +60,7 @@ directories: logs: Logs temp: /tmp/unshackle cdm: - default: hisense_msd6a648_4.10.2891.0_2a621b99_7110_l1 + default: samsung_sm-g975f_16.0.0_1e7c5ba2_22589_l3 remote_cdm: - name: "chromecdm"