feat(core): add TrackRequest system for multi-codec/multi-range support

Add TrackRequest dataclass to Service base class that centralizes CLI vcodec/range/best_available params. Services read from self.track_request instead of accessing ctx.parent.params directly.

- Add TrackRequest dataclass with codecs, ranges, best_available fields
- Set self.track_request in Service.__init__() from CLI params
- Add _get_tracks_for_variants() helper for per-codec/range API calls
- Update dl.py to detect migrated vs legacy services automatically
- Handle HYBRID+other ranges (e.g. -r HYBRID,SDR) correctly in dl.py
- Support --best-available with multi-range/codec (skip unavailable)
This commit is contained in:
Andy
2026-02-23 15:47:27 -07:00
parent 983fd18d53
commit d0341f6844
3 changed files with 319 additions and 38 deletions

View File

@@ -64,7 +64,8 @@ from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger
is_close_match, suggest_font_packages, time_elapsed_since) is_close_match, suggest_font_packages, time_elapsed_since)
from unshackle.core.utils import tags from unshackle.core.utils import tags
from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE, from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE,
ContextData, MultipleChoice, SubtitleCodecChoice, VideoCodecChoice) ContextData, MultipleChoice, MultipleVideoCodecChoice,
SubtitleCodecChoice)
from unshackle.core.utils.collections import merge_dict from unshackle.core.utils.collections import merge_dict
from unshackle.core.utils.selector import select_multiple from unshackle.core.utils.selector import select_multiple
from unshackle.core.utils.subprocess import ffprobe from unshackle.core.utils.subprocess import ffprobe
@@ -288,9 +289,9 @@ class dl:
@click.option( @click.option(
"-v", "-v",
"--vcodec", "--vcodec",
type=VideoCodecChoice(Video.Codec), type=MultipleVideoCodecChoice(Video.Codec),
default=None, default=[],
help="Video Codec to download, defaults to any codec.", help="Video Codec(s) to download, defaults to any codec.",
) )
@click.option( @click.option(
"-a", "-a",
@@ -913,7 +914,7 @@ class dl:
self, self,
service: Service, service: Service,
quality: list[int], quality: list[int],
vcodec: Optional[Video.Codec], vcodec: list[Video.Codec],
acodec: list[Audio.Codec], acodec: list[Audio.Codec],
vbitrate: int, vbitrate: int,
abitrate: int, abitrate: int,
@@ -1273,8 +1274,78 @@ class dl:
with console.status("Getting tracks...", spinner="dots"): with console.status("Getting tracks...", spinner="dots"):
try: try:
title.tracks.add(service.get_tracks(title), warn_only=True) # Migrated services use track_request and handle multi-codec/range
title.tracks.chapters = service.get_chapters(title) # internally via _get_tracks_for_variants(). Unmigrated services
# still expose self.vcodec/self.range as strings and need the
# multi-fetch loop here.
is_single_range_service = isinstance(getattr(service, "range", None), str)
is_single_vcodec_service = isinstance(getattr(service, "vcodec", None), str)
uses_legacy_pattern = is_single_range_service or is_single_vcodec_service
if uses_legacy_pattern:
# Legacy path for unmigrated services
non_hybrid_ranges = [r for r in range_ if r != Video.Range.HYBRID]
hybrid_requested = Video.Range.HYBRID in range_
needs_multi_range = (
is_single_range_service
and not no_video
and (len(non_hybrid_ranges) > 1 or (hybrid_requested and non_hybrid_ranges))
)
needs_multi_vcodec = is_single_vcodec_service and not no_video and len(vcodec) > 1
if needs_multi_range or needs_multi_vcodec:
original_range = getattr(service, "range", None)
original_vcodec = getattr(service, "vcodec", None)
range_iterations = non_hybrid_ranges if needs_multi_range else [None]
vcodec_iterations = vcodec if needs_multi_vcodec else [None]
first = True
if needs_multi_range and hybrid_requested:
service.range = "HYBRID"
self.log.info("Fetching HYBRID tracks...")
title.tracks.add(service.get_tracks(title), warn_only=True)
title.tracks.chapters = service.get_chapters(title)
first = False
for r_val, vc_val in product(range_iterations, vcodec_iterations):
label_parts = []
if r_val:
service.range = r_val.name
label_parts.append(r_val.name)
if vc_val and is_single_vcodec_service and original_vcodec is not None:
if "." in str(original_vcodec):
service.vcodec = vc_val.value
elif str(original_vcodec) == str(original_vcodec).lower():
service.vcodec = vc_val.value.lower()
else:
service.vcodec = vc_val.value.replace(".", "").replace("-", "")
label_parts.append(vc_val.name)
if label_parts:
self.log.info(f"Fetching {' '.join(label_parts)} tracks...")
fetch_tracks = service.get_tracks(title)
if first:
title.tracks.add(fetch_tracks, warn_only=True)
title.tracks.chapters = service.get_chapters(title)
first = False
else:
for video in fetch_tracks.videos:
title.tracks.add(video, warn_only=True)
if original_range is not None:
service.range = original_range
if original_vcodec is not None:
service.vcodec = original_vcodec
else:
title.tracks.add(service.get_tracks(title), warn_only=True)
title.tracks.chapters = service.get_chapters(title)
else:
# Migrated services handle multi-codec/range via track_request
title.tracks.add(service.get_tracks(title), warn_only=True)
title.tracks.chapters = service.get_chapters(title)
except Exception as e: except Exception as e:
if self.debug_logger: if self.debug_logger:
self.debug_logger.log_error( self.debug_logger.log_error(
@@ -1384,9 +1455,12 @@ class dl:
if isinstance(title, (Movie, Episode)): if isinstance(title, (Movie, Episode)):
# filter video tracks # filter video tracks
if vcodec: if vcodec:
title.tracks.select_video(lambda x: x.codec == vcodec) title.tracks.select_video(lambda x: x.codec in vcodec)
missing_codecs = [c for c in vcodec if not any(x.codec == c for x in title.tracks.videos)]
for codec in missing_codecs:
self.log.warning(f"Skipping {codec.name} video tracks as none are available.")
if not title.tracks.videos: if not title.tracks.videos:
self.log.error(f"There's no {vcodec.name} Video Track...") self.log.error(f"There's no {', '.join(c.name for c in vcodec)} Video Track...")
sys.exit(1) sys.exit(1)
if range_: if range_:
@@ -1438,10 +1512,38 @@ class dl:
self.log.error(f"There's no {processed_video_lang} Video Track...") self.log.error(f"There's no {processed_video_lang} Video Track...")
sys.exit(1) sys.exit(1)
has_hybrid = any(r == Video.Range.HYBRID for r in range_)
non_hybrid_ranges = [r for r in range_ if r != Video.Range.HYBRID]
if quality: if quality:
missing_resolutions = [] missing_resolutions = []
if any(r == Video.Range.HYBRID for r in range_): if has_hybrid:
title.tracks.select_video(title.tracks.select_hybrid(title.tracks.videos, quality)) # Split tracks: hybrid candidates vs non-hybrid
hybrid_candidate_tracks = [
v for v in title.tracks.videos
if v.range in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
non_hybrid_tracks = [
v for v in title.tracks.videos
if v.range not in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
# Apply hybrid selection to HDR10+DV tracks
hybrid_filter = title.tracks.select_hybrid(hybrid_candidate_tracks, quality)
hybrid_selected = list(filter(hybrid_filter, hybrid_candidate_tracks))
if non_hybrid_ranges and non_hybrid_tracks:
# Also filter non-hybrid tracks by resolution
non_hybrid_selected = [
v for v in non_hybrid_tracks
if any(
v.height == res or int(v.width * (9 / 16)) == res
for res in quality
)
]
title.tracks.videos = hybrid_selected + non_hybrid_selected
else:
title.tracks.videos = hybrid_selected
else: else:
title.tracks.by_resolutions(quality) title.tracks.by_resolutions(quality)
@@ -1468,21 +1570,63 @@ class dl:
sys.exit(1) sys.exit(1)
# choose best track by range and quality # choose best track by range and quality
if any(r == Video.Range.HYBRID for r in range_): if has_hybrid:
# For hybrid mode, always apply hybrid selection # Apply hybrid selection for HYBRID tracks
# If no quality specified, use only the best (highest) resolution hybrid_candidate_tracks = [
v for v in title.tracks.videos
if v.range in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
non_hybrid_tracks = [
v for v in title.tracks.videos
if v.range not in (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
]
if not quality: if not quality:
# Get the highest resolution available best_resolution = max(
best_resolution = max((v.height for v in title.tracks.videos), default=None) (v.height for v in hybrid_candidate_tracks), default=None
)
if best_resolution: if best_resolution:
# Use the hybrid selection logic with only the best resolution hybrid_filter = title.tracks.select_hybrid(
title.tracks.select_video( hybrid_candidate_tracks, [best_resolution]
title.tracks.select_hybrid(title.tracks.videos, [best_resolution])
) )
# If quality was specified, hybrid selection was already applied above hybrid_selected = list(filter(hybrid_filter, hybrid_candidate_tracks))
else:
hybrid_selected = []
else:
hybrid_filter = title.tracks.select_hybrid(
hybrid_candidate_tracks, quality
)
hybrid_selected = list(filter(hybrid_filter, hybrid_candidate_tracks))
# For non-hybrid ranges, apply Cartesian product selection
non_hybrid_selected: list[Video] = []
if non_hybrid_ranges and non_hybrid_tracks:
for resolution, color_range, codec in product(
quality or [None], non_hybrid_ranges, vcodec or [None]
):
match = next(
(
t
for t in non_hybrid_tracks
if (
not resolution
or t.height == resolution
or int(t.width * (9 / 16)) == resolution
)
and (not color_range or t.range == color_range)
and (not codec or t.codec == codec)
),
None,
)
if match and match not in non_hybrid_selected:
non_hybrid_selected.append(match)
title.tracks.videos = hybrid_selected + non_hybrid_selected
else: else:
selected_videos: list[Video] = [] selected_videos: list[Video] = []
for resolution, color_range in product(quality or [None], range_ or [None]): for resolution, color_range, codec in product(
quality or [None], range_ or [None], vcodec or [None]
):
match = next( match = next(
( (
t t
@@ -1493,6 +1637,7 @@ class dl:
or int(t.width * (9 / 16)) == resolution or int(t.width * (9 / 16)) == resolution
) )
and (not color_range or t.range == color_range) and (not color_range or t.range == color_range)
and (not codec or t.codec == codec)
), ),
None, None,
) )
@@ -1508,29 +1653,38 @@ class dl:
] ]
dv_tracks = [v for v in title.tracks.videos if v.range == Video.Range.DV] dv_tracks = [v for v in title.tracks.videos if v.range == Video.Range.DV]
hybrid_failed = False
if not base_tracks and not dv_tracks: if not base_tracks and not dv_tracks:
available_ranges = sorted(set(v.range.name for v in title.tracks.videos)) available_ranges = sorted(set(v.range.name for v in title.tracks.videos))
self.log.error( msg = "HYBRID mode requires both HDR10/HDR10+ and DV tracks, but neither is available"
"HYBRID mode requires both HDR10/HDR10+ and DV tracks, but neither is available" msg_detail = (
)
self.log.error(
f"Available ranges: {', '.join(available_ranges) if available_ranges else 'none'}" f"Available ranges: {', '.join(available_ranges) if available_ranges else 'none'}"
) )
sys.exit(1) hybrid_failed = True
elif not base_tracks: elif not base_tracks:
available_ranges = sorted(set(v.range.name for v in title.tracks.videos)) available_ranges = sorted(set(v.range.name for v in title.tracks.videos))
self.log.error( msg = "HYBRID mode requires both HDR10/HDR10+ and DV tracks, but only DV is available"
"HYBRID mode requires both HDR10/HDR10+ and DV tracks, but only DV is available" msg_detail = f"Available ranges: {', '.join(available_ranges)}"
) hybrid_failed = True
self.log.error(f"Available ranges: {', '.join(available_ranges)}")
sys.exit(1)
elif not dv_tracks: elif not dv_tracks:
available_ranges = sorted(set(v.range.name for v in title.tracks.videos)) available_ranges = sorted(set(v.range.name for v in title.tracks.videos))
self.log.error( msg = "HYBRID mode requires both HDR10/HDR10+ and DV tracks, but only HDR10 is available"
"HYBRID mode requires both HDR10/HDR10+ and DV tracks, but only HDR10 is available" msg_detail = f"Available ranges: {', '.join(available_ranges)}"
) hybrid_failed = True
self.log.error(f"Available ranges: {', '.join(available_ranges)}")
sys.exit(1) if hybrid_failed:
other_ranges = [r for r in range_ if r != Video.Range.HYBRID]
if best_available and other_ranges:
self.log.warning(msg)
self.log.warning(
f"Continuing with remaining range(s): "
f"{', '.join(r.name for r in other_ranges)}"
)
range_ = other_ranges
else:
self.log.error(msg)
self.log.error(msg_detail)
sys.exit(1)
# filter subtitle tracks # filter subtitle tracks
if require_subs: if require_subs:
@@ -2122,6 +2276,8 @@ class dl:
task_description += f" {video_track.height}p" task_description += f" {video_track.height}p"
if len(range_) > 1: if len(range_) > 1:
task_description += f" {video_track.range.name}" task_description += f" {video_track.range.name}"
if len(vcodec) > 1:
task_description += f" {video_track.codec.name}"
task_tracks = Tracks(title.tracks) + title.tracks.chapters + title.tracks.attachments task_tracks = Tracks(title.tracks) + title.tracks.chapters + title.tracks.attachments
if video_track: if video_track:

View File

@@ -1,7 +1,8 @@
import base64 import base64
import logging import logging
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from collections.abc import Generator from collections.abc import Callable, Generator
from dataclasses import dataclass, field
from http.cookiejar import CookieJar from http.cookiejar import CookieJar
from pathlib import Path from pathlib import Path
from typing import Optional, Union from typing import Optional, Union
@@ -24,9 +25,26 @@ from unshackle.core.search_result import SearchResult
from unshackle.core.title_cacher import TitleCacher, get_account_hash, get_region_from_proxy from unshackle.core.title_cacher import TitleCacher, get_account_hash, get_region_from_proxy
from unshackle.core.titles import Title_T, Titles_T from unshackle.core.titles import Title_T, Titles_T
from unshackle.core.tracks import Chapters, Tracks from unshackle.core.tracks import Chapters, Tracks
from unshackle.core.tracks.video import Video
from unshackle.core.utilities import get_cached_ip_info, get_ip_info from unshackle.core.utilities import get_cached_ip_info, get_ip_info
@dataclass
class TrackRequest:
"""Holds what the user requested for video codec and range selection.
Services read from this instead of ctx.parent.params for vcodec/range.
Attributes:
codecs: Requested codecs from CLI. Empty list means no filter (accept any).
ranges: Requested ranges from CLI. Defaults to [SDR].
"""
codecs: list[Video.Codec] = field(default_factory=list)
ranges: list[Video.Range] = field(default_factory=lambda: [Video.Range.SDR])
best_available: bool = False
def sanitize_proxy_for_log(uri: Optional[str]) -> Optional[str]: def sanitize_proxy_for_log(uri: Optional[str]) -> Optional[str]:
""" """
Sanitize a proxy URI for logs by redacting any embedded userinfo (username/password). Sanitize a proxy URI for logs by redacting any embedded userinfo (username/password).
@@ -89,6 +107,16 @@ class Service(metaclass=ABCMeta):
self.credential = None # Will be set in authenticate() self.credential = None # Will be set in authenticate()
self.current_region = None # Will be set based on proxy/geolocation self.current_region = None # Will be set based on proxy/geolocation
# Set track request from CLI params - services can read/override in their __init__
vcodec = ctx.parent.params.get("vcodec") if ctx.parent else None
range_ = ctx.parent.params.get("range_") if ctx.parent else None
best_available = ctx.parent.params.get("best_available", False) if ctx.parent else False
self.track_request = TrackRequest(
codecs=list(vcodec) if vcodec else [],
ranges=list(range_) if range_ else [Video.Range.SDR],
best_available=bool(best_available),
)
if not ctx.parent or not ctx.parent.params.get("no_proxy"): if not ctx.parent or not ctx.parent.params.get("no_proxy"):
if ctx.parent: if ctx.parent:
proxy = ctx.parent.params["proxy"] proxy = ctx.parent.params["proxy"]
@@ -205,6 +233,76 @@ class Service(metaclass=ABCMeta):
self.log.debug(f"Failed to get cached IP info: {e}") self.log.debug(f"Failed to get cached IP info: {e}")
self.current_region = None self.current_region = None
def _get_tracks_for_variants(
self,
title: Title_T,
fetch_fn: Callable[..., Tracks],
) -> Tracks:
"""Call fetch_fn for each codec/range combo in track_request, merge results.
Services that need separate API calls per codec/range combo can use this
helper from their get_tracks() implementation.
The fetch_fn signature should be: (title, codec, range_) -> Tracks
For HYBRID range, fetch_fn is called with HDR10 and DV separately and
the DV video tracks are merged into the HDR10 result.
Args:
title: The title being processed.
fetch_fn: A callable that fetches tracks for a specific codec/range.
"""
all_tracks = Tracks()
first = True
codecs = self.track_request.codecs or [None]
ranges = self.track_request.ranges or [Video.Range.SDR]
for range_val in ranges:
if range_val == Video.Range.HYBRID:
# HYBRID: fetch HDR10 first (full tracks), then DV (video only)
for codec_val in codecs:
try:
hdr_tracks = fetch_fn(title, codec=codec_val, range_=Video.Range.HDR10)
except (ValueError, SystemExit) as e:
if self.track_request.best_available:
self.log.warning(f" - HDR10 not available for HYBRID, skipping ({e})")
continue
raise
if first:
all_tracks.add(hdr_tracks, warn_only=True)
first = False
else:
for video in hdr_tracks.videos:
all_tracks.add(video, warn_only=True)
try:
dv_tracks = fetch_fn(title, codec=codec_val, range_=Video.Range.DV)
for video in dv_tracks.videos:
all_tracks.add(video, warn_only=True)
except (ValueError, SystemExit):
self.log.info(" - No DolbyVision manifest available for HYBRID")
else:
for codec_val in codecs:
try:
tracks = fetch_fn(title, codec=codec_val, range_=range_val)
except (ValueError, SystemExit) as e:
if self.track_request.best_available:
codec_name = codec_val.name if codec_val else "default"
self.log.warning(
f" - {range_val.name}/{codec_name} not available, skipping ({e})"
)
continue
raise
if first:
all_tracks.add(tracks, warn_only=True)
first = False
else:
for video in tracks.videos:
all_tracks.add(video, warn_only=True)
return all_tracks
# Optional Abstract functions # Optional Abstract functions
# The following functions may be implemented by the Service. # The following functions may be implemented by the Service.
# Otherwise, the base service code (if any) of the function will be executed on call. # Otherwise, the base service code (if any) of the function will be executed on call.
@@ -461,4 +559,4 @@ class Service(metaclass=ABCMeta):
""" """
__all__ = ("Service",) __all__ = ("Service", "TrackRequest")

View File

@@ -44,6 +44,33 @@ class VideoCodecChoice(click.Choice):
self.fail(f"'{value}' is not a valid video codec", param, ctx) self.fail(f"'{value}' is not a valid video codec", param, ctx)
class MultipleVideoCodecChoice(VideoCodecChoice):
"""
A multiple-value variant of VideoCodecChoice that accepts comma-separated codecs.
Accepts both enum names and values, e.g.: ``-v hevc,avc`` or ``-v H.264,H.265``
"""
name = "multiple_video_codec_choice"
def convert(
self, value: Any, param: Optional[click.Parameter] = None, ctx: Optional[click.Context] = None
) -> list[Any]:
if not value:
return []
if isinstance(value, list):
values = value
elif isinstance(value, str):
values = value.split(",")
else:
self.fail(f"{value!r} is not a supported value.", param, ctx)
chosen_values: list[Any] = []
for v in values:
chosen_values.append(super().convert(v.strip(), param, ctx))
return chosen_values
class SubtitleCodecChoice(click.Choice): class SubtitleCodecChoice(click.Choice):
""" """
A custom Choice type for subtitle codecs that accepts both enum names, values, and common aliases. A custom Choice type for subtitle codecs that accepts both enum names, values, and common aliases.