feat(hls): detect DV-composite tracks and restore signaling post-mux

Parse HLS SUPPLEMENTAL-CODECS to identify tracks that ship Dolby Vision RPU NALs in a stream whose primary codec is plain hvc1 (e.g. fMP4 ladders signalled as dvh1.08.x only via SUPPLEMENTAL-CODECS). Tag such tracks with the new `dv_compatible_bitstream` flag on Video.

Add DVFixup helper that runs `dovi_tool extract-rpu | inject-rpu` on flagged tracks before mux so the muxed MKV is recognised as Dolby Vision. Soft-fails to the source bitstream if dovi_tool is missing or any step errors.

Range stays whatever VIDEO-RANGE signalled. HDR10+ presence is a bitstream feature, not a codec-string feature, so services that know their encoder embeds HDR10+ SEI must override Video.range themselves.
This commit is contained in:
imSp4rky
2026-05-16 13:52:57 -06:00
parent b4d422459c
commit ead88fe066
5 changed files with 187 additions and 9 deletions

View File

@@ -53,6 +53,7 @@ from unshackle.core.titles import Movie, Movies, Series, Song, Title_T
from unshackle.core.titles.episode import Episode from unshackle.core.titles.episode import Episode
from unshackle.core.tracks import Audio, Subtitle, Tracks, Video from unshackle.core.tracks import Audio, Subtitle, Tracks, Video
from unshackle.core.tracks.attachment import Attachment from unshackle.core.tracks.attachment import Attachment
from unshackle.core.tracks.dv_fixup import apply_dv_fixup
from unshackle.core.tracks.hybrid import Hybrid from unshackle.core.tracks.hybrid import Hybrid
from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger, get_system_fonts, init_debug_logger, from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger, get_system_fonts, init_debug_logger,
is_close_match, suggest_font_packages, time_elapsed_since) is_close_match, suggest_font_packages, time_elapsed_since)
@@ -2433,6 +2434,9 @@ class dl:
else: else:
# Normal mode: process each video track separately # Normal mode: process each video track separately
for video_track in title.tracks.videos or [None]: for video_track in title.tracks.videos or [None]:
if video_track and getattr(video_track, "dv_compatible_bitstream", False):
apply_dv_fixup(video_track)
task_description = "Multiplexing" task_description = "Multiplexing"
if video_track: if video_track:
if len(quality) > 1: if len(quality) > 1:

View File

@@ -853,6 +853,10 @@ class DASH:
def get_video_range( def get_video_range(
codecs: str, all_supplemental_props: list[Element], all_essential_props: list[Element] codecs: str, all_supplemental_props: list[Element], all_essential_props: list[Element]
) -> Video.Range: ) -> Video.Range:
# TODO: Detect Dolby Vision composite streams in DASH manifests (DV RPU embedded but
# primary codec is plain hvc1, signaled via a separate AdaptationSet/Representation
# with DV codec strings or DolbyVisionConfigurationBox). When found, mark the track
# with dv_compatible_bitstream=True so DVFixup runs pre-mux. No DASH samples seen yet.
if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")): if codecs.startswith(("dva1", "dvav", "dvhe", "dvh1")):
return Video.Range.DV return Video.Range.DV

View File

