From b4d422459c1039c91fa3b8f5403f1aa6cf60abbb Mon Sep 17 00:00:00 2001 From: imSp4rky Date: Sat, 16 May 2026 13:50:55 -0600 Subject: [PATCH] refactor(hybrid): extract dovi_tool and run_step helpers Centralise dovi_tool subcommand invocations behind a thin wrapper module so Hybrid no longer re-implements argv construction, status spinners, and stderr handling per call site. Adds a generic `run_step` helper for subprocess steps that must produce a non-empty output file. --- unshackle/core/tracks/hybrid.py | 259 +++++++++-------------------- unshackle/core/utils/dovi.py | 130 +++++++++++++++ unshackle/core/utils/subprocess.py | 34 +++- 3 files changed, 241 insertions(+), 182 deletions(-) create mode 100644 unshackle/core/utils/dovi.py diff --git a/unshackle/core/tracks/hybrid.py b/unshackle/core/tracks/hybrid.py index 54756d8..8e1f43a 100644 --- a/unshackle/core/tracks/hybrid.py +++ b/unshackle/core/tracks/hybrid.py @@ -10,10 +10,12 @@ from pathlib import Path from rich.padding import Padding from rich.rule import Rule -from unshackle.core.binaries import FFMPEG, DoviTool, FFProbe, HDR10PlusTool +from unshackle.core.binaries import FFMPEG, FFProbe, HDR10PlusTool from unshackle.core.config import config from unshackle.core.console import console from unshackle.core.utilities import get_debug_logger +from unshackle.core.utils import dovi +from unshackle.core.utils.subprocess import run_step class Hybrid: @@ -138,51 +140,27 @@ class Hybrid: Path.unlink(config.directories.temp / "dv.mkv") Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True) Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True) - Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True) - Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True) - Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True) + for rpu_name in ("RPU.bin", "RPU_UNT.bin", "RPU_L5.bin", "RPU_L6.bin"): + Path.unlink(config.directories.temp / rpu_name, missing_ok=True) Path.unlink(config.directories.temp / "L5.json", missing_ok=True) Path.unlink(config.directories.temp / "L6.json", missing_ok=True) - def ffmpeg_simple(self, save_path, output): - """Simple ffmpeg execution without progress tracking""" - p = subprocess.run( - [ - str(FFMPEG) if FFMPEG else "ffmpeg", - "-nostdin", - "-i", - str(save_path), - "-c:v", - "copy", - str(output), - "-y", # overwrite output - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - return p - def extract_stream(self, save_path, type_): output = Path(config.directories.temp / f"{type_}.hevc") - - with console.status(f"Extracting {type_} stream...", spinner="dots"): - result = self.ffmpeg_simple(save_path, output) - - if result.returncode: - output.unlink(missing_ok=True) + try: + run_step( + [FFMPEG or "ffmpeg", "-nostdin", "-y", "-i", save_path, "-c:v", "copy", output], + status=f"Extracting {type_} stream...", + output=output, + label=f"ffmpeg extract {type_}", + ) + except RuntimeError as e: if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_extract_stream", message=f"Failed extracting {type_} stream", - context={ - "type": type_, - "input": str(save_path), - "output": str(output), - "returncode": result.returncode, - "stderr": (result.stderr or b"").decode(errors="replace"), - "stdout": (result.stdout or b"").decode(errors="replace"), - }, + context={"type": type_, "input": str(save_path), "output": str(output), "error": str(e)}, ) self.log.error(f"x Failed extracting {type_} stream") sys.exit(1) @@ -202,48 +180,30 @@ class Hybrid: ): return - with console.status( - f"Extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream...", spinner="dots" - ): - extraction_args = [str(DoviTool)] - if not untouched: - extraction_args += ["-m", "3"] - extraction_args += [ - "extract-rpu", - config.directories.temp / "DV.hevc", - "-o", - config.directories.temp / f"{'RPU' if not untouched else 'RPU_UNT'}.bin", - ] + rpu_name = "RPU_UNT" if untouched else "RPU" + rpu_path = config.directories.temp / f"{rpu_name}.bin" + dv_stream = config.directories.temp / "DV.hevc" + spinner = f"Extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream..." - rpu_extraction = subprocess.run( - extraction_args, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - rpu_name = "RPU" if not untouched else "RPU_UNT" - if rpu_extraction.returncode: - Path.unlink(config.directories.temp / f"{rpu_name}.bin") - stderr_text = rpu_extraction.stderr.decode(errors="replace") if rpu_extraction.stderr else "" + try: + dovi.extract_rpu(dv_stream, rpu_path, mode=None if untouched else 3, status=spinner) + except RuntimeError as e: + stderr_text = str(e) if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_extract_rpu", message=f"Failed extracting{' untouched ' if untouched else ' '}RPU", - context={ - "untouched": untouched, - "returncode": rpu_extraction.returncode, - "stderr": stderr_text, - "args": [str(a) for a in extraction_args], - }, + context={"untouched": untouched, "error": stderr_text}, ) - if b"MAX_PQ_LUMINANCE" in rpu_extraction.stderr: + if "MAX_PQ_LUMINANCE" in stderr_text: self.extract_rpu(video, untouched=True) - elif b"Invalid PPS index" in rpu_extraction.stderr: + return + if "Invalid PPS index" in stderr_text: raise ValueError("Dolby Vision VideoTrack seems to be corrupt") - else: - raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream") - elif self.debug_logger: + raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream") + + if self.debug_logger: self.debug_logger.log( level="DEBUG", operation="hybrid_extract_rpu", @@ -399,34 +359,22 @@ class Hybrid: with open(l5_path, "w") as f: json.dump(l5_json, f, indent=4) - with console.status("Editing RPU Level 5 active area...", spinner="dots"): - result = subprocess.run( - [ - str(DoviTool), - "editor", - "-i", - str(config.directories.temp / self.rpu_file), - "-j", - str(l5_path), - "-o", - str(config.directories.temp / "RPU_L5.bin"), - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + try: + dovi.editor( + config.directories.temp / self.rpu_file, + l5_path, + config.directories.temp / "RPU_L5.bin", + status="Editing RPU Level 5 active area...", + label="dovi_tool editor (L5)", ) - - if result.returncode: + except RuntimeError as e: if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_level5", message="Failed editing RPU Level 5 values", - context={ - "returncode": result.returncode, - "stderr": (result.stderr or b"").decode(errors="replace"), - }, + context={"error": str(e)}, ) - Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True) raise ValueError("Failed editing RPU Level 5 values") if self.debug_logger: @@ -447,20 +395,16 @@ class Hybrid: if os.path.isfile(config.directories.temp / "RPU_L6.bin"): return - with console.status("Reading RPU luminance metadata...", spinner="dots"): - result = subprocess.run( - [str(DoviTool), "info", "-i", str(config.directories.temp / self.rpu_file), "-s"], - capture_output=True, - text=True, - ) - - if result.returncode != 0: + try: + with console.status("Reading RPU luminance metadata...", spinner="dots"): + info_text = dovi.info_summary(config.directories.temp / self.rpu_file) + except RuntimeError as e: if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_level6", message="Failed reading RPU metadata for Level 6 values", - context={"returncode": result.returncode, "stderr": (result.stderr or "")}, + context={"error": str(e)}, ) raise ValueError("Failed reading RPU metadata for Level 6 values") @@ -469,7 +413,7 @@ class Hybrid: max_mdl = None min_mdl = None - for line in result.stdout.splitlines(): + for line in info_text.splitlines(): if "RPU content light level (L1):" in line: parts = line.split("MaxCLL:")[1].split(",") max_cll = int(float(parts[0].strip().split()[0])) @@ -506,34 +450,22 @@ class Hybrid: with open(l6_path, "w") as f: json.dump(level6_data, f, indent=4) - with console.status("Editing RPU Level 6 values...", spinner="dots"): - result = subprocess.run( - [ - str(DoviTool), - "editor", - "-i", - str(config.directories.temp / self.rpu_file), - "-j", - str(l6_path), - "-o", - str(config.directories.temp / "RPU_L6.bin"), - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + try: + dovi.editor( + config.directories.temp / self.rpu_file, + l6_path, + config.directories.temp / "RPU_L6.bin", + status="Editing RPU Level 6 values...", + label="dovi_tool editor (L6)", ) - - if result.returncode: + except RuntimeError as e: if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_level6", message="Failed editing RPU Level 6 values", - context={ - "returncode": result.returncode, - "stderr": (result.stderr or b"").decode(errors="replace"), - }, + context={"error": str(e)}, ) - Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True) raise ValueError("Failed editing RPU Level 6 values") if self.debug_logger: @@ -555,38 +487,22 @@ class Hybrid: if os.path.isfile(config.directories.temp / self.hevc_file): return - with console.status(f"Injecting Dolby Vision metadata into {self.hdr_type} stream...", spinner="dots"): - inject_cmd = [ - str(DoviTool), - "inject-rpu", - "-i", + try: + dovi.inject_rpu( config.directories.temp / "HDR10.hevc", - "--rpu-in", config.directories.temp / self.rpu_file, - ] - - inject_cmd.extend(["-o", config.directories.temp / self.hevc_file]) - - inject = subprocess.run( - inject_cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + config.directories.temp / self.hevc_file, + status=f"Injecting Dolby Vision metadata into {self.hdr_type} stream...", + label="dovi_tool inject-rpu", ) - - if inject.returncode: + except RuntimeError as e: if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_inject_rpu", message="Failed injecting Dolby Vision metadata into HDR10 stream", - context={ - "returncode": inject.returncode, - "stderr": (inject.stderr or b"").decode(errors="replace"), - "stdout": (inject.stdout or b"").decode(errors="replace"), - "cmd": [str(a) for a in inject_cmd], - }, + context={"error": str(e)}, ) - Path.unlink(config.directories.temp / self.hevc_file) raise ValueError("Failed injecting Dolby Vision metadata into HDR10 stream") if self.debug_logger: @@ -611,35 +527,29 @@ class Hybrid: if not HDR10PlusTool: raise ValueError("HDR10Plus_tool not found. Please install it to use HDR10+ to DV conversion.") - with console.status("Extracting HDR10+ metadata...", spinner="dots"): - # HDR10Plus_tool needs raw HEVC stream - extraction = subprocess.run( + try: + run_step( [ - str(HDR10PlusTool), + HDR10PlusTool, "extract", - str(config.directories.temp / "HDR10.hevc"), + config.directories.temp / "HDR10.hevc", "-o", - str(config.directories.temp / self.hdr10plus_file), + config.directories.temp / self.hdr10plus_file, ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + status="Extracting HDR10+ metadata...", + output=config.directories.temp / self.hdr10plus_file, + label="hdr10plus_tool extract", ) - - if extraction.returncode: + except RuntimeError as e: if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_extract_hdr10plus", message="Failed extracting HDR10+ metadata", - context={ - "returncode": extraction.returncode, - "stderr": (extraction.stderr or b"").decode(errors="replace"), - "stdout": (extraction.stdout or b"").decode(errors="replace"), - }, + context={"error": str(e)}, ) raise ValueError("Failed extracting HDR10+ metadata") - # Check if the extracted file has content file_size = os.path.getsize(config.directories.temp / self.hdr10plus_file) if file_size == 0: if self.debug_logger: @@ -744,33 +654,20 @@ class Hybrid: with open(config.directories.temp / "extra.json", "w") as f: json.dump(extra_metadata, f, indent=2) - # Generate DV RPU from HDR10+ metadata - conversion = subprocess.run( - [ - str(DoviTool), - "generate", - "-j", - str(config.directories.temp / "extra.json"), - "--hdr10plus-json", - str(config.directories.temp / self.hdr10plus_file), - "-o", - str(config.directories.temp / "RPU.bin"), - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + try: + dovi.generate_from_hdr10plus( + config.directories.temp / "extra.json", + config.directories.temp / self.hdr10plus_file, + config.directories.temp / "RPU.bin", + label="dovi_tool generate", ) - - if conversion.returncode: + except RuntimeError as e: if self.debug_logger: self.debug_logger.log( level="ERROR", operation="hybrid_convert_hdr10plus", message="Failed converting HDR10+ to Dolby Vision", - context={ - "returncode": conversion.returncode, - "stderr": (conversion.stderr or b"").decode(errors="replace"), - "stdout": (conversion.stdout or b"").decode(errors="replace"), - }, + context={"error": str(e)}, ) raise ValueError("Failed converting HDR10+ to Dolby Vision") diff --git a/unshackle/core/utils/dovi.py b/unshackle/core/utils/dovi.py new file mode 100644 index 0000000..190e599 --- /dev/null +++ b/unshackle/core/utils/dovi.py @@ -0,0 +1,130 @@ +"""Thin wrappers around dovi_tool subcommands used by DVFixup and Hybrid. + +Centralises argv construction, status spinners, and error handling so callers do not +re-implement subprocess plumbing per call site. Each wrapper: + +- Resolves `binaries.DoviTool` and raises EnvironmentError if missing. +- Delegates to `core.utils.subprocess.run_step` for execution, output validation, and + stderr-tail RuntimeError on failure. +- Returns captured stderr so callers can inspect specific failure modes (e.g. the + MAX_PQ_LUMINANCE retry path in extract_rpu). +""" + +from __future__ import annotations + +import subprocess +from pathlib import Path +from typing import Optional + +from unshackle.core import binaries +from unshackle.core.utils.subprocess import run_step + + +def _require_dovi_tool() -> str: + if not binaries.DoviTool: + raise EnvironmentError("dovi_tool executable was not found but is required.") + return str(binaries.DoviTool) + + +def extract_rpu( + source: Path, + output: Path, + *, + mode: Optional[int] = 3, + status: Optional[str] = "Extracting DV RPU...", + label: str = "dovi_tool extract-rpu", +) -> bytes: + """Extract DV RPU NALs from a raw HEVC stream. `mode=None` skips the -m flag (untouched).""" + tool = _require_dovi_tool() + args: list = [tool] + if mode is not None: + args += ["-m", str(mode)] + args += ["extract-rpu", source, "-o", output] + return run_step(args, status=status, output=output, label=label) + + +def inject_rpu( + source: Path, + rpu: Path, + output: Path, + *, + status: Optional[str] = "Re-injecting DV RPU...", + label: str = "dovi_tool inject-rpu", +) -> bytes: + """Inject a DV RPU back into a raw HEVC stream, producing DV-signaled output.""" + tool = _require_dovi_tool() + return run_step( + [tool, "inject-rpu", "-i", source, "--rpu-in", rpu, "-o", output], + status=status, + output=output, + label=label, + ) + + +def editor( + source: Path, + json_spec: Path, + output: Path, + *, + status: Optional[str] = "Editing DV RPU...", + label: str = "dovi_tool editor", +) -> bytes: + """Apply a JSON edit spec to an RPU file.""" + tool = _require_dovi_tool() + return run_step( + [tool, "editor", "-i", source, "-j", json_spec, "-o", output], + status=status, + output=output, + label=label, + ) + + +def info_summary(rpu: Path) -> str: + """Return the textual summary (`dovi_tool info -i ... -s`) for an RPU file.""" + tool = _require_dovi_tool() + p = subprocess.run([tool, "info", "-i", str(rpu), "-s"], capture_output=True, text=True) + if p.returncode != 0: + raise RuntimeError(f"dovi_tool info failed: {(p.stderr or '')[-400:]}") + return p.stdout + + +def generate_from_hdr10plus( + extra_json: Path, + hdr10plus_json: Path, + output: Path, + *, + status: Optional[str] = "Generating DV RPU from HDR10+ metadata...", + label: str = "dovi_tool generate", +) -> bytes: + """Build a DV RPU from extracted HDR10+ metadata + an extra JSON descriptor.""" + tool = _require_dovi_tool() + return run_step( + [tool, "generate", "-j", extra_json, "--hdr10plus-json", hdr10plus_json, "-o", output], + status=status, + output=output, + label=label, + ) + + +def extract_rpu_with_fallback(source: Path, output: Path, *, label: str = "dovi_tool extract-rpu") -> bytes: + """Try `-m 3` first; on MAX_PQ_LUMINANCE error, retry untouched (no -m). Returns stderr. + + Used when the caller wants automatic normalization but cannot abort if the source + rejects mode-3 conversion. + """ + try: + return extract_rpu(source, output, mode=3, label=label) + except RuntimeError as e: + if "MAX_PQ_LUMINANCE" not in str(e): + raise + return extract_rpu(source, output, mode=None, status="Extracting DV RPU (untouched)...", label=label) + + +__all__ = ( + "extract_rpu", + "extract_rpu_with_fallback", + "inject_rpu", + "editor", + "info_summary", + "generate_from_hdr10plus", +) diff --git a/unshackle/core/utils/subprocess.py b/unshackle/core/utils/subprocess.py index 3dce5dd..89cae9f 100644 --- a/unshackle/core/utils/subprocess.py +++ b/unshackle/core/utils/subprocess.py @@ -1,9 +1,10 @@ import json import subprocess from pathlib import Path -from typing import Union +from typing import Optional, Sequence, Union from unshackle.core import binaries +from unshackle.core.console import console def ffprobe(uri: Union[bytes, Path]) -> dict: @@ -23,3 +24,34 @@ def ffprobe(uri: Union[bytes, Path]) -> dict: except subprocess.CalledProcessError: return {} return json.loads(ff.stdout.decode("utf8")) + + +def run_step( + args: Sequence[Union[str, Path]], + *, + status: Optional[str] = None, + output: Optional[Path] = None, + label: str = "subprocess step", +) -> bytes: + """Run a CLI step that writes to `output` (when provided). Returns stderr bytes. + + Raises RuntimeError with the stderr tail when the process exits non-zero, or when + `output` is given and does not exist / is empty after the run. + """ + if output is not None: + output.unlink(missing_ok=True) + + str_args = [str(a) for a in args] + if status: + with console.status(status, spinner="dots"): + p = subprocess.run(str_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + else: + p = subprocess.run(str_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stderr = p.stderr or b"" + bad_output = output is not None and (not output.exists() or output.stat().st_size == 0) + if p.returncode or bad_output: + if output is not None: + output.unlink(missing_ok=True) + raise RuntimeError(f"{label} failed: {stderr.decode(errors='replace')[-400:]}") + return stderr