From d3fb0b6b24c4a9a230acf39603a610758c6038cc Mon Sep 17 00:00:00 2001 From: Andy Date: Tue, 24 Feb 2026 13:29:10 -0700 Subject: [PATCH] refactor(example): migrate EXAMPLE service to track_request pattern --- unshackle/services/EXAMPLE/__init__.py | 145 ++++++++++++++----------- 1 file changed, 80 insertions(+), 65 deletions(-) diff --git a/unshackle/services/EXAMPLE/__init__.py b/unshackle/services/EXAMPLE/__init__.py index 2590e87..c296cd0 100644 --- a/unshackle/services/EXAMPLE/__init__.py +++ b/unshackle/services/EXAMPLE/__init__.py @@ -13,6 +13,7 @@ from langcodes import Language from unshackle.core.constants import AnyTrack from unshackle.core.credential import Credential from unshackle.core.manifests import DASH +# from unshackle.core.manifests import HLS from unshackle.core.search_result import SearchResult from unshackle.core.service import Service from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T @@ -35,6 +36,12 @@ class EXAMPLE(Service): GEOFENCE = ("US", "UK") NO_SUBTITLES = True + VIDEO_RANGE_MAP = { + "SDR": "sdr", + "HDR10": "hdr10", + "DV": "dolby_vision", + } + @staticmethod @click.command(name="EXAMPLE", short_help="https://domain.com") @click.argument("title", type=str) @@ -52,17 +59,23 @@ class EXAMPLE(Service): self.device = device self.cdm = ctx.obj.cdm - # Get range parameter for HDR support - range_param = ctx.parent.params.get("range_") - self.range = range_param[0].name if range_param else "SDR" + # self.track_request is set by Service.__init__() from CLI params + # Contains: codecs (list[Video.Codec]), ranges (list[Video.Range]), best_available (bool) + + # Override codec for HDR ranges (HDR requires HEVC) + if any(r != Video.Range.SDR for r in self.track_request.ranges): + self.track_request.codecs = [Video.Codec.HEVC] + + # Override for L3 CDM limitations + if self.cdm and self.cdm.security_level == 3: + self.track_request.codecs = [Video.Codec.AVC] + self.track_request.ranges = [Video.Range.SDR] if self.config is None: raise Exception("Config is missing!") - else: - profile_name = ctx.parent.params.get("profile") - if profile_name is None: - profile_name = "default" - self.profile = profile_name + + profile_name = ctx.parent.params.get("profile") + self.profile = profile_name or "default" def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None: super().authenticate(cookies, credential) @@ -165,78 +178,83 @@ class EXAMPLE(Service): ) return Series(episodes) + # DASH Example: Service requires separate API calls per codec/range. + # Uses _get_tracks_for_variants() which iterates codecs x ranges, + # handles HYBRID (HDR10+DV), and best_available fallback. + def get_tracks(self, title: Title_T) -> Tracks: - # Handle HYBRID mode by fetching both HDR10 and DV tracks separately - if self.range == "HYBRID" and self.cdm.security_level != 3: - tracks = Tracks() + def _fetch_variant( + title: Title_T, + codec: Optional[Video.Codec], + range_: Video.Range, + ) -> Tracks: + vcodec_str = "H265" if codec == Video.Codec.HEVC else "H264" + range_str = range_.name + video_format = self.VIDEO_RANGE_MAP.get(range_str, "sdr") - # Get HDR10 tracks - hdr10_tracks = self._get_tracks_for_range(title, "HDR10") - tracks.add(hdr10_tracks, warn_only=True) + self.log.info(f" + Fetching {vcodec_str} {range_str} manifest") + tracks = self._fetch_dash_manifest(title, vcodec=vcodec_str, video_format=video_format) - # Get DV tracks - dv_tracks = self._get_tracks_for_range(title, "DV") - tracks.add(dv_tracks, warn_only=True) + expected_range = { + "HDR10": Video.Range.HDR10, + "DV": Video.Range.DV, + }.get(range_str) + if expected_range and not any(v.range == expected_range for v in tracks.videos): + raise ValueError(f"{range_str} requested but no {range_str} tracks available") return tracks - else: - # Normal single-range behavior - return self._get_tracks_for_range(title, self.range) - def _get_tracks_for_range(self, title: Title_T, range_override: str = None) -> Tracks: - # Use range_override if provided, otherwise use self.range - current_range = range_override if range_override else self.range + return self._get_tracks_for_variants(title, _fetch_variant) - # Build API request parameters - params = { - "token": self.token, - "guid": title.id, - } - - data = { - "type": self.config["client"][self.device]["type"], - } - - # Add range-specific parameters - if current_range == "HDR10": - data["video_format"] = "hdr10" - elif current_range == "DV": - data["video_format"] = "dolby_vision" - else: - data["video_format"] = "sdr" - - # Only request high-quality HDR content with L1 CDM - if current_range in ("HDR10", "DV") and self.cdm.security_level == 3: - # L3 CDM - skip HDR content - return Tracks() + # HLS Example: Service returns all codecs/ranges in one master playlist. + # No need for _get_tracks_for_variants, dl.py filters by user selection. + # + # def get_tracks(self, title: Title_T) -> Tracks: + # playback = self.session.get( + # url=self.config["endpoints"]["playback"].format(title_id=title.id), + # params={"token": self.token}, + # ).json() + # return HLS.from_url( + # url=playback["manifest_url"], + # session=self.session, + # ).to_tracks(title.language) + def _fetch_dash_manifest( + self, + title: Title_T, + vcodec: str = "H264", + video_format: str = "sdr", + ) -> Tracks: streams = self.session.post( url=self.config["endpoints"]["streams"], - params=params, - data=data, + params={ + "token": self.token, + "guid": title.id, + }, + data={ + "type": self.config["client"][self.device]["type"], + "video_format": video_format, + "video_codec": vcodec, + }, ).json()["media"] - self.license = { + self.license_data = { "url": streams["drm"]["url"], "data": streams["drm"]["data"], "session": streams["drm"]["session"], } manifest_url = streams["url"].split("?")[0] - self.log.debug(f"Manifest URL: {manifest_url}") tracks = DASH.from_url(url=manifest_url, session=self.session).to_tracks(language=title.language) - # Set range attributes on video tracks + range_enum = { + "hdr10": Video.Range.HDR10, + "dolby_vision": Video.Range.DV, + }.get(video_format, Video.Range.SDR) for video in tracks.videos: - if current_range == "HDR10": - video.range = Video.Range.HDR10 - elif current_range == "DV": - video.range = Video.Range.DV - else: - video.range = Video.Range.SDR + video.range = range_enum - # Remove DRM-free ("clear") audio tracks tracks.audio = [ track for track in tracks.audio if "clear" not in track.data["dash"]["representation"].get("id") ] @@ -257,14 +275,14 @@ class EXAMPLE(Service): url=subtitle["url"], codec=Subtitle.Codec.from_mime("vtt"), language=Language.get(subtitle["language"]), - # cc=True if '(cc)' in subtitle['name'] else False, sdh=True, ) ) if not self.movie: title.data["chapters"] = self.session.get( - url=self.config["endpoints"]["metadata"].format(title_id=title.id), params={"token": self.token} + url=self.config["endpoints"]["metadata"].format(title_id=title.id), + params={"token": self.token}, ).json()["chapters"] return tracks @@ -283,12 +301,9 @@ class EXAMPLE(Service): return chapters def get_widevine_service_certificate(self, **_: any) -> str: - """Return the Widevine service certificate from config, if available.""" return self.config.get("certificate") def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]: - """Retrieve a PlayReady license for a given track.""" - license_url = self.config["endpoints"].get("playready_license") if not license_url: raise ValueError("PlayReady license endpoint not configured") @@ -304,7 +319,7 @@ class EXAMPLE(Service): return response.content def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]: - license_url = self.license.get("url") or self.config["endpoints"].get("widevine_license") + license_url = self.license_data.get("url") or self.config["endpoints"].get("widevine_license") if not license_url: raise ValueError("Widevine license endpoint not configured") @@ -312,11 +327,11 @@ class EXAMPLE(Service): url=license_url, data=challenge, params={ - "session": self.license.get("session"), + "session": self.license_data.get("session"), "userId": self.user_id, }, headers={ - "dt-custom-data": self.license.get("data"), + "dt-custom-data": self.license_data.get("data"), "user-agent": self.config["client"][self.device]["license_user_agent"], }, )