mirror of
https://github.com/unshackle-dl/unshackle.git
synced 2026-03-10 00:19:01 +00:00
refactor(example): migrate EXAMPLE service to track_request pattern
This commit is contained in:
@@ -13,6 +13,7 @@ from langcodes import Language
|
||||
from unshackle.core.constants import AnyTrack
|
||||
from unshackle.core.credential import Credential
|
||||
from unshackle.core.manifests import DASH
|
||||
# from unshackle.core.manifests import HLS
|
||||
from unshackle.core.search_result import SearchResult
|
||||
from unshackle.core.service import Service
|
||||
from unshackle.core.titles import Episode, Movie, Movies, Series, Title_T, Titles_T
|
||||
@@ -35,6 +36,12 @@ class EXAMPLE(Service):
|
||||
GEOFENCE = ("US", "UK")
|
||||
NO_SUBTITLES = True
|
||||
|
||||
VIDEO_RANGE_MAP = {
|
||||
"SDR": "sdr",
|
||||
"HDR10": "hdr10",
|
||||
"DV": "dolby_vision",
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@click.command(name="EXAMPLE", short_help="https://domain.com")
|
||||
@click.argument("title", type=str)
|
||||
@@ -52,17 +59,23 @@ class EXAMPLE(Service):
|
||||
self.device = device
|
||||
self.cdm = ctx.obj.cdm
|
||||
|
||||
# Get range parameter for HDR support
|
||||
range_param = ctx.parent.params.get("range_")
|
||||
self.range = range_param[0].name if range_param else "SDR"
|
||||
# self.track_request is set by Service.__init__() from CLI params
|
||||
# Contains: codecs (list[Video.Codec]), ranges (list[Video.Range]), best_available (bool)
|
||||
|
||||
# Override codec for HDR ranges (HDR requires HEVC)
|
||||
if any(r != Video.Range.SDR for r in self.track_request.ranges):
|
||||
self.track_request.codecs = [Video.Codec.HEVC]
|
||||
|
||||
# Override for L3 CDM limitations
|
||||
if self.cdm and self.cdm.security_level == 3:
|
||||
self.track_request.codecs = [Video.Codec.AVC]
|
||||
self.track_request.ranges = [Video.Range.SDR]
|
||||
|
||||
if self.config is None:
|
||||
raise Exception("Config is missing!")
|
||||
else:
|
||||
profile_name = ctx.parent.params.get("profile")
|
||||
if profile_name is None:
|
||||
profile_name = "default"
|
||||
self.profile = profile_name
|
||||
|
||||
profile_name = ctx.parent.params.get("profile")
|
||||
self.profile = profile_name or "default"
|
||||
|
||||
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
|
||||
super().authenticate(cookies, credential)
|
||||
@@ -165,78 +178,83 @@ class EXAMPLE(Service):
|
||||
)
|
||||
return Series(episodes)
|
||||
|
||||
# DASH Example: Service requires separate API calls per codec/range.
|
||||
# Uses _get_tracks_for_variants() which iterates codecs x ranges,
|
||||
# handles HYBRID (HDR10+DV), and best_available fallback.
|
||||
|
||||
def get_tracks(self, title: Title_T) -> Tracks:
|
||||
# Handle HYBRID mode by fetching both HDR10 and DV tracks separately
|
||||
if self.range == "HYBRID" and self.cdm.security_level != 3:
|
||||
tracks = Tracks()
|
||||
def _fetch_variant(
|
||||
title: Title_T,
|
||||
codec: Optional[Video.Codec],
|
||||
range_: Video.Range,
|
||||
) -> Tracks:
|
||||
vcodec_str = "H265" if codec == Video.Codec.HEVC else "H264"
|
||||
range_str = range_.name
|
||||
video_format = self.VIDEO_RANGE_MAP.get(range_str, "sdr")
|
||||
|
||||
# Get HDR10 tracks
|
||||
hdr10_tracks = self._get_tracks_for_range(title, "HDR10")
|
||||
tracks.add(hdr10_tracks, warn_only=True)
|
||||
self.log.info(f" + Fetching {vcodec_str} {range_str} manifest")
|
||||
tracks = self._fetch_dash_manifest(title, vcodec=vcodec_str, video_format=video_format)
|
||||
|
||||
# Get DV tracks
|
||||
dv_tracks = self._get_tracks_for_range(title, "DV")
|
||||
tracks.add(dv_tracks, warn_only=True)
|
||||
expected_range = {
|
||||
"HDR10": Video.Range.HDR10,
|
||||
"DV": Video.Range.DV,
|
||||
}.get(range_str)
|
||||
if expected_range and not any(v.range == expected_range for v in tracks.videos):
|
||||
raise ValueError(f"{range_str} requested but no {range_str} tracks available")
|
||||
|
||||
return tracks
|
||||
else:
|
||||
# Normal single-range behavior
|
||||
return self._get_tracks_for_range(title, self.range)
|
||||
|
||||
def _get_tracks_for_range(self, title: Title_T, range_override: str = None) -> Tracks:
|
||||
# Use range_override if provided, otherwise use self.range
|
||||
current_range = range_override if range_override else self.range
|
||||
return self._get_tracks_for_variants(title, _fetch_variant)
|
||||
|
||||
# Build API request parameters
|
||||
params = {
|
||||
"token": self.token,
|
||||
"guid": title.id,
|
||||
}
|
||||
|
||||
data = {
|
||||
"type": self.config["client"][self.device]["type"],
|
||||
}
|
||||
|
||||
# Add range-specific parameters
|
||||
if current_range == "HDR10":
|
||||
data["video_format"] = "hdr10"
|
||||
elif current_range == "DV":
|
||||
data["video_format"] = "dolby_vision"
|
||||
else:
|
||||
data["video_format"] = "sdr"
|
||||
|
||||
# Only request high-quality HDR content with L1 CDM
|
||||
if current_range in ("HDR10", "DV") and self.cdm.security_level == 3:
|
||||
# L3 CDM - skip HDR content
|
||||
return Tracks()
|
||||
# HLS Example: Service returns all codecs/ranges in one master playlist.
|
||||
# No need for _get_tracks_for_variants, dl.py filters by user selection.
|
||||
#
|
||||
# def get_tracks(self, title: Title_T) -> Tracks:
|
||||
# playback = self.session.get(
|
||||
# url=self.config["endpoints"]["playback"].format(title_id=title.id),
|
||||
# params={"token": self.token},
|
||||
# ).json()
|
||||
# return HLS.from_url(
|
||||
# url=playback["manifest_url"],
|
||||
# session=self.session,
|
||||
# ).to_tracks(title.language)
|
||||
|
||||
def _fetch_dash_manifest(
|
||||
self,
|
||||
title: Title_T,
|
||||
vcodec: str = "H264",
|
||||
video_format: str = "sdr",
|
||||
) -> Tracks:
|
||||
streams = self.session.post(
|
||||
url=self.config["endpoints"]["streams"],
|
||||
params=params,
|
||||
data=data,
|
||||
params={
|
||||
"token": self.token,
|
||||
"guid": title.id,
|
||||
},
|
||||
data={
|
||||
"type": self.config["client"][self.device]["type"],
|
||||
"video_format": video_format,
|
||||
"video_codec": vcodec,
|
||||
},
|
||||
).json()["media"]
|
||||
|
||||
self.license = {
|
||||
self.license_data = {
|
||||
"url": streams["drm"]["url"],
|
||||
"data": streams["drm"]["data"],
|
||||
"session": streams["drm"]["session"],
|
||||
}
|
||||
|
||||
manifest_url = streams["url"].split("?")[0]
|
||||
|
||||
self.log.debug(f"Manifest URL: {manifest_url}")
|
||||
tracks = DASH.from_url(url=manifest_url, session=self.session).to_tracks(language=title.language)
|
||||
|
||||
# Set range attributes on video tracks
|
||||
range_enum = {
|
||||
"hdr10": Video.Range.HDR10,
|
||||
"dolby_vision": Video.Range.DV,
|
||||
}.get(video_format, Video.Range.SDR)
|
||||
for video in tracks.videos:
|
||||
if current_range == "HDR10":
|
||||
video.range = Video.Range.HDR10
|
||||
elif current_range == "DV":
|
||||
video.range = Video.Range.DV
|
||||
else:
|
||||
video.range = Video.Range.SDR
|
||||
video.range = range_enum
|
||||
|
||||
# Remove DRM-free ("clear") audio tracks
|
||||
tracks.audio = [
|
||||
track for track in tracks.audio if "clear" not in track.data["dash"]["representation"].get("id")
|
||||
]
|
||||
@@ -257,14 +275,14 @@ class EXAMPLE(Service):
|
||||
url=subtitle["url"],
|
||||
codec=Subtitle.Codec.from_mime("vtt"),
|
||||
language=Language.get(subtitle["language"]),
|
||||
# cc=True if '(cc)' in subtitle['name'] else False,
|
||||
sdh=True,
|
||||
)
|
||||
)
|
||||
|
||||
if not self.movie:
|
||||
title.data["chapters"] = self.session.get(
|
||||
url=self.config["endpoints"]["metadata"].format(title_id=title.id), params={"token": self.token}
|
||||
url=self.config["endpoints"]["metadata"].format(title_id=title.id),
|
||||
params={"token": self.token},
|
||||
).json()["chapters"]
|
||||
|
||||
return tracks
|
||||
@@ -283,12 +301,9 @@ class EXAMPLE(Service):
|
||||
return chapters
|
||||
|
||||
def get_widevine_service_certificate(self, **_: any) -> str:
|
||||
"""Return the Widevine service certificate from config, if available."""
|
||||
return self.config.get("certificate")
|
||||
|
||||
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[bytes]:
|
||||
"""Retrieve a PlayReady license for a given track."""
|
||||
|
||||
license_url = self.config["endpoints"].get("playready_license")
|
||||
if not license_url:
|
||||
raise ValueError("PlayReady license endpoint not configured")
|
||||
@@ -304,7 +319,7 @@ class EXAMPLE(Service):
|
||||
return response.content
|
||||
|
||||
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
|
||||
license_url = self.license.get("url") or self.config["endpoints"].get("widevine_license")
|
||||
license_url = self.license_data.get("url") or self.config["endpoints"].get("widevine_license")
|
||||
if not license_url:
|
||||
raise ValueError("Widevine license endpoint not configured")
|
||||
|
||||
@@ -312,11 +327,11 @@ class EXAMPLE(Service):
|
||||
url=license_url,
|
||||
data=challenge,
|
||||
params={
|
||||
"session": self.license.get("session"),
|
||||
"session": self.license_data.get("session"),
|
||||
"userId": self.user_id,
|
||||
},
|
||||
headers={
|
||||
"dt-custom-data": self.license.get("data"),
|
||||
"dt-custom-data": self.license_data.get("data"),
|
||||
"user-agent": self.config["client"][self.device]["license_user_agent"],
|
||||
},
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user