@@ -5,6 +5,7 @@ import html
import json import json
import logging import logging
import os import os
import re
import shutil import shutil
import subprocess import subprocess
import sys import sys
@@ -36,8 +37,14 @@ from unshackle.core.utilities import get_debug_logger, get_extension, is_close_m
class HLS: class HLS:
SUPP_CODECS_RE = re.compile(r'SUPPLEMENTAL-CODECS="([^"]+)"', re.IGNORECASE)
def __init__( def __init__(
self, manifest: M3U8, session: Optional[Union[Session, RnetSession]] = None, url: Optional[str] = None self,
manifest: M3U8,
session: Optional[Union[Session, RnetSession]] = None,
url: Optional[str] = None,
raw_text: Optional[str] = None,
): ):
if not manifest: if not manifest:
raise ValueError("HLS manifest must be provided.") raise ValueError("HLS manifest must be provided.")
@@ -49,6 +56,7 @@ class HLS:
self.manifest = manifest self.manifest = manifest
self.session = session or Session() self.session = session or Session()
self.url = url self.url = url
self.raw_text = raw_text
@classmethod @classmethod
def from_url(cls, url: str, session: Optional[Union[Session, RnetSession]] = None, **args: Any) -> HLS: def from_url(cls, url: str, session: Optional[Union[Session, RnetSession]] = None, **args: Any) -> HLS:
@@ -78,7 +86,7 @@ class HLS:
master = m3u8.loads(content, uri=url) master = m3u8.loads(content, uri=url)
return cls(master, session, url=url) return cls(master, session, url=url, raw_text=content)
@classmethod @classmethod
def from_text(cls, text: str, url: str) -> HLS: def from_text(cls, text: str, url: str) -> HLS:
@@ -94,7 +102,31 @@ class HLS:
master = m3u8.loads(text, uri=url) master = m3u8.loads(text, uri=url)
return cls(master) return cls(master, raw_text=text)
def supplemental_codecs_by_uri(self) -> dict[str, str]:
"""Map each variant URI to its SUPPLEMENTAL-CODECS value.
python-m3u8 drops this attribute, so we re-parse the raw text to recover it for
Dolby Vision composite detection (dvh1.08.x advertised only in SUPPLEMENTAL-CODECS
while primary CODECS stays plain hvc1).
"""
if not self.raw_text:
return {}
out: dict[str, str] = {}
lines = self.raw_text.splitlines()
for i, line in enumerate(lines):
if not line.startswith("#EXT-X-STREAM-INF"):
continue
supp_match = self.SUPP_CODECS_RE.search(line)
if not supp_match:
continue
for j in range(i + 1, len(lines)):
uri = lines[j].strip()
if uri and not uri.startswith("#"):
out[uri] = supp_match.group(1)
break
return out
def to_tracks(self, language: Union[str, Language]) -> Tracks: def to_tracks(self, language: Union[str, Language]) -> Tracks:
""" """
@@ -126,6 +158,9 @@ class HLS:
) )
tracks = Tracks() tracks = Tracks()
supplemental_codecs = self.supplemental_codecs_by_uri()
dv_supp_prefixes = ("dva1", "dvav", "dvhe", "dvh1")
for playlist in self.manifest.playlists: for playlist in self.manifest.playlists:
audio_group = playlist.stream_info.audio audio_group = playlist.stream_info.audio
audio_codec: Optional[Audio.Codec] = None audio_codec: Optional[Audio.Codec] = None
@@ -146,6 +181,26 @@ class HLS:
else: else:
primary_track_type = Video primary_track_type = Video
primary_codecs = (playlist.stream_info.codecs or "").lower()
primary_has_dv = any(codec.split(".")[0] in dv_supp_prefixes for codec in primary_codecs.split(","))
supp_codecs_str = supplemental_codecs.get(playlist.uri, "")
supp_dv_codec: Optional[str] = None
for codec in supp_codecs_str.lower().split(","):
token = codec.strip().split("/")[0]
if token.split(".")[0] in dv_supp_prefixes:
supp_dv_codec = token
break
video_range = (
Video.Range.DV if primary_has_dv else Video.Range.from_m3u_range_tag(playlist.stream_info.video_range)
)
# DV-composite track: primary codec is plain HEVC but SUPPLEMENTAL-CODECS advertises
# a DV codec. Range stays whatever VIDEO-RANGE signaled (HDR10/HLG/SDR); DVFixup will
# restore DV signaling post-download. Services that know their encoder embeds HDR10+
# SEI must override `range` themselves (see services/ATV).
dv_compatible_bitstream = primary_track_type is Video and not primary_has_dv and supp_dv_codec is not None
tracks.add( tracks.add(
primary_track_type( primary_track_type(
id_=hex(crc32(str(playlist).encode()))[2:], id_=hex(crc32(str(playlist).encode()))[2:],
@@ -164,18 +219,14 @@ class HLS:
# video track args # video track args
**( **(
dict( dict(
range_=Video.Range.DV range_=video_range,
if any(
codec.split(".")[0] in ("dva1", "dvav", "dvhe", "dvh1")
for codec in (playlist.stream_info.codecs or "").lower().split(",")
)
else Video.Range.from_m3u_range_tag(playlist.stream_info.video_range),
width=playlist.stream_info.resolution[0] if playlist.stream_info.resolution else None, width=playlist.stream_info.resolution[0] if playlist.stream_info.resolution else None,
height=playlist.stream_info.resolution[1] if playlist.stream_info.resolution else None, height=playlist.stream_info.resolution[1] if playlist.stream_info.resolution else None,
fps=playlist.stream_info.frame_rate, fps=playlist.stream_info.frame_rate,
closed_captions=cc_by_group_id.get( closed_captions=cc_by_group_id.get(
(playlist.stream_info.closed_captions or "").strip('"'), [] (playlist.stream_info.closed_captions or "").strip('"'), []
), ),
dv_compatible_bitstream=dv_compatible_bitstream,
) )
if primary_track_type is Video if primary_track_type is Video
else {} else {}

View File

@@ -0,0 +1,117 @@
"""
DV fixup for HLS composite HEVC streams.
Some services deliver DV Profile 8.1 in a stream whose primary CODECS is plain
hvc1, with DV advertised only via SUPPLEMENTAL-CODECS. The fMP4 carries DV RPU NALs but
the container does not signal DV, so muxing produces an MKV that mediainfo and DV-capable
TVs see as plain HDR10/HDR10+.
A dovi_tool extract-rpu / inject-rpu round-trip rewrites the bitstream so it is recognised
as DV after muxing. HDR10+ SEI NALs and HDR10 base layer signaling survive untouched.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from rich.padding import Padding
from rich.rule import Rule
from unshackle.core.binaries import FFMPEG, DoviTool
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.utilities import get_debug_logger
from unshackle.core.utils import dovi
from unshackle.core.utils.subprocess import run_step
if TYPE_CHECKING:
from unshackle.core.tracks import Video
class DVFixup:
"""Round-trip a DV-composite HEVC track through dovi_tool to restore DV signaling."""
def __init__(self, video: "Video") -> None:
self.log = logging.getLogger("dv-fixup")
self.debug_logger = get_debug_logger()
self.video = video
if not DoviTool:
raise EnvironmentError("dovi_tool is required for DV-composite fixup but was not found.")
if not FFMPEG:
raise EnvironmentError("ffmpeg is required for DV-composite fixup but was not found.")
if not video.path or not Path(video.path).exists():
raise ValueError(f"Video track {video.id} was not downloaded before DV fixup.")
def run(self) -> Path:
"""Execute the fixup. Returns the DV-signaled HEVC path, or the original
source path on any failure so muxing can proceed with the as-downloaded file."""
source = Path(self.video.path)
height = self.video.height or 0
console.print(Padding(Rule(f"[rule.text]DV Composite Fixup ({height}p)"), (1, 2)))
fixed_hevc = source.with_name(f"{self.video.id}.dv.hevc")
if fixed_hevc.exists() and fixed_hevc.stat().st_size > 0:
self.log.info("✓ DV signaling already restored (reusing existing fixup)")
return fixed_hevc
tmp = config.directories.temp
tmp.mkdir(parents=True, exist_ok=True)
suffix = f"{self.video.id}_{height or 'na'}"
raw_hevc = tmp / f"dvfix_{suffix}.hevc"
rpu = tmp / f"dvfix_{suffix}_rpu.bin"
try:
run_step(
[FFMPEG, "-nostdin", "-y", "-i", source, "-c:v", "copy", "-f", "hevc", raw_hevc],
status="Demuxing HEVC bitstream...",
output=raw_hevc,
label="ffmpeg demux",
)
dovi.extract_rpu_with_fallback(raw_hevc, rpu)
dovi.inject_rpu(raw_hevc, rpu, fixed_hevc, status="Re-injecting DV RPU with proper signaling...")
except Exception as e:
self.log.warning(f"DV fixup failed ({e}); muxing source as-is.")
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="dv_fixup",
message="DV fixup failed; falling back to source",
context={"error": str(e), "source": str(source)},
)
for leftover in (raw_hevc, rpu, fixed_hevc):
leftover.unlink(missing_ok=True)
return source
for leftover in (raw_hevc, rpu):
leftover.unlink(missing_ok=True)
self.log.info("✓ DV signaling restored")
if self.debug_logger:
self.debug_logger.log(
level="INFO",
operation="dv_fixup",
message="DV fixup complete",
context={"source": str(source), "output": str(fixed_hevc)},
success=True,
)
return fixed_hevc
def apply_dv_fixup(video: "Video") -> None:
"""Run DV fixup on `video` if flagged as DV-composite. Updates `video.path` in place
and deletes the original source file so the standard mux cleanup handles the new path."""
if not getattr(video, "dv_compatible_bitstream", False):
return
if not video.path or not Path(video.path).exists():
return
original = Path(video.path)
fixed = DVFixup(video).run()
if fixed != original:
video.path = fixed
original.unlink(missing_ok=True)
__all__ = ("DVFixup", "apply_dv_fixup")

View File

@@ -228,6 +228,7 @@ class Video(Track):
fps: Optional[Union[str, int, float]] = None, fps: Optional[Union[str, int, float]] = None,
scan_type: Optional[Video.ScanType] = None, scan_type: Optional[Video.ScanType] = None,
closed_captions: Optional[list[dict[str, Any]]] = None, closed_captions: Optional[list[dict[str, Any]]] = None,
dv_compatible_bitstream: bool = False,
**kwargs: Any, **kwargs: Any,
) -> None: ) -> None:
""" """
@@ -294,6 +295,7 @@ class Video(Track):
self.scan_type = scan_type self.scan_type = scan_type
self.closed_captions: list[dict[str, Any]] = closed_captions or [] self.closed_captions: list[dict[str, Any]] = closed_captions or []
self.needs_duration_fix = False self.needs_duration_fix = False
self.dv_compatible_bitstream = dv_compatible_bitstream
def __str__(self) -> str: def __str__(self) -> str:
return " | ".join( return " | ".join(