2 Commits

Author SHA1 Message Date
Andy
e0dbd0b046 feat(dl): add cross-mux support for combining tracks from multiple services
Add --cross-video, --cross-audio, --cross-subtitles, and --cross-chapters options that allow sourcing specific track types from different streaming services. Each cross-service runs its full authentication and track fetching pipeline independently.
Also adds --cross-audio-offset and --cross-subtitle-offset with human-friendly time formats (e.g. '10s', '500ms', '-5.5s') for timing adjustment via mkvmerge --sync, --cross-profile for per-service credentials, and --cross-wanted for manual episode mapping override.
2026-03-13 12:49:21 -06:00
Andy
e02aa66843 feat(dl): add --worst flag and SHIELD OkHttp fingerprint preset
Add --worst CLI flag to select the lowest bitrate video track within a specified resolution (e.g. --worst -q 720). Requires -q/--quality.
Add shield_okhttp TLS fingerprint preset for NVIDIA SHIELD Android TV with OkHttp 4.11 JA3 signature.
2026-03-11 13:59:07 -06:00
4 changed files with 440 additions and 79 deletions

View File

@@ -64,7 +64,7 @@ from unshackle.core.tracks.hybrid import Hybrid
from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger, get_system_fonts, init_debug_logger, from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger, get_system_fonts, init_debug_logger,
is_close_match, suggest_font_packages, time_elapsed_since) is_close_match, suggest_font_packages, time_elapsed_since)
from unshackle.core.utils import tags from unshackle.core.utils import tags
from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE, from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, OFFSET, QUALITY_LIST, SEASON_RANGE,
ContextData, MultipleChoice, MultipleVideoCodecChoice, ContextData, MultipleChoice, MultipleVideoCodecChoice,
SubtitleCodecChoice) SubtitleCodecChoice)
from unshackle.core.utils.collections import merge_dict from unshackle.core.utils.collections import merge_dict
@@ -506,6 +506,12 @@ class dl:
@click.option( @click.option(
"--reset-cache", "reset_cache", is_flag=True, default=False, help="Clear title cache before fetching." "--reset-cache", "reset_cache", is_flag=True, default=False, help="Clear title cache before fetching."
) )
@click.option(
"--worst",
is_flag=True,
default=False,
help="Select the lowest bitrate track within the specified quality. Requires -q/--quality.",
)
@click.option( @click.option(
"--best-available", "--best-available",
"best_available", "best_available",
@@ -513,6 +519,58 @@ class dl:
default=False, default=False,
help="Continue with best available quality if requested resolutions are not available.", help="Continue with best available quality if requested resolutions are not available.",
) )
@click.option(
"--cross-video",
nargs=2,
type=(str, str),
default=None,
help="Cross-mux: use video from another service. Format: SERVICE URL.",
)
@click.option(
"--cross-audio",
nargs=2,
type=(str, str),
default=None,
help="Cross-mux: use audio from another service. Format: SERVICE URL.",
)
@click.option(
"--cross-subtitles",
nargs=2,
type=(str, str),
default=None,
help="Cross-mux: use subtitles from another service. Format: SERVICE URL.",
)
@click.option(
"--cross-chapters",
nargs=2,
type=(str, str),
default=None,
help="Cross-mux: use chapters from another service. Format: SERVICE URL.",
)
@click.option(
"--cross-audio-offset",
type=OFFSET,
default=None,
help="Timing offset for cross-sourced audio, e.g. '10s', '500ms', '-5.5s'.",
)
@click.option(
"--cross-subtitle-offset",
type=OFFSET,
default=None,
help="Timing offset for cross-sourced subtitles, e.g. '10s', '500ms', '-5.5s'.",
)
@click.option(
"--cross-profile",
type=str,
default=None,
help="Profile to use for cross-service credentials. Defaults to --profile.",
)
@click.option(
"--cross-wanted",
type=str,
default=None,
help="Override episode mapping for cross-services, e.g. 'S01E02'.",
)
@click.pass_context @click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> dl: def cli(ctx: click.Context, **kwargs: Any) -> dl:
return dl(ctx, **kwargs) return dl(ctx, **kwargs)
@@ -532,6 +590,14 @@ class dl:
animeapi_id: Optional[str] = None, animeapi_id: Optional[str] = None,
enrich: bool = False, enrich: bool = False,
output_dir: Optional[Path] = None, output_dir: Optional[Path] = None,
cross_video: Optional[tuple[str, str]] = None,
cross_audio: Optional[tuple[str, str]] = None,
cross_subtitles: Optional[tuple[str, str]] = None,
cross_chapters: Optional[tuple[str, str]] = None,
cross_audio_offset: Optional[int] = None,
cross_subtitle_offset: Optional[int] = None,
cross_profile: Optional[str] = None,
cross_wanted: Optional[str] = None,
*_: Any, *_: Any,
**__: Any, **__: Any,
): ):
@@ -580,6 +646,16 @@ class dl:
self.animeapi_title: Optional[str] = None self.animeapi_title: Optional[str] = None
self.output_dir = output_dir self.output_dir = output_dir
# Cross-mux settings
self.cross_video = cross_video
self.cross_audio = cross_audio
self.cross_subtitles = cross_subtitles
self.cross_chapters = cross_chapters
self.cross_audio_offset = cross_audio_offset
self.cross_subtitle_offset = cross_subtitle_offset
self.cross_profile = cross_profile or profile
self.cross_wanted = cross_wanted
if animeapi_id: if animeapi_id:
from unshackle.core.utils.animeapi import resolve_animeapi from unshackle.core.utils.animeapi import resolve_animeapi
@@ -948,6 +1024,218 @@ class dl:
# able to keep `self` as the first positional # able to keep `self` as the first positional
self.cli._result_callback = self.result self.cli._result_callback = self.result
def _instantiate_cross_service(self, tag: str, url: str) -> tuple[Service, str]:
"""
Instantiate a cross-service for cross-mux by tag and URL.
Returns (service_instance, title_url) after authentication.
"""
tag = Services.get_tag(tag)
service_cls = Services.load(tag)
# Build service config for the cross-service
service_config_path = Services.get_path(tag) / config.filenames.config
if service_config_path.exists():
cross_service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
else:
cross_service_config = {}
# Load CDM for the cross-service
cross_cdm = self.get_cdm(tag, self.cross_profile)
# Build a synthetic Click context for the cross-service
cross_ctx_obj = ContextData(
config=cross_service_config,
cdm=cross_cdm,
proxy_providers=self.proxy_providers,
profile=self.cross_profile,
)
# Extract the title argument from the URL using TITLE_RE
title_id = url
if hasattr(service_cls, "TITLE_RE"):
m = re.match(service_cls.TITLE_RE, url)
if m:
# Try named group 'title_id' first, then 'id', then group(1)
title_id = m.group("title_id") if "title_id" in m.groupdict() else (
m.group("id") if "id" in m.groupdict() else m.group(1)
)
# Build kwargs from the service cli command's params with defaults
cli_cmd = service_cls.cli
kwargs = {"title": title_id}
for param in cli_cmd.params:
if param.name and param.name != "title" and param.name not in kwargs:
kwargs[param.name] = param.default
# Create a parent context that mimics what dl.__init__ sets up.
# Services access ctx.parent.params for various dl-level options,
# so we provide a complete set of defaults to avoid KeyError.
parent_ctx = click.Context(self.cli, info_name="dl")
parent_ctx.params = {
"no_proxy": True,
"proxy": None,
"proxy_query": None,
"proxy_provider": None,
"vcodec": [],
"acodec": [],
"range_": [Video.Range.SDR],
"best_available": False,
"profile": self.cross_profile,
"quality": [],
"wanted": None,
"video_only": False,
"audio_only": False,
"subs_only": False,
"chapters_only": False,
"list_": False,
"skip_dl": False,
"no_cache": False,
"reset_cache": False,
}
ctx = click.Context(cli_cmd, parent=parent_ctx, info_name=tag)
ctx.obj = cross_ctx_obj
# Instantiate the service
cross_service = service_cls(ctx, **kwargs)
# Authenticate
cookies = self.get_cookie_jar(tag, self.cross_profile)
credential = self.get_credentials(tag, self.cross_profile)
cross_service.authenticate(cookies, credential)
return cross_service
def _match_cross_title(
self, primary_title: Title_T, cross_titles: Any
) -> Optional[Title_T]:
"""Match a primary title to its counterpart in cross-service titles."""
if isinstance(primary_title, Movie):
# For movies, the cross URL should resolve to the same movie
if hasattr(cross_titles, "__iter__"):
for t in cross_titles:
if isinstance(t, Movie):
return t
return None
if isinstance(primary_title, Episode):
if self.cross_wanted:
# Manual override: parse S01E02 format
m = re.match(r"S(\d+)E(\d+)", self.cross_wanted, re.IGNORECASE)
if m:
wanted_season = int(m.group(1))
wanted_episode = int(m.group(2))
for t in cross_titles:
if isinstance(t, Episode) and t.season == wanted_season and t.number == wanted_episode:
return t
self.log.warning(
f"Cross-wanted S{wanted_season:02d}E{wanted_episode:02d} not found in cross-service"
)
return None
# Auto-match by season + episode number
for t in cross_titles:
if isinstance(t, Episode) and t.season == primary_title.season and t.number == primary_title.number:
return t
self.log.warning(
f"No cross-service match for S{primary_title.season:02d}E{primary_title.number:02d}"
)
return None
return None
def _process_cross_services(self, title: Title_T) -> dict[str, tuple[Service, Title_T, Tracks]]:
"""
Process all cross-service specs and return fetched tracks per track type.
Returns dict like {"video": (service, matched_title, tracks), ...}
"""
cross_specs: list[tuple[str, Optional[tuple[str, str]]]] = [
("video", self.cross_video),
("audio", self.cross_audio),
("subtitles", self.cross_subtitles),
("chapters", self.cross_chapters),
]
# Cache instantiated services by (tag, url) to avoid duplicate auth
service_cache: dict[tuple[str, str], Service] = {}
result: dict[str, tuple[Service, Title_T, Tracks]] = {}
for track_type, spec in cross_specs:
if not spec:
continue
tag, url = spec
cache_key = (Services.get_tag(tag), url)
if cache_key not in service_cache:
self.log.info(f"Cross-mux: loading {track_type} from {tag}")
cross_service = self._instantiate_cross_service(tag, url)
service_cache[cache_key] = cross_service
else:
cross_service = service_cache[cache_key]
# Get titles from cross-service
cross_titles = cross_service.get_titles()
# Match the primary title to a cross-service title
cross_title = self._match_cross_title(title, cross_titles)
if not cross_title:
self.log.warning(f"Cross-mux: could not match title for {track_type} from {tag}, skipping")
continue
# Get tracks from cross-service
cross_tracks = cross_service.get_tracks(cross_title)
cross_chapters = cross_service.get_chapters(cross_title)
cross_tracks.chapters = cross_chapters
result[track_type] = (cross_service, cross_title, cross_tracks)
return result
def _apply_cross_tracks(
self,
title: Title_T,
cross_results: dict[str, tuple[Service, Title_T, Tracks]],
) -> None:
"""Replace primary tracks with cross-service tracks and mark with metadata."""
if "video" in cross_results:
cross_service, cross_title, cross_tracks = cross_results["video"]
title.tracks.videos = cross_tracks.videos
for track in title.tracks.videos:
track.data["_cross_service"] = cross_service
track.data["_cross_title"] = cross_title
track.data["cross_source"] = cross_service.__class__.__name__
if "audio" in cross_results:
cross_service, cross_title, cross_tracks = cross_results["audio"]
title.tracks.audio = cross_tracks.audio
for track in title.tracks.audio:
track.data["_cross_service"] = cross_service
track.data["_cross_title"] = cross_title
track.data["cross_source"] = cross_service.__class__.__name__
if self.cross_audio_offset:
track.data["cross_offset_ms"] = self.cross_audio_offset
if "subtitles" in cross_results:
cross_service, cross_title, cross_tracks = cross_results["subtitles"]
title.tracks.subtitles = cross_tracks.subtitles
for track in title.tracks.subtitles:
track.data["_cross_service"] = cross_service
track.data["_cross_title"] = cross_title
track.data["cross_source"] = cross_service.__class__.__name__
if self.cross_subtitle_offset:
track.data["cross_offset_ms"] = self.cross_subtitle_offset
if "chapters" in cross_results:
_, _, cross_tracks = cross_results["chapters"]
title.tracks.chapters = cross_tracks.chapters
@property
def has_cross_mux(self) -> bool:
return any([self.cross_video, self.cross_audio, self.cross_subtitles, self.cross_chapters])
def result( def result(
self, self,
service: Service, service: Service,
@@ -991,6 +1279,7 @@ class dl:
no_mux: bool, no_mux: bool,
workers: Optional[int], workers: Optional[int],
downloads: int, downloads: int,
worst: bool,
best_available: bool, best_available: bool,
split_audio: Optional[bool] = None, split_audio: Optional[bool] = None,
*_: Any, *_: Any,
@@ -1016,6 +1305,10 @@ class dl:
self.log.error("--require-subs and --s-lang cannot be used together") self.log.error("--require-subs and --s-lang cannot be used together")
sys.exit(1) sys.exit(1)
if worst and not quality:
self.log.error("--worst requires -q/--quality to be specified")
sys.exit(1)
if select_titles and wanted: if select_titles and wanted:
self.log.error("--select-titles and -w/--wanted cannot be used together") self.log.error("--select-titles and -w/--wanted cannot be used together")
sys.exit(1) sys.exit(1)
@@ -1405,6 +1698,25 @@ class dl:
level="INFO", operation="get_tracks", service=self.service, context=tracks_info level="INFO", operation="get_tracks", service=self.service, context=tracks_info
) )
# Cross-mux: replace tracks from cross-services if configured
if self.has_cross_mux:
with console.status("Cross-mux: fetching tracks from cross-services...", spinner="dots"):
try:
cross_results = self._process_cross_services(title)
if cross_results:
self._apply_cross_tracks(title, cross_results)
cross_sources = ", ".join(
f"{k}={v[0].__class__.__name__}" for k, v in cross_results.items()
)
self.log.info(f"Cross-mux: applied tracks from {cross_sources}")
except Exception as e:
self.log.error(f"Cross-mux failed: {e}")
if self.debug_logger:
self.debug_logger.log_error(
"cross_mux", e, service=self.service, context={"title": str(title)}
)
raise
# strip SDH subs to non-SDH if no equivalent same-lang non-SDH is available # strip SDH subs to non-SDH if no equivalent same-lang non-SDH is available
# uses a loose check, e.g, wont strip en-US SDH sub if a non-SDH en-GB is available # uses a loose check, e.g, wont strip en-US SDH sub if a non-SDH en-GB is available
# Check if automatic SDH stripping is enabled in config # Check if automatic SDH stripping is enabled in config
@@ -1609,20 +1921,18 @@ class dl:
for resolution, color_range, codec in product( for resolution, color_range, codec in product(
quality or [None], non_hybrid_ranges, vcodec or [None] quality or [None], non_hybrid_ranges, vcodec or [None]
): ):
match = next( candidates = [
( t
t for t in non_hybrid_tracks
for t in non_hybrid_tracks if (
if ( not resolution
not resolution or t.height == resolution
or t.height == resolution or int(t.width * (9 / 16)) == resolution
or int(t.width * (9 / 16)) == resolution )
) and (not color_range or t.range == color_range)
and (not color_range or t.range == color_range) and (not codec or t.codec == codec)
and (not codec or t.codec == codec) ]
), match = candidates[-1] if worst and candidates else next(iter(candidates), None)
None,
)
if match and match not in non_hybrid_selected: if match and match not in non_hybrid_selected:
non_hybrid_selected.append(match) non_hybrid_selected.append(match)
@@ -1632,20 +1942,18 @@ class dl:
for resolution, color_range, codec in product( for resolution, color_range, codec in product(
quality or [None], range_ or [None], vcodec or [None] quality or [None], range_ or [None], vcodec or [None]
): ):
match = next( candidates = [
( t
t for t in title.tracks.videos
for t in title.tracks.videos if (
if ( not resolution
not resolution or t.height == resolution
or t.height == resolution or int(t.width * (9 / 16)) == resolution
or int(t.width * (9 / 16)) == resolution )
) and (not color_range or t.range == color_range)
and (not color_range or t.range == color_range) and (not codec or t.codec == codec)
and (not codec or t.codec == codec) ]
), match = candidates[-1] if worst and candidates else next(iter(candidates), None)
None,
)
if match and match not in selected_videos: if match and match not in selected_videos:
selected_videos.append(match) selected_videos.append(match)
title.tracks.videos = selected_videos title.tracks.videos = selected_videos
@@ -1943,21 +2251,35 @@ class dl:
( (
pool.submit( pool.submit(
track.download, track.download,
session=service.session, session=(
track.data["_cross_service"].session
if track.data.get("_cross_service")
else service.session
),
prepare_drm=partial( prepare_drm=partial(
partial(self.prepare_drm, table=download_table), partial(self.prepare_drm, table=download_table),
track=track, track=track,
title=title, title=track.data.get("_cross_title", title),
certificate=partial( certificate=partial(
service.get_widevine_service_certificate, (
title=title, track.data["_cross_service"].get_widevine_service_certificate
if track.data.get("_cross_service")
else service.get_widevine_service_certificate
),
title=track.data.get("_cross_title", title),
track=track, track=track,
), ),
licence=partial( licence=partial(
service.get_playready_license (
if is_playready_cdm(self.cdm) track.data["_cross_service"].get_playready_license
else service.get_widevine_license, if track.data.get("_cross_service") and is_playready_cdm(self.cdm)
title=title, else track.data["_cross_service"].get_widevine_license
if track.data.get("_cross_service")
else service.get_playready_license
if is_playready_cdm(self.cdm)
else service.get_widevine_license
),
title=track.data.get("_cross_title", title),
track=track, track=track,
), ),
cdm_only=cdm_only, cdm_only=cdm_only,

View File

@@ -44,6 +44,17 @@ FINGERPRINT_PRESETS = {
"akamai": "4:16777216|16711681|0|m,p,a,s", "akamai": "4:16777216|16711681|0|m,p,a,s",
"description": "OkHttp 5.x (BoringSSL TLS stack)", "description": "OkHttp 5.x (BoringSSL TLS stack)",
}, },
"shield_okhttp": {
"ja3": (
"771," # TLS 1.2
"4865-4866-4867-49195-49199-49196-49200-52393-52392-49171-49172-156-157-47-53," # Ciphers (OkHttp 4.11)
"0-23-65281-10-11-35-16-5-13-51-45-43-21," # Extensions (incl padding ext 21)
"29-23-24," # Named groups (x25519, secp256r1, secp384r1)
"0" # EC point formats
),
"akamai": "4:16777216|16711681|0|m,p,a,s",
"description": "NVIDIA SHIELD Android TV OkHttp 4.11 (captured JA3)",
},
} }

View File

@@ -477,25 +477,25 @@ class Tracks:
if not at.path or not at.path.exists(): if not at.path or not at.path.exists():
raise ValueError("Audio Track must be downloaded before muxing...") raise ValueError("Audio Track must be downloaded before muxing...")
events.emit(events.Types.TRACK_MULTIPLEX, track=at) events.emit(events.Types.TRACK_MULTIPLEX, track=at)
cl.extend( audio_args = [
[ "--track-name",
"--track-name", f"0:{at.get_track_name() or ''}",
f"0:{at.get_track_name() or ''}", "--language",
"--language", f"0:{at.language}",
f"0:{at.language}", "--default-track",
"--default-track", f"0:{at.is_original_lang}",
f"0:{at.is_original_lang}", "--visual-impaired-flag",
"--visual-impaired-flag", f"0:{at.descriptive}",
f"0:{at.descriptive}", "--original-flag",
"--original-flag", f"0:{at.is_original_lang}",
f"0:{at.is_original_lang}", "--compression",
"--compression", "0:none", # disable extra compression
"0:none", # disable extra compression ]
"(",
str(at.path), if at.data.get("cross_offset_ms"):
")", audio_args.extend(["--sync", f"0:{at.data['cross_offset_ms']}"])
]
) cl.extend(audio_args + ["(", str(at.path), ")"])
if not skip_subtitles: if not skip_subtitles:
for st in self.subtitles: for st in self.subtitles:
@@ -503,29 +503,29 @@ class Tracks:
raise ValueError("Text Track must be downloaded before muxing...") raise ValueError("Text Track must be downloaded before muxing...")
events.emit(events.Types.TRACK_MULTIPLEX, track=st) events.emit(events.Types.TRACK_MULTIPLEX, track=st)
default = bool(self.audio and is_close_match(st.language, [self.audio[0].language]) and st.forced) default = bool(self.audio and is_close_match(st.language, [self.audio[0].language]) and st.forced)
cl.extend( sub_args = [
[ "--track-name",
"--track-name", f"0:{st.get_track_name() or ''}",
f"0:{st.get_track_name() or ''}", "--language",
"--language", f"0:{st.language}",
f"0:{st.language}", "--sub-charset",
"--sub-charset", "0:UTF-8",
"0:UTF-8", "--forced-track",
"--forced-track", f"0:{st.forced}",
f"0:{st.forced}", "--default-track",
"--default-track", f"0:{default}",
f"0:{default}", "--hearing-impaired-flag",
"--hearing-impaired-flag", f"0:{st.sdh}",
f"0:{st.sdh}", "--original-flag",
"--original-flag", f"0:{st.is_original_lang}",
f"0:{st.is_original_lang}", "--compression",
"--compression", "0:none", # disable extra compression (probably zlib)
"0:none", # disable extra compression (probably zlib) ]
"(",
str(st.path), if st.data.get("cross_offset_ms"):
")", sub_args.extend(["--sync", f"0:{st.data['cross_offset_ms']}"])
]
) cl.extend(sub_args + ["(", str(st.path), ")"])
if self.chapters: if self.chapters:
chapters_path = config.directories.temp / config.filenames.chapters.format( chapters_path = config.directories.temp / config.filenames.chapters.format(

View File

@@ -360,9 +360,37 @@ class MultipleChoice(click.Choice):
return super(self).shell_complete(ctx, param, incomplete) return super(self).shell_complete(ctx, param, incomplete)
class OffsetType(click.ParamType):
"""
Parses human-friendly time offset strings into milliseconds.
Accepts: '10s', '500ms', '-5.5s', '200' (bare number = ms).
"""
name = "offset"
_PATTERN = re.compile(r"^(-?\d+(?:\.\d+)?)\s*(s|ms)?$")
def convert(
self, value: Any, param: Optional[click.Parameter] = None, ctx: Optional[click.Context] = None
) -> int:
if isinstance(value, int):
return value
value = str(value).strip()
m = self._PATTERN.match(value)
if not m:
self.fail(f"'{value}' is not a valid offset. Use e.g. '10s', '500ms', '-5.5s'.", param, ctx)
number = float(m.group(1))
unit = m.group(2) or "ms"
if unit == "s":
return int(number * 1000)
return int(number)
SEASON_RANGE = SeasonRange() SEASON_RANGE = SeasonRange()
LANGUAGE_RANGE = LanguageRange() LANGUAGE_RANGE = LanguageRange()
QUALITY_LIST = QualityList() QUALITY_LIST = QualityList()
AUDIO_CODEC_LIST = AudioCodecList(Audio.Codec) AUDIO_CODEC_LIST = AudioCodecList(Audio.Codec)
OFFSET = OffsetType()
# VIDEO_CODEC_CHOICE will be created dynamically when imported # VIDEO_CODEC_CHOICE will be created dynamically when imported