From ead88fe066355eb42ee27740af8108d3c5cca110 Mon Sep 17 00:00:00 2001 From: imSp4rky Date: Sat, 16 May 2026 13:52:57 -0600 Subject: [PATCH] feat(hls): detect DV-composite tracks and restore signaling post-mux Parse HLS SUPPLEMENTAL-CODECS to identify tracks that ship Dolby Vision RPU NALs in a stream whose primary codec is plain hvc1 (e.g. fMP4 ladders signalled as dvh1.08.x only via SUPPLEMENTAL-CODECS). Tag such tracks with the new `dv_compatible_bitstream` flag on Video. Add DVFixup helper that runs `dovi_tool extract-rpu | inject-rpu` on flagged tracks before mux so the muxed MKV is recognised as Dolby Vision. Soft-fails to the source bitstream if dovi_tool is missing or any step errors. Range stays whatever VIDEO-RANGE signalled. HDR10+ presence is a bitstream feature, not a codec-string feature, so services that know their encoder embeds HDR10+ SEI must override Video.range themselves. --- unshackle/commands/dl.py | 4 + unshackle/core/manifests/dash.py | 4 + unshackle/core/manifests/hls.py | 69 +++++++++++++++--- unshackle/core/tracks/dv_fixup.py | 117 ++++++++++++++++++++++++++++++ unshackle/core/tracks/video.py | 2 + 5 files changed, 187 insertions(+), 9 deletions(-) create mode 100644 unshackle/core/tracks/dv_fixup.py diff --git a/unshackle/commands/dl.py b/unshackle/commands/dl.py index 554cf40..e336c0c 100644 --- a/unshackle/commands/dl.py +++ b/unshackle/commands/dl.py @@ -53,6 +53,7 @@ from unshackle.core.titles import Movie, Movies, Series, Song, Title_T from unshackle.core.titles.episode import Episode from unshackle.core.tracks import Audio, Subtitle, Tracks, Video from unshackle.core.tracks.attachment import Attachment +from unshackle.core.tracks.dv_fixup import apply_dv_fixup from unshackle.core.tracks.hybrid import Hybrid from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger, get_system_fonts, init_debug_logger, is_close_match, suggest_font_packages, time_elapsed_since) @@ -2433,6 +2434,9 @@ class dl: else: # Normal mode: process each video track separately for video_track in title.tracks.videos or [None]: + if video_track and getattr(video_track, "dv_compatible_bitstream", False): + apply_dv_fixup(video_track) + task_description = "Multiplexing" if video_track: if len(quality) > 1: diff --git a/unshackle/core/manifests/dash.py b/unshackle/core/manifests/dash.py index 21ece1b..ca8b35f 100644 --- a/unshackle/core/manifests/dash.py +++ b/unshackle/core/manifests/dash.py @@ -853,6 +853,10 @@ class DASH: def get_video_range( codecs: str, all_supplemental_props: list[Element], all_essential_props: list[Element] ) -> Video.Range: + # TODO: Detect Dolby Vision composite streams in DASH manifests (DV RPU embedded but + # primary codec is plain hvc1, signaled via a separate AdaptationSet/Representation + # with DV codec strings or DolbyVisionConfigurationBox). When found, mark the track + # with dv_compatible_bitstream=True so DVFixup runs pre-mux. No DASH samples seen yet. if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")): return Video.Range.DV diff --git a/unshackle/core/manifests/hls.py b/unshackle/core/manifests/hls.py index 80b5c9a..b26185a 100644 --- a/unshackle/core/manifests/hls.py +++ b/unshackle/core/manifests/hls.py @@ -5,6 +5,7 @@ import html import json import logging import os +import re import shutil import subprocess import sys @@ -36,8 +37,14 @@ from unshackle.core.utilities import get_debug_logger, get_extension, is_close_m class HLS: + SUPP_CODECS_RE = re.compile(r'SUPPLEMENTAL-CODECS="([^"]+)"', re.IGNORECASE) + def __init__( - self, manifest: M3U8, session: Optional[Union[Session, RnetSession]] = None, url: Optional[str] = None + self, + manifest: M3U8, + session: Optional[Union[Session, RnetSession]] = None, + url: Optional[str] = None, + raw_text: Optional[str] = None, ): if not manifest: raise ValueError("HLS manifest must be provided.") @@ -49,6 +56,7 @@ class HLS: self.manifest = manifest self.session = session or Session() self.url = url + self.raw_text = raw_text @classmethod def from_url(cls, url: str, session: Optional[Union[Session, RnetSession]] = None, **args: Any) -> HLS: @@ -78,7 +86,7 @@ class HLS: master = m3u8.loads(content, uri=url) - return cls(master, session, url=url) + return cls(master, session, url=url, raw_text=content) @classmethod def from_text(cls, text: str, url: str) -> HLS: @@ -94,7 +102,31 @@ class HLS: master = m3u8.loads(text, uri=url) - return cls(master) + return cls(master, raw_text=text) + + def supplemental_codecs_by_uri(self) -> dict[str, str]: + """Map each variant URI to its SUPPLEMENTAL-CODECS value. + + python-m3u8 drops this attribute, so we re-parse the raw text to recover it for + Dolby Vision composite detection (dvh1.08.x advertised only in SUPPLEMENTAL-CODECS + while primary CODECS stays plain hvc1). + """ + if not self.raw_text: + return {} + out: dict[str, str] = {} + lines = self.raw_text.splitlines() + for i, line in enumerate(lines): + if not line.startswith("#EXT-X-STREAM-INF"): + continue + supp_match = self.SUPP_CODECS_RE.search(line) + if not supp_match: + continue + for j in range(i + 1, len(lines)): + uri = lines[j].strip() + if uri and not uri.startswith("#"): + out[uri] = supp_match.group(1) + break + return out def to_tracks(self, language: Union[str, Language]) -> Tracks: """ @@ -126,6 +158,9 @@ class HLS: ) tracks = Tracks() + supplemental_codecs = self.supplemental_codecs_by_uri() + dv_supp_prefixes = ("dva1", "dvav", "dvhe", "dvh1") + for playlist in self.manifest.playlists: audio_group = playlist.stream_info.audio audio_codec: Optional[Audio.Codec] = None @@ -146,6 +181,26 @@ class HLS: else: primary_track_type = Video + primary_codecs = (playlist.stream_info.codecs or "").lower() + primary_has_dv = any(codec.split(".")[0] in dv_supp_prefixes for codec in primary_codecs.split(",")) + + supp_codecs_str = supplemental_codecs.get(playlist.uri, "") + supp_dv_codec: Optional[str] = None + for codec in supp_codecs_str.lower().split(","): + token = codec.strip().split("/")[0] + if token.split(".")[0] in dv_supp_prefixes: + supp_dv_codec = token + break + + video_range = ( + Video.Range.DV if primary_has_dv else Video.Range.from_m3u_range_tag(playlist.stream_info.video_range) + ) + # DV-composite track: primary codec is plain HEVC but SUPPLEMENTAL-CODECS advertises + # a DV codec. Range stays whatever VIDEO-RANGE signaled (HDR10/HLG/SDR); DVFixup will + # restore DV signaling post-download. Services that know their encoder embeds HDR10+ + # SEI must override `range` themselves (see services/ATV). + dv_compatible_bitstream = primary_track_type is Video and not primary_has_dv and supp_dv_codec is not None + tracks.add( primary_track_type( id_=hex(crc32(str(playlist).encode()))[2:], @@ -164,18 +219,14 @@ class HLS: # video track args **( dict( - range_=Video.Range.DV - if any( - codec.split(".")[0] in ("dva1", "dvav", "dvhe", "dvh1") - for codec in (playlist.stream_info.codecs or "").lower().split(",") - ) - else Video.Range.from_m3u_range_tag(playlist.stream_info.video_range), + range_=video_range, width=playlist.stream_info.resolution[0] if playlist.stream_info.resolution else None, height=playlist.stream_info.resolution[1] if playlist.stream_info.resolution else None, fps=playlist.stream_info.frame_rate, closed_captions=cc_by_group_id.get( (playlist.stream_info.closed_captions or "").strip('"'), [] ), + dv_compatible_bitstream=dv_compatible_bitstream, ) if primary_track_type is Video else {} diff --git a/unshackle/core/tracks/dv_fixup.py b/unshackle/core/tracks/dv_fixup.py new file mode 100644 index 0000000..3e8f9ff --- /dev/null +++ b/unshackle/core/tracks/dv_fixup.py @@ -0,0 +1,117 @@ +""" +DV fixup for HLS composite HEVC streams. + +Some services deliver DV Profile 8.1 in a stream whose primary CODECS is plain +hvc1, with DV advertised only via SUPPLEMENTAL-CODECS. The fMP4 carries DV RPU NALs but +the container does not signal DV, so muxing produces an MKV that mediainfo and DV-capable +TVs see as plain HDR10/HDR10+. + +A dovi_tool extract-rpu / inject-rpu round-trip rewrites the bitstream so it is recognised +as DV after muxing. HDR10+ SEI NALs and HDR10 base layer signaling survive untouched. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +from rich.padding import Padding +from rich.rule import Rule + +from unshackle.core.binaries import FFMPEG, DoviTool +from unshackle.core.config import config +from unshackle.core.console import console +from unshackle.core.utilities import get_debug_logger +from unshackle.core.utils import dovi +from unshackle.core.utils.subprocess import run_step + +if TYPE_CHECKING: + from unshackle.core.tracks import Video + + +class DVFixup: + """Round-trip a DV-composite HEVC track through dovi_tool to restore DV signaling.""" + + def __init__(self, video: "Video") -> None: + self.log = logging.getLogger("dv-fixup") + self.debug_logger = get_debug_logger() + self.video = video + + if not DoviTool: + raise EnvironmentError("dovi_tool is required for DV-composite fixup but was not found.") + if not FFMPEG: + raise EnvironmentError("ffmpeg is required for DV-composite fixup but was not found.") + if not video.path or not Path(video.path).exists(): + raise ValueError(f"Video track {video.id} was not downloaded before DV fixup.") + + def run(self) -> Path: + """Execute the fixup. Returns the DV-signaled HEVC path, or the original + source path on any failure so muxing can proceed with the as-downloaded file.""" + source = Path(self.video.path) + height = self.video.height or 0 + console.print(Padding(Rule(f"[rule.text]DV Composite Fixup ({height}p)"), (1, 2))) + + fixed_hevc = source.with_name(f"{self.video.id}.dv.hevc") + if fixed_hevc.exists() and fixed_hevc.stat().st_size > 0: + self.log.info("✓ DV signaling already restored (reusing existing fixup)") + return fixed_hevc + + tmp = config.directories.temp + tmp.mkdir(parents=True, exist_ok=True) + suffix = f"{self.video.id}_{height or 'na'}" + raw_hevc = tmp / f"dvfix_{suffix}.hevc" + rpu = tmp / f"dvfix_{suffix}_rpu.bin" + + try: + run_step( + [FFMPEG, "-nostdin", "-y", "-i", source, "-c:v", "copy", "-f", "hevc", raw_hevc], + status="Demuxing HEVC bitstream...", + output=raw_hevc, + label="ffmpeg demux", + ) + dovi.extract_rpu_with_fallback(raw_hevc, rpu) + dovi.inject_rpu(raw_hevc, rpu, fixed_hevc, status="Re-injecting DV RPU with proper signaling...") + except Exception as e: + self.log.warning(f"DV fixup failed ({e}); muxing source as-is.") + if self.debug_logger: + self.debug_logger.log( + level="WARNING", + operation="dv_fixup", + message="DV fixup failed; falling back to source", + context={"error": str(e), "source": str(source)}, + ) + for leftover in (raw_hevc, rpu, fixed_hevc): + leftover.unlink(missing_ok=True) + return source + + for leftover in (raw_hevc, rpu): + leftover.unlink(missing_ok=True) + + self.log.info("✓ DV signaling restored") + if self.debug_logger: + self.debug_logger.log( + level="INFO", + operation="dv_fixup", + message="DV fixup complete", + context={"source": str(source), "output": str(fixed_hevc)}, + success=True, + ) + return fixed_hevc + + +def apply_dv_fixup(video: "Video") -> None: + """Run DV fixup on `video` if flagged as DV-composite. Updates `video.path` in place + and deletes the original source file so the standard mux cleanup handles the new path.""" + if not getattr(video, "dv_compatible_bitstream", False): + return + if not video.path or not Path(video.path).exists(): + return + original = Path(video.path) + fixed = DVFixup(video).run() + if fixed != original: + video.path = fixed + original.unlink(missing_ok=True) + + +__all__ = ("DVFixup", "apply_dv_fixup") diff --git a/unshackle/core/tracks/video.py b/unshackle/core/tracks/video.py index ed2bdb6..19f184b 100644 --- a/unshackle/core/tracks/video.py +++ b/unshackle/core/tracks/video.py @@ -228,6 +228,7 @@ class Video(Track): fps: Optional[Union[str, int, float]] = None, scan_type: Optional[Video.ScanType] = None, closed_captions: Optional[list[dict[str, Any]]] = None, + dv_compatible_bitstream: bool = False, **kwargs: Any, ) -> None: """ @@ -294,6 +295,7 @@ class Video(Track): self.scan_type = scan_type self.closed_captions: list[dict[str, Any]] = closed_captions or [] self.needs_duration_fix = False + self.dv_compatible_bitstream = dv_compatible_bitstream def __str__(self) -> str: return " | ".join(