Files
unshackle/unshackle/core/tracks/tracks.py
imSp4rky 4a543c59e6 fix(dl): mux hybrid ingredients standalone only when range explicitly requested
-r HYBRID alone muxed the HDR10/HDR10+ base layer as a standalone output because only the ingredient DV track was flagged hybrid_base_only. The inverse was also broken: HDR10/HDR10+/DV tracks never entered the standalone-deliverable pool, so -r HYBRID,HDR10P only delivered the standalone HDR10+ by accident of the first bug.

- Add Tracks.partition_hybrid_videos: ingredient ranges (HDR10/HDR10+/DV) enter the deliverable pool only when their range is explicitly requested alongside HYBRID; replaces the duplicated filter in dl.py.
- Add Tracks.flag_hybrid_ingredients: any track in the hybrid selection but not in the deliverable selection is flagged hybrid_base_only; replaces and generalises the DV-only dv_is_deliverable special case.
2026-06-06 16:01:54 -06:00

742 lines
31 KiB
Python

from __future__ import annotations
import logging
import subprocess
from functools import partial
from pathlib import Path
from typing import Callable, Iterator, Optional, Sequence, Union
from langcodes import Language, closest_supported_match
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn, TimeRemainingColumn
from rich.table import Table
from rich.tree import Tree
from unshackle.core import binaries
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.constants import LANGUAGE_EXACT_DISTANCE, LANGUAGE_MAX_DISTANCE, AnyTrack, TrackT
from unshackle.core.events import events
from unshackle.core.tracks.attachment import Attachment
from unshackle.core.tracks.audio import Audio
from unshackle.core.tracks.chapters import Chapter, Chapters
from unshackle.core.tracks.subtitle import Subtitle
from unshackle.core.tracks.track import Track
from unshackle.core.tracks.video import Video
from unshackle.core.utilities import get_debug_logger, is_close_match, sanitize_filename
from unshackle.core.utils.collections import as_list, flatten
class Tracks:
"""
Video, Audio, Subtitle, Chapter, and Attachment Track Store.
It provides convenience functions for listing, sorting, and selecting tracks.
"""
TRACK_ORDER_MAP = {Video: 0, Audio: 1, Subtitle: 2, Chapter: 3, Attachment: 4}
def __init__(
self,
*args: Union[
Tracks, Sequence[Union[AnyTrack, Chapter, Chapters, Attachment]], Track, Chapter, Chapters, Attachment
],
manifest_url: Optional[str] = None,
):
self.videos: list[Video] = []
self.audio: list[Audio] = []
self.subtitles: list[Subtitle] = []
self.chapters = Chapters()
self.attachments: list[Attachment] = []
self.manifest_url: Optional[str] = manifest_url
if args:
self.add(args)
def __iter__(self) -> Iterator[AnyTrack]:
return iter(as_list(self.videos, self.audio, self.subtitles))
def __len__(self) -> int:
return len(self.videos) + len(self.audio) + len(self.subtitles)
def __add__(
self,
other: Union[
Tracks, Sequence[Union[AnyTrack, Chapter, Chapters, Attachment]], Track, Chapter, Chapters, Attachment
],
) -> Tracks:
self.add(other)
return self
def __repr__(self) -> str:
return "{name}({items})".format(
name=self.__class__.__name__, items=", ".join([f"{k}={repr(v)}" for k, v in self.__dict__.items()])
)
def __str__(self) -> str:
rep = {Video: [], Audio: [], Subtitle: [], Chapter: [], Attachment: []}
tracks = [*list(self), *self.chapters]
for track in sorted(tracks, key=lambda t: self.TRACK_ORDER_MAP[type(t)]):
if not rep[type(track)]:
count = sum(type(x) is type(track) for x in tracks)
rep[type(track)].append(
"{count} {type} Track{plural}{colon}".format(
count=count,
type=track.__class__.__name__,
plural="s" if count != 1 else "",
colon=":" if count > 0 else "",
)
)
rep[type(track)].append(str(track))
for type_ in list(rep):
if not rep[type_]:
del rep[type_]
continue
rep[type_] = "\n".join([rep[type_][0]] + [f"├─ {x}" for x in rep[type_][1:-1]] + [f"└─ {rep[type_][-1]}"])
rep = "\n".join(list(rep.values()))
return rep
def tree(self, add_progress: bool = False) -> tuple[Tree, list[Callable[..., None]]]:
all_tracks = [*list(self), *self.chapters, *self.attachments]
progress_callables = []
tree = Tree("", hide_root=True)
for track_type in self.TRACK_ORDER_MAP:
tracks = list(x for x in all_tracks if isinstance(x, track_type))
if tracks:
num_tracks = len(tracks)
track_type_plural = track_type.__name__ + ("s" if track_type != Audio and num_tracks != 1 else "")
tracks_tree = tree.add(f"[repr.number]{num_tracks}[/] {track_type_plural}")
for track in tracks:
if add_progress and track_type not in (Chapter, Attachment):
progress = Progress(
SpinnerColumn(finished_text=""),
BarColumn(),
"",
TimeRemainingColumn(compact=True, elapsed_when_finished=True),
"",
TextColumn("[progress.data.speed]{task.fields[downloaded]}"),
console=console,
speed_estimate_period=10,
)
task = progress.add_task("", downloaded="-")
state = {"total": 100.0}
def update_track_progress(
task_id: int = task,
_state: dict[str, float] = state,
_progress: Progress = progress,
**kwargs,
) -> None:
"""
Ensure terminal status states render as a fully completed bar.
Some downloaders can report completed slightly below total
before emitting the final "Downloaded" state.
"""
if "total" in kwargs and kwargs["total"] is not None:
_state["total"] = kwargs["total"]
downloaded_state = kwargs.get("downloaded")
if downloaded_state in {"Downloaded", "Decrypted", "[yellow]SKIPPED"}:
kwargs["completed"] = _state["total"]
_progress.update(task_id=task_id, **kwargs)
progress_callables.append(update_track_progress)
track_table = Table.grid()
track_table.add_row(str(track)[6:], style="text2")
track_table.add_row(progress)
tracks_tree.add(track_table)
else:
tracks_tree.add(str(track)[6:], style="text2")
# Show Closed Captions right after Subtitles (even if no subtitle tracks exist)
if track_type is Subtitle:
seen_cc: set[str] = set()
unique_cc: list[str] = []
for video in (x for x in all_tracks if isinstance(x, Video)):
for cc in getattr(video, "closed_captions", []):
lang = cc.get("language", "und")
name = cc.get("name", "")
instream_id = cc.get("instream_id", "")
key = f"{lang}|{instream_id}"
if key in seen_cc:
continue
seen_cc.add(key)
parts = [f"[CC] | {lang}"]
if name:
parts.append(name)
if instream_id:
parts.append(instream_id)
unique_cc.append(" | ".join(parts))
if unique_cc:
cc_tree = tree.add(
f"[repr.number]{len(unique_cc)}[/] Closed Caption{'s' if len(unique_cc) != 1 else ''}"
)
for cc_str in unique_cc:
cc_tree.add(cc_str, style="text2")
return tree, progress_callables
def exists(self, by_id: Optional[str] = None, by_url: Optional[Union[str, list[str]]] = None) -> bool:
"""Check if a track already exists by various methods."""
if by_id: # recommended
return any(x.id == by_id for x in self)
if by_url:
return any(x.url == by_url for x in self)
return False
def add(
self,
tracks: Union[
Tracks, Sequence[Union[AnyTrack, Chapter, Chapters, Attachment]], Track, Chapter, Chapters, Attachment
],
warn_only: bool = False,
) -> None:
"""Add a provided track to its appropriate array and ensuring it's not a duplicate."""
if isinstance(tracks, Tracks):
if tracks.manifest_url and not self.manifest_url:
self.manifest_url = tracks.manifest_url
tracks = [*list(tracks), *tracks.chapters, *tracks.attachments]
duplicates = 0
for track in flatten(tracks):
if self.exists(by_id=track.id):
if not warn_only:
raise ValueError(
"One or more of the provided Tracks is a duplicate. "
"Track IDs must be unique but accurate using static values. The "
"value should stay the same no matter when you request the same "
"content. Use a value that has relation to the track content "
"itself and is static or permanent and not random/RNG data that "
"wont change each refresh or conflict in edge cases."
)
duplicates += 1
continue
if isinstance(track, Video):
self.videos.append(track)
elif isinstance(track, Audio):
self.audio.append(track)
elif isinstance(track, Subtitle):
self.subtitles.append(track)
elif isinstance(track, Chapter):
self.chapters.add(track)
elif isinstance(track, Attachment):
self.attachments.append(track)
else:
raise ValueError("Track type was not set or is invalid.")
log = logging.getLogger("Tracks")
if duplicates:
log.debug(f" - Found and skipped {duplicates} duplicate tracks...")
def sort_videos(self, by_language: Optional[Sequence[Union[str, Language]]] = None) -> None:
"""Sort video tracks by bitrate, and optionally language."""
if not self.videos:
return
# bitrate
self.videos.sort(key=lambda x: float(x.bitrate or 0.0), reverse=True)
# language
for language in reversed(by_language or []):
if str(language) in ("all", "best"):
language = next((x.language for x in self.videos if x.is_original_lang), "")
if not language:
continue
self.videos.sort(key=lambda x: str(x.language))
self.videos.sort(key=lambda x: not is_close_match(language, [x.language]))
def sort_audio(
self,
by_language: Optional[Sequence[Union[str, Language]]] = None,
codec_priority: Optional[Sequence[str]] = None,
) -> None:
"""Sort audio tracks by bitrate, codec priority, Atmos, descriptive, and optionally language."""
if not self.audio:
return
# bitrate (highest first)
self.audio.sort(key=lambda x: float(x.bitrate or 0.0), reverse=True)
# codec priority (listed codecs ranked in order; unlisted fall to end with bitrate order preserved)
if codec_priority:
rank = {str(c).upper(): i for i, c in enumerate(codec_priority)}
default_rank = len(rank)
self.audio.sort(key=lambda x: rank.get(x.codec.name if x.codec else "", default_rank))
# Atmos tracks first (prioritize over higher bitrate non-Atmos)
self.audio.sort(key=lambda x: not x.atmos)
# descriptive tracks last
self.audio.sort(key=lambda x: x.descriptive)
# language
for language in reversed(by_language or []):
if str(language) in ("all", "best"):
language = next((x.language for x in self.audio if x.is_original_lang), "")
if not language:
continue
self.audio.sort(key=lambda x: not is_close_match(language, [x.language]))
def sort_subtitles(self, by_language: Optional[Sequence[Union[str, Language]]] = None) -> None:
"""
Sort subtitle tracks by various track attributes to a common P2P standard.
You may optionally provide a sequence of languages to prioritize to the top.
Section Order:
- by_language groups prioritized to top, and ascending alphabetically
- then rest ascending alphabetically after the prioritized groups
(Each section ascending alphabetically, but separated)
Language Group Order:
- Forced
- Normal
- Hard of Hearing (SDH/CC)
(Least to most captions expected in the subtitle)
"""
if not self.subtitles:
return
# language groups
self.subtitles.sort(key=lambda x: str(x.language))
self.subtitles.sort(key=lambda x: x.sdh or x.cc)
self.subtitles.sort(key=lambda x: x.forced, reverse=True)
# sections
for language in reversed(by_language or []):
if str(language) == "all":
language = next((x.language for x in self.subtitles if x.is_original_lang), "")
if not language:
continue
self.subtitles.sort(key=lambda x: is_close_match(language, [x.language]), reverse=True)
def select_video(self, x: Callable[[Video], bool]) -> None:
self.videos = list(filter(x, self.videos))
def select_audio(self, x: Callable[[Audio], bool]) -> None:
self.audio = list(filter(x, self.audio))
def select_subtitles(self, x: Callable[[Subtitle], bool]) -> None:
self.subtitles = list(filter(x, self.subtitles))
def filter(self, predicate: Callable[[AnyTrack], bool]) -> Tracks:
"""Return a new Tracks with tracks filtered by predicate, preserving metadata."""
new_tracks = Tracks(manifest_url=self.manifest_url)
new_tracks.videos = [t for t in self.videos if predicate(t)]
new_tracks.audio = [t for t in self.audio if predicate(t)]
new_tracks.subtitles = [t for t in self.subtitles if predicate(t)]
new_tracks.chapters = self.chapters
new_tracks.attachments = list(self.attachments)
return new_tracks
@staticmethod
def merge_video_selections(*groups: list[Video]) -> list[Video]:
"""Concatenate video selections, dropping duplicates (by track id, order-preserving).
A DV track can be chosen as both the hybrid ingredient (lowest) and an explicit
deliverable; without dedup the same track would be muxed/downloaded twice.
"""
merged: list[Video] = []
for group in groups:
for video in group:
if video not in merged:
merged.append(video)
return merged
@staticmethod
def partition_hybrid_videos(
videos: list[Video], non_hybrid_ranges: list[Video.Range]
) -> tuple[list[Video], list[Video]]:
"""Split videos into hybrid-ingredient candidates and the standalone-deliverable pool.
HDR10/HDR10+/DV tracks are hybrid ingredients; they only enter the standalone
pool when their range was explicitly requested alongside HYBRID, so e.g.
`-r HYBRID` muxes only the hybrid while `-r HYBRID,HDR10P` also delivers HDR10+.
"""
ingredient_ranges = (Video.Range.HDR10, Video.Range.HDR10P, Video.Range.DV)
hybrid_candidates = [v for v in videos if v.range in ingredient_ranges]
non_hybrid = [v for v in videos if v.range not in ingredient_ranges or v.range in non_hybrid_ranges]
return hybrid_candidates, non_hybrid
@staticmethod
def flag_hybrid_ingredients(hybrid_selected: list[Video], non_hybrid_selected: list[Video]) -> None:
"""Mark tracks selected only as hybrid ingredients so the standalone mux loop skips them.
A track that was also picked as an explicit deliverable (same track in both
selections) stays unflagged and is muxed standalone alongside the hybrid.
"""
for video in hybrid_selected:
if video not in non_hybrid_selected:
video.hybrid_base_only = True
def select_hybrid(self, tracks, quality, worst: bool = False):
# Prefer HDR10+ over HDR10 as the base layer (preserves dynamic metadata)
base_ranges = (Video.Range.HDR10P, Video.Range.HDR10)
base_tracks = []
for range_type in base_ranges:
base_tracks = [
v for v in tracks if v.range == range_type and (v.height in quality or int(v.width * 9 / 16) in quality)
]
if base_tracks:
break
pick = min if worst else max
base_selected = []
for res in quality:
candidates = [v for v in base_tracks if v.height == res or int(v.width * 9 / 16) == res]
if candidates:
chosen = pick(candidates, key=lambda v: v.bitrate)
base_selected.append(chosen)
dv_tracks = [v for v in tracks if v.range == Video.Range.DV]
lowest_dv = min(dv_tracks, key=lambda v: v.height) if dv_tracks else None
def select(x):
if x in base_selected:
return True
if lowest_dv and x is lowest_dv:
return True
return False
return select
def by_resolutions(self, resolutions: list[int], per_resolution: int = 0) -> None:
# Note: Do not merge these list comprehensions. They must be done separately so the results
# from the 16:9 canvas check is only used if there's no exact height resolution match.
selected = []
for resolution in resolutions:
matches = [ # exact matches
x for x in self.videos if x.height == resolution
]
if not matches:
matches = [ # 16:9 canvas matches
x for x in self.videos if int(x.width * (9 / 16)) == resolution
]
selected.extend(matches[: per_resolution or None])
self.videos = selected
@staticmethod
def by_language(
tracks: list[TrackT], languages: list[str], per_language: int = 0, exact_match: bool = False
) -> list[TrackT]:
distance = LANGUAGE_EXACT_DISTANCE if exact_match else LANGUAGE_MAX_DISTANCE
selected = []
for language in languages:
selected.extend(
[x for x in tracks if closest_supported_match(str(x.language), [language], distance)][
: per_language or None
]
)
return selected
def mux(
self,
title: str,
delete: bool = True,
progress: Optional[partial] = None,
audio_expected: bool = True,
title_language: Optional[Language] = None,
skip_subtitles: bool = False,
) -> tuple[Path, int, list[str]]:
"""
Multiplex all the Tracks into a Matroska Container file.
Parameters:
title: Set the Matroska Container file title. Usually displayed in players
instead of the filename if set.
delete: Delete all track files after multiplexing.
progress: Update a rich progress bar via `completed=...`. This must be the
progress object's update() func, pre-set with task id via functools.partial.
audio_expected: Whether audio is expected in the output. Used to determine
if embedded audio metadata should be added.
title_language: The title's intended language. Used to select the best video track
for audio metadata when multiple video tracks exist.
skip_subtitles: Skip muxing subtitle tracks into the container.
"""
if self.videos and not self.audio and audio_expected:
video_track = None
if title_language:
video_track = next((v for v in self.videos if v.language == title_language), None)
if not video_track:
video_track = next((v for v in self.videos if v.is_original_lang), None)
video_track = video_track or self.videos[0]
if video_track.language.is_valid():
lang_code = str(video_track.language)
lang_name = video_track.language.display_name()
for video in self.videos:
video.needs_repack = True
video.data["audio_language"] = lang_code
video.data["audio_language_name"] = lang_name
if not binaries.MKVToolNix:
raise RuntimeError("MKVToolNix (mkvmerge) is required for muxing but was not found")
cl = [
str(binaries.MKVToolNix),
"--no-date", # remove dates from the output for security
]
if config.muxing.get("set_title", True):
cl.extend(["--title", title])
default_language = config.muxing.get("default_language") or {}
preferred_video_lang = default_language.get("video")
preferred_audio_lang = default_language.get("audio")
preferred_subtitle_lang = default_language.get("subtitle")
preferred_video_idx: Optional[int] = None
if preferred_video_lang:
preferred_video_idx = next(
(idx for idx, v in enumerate(self.videos) if is_close_match(v.language, [preferred_video_lang])),
None,
)
preferred_audio_idx: Optional[int] = None
if preferred_audio_lang:
preferred_audio_idx = next(
(idx for idx, a in enumerate(self.audio) if is_close_match(a.language, [preferred_audio_lang])),
None,
)
preferred_subtitle_idx: Optional[int] = None
if preferred_subtitle_lang and not skip_subtitles:
preferred_subtitle_idx = next(
(idx for idx, s in enumerate(self.subtitles) if is_close_match(s.language, [preferred_subtitle_lang])),
None,
)
for i, vt in enumerate(self.videos):
if not vt.path or not vt.path.exists():
raise ValueError("Video Track must be downloaded before muxing...")
events.emit(events.Types.TRACK_MULTIPLEX, track=vt)
if preferred_video_idx is not None:
is_default = i == preferred_video_idx
elif title_language:
is_default = vt.language == title_language
if not any(v.language == title_language for v in self.videos):
is_default = vt.is_original_lang or i == 0
else:
is_default = i == 0
# Prepare base arguments
video_args = [
"--language",
f"0:{vt.language}",
"--default-track",
f"0:{is_default}",
"--original-flag",
f"0:{vt.is_original_lang}",
"--compression",
"0:none", # disable extra compression
]
# Add FPS fix if needed (typically for hybrid mode to prevent sync issues)
if hasattr(vt, "needs_duration_fix") and vt.needs_duration_fix and vt.fps:
video_args.extend(
[
"--default-duration",
f"0:{vt.fps}fps" if isinstance(vt.fps, str) else f"0:{vt.fps:.3f}fps",
"--fix-bitstream-timing-information",
"0:1",
]
)
if hasattr(vt, "range") and vt.range == Video.Range.HLG:
video_args.extend(
[
"--color-transfer-characteristics",
"0:18", # ARIB STD-B67 (HLG)
]
)
if hasattr(vt, "data") and vt.data.get("audio_language"):
audio_lang = vt.data["audio_language"]
audio_name = vt.data.get("audio_language_name", audio_lang)
video_args.extend(
[
"--language",
f"1:{audio_lang}",
"--track-name",
f"1:{audio_name}",
]
)
cl.extend(video_args + ["(", str(vt.path), ")"])
for i, at in enumerate(self.audio):
if not at.path or not at.path.exists():
raise ValueError("Audio Track must be downloaded before muxing...")
events.emit(events.Types.TRACK_MULTIPLEX, track=at)
if preferred_audio_idx is not None:
audio_default = i == preferred_audio_idx
else:
audio_default = at.is_original_lang
cl.extend(
[
"--track-name",
f"0:{at.get_track_name() or ''}",
"--language",
f"0:{at.language}",
"--default-track",
f"0:{audio_default}",
"--visual-impaired-flag",
f"0:{at.descriptive}",
"--original-flag",
f"0:{at.is_original_lang}",
"--compression",
"0:none", # disable extra compression
"(",
str(at.path),
")",
]
)
if not skip_subtitles:
for i, st in enumerate(self.subtitles):
if not st.path or not st.path.exists():
raise ValueError("Text Track must be downloaded before muxing...")
events.emit(events.Types.TRACK_MULTIPLEX, track=st)
if preferred_subtitle_idx is not None:
default = i == preferred_subtitle_idx
else:
default = bool(self.audio and is_close_match(st.language, [self.audio[0].language]) and st.forced)
cl.extend(
[
"--track-name",
f"0:{st.get_track_name() or ''}",
"--language",
f"0:{st.language}",
"--sub-charset",
"0:UTF-8",
"--forced-track",
f"0:{st.forced}",
"--default-track",
f"0:{default}",
"--hearing-impaired-flag",
f"0:{st.sdh}",
"--original-flag",
f"0:{st.is_original_lang}",
"--compression",
"0:none", # disable extra compression (probably zlib)
"(",
str(st.path),
")",
]
)
if self.chapters:
chapters_path = config.directories.temp / config.filenames.chapters.format(
title=sanitize_filename(title), random=self.chapters.id
)
self.chapters.dump(chapters_path, fallback_name=config.chapter_fallback_name)
cl.extend(["--chapter-charset", "UTF-8", "--chapters", str(chapters_path)])
else:
chapters_path = None
for attachment in self.attachments:
if not attachment.path or not attachment.path.exists():
raise ValueError("Attachment File was not found...")
cl.extend(
[
"--attachment-description",
attachment.description or "",
"--attachment-mime-type",
attachment.mime_type,
"--attachment-name",
attachment.name,
"--attach-file",
str(attachment.path.resolve()),
]
)
output_path = (
self.videos[0].path.with_suffix(".muxed.mkv")
if self.videos
else self.audio[0].path.with_suffix(".muxed.mka")
if self.audio
else self.subtitles[0].path.with_suffix(".muxed.mks")
if self.subtitles
else chapters_path.with_suffix(".muxed.mkv")
if self.chapters
else None
)
if not output_path:
raise ValueError("No tracks provided, at least one track must be provided.")
debug_logger = get_debug_logger()
if debug_logger:
debug_logger.log(
level="DEBUG",
operation="mux_start",
message="Starting mkvmerge muxing",
context={
"title": title,
"output_path": str(output_path),
"video_count": len(self.videos),
"audio_count": len(self.audio),
"subtitle_count": len(self.subtitles),
"attachment_count": len(self.attachments),
"has_chapters": bool(self.chapters),
"video_tracks": [
{"id": v.id, "codec": getattr(v, "codec", None), "language": str(v.language)}
for v in self.videos
],
"audio_tracks": [
{"id": a.id, "codec": getattr(a, "codec", None), "language": str(a.language)}
for a in self.audio
],
"subtitle_tracks": [
{"id": s.id, "codec": getattr(s, "codec", None), "language": str(s.language)}
for s in self.subtitles
],
},
)
# let potential failures go to caller, caller should handle
try:
errors = []
p = subprocess.Popen([*cl, "--output", str(output_path), "--gui-mode"], text=True, stdout=subprocess.PIPE)
for line in iter(p.stdout.readline, ""):
if line.startswith("#GUI#error") or line.startswith("#GUI#warning"):
errors.append(line)
if "progress" in line:
progress(total=100, completed=int(line.strip()[14:-1]))
returncode = p.wait()
if debug_logger:
if returncode != 0 or errors:
debug_logger.log(
level="ERROR",
operation="mux_failed",
message=f"mkvmerge exited with code {returncode}",
context={
"returncode": returncode,
"output_path": str(output_path),
"errors": errors,
},
)
else:
debug_logger.log(
level="DEBUG",
operation="mux_complete",
message="mkvmerge muxing completed successfully",
context={
"output_path": str(output_path),
"output_exists": output_path.exists() if output_path else False,
},
)
return output_path, returncode, errors
finally:
if chapters_path:
chapters_path.unlink()
if delete:
for track in self:
track.delete()
for attachment in self.attachments:
if attachment.path and attachment.path.exists():
attachment.path.unlink()
__all__ = ("Tracks",)