refactor(hybrid): extract dovi_tool and run_step helpers

Centralise dovi_tool subcommand invocations behind a thin wrapper module so Hybrid no longer re-implements argv construction, status spinners, and stderr handling per call site. Adds a generic `run_step` helper for subprocess steps that must produce a non-empty output file.
This commit is contained in:
imSp4rky
2026-05-16 13:50:55 -06:00
parent cda8120b6d
commit b4d422459c
3 changed files with 241 additions and 182 deletions

View File

@@ -10,10 +10,12 @@ from pathlib import Path
from rich.padding import Padding
from rich.rule import Rule
from unshackle.core.binaries import FFMPEG, DoviTool, FFProbe, HDR10PlusTool
from unshackle.core.binaries import FFMPEG, FFProbe, HDR10PlusTool
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.utilities import get_debug_logger
from unshackle.core.utils import dovi
from unshackle.core.utils.subprocess import run_step
class Hybrid:
@@ -138,51 +140,27 @@ class Hybrid:
Path.unlink(config.directories.temp / "dv.mkv")
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
for rpu_name in ("RPU.bin", "RPU_UNT.bin", "RPU_L5.bin", "RPU_L6.bin"):
Path.unlink(config.directories.temp / rpu_name, missing_ok=True)
Path.unlink(config.directories.temp / "L5.json", missing_ok=True)
Path.unlink(config.directories.temp / "L6.json", missing_ok=True)
def ffmpeg_simple(self, save_path, output):
"""Simple ffmpeg execution without progress tracking"""
p = subprocess.run(
[
str(FFMPEG) if FFMPEG else "ffmpeg",
"-nostdin",
"-i",
str(save_path),
"-c:v",
"copy",
str(output),
"-y", # overwrite output
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return p
def extract_stream(self, save_path, type_):
output = Path(config.directories.temp / f"{type_}.hevc")
with console.status(f"Extracting {type_} stream...", spinner="dots"):
result = self.ffmpeg_simple(save_path, output)
if result.returncode:
output.unlink(missing_ok=True)
try:
run_step(
[FFMPEG or "ffmpeg", "-nostdin", "-y", "-i", save_path, "-c:v", "copy", output],
status=f"Extracting {type_} stream...",
output=output,
label=f"ffmpeg extract {type_}",
)
except RuntimeError as e:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_stream",
message=f"Failed extracting {type_} stream",
context={
"type": type_,
"input": str(save_path),
"output": str(output),
"returncode": result.returncode,
"stderr": (result.stderr or b"").decode(errors="replace"),
"stdout": (result.stdout or b"").decode(errors="replace"),
},
context={"type": type_, "input": str(save_path), "output": str(output), "error": str(e)},
)
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
@@ -202,48 +180,30 @@ class Hybrid:
):
return
with console.status(
f"Extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream...", spinner="dots"
):
extraction_args = [str(DoviTool)]
if not untouched:
extraction_args += ["-m", "3"]
extraction_args += [
"extract-rpu",
config.directories.temp / "DV.hevc",
"-o",
config.directories.temp / f"{'RPU' if not untouched else 'RPU_UNT'}.bin",
]
rpu_name = "RPU_UNT" if untouched else "RPU"
rpu_path = config.directories.temp / f"{rpu_name}.bin"
dv_stream = config.directories.temp / "DV.hevc"
spinner = f"Extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream..."
rpu_extraction = subprocess.run(
extraction_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
rpu_name = "RPU" if not untouched else "RPU_UNT"
if rpu_extraction.returncode:
Path.unlink(config.directories.temp / f"{rpu_name}.bin")
stderr_text = rpu_extraction.stderr.decode(errors="replace") if rpu_extraction.stderr else ""
try:
dovi.extract_rpu(dv_stream, rpu_path, mode=None if untouched else 3, status=spinner)
except RuntimeError as e:
stderr_text = str(e)
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_rpu",
message=f"Failed extracting{' untouched ' if untouched else ' '}RPU",
context={
"untouched": untouched,
"returncode": rpu_extraction.returncode,
"stderr": stderr_text,
"args": [str(a) for a in extraction_args],
},
context={"untouched": untouched, "error": stderr_text},
)
if b"MAX_PQ_LUMINANCE" in rpu_extraction.stderr:
if "MAX_PQ_LUMINANCE" in stderr_text:
self.extract_rpu(video, untouched=True)
elif b"Invalid PPS index" in rpu_extraction.stderr:
return
if "Invalid PPS index" in stderr_text:
raise ValueError("Dolby Vision VideoTrack seems to be corrupt")
else:
raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
elif self.debug_logger:
raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_extract_rpu",
@@ -399,34 +359,22 @@ class Hybrid:
with open(l5_path, "w") as f:
json.dump(l5_json, f, indent=4)
with console.status("Editing RPU Level 5 active area...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
str(config.directories.temp / self.rpu_file),
"-j",
str(l5_path),
"-o",
str(config.directories.temp / "RPU_L5.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
try:
dovi.editor(
config.directories.temp / self.rpu_file,
l5_path,
config.directories.temp / "RPU_L5.bin",
status="Editing RPU Level 5 active area...",
label="dovi_tool editor (L5)",
)
if result.returncode:
except RuntimeError as e:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level5",
message="Failed editing RPU Level 5 values",
context={
"returncode": result.returncode,
"stderr": (result.stderr or b"").decode(errors="replace"),
},
context={"error": str(e)},
)
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 5 values")
if self.debug_logger:
@@ -447,20 +395,16 @@ class Hybrid:
if os.path.isfile(config.directories.temp / "RPU_L6.bin"):
return
with console.status("Reading RPU luminance metadata...", spinner="dots"):
result = subprocess.run(
[str(DoviTool), "info", "-i", str(config.directories.temp / self.rpu_file), "-s"],
capture_output=True,
text=True,
)
if result.returncode != 0:
try:
with console.status("Reading RPU luminance metadata...", spinner="dots"):
info_text = dovi.info_summary(config.directories.temp / self.rpu_file)
except RuntimeError as e:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Failed reading RPU metadata for Level 6 values",
context={"returncode": result.returncode, "stderr": (result.stderr or "")},
context={"error": str(e)},
)
raise ValueError("Failed reading RPU metadata for Level 6 values")
@@ -469,7 +413,7 @@ class Hybrid:
max_mdl = None
min_mdl = None
for line in result.stdout.splitlines():
for line in info_text.splitlines():
if "RPU content light level (L1):" in line:
parts = line.split("MaxCLL:")[1].split(",")
max_cll = int(float(parts[0].strip().split()[0]))
@@ -506,34 +450,22 @@ class Hybrid:
with open(l6_path, "w") as f:
json.dump(level6_data, f, indent=4)
with console.status("Editing RPU Level 6 values...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
str(config.directories.temp / self.rpu_file),
"-j",
str(l6_path),
"-o",
str(config.directories.temp / "RPU_L6.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
try:
dovi.editor(
config.directories.temp / self.rpu_file,
l6_path,
config.directories.temp / "RPU_L6.bin",
status="Editing RPU Level 6 values...",
label="dovi_tool editor (L6)",
)
if result.returncode:
except RuntimeError as e:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Failed editing RPU Level 6 values",
context={
"returncode": result.returncode,
"stderr": (result.stderr or b"").decode(errors="replace"),
},
context={"error": str(e)},
)
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 6 values")
if self.debug_logger:
@@ -555,38 +487,22 @@ class Hybrid:
if os.path.isfile(config.directories.temp / self.hevc_file):
return
with console.status(f"Injecting Dolby Vision metadata into {self.hdr_type} stream...", spinner="dots"):
inject_cmd = [
str(DoviTool),
"inject-rpu",
"-i",
try:
dovi.inject_rpu(
config.directories.temp / "HDR10.hevc",
"--rpu-in",
config.directories.temp / self.rpu_file,
]
inject_cmd.extend(["-o", config.directories.temp / self.hevc_file])
inject = subprocess.run(
inject_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
config.directories.temp / self.hevc_file,
status=f"Injecting Dolby Vision metadata into {self.hdr_type} stream...",
label="dovi_tool inject-rpu",
)
if inject.returncode:
except RuntimeError as e:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_inject_rpu",
message="Failed injecting Dolby Vision metadata into HDR10 stream",
context={
"returncode": inject.returncode,
"stderr": (inject.stderr or b"").decode(errors="replace"),
"stdout": (inject.stdout or b"").decode(errors="replace"),
"cmd": [str(a) for a in inject_cmd],
},
context={"error": str(e)},
)
Path.unlink(config.directories.temp / self.hevc_file)
raise ValueError("Failed injecting Dolby Vision metadata into HDR10 stream")
if self.debug_logger:
@@ -611,35 +527,29 @@ class Hybrid:
if not HDR10PlusTool:
raise ValueError("HDR10Plus_tool not found. Please install it to use HDR10+ to DV conversion.")
with console.status("Extracting HDR10+ metadata...", spinner="dots"):
# HDR10Plus_tool needs raw HEVC stream
extraction = subprocess.run(
try:
run_step(
[
str(HDR10PlusTool),
HDR10PlusTool,
"extract",
str(config.directories.temp / "HDR10.hevc"),
config.directories.temp / "HDR10.hevc",
"-o",
str(config.directories.temp / self.hdr10plus_file),
config.directories.temp / self.hdr10plus_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
status="Extracting HDR10+ metadata...",
output=config.directories.temp / self.hdr10plus_file,
label="hdr10plus_tool extract",
)
if extraction.returncode:
except RuntimeError as e:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_hdr10plus",
message="Failed extracting HDR10+ metadata",
context={
"returncode": extraction.returncode,
"stderr": (extraction.stderr or b"").decode(errors="replace"),
"stdout": (extraction.stdout or b"").decode(errors="replace"),
},
context={"error": str(e)},
)
raise ValueError("Failed extracting HDR10+ metadata")
# Check if the extracted file has content
file_size = os.path.getsize(config.directories.temp / self.hdr10plus_file)
if file_size == 0:
if self.debug_logger:
@@ -744,33 +654,20 @@ class Hybrid:
with open(config.directories.temp / "extra.json", "w") as f:
json.dump(extra_metadata, f, indent=2)
# Generate DV RPU from HDR10+ metadata
conversion = subprocess.run(
[
str(DoviTool),
"generate",
"-j",
str(config.directories.temp / "extra.json"),
"--hdr10plus-json",
str(config.directories.temp / self.hdr10plus_file),
"-o",
str(config.directories.temp / "RPU.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
try:
dovi.generate_from_hdr10plus(
config.directories.temp / "extra.json",
config.directories.temp / self.hdr10plus_file,
config.directories.temp / "RPU.bin",
label="dovi_tool generate",
)
if conversion.returncode:
except RuntimeError as e:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_convert_hdr10plus",
message="Failed converting HDR10+ to Dolby Vision",
context={
"returncode": conversion.returncode,
"stderr": (conversion.stderr or b"").decode(errors="replace"),
"stdout": (conversion.stdout or b"").decode(errors="replace"),
},
context={"error": str(e)},
)
raise ValueError("Failed converting HDR10+ to Dolby Vision")

View File

@@ -0,0 +1,130 @@
"""Thin wrappers around dovi_tool subcommands used by DVFixup and Hybrid.
Centralises argv construction, status spinners, and error handling so callers do not
re-implement subprocess plumbing per call site. Each wrapper:
- Resolves `binaries.DoviTool` and raises EnvironmentError if missing.
- Delegates to `core.utils.subprocess.run_step` for execution, output validation, and
stderr-tail RuntimeError on failure.
- Returns captured stderr so callers can inspect specific failure modes (e.g. the
MAX_PQ_LUMINANCE retry path in extract_rpu).
"""
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import Optional
from unshackle.core import binaries
from unshackle.core.utils.subprocess import run_step
def _require_dovi_tool() -> str:
if not binaries.DoviTool:
raise EnvironmentError("dovi_tool executable was not found but is required.")
return str(binaries.DoviTool)
def extract_rpu(
source: Path,
output: Path,
*,
mode: Optional[int] = 3,
status: Optional[str] = "Extracting DV RPU...",
label: str = "dovi_tool extract-rpu",
) -> bytes:
"""Extract DV RPU NALs from a raw HEVC stream. `mode=None` skips the -m flag (untouched)."""
tool = _require_dovi_tool()
args: list = [tool]
if mode is not None:
args += ["-m", str(mode)]
args += ["extract-rpu", source, "-o", output]
return run_step(args, status=status, output=output, label=label)
def inject_rpu(
source: Path,
rpu: Path,
output: Path,
*,
status: Optional[str] = "Re-injecting DV RPU...",
label: str = "dovi_tool inject-rpu",
) -> bytes:
"""Inject a DV RPU back into a raw HEVC stream, producing DV-signaled output."""
tool = _require_dovi_tool()
return run_step(
[tool, "inject-rpu", "-i", source, "--rpu-in", rpu, "-o", output],
status=status,
output=output,
label=label,
)
def editor(
source: Path,
json_spec: Path,
output: Path,
*,
status: Optional[str] = "Editing DV RPU...",
label: str = "dovi_tool editor",
) -> bytes:
"""Apply a JSON edit spec to an RPU file."""
tool = _require_dovi_tool()
return run_step(
[tool, "editor", "-i", source, "-j", json_spec, "-o", output],
status=status,
output=output,
label=label,
)
def info_summary(rpu: Path) -> str:
"""Return the textual summary (`dovi_tool info -i ... -s`) for an RPU file."""
tool = _require_dovi_tool()
p = subprocess.run([tool, "info", "-i", str(rpu), "-s"], capture_output=True, text=True)
if p.returncode != 0:
raise RuntimeError(f"dovi_tool info failed: {(p.stderr or '')[-400:]}")
return p.stdout
def generate_from_hdr10plus(
extra_json: Path,
hdr10plus_json: Path,
output: Path,
*,
status: Optional[str] = "Generating DV RPU from HDR10+ metadata...",
label: str = "dovi_tool generate",
) -> bytes:
"""Build a DV RPU from extracted HDR10+ metadata + an extra JSON descriptor."""
tool = _require_dovi_tool()
return run_step(
[tool, "generate", "-j", extra_json, "--hdr10plus-json", hdr10plus_json, "-o", output],
status=status,
output=output,
label=label,
)
def extract_rpu_with_fallback(source: Path, output: Path, *, label: str = "dovi_tool extract-rpu") -> bytes:
"""Try `-m 3` first; on MAX_PQ_LUMINANCE error, retry untouched (no -m). Returns stderr.
Used when the caller wants automatic normalization but cannot abort if the source
rejects mode-3 conversion.
"""
try:
return extract_rpu(source, output, mode=3, label=label)
except RuntimeError as e:
if "MAX_PQ_LUMINANCE" not in str(e):
raise
return extract_rpu(source, output, mode=None, status="Extracting DV RPU (untouched)...", label=label)
__all__ = (
"extract_rpu",
"extract_rpu_with_fallback",
"inject_rpu",
"editor",
"info_summary",
"generate_from_hdr10plus",
)

View File

@@ -1,9 +1,10 @@
import json
import subprocess
from pathlib import Path
from typing import Union
from typing import Optional, Sequence, Union
from unshackle.core import binaries
from unshackle.core.console import console
def ffprobe(uri: Union[bytes, Path]) -> dict:
@@ -23,3 +24,34 @@ def ffprobe(uri: Union[bytes, Path]) -> dict:
except subprocess.CalledProcessError:
return {}
return json.loads(ff.stdout.decode("utf8"))
def run_step(
args: Sequence[Union[str, Path]],
*,
status: Optional[str] = None,
output: Optional[Path] = None,
label: str = "subprocess step",
) -> bytes:
"""Run a CLI step that writes to `output` (when provided). Returns stderr bytes.
Raises RuntimeError with the stderr tail when the process exits non-zero, or when
`output` is given and does not exist / is empty after the run.
"""
if output is not None:
output.unlink(missing_ok=True)
str_args = [str(a) for a in args]
if status:
with console.status(status, spinner="dots"):
p = subprocess.run(str_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
else:
p = subprocess.run(str_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stderr = p.stderr or b""
bad_output = output is not None and (not output.exists() or output.stat().st_size == 0)
if p.returncode or bad_output:
if output is not None:
output.unlink(missing_ok=True)
raise RuntimeError(f"{label} failed: {stderr.decode(errors='replace')[-400:]}")
return stderr