17 Commits

Author SHA1 Message Date
Andy
e7120bd063 fix(attachment): sanitize filenames with illegal Windows characters 2026-02-17 16:02:59 -07:00
Andy
42ee9d67a3 fix(hybrid): skip bitrate filter for DV tracks in HYBRID mode 2026-02-17 15:38:55 -07:00
Andy
b0f5b11820 feat(debug): log binary tool versions at session start 2026-02-17 14:39:28 -07:00
Andy
c10257b8dc Revert "feat(debug): add JSONL debug logging to decryption, muxing, and all downloaders"
This reverts commit cc89f4ca93.
2026-02-17 14:37:50 -07:00
Andy
cc89f4ca93 feat(debug): add JSONL debug logging to decryption, muxing, and all downloaders
Expand debug logging coverage for better diagnostics when investigating download/decryption issues like QUICKTIME/cbcs problem.
2026-02-17 13:58:36 -07:00
Andy
0217086abf style: fix ruff E721, E701, and E722 lint errors 2026-02-16 13:37:23 -07:00
Andy
df92f9e4b6 refactor(hybrid): replace log.info with console status and add JSONL debug logging 2026-02-16 13:33:11 -07:00
Andy
9ed56709cd Merge branch 'dev' of https://github.com/unshackle-dl/unshackle into dev 2026-02-15 17:38:52 -07:00
Andy
f96f1f9a95 feat(hybrid): add L5 active area and dynamic L6 luminance metadata
Add crop detection via ffmpeg to generate Level 5 active area metadata, resolving DV/HDR10 black bar mismatches. Update Level 6 to extract actual luminance values from the RPU instead of hardcoding defaults.
2026-02-15 17:38:43 -07:00
Sp5rky
9f9a609d71 Merge pull request #77 from CodeName393/Select-Titles
Add Select titles option
2026-02-15 16:19:42 -07:00
Andy
cee7d9a75f fix(n_m3u8dl_re): pass all content keys for DualKey DRM decryption 2026-02-15 13:37:49 -07:00
Andy
bf9087a1ce chore(release): bump version to 3.0.0
BREAKING CHANGE: PlayReady users without explicit playready_devices no longer get access to all devices by default.

Key changes:
- feat(drm): add MonaLisa DRM support to core infrastructure
- feat(cdm): add remote PlayReady CDM support via pyplayready RemoteCdm
- feat(serve): add PlayReady CDM support alongside Widevine
- feat(gluetun): Gluetun VPN integration and Windscribe support
- feat(audio): codec lists and split muxing
- feat(tracks): prioritize Atmos audio tracks over higher bitrate non-Atmos
- feat(video): detect interlaced scan type from MPD manifests
- feat(cdm): normalize CDM detection for local and remote implementations
- fix(serve)!: make PlayReady users config consistently a mapping
- 50+ additional bug fixes across HLS/DASH, proxies, subtitles, and more
2026-02-15 13:04:42 -07:00
Andy
23cc351f77 feat(tracks): prioritize Atmos audio tracks over higher bitrate non-Atmos 2026-02-15 12:08:27 -07:00
Andy
132d3549f9 fix(main): update copyright year dynamically in version display 2026-02-11 16:01:33 -07:00
Andy
3ee554401a feat(HLS): improve audio codec handling with error handling for codec extraction 2026-02-10 08:34:54 -07:00
CodeName393
dd19f405a4 Add selector 2026-02-09 02:21:04 +09:00
CodeName393
dbebf68f18 Add select-titles 2026-02-09 02:20:26 +09:00
12 changed files with 964 additions and 79 deletions

View File

@@ -6,7 +6,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This changelog is automatically generated using [git-cliff](https://git-cliff.org).
## [Unreleased]
## [3.0.0] - 2026-02-15
### Features
@@ -21,6 +21,9 @@ This changelog is automatically generated using [git-cliff](https://git-cliff.or
- *drm*: Add MonaLisa DRM support to core infrastructure
- *audio*: Codec lists and split muxing
- *proxy*: Add specific server selection for WindscribeVPN
- *cdm*: Normalize CDM detection for local and remote implementations
- *HLS*: Improve audio codec handling with error handling for codec extraction
- *tracks*: Prioritize Atmos audio tracks over higher bitrate non-Atmos
### Bug Fixes
@@ -53,11 +56,39 @@ This changelog is automatically generated using [git-cliff](https://git-cliff.or
- *dl*: Always clean up hybrid temp hevc outputs
- *hls*: Finalize n_m3u8dl_re outputs
- *downloader*: Restore requests progress for single-url downloads
- *dl*: Invert audio codec suffixing when splitting
- *dl*: Support snake_case keys for RemoteCdm
- *aria2c*: Warn on config mismatch and wait for RPC ready
- *serve*: [**breaking**] Make PlayReady users config consistently a mapping
- *dl*: Preserve proxy_query selector (not resolved URI)
- *gluetun*: Stop leaking proxy/vpn secrets to process list
- *monalisa*: Avoid leaking secrets and add worker safety
- *dl*: Avoid selecting all variants when multiple audio codecs requested
- *hls*: Keep range offset numeric and align MonaLisa licensing
- *titles*: Remove trailing space from HDR dynamic range label
- *config*: Normalize playready_remote remote_cdm keys
- *titles*: Avoid None/double spaces in HDR tokens
- *naming*: Keep technical tokens with scene_naming off
- *api*: Log PSSH extraction failures
- *proxies*: Harden surfshark and windscribe selection
- *service*: Redact proxy credentials in logs
- *monalisa*: Harden wasm calls and license handling
- *hls*: Remove no-op encryption_data reassignment
- *serve*: Default PlayReady access to none
- *tracks*: Close temp session and improve path type error
- *main*: Update copyright year dynamically in version display
### Reverts
- *monalisa*: Pass key via argv again
### Documentation
- Add configuration documentation WIP
- *changelog*: Add 2.4.0 release notes
- *changelog*: Update cliff config and regenerate changelog
- *changelog*: Complete 2.4.0 notes
- *config*: Clarify sdh_method uses subtitle-filter
### Performance Improvements
@@ -451,7 +482,7 @@ This changelog is automatically generated using [git-cliff](https://git-cliff.or
- Reorganize Planned Features section in README for clarity
- Improve track selection logic in dl.py
[unreleased]: https://github.com/unshackle-dl/unshackle/compare/2.3.0..HEAD
[3.0.0]: https://github.com/unshackle-dl/unshackle/compare/2.3.0..3.0.0
[2.3.0]: https://github.com/unshackle-dl/unshackle/compare/2.2.0..2.3.0
[2.2.0]: https://github.com/unshackle-dl/unshackle/compare/2.1.0..2.2.0
[2.1.0]: https://github.com/unshackle-dl/unshackle/compare/2.0.0..2.1.0

View File

@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "unshackle"
version = "2.4.0"
version = "3.0.0"
description = "Modular Movie, TV, and Music Archival Software."
authors = [{ name = "unshackle team" }]
requires-python = ">=3.10,<3.13"

View File

@@ -65,6 +65,7 @@ from unshackle.core.utils import tags
from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE,
ContextData, MultipleChoice, SubtitleCodecChoice, VideoCodecChoice)
from unshackle.core.utils.collections import merge_dict
from unshackle.core.utils.selector import select_multiple
from unshackle.core.utils.subprocess import ffprobe
from unshackle.core.vaults import Vaults
@@ -194,12 +195,7 @@ class dl:
sdh_suffix = ".sdh" if (subtitle.sdh or subtitle.cc) else ""
extension = (target_codec or subtitle.codec or Subtitle.Codec.SubRip).extension
if (
not target_codec
and not subtitle.codec
and source_path
and source_path.suffix
):
if not target_codec and not subtitle.codec and source_path and source_path.suffix:
extension = source_path.suffix.lstrip(".")
filename = f"{base_filename}.{lang_suffix}{forced_suffix}{sdh_suffix}.{extension}"
@@ -346,6 +342,12 @@ class dl:
default=None,
help="Create separate output files per audio codec instead of merging all audio.",
)
@click.option(
"--select-titles",
is_flag=True,
default=False,
help="Interactively select downloads from a list. Only use with Series to select Episodes",
)
@click.option(
"-w",
"--wanted",
@@ -586,6 +588,59 @@ class dl:
},
},
)
# Log binary versions for diagnostics
binary_versions = {}
for name, binary in [
("shaka_packager", binaries.ShakaPackager),
("mp4decrypt", binaries.Mp4decrypt),
("n_m3u8dl_re", binaries.N_m3u8DL_RE),
("mkvmerge", binaries.MKVToolNix),
("ffmpeg", binaries.FFMPEG),
("ffprobe", binaries.FFProbe),
]:
if binary:
version = None
try:
if name == "shaka_packager":
r = subprocess.run(
[str(binary), "--version"], capture_output=True, text=True, timeout=5
)
version = (r.stdout or r.stderr or "").strip()
elif name in ("ffmpeg", "ffprobe"):
r = subprocess.run(
[str(binary), "-version"], capture_output=True, text=True, timeout=5
)
version = (r.stdout or "").split("\n")[0].strip()
elif name == "mkvmerge":
r = subprocess.run(
[str(binary), "--version"], capture_output=True, text=True, timeout=5
)
version = (r.stdout or "").strip()
elif name == "mp4decrypt":
r = subprocess.run(
[str(binary)], capture_output=True, text=True, timeout=5
)
output = (r.stdout or "") + (r.stderr or "")
lines = [line.strip() for line in output.split("\n") if line.strip()]
version = " | ".join(lines[:2]) if lines else None
elif name == "n_m3u8dl_re":
r = subprocess.run(
[str(binary), "--version"], capture_output=True, text=True, timeout=5
)
version = (r.stdout or r.stderr or "").strip().split("\n")[0]
except Exception:
version = "<error getting version>"
binary_versions[name] = {"path": str(binary), "version": version}
else:
binary_versions[name] = None
self.debug_logger.log(
level="DEBUG",
operation="binary_versions",
message="Binary tool versions",
context=binary_versions,
)
else:
self.debug_logger = None
@@ -859,6 +914,7 @@ class dl:
range_: list[Video.Range],
channels: float,
no_atmos: bool,
select_titles: bool,
wanted: list[str],
latest_episode: bool,
lang: list[str],
@@ -1047,6 +1103,78 @@ class dl:
if list_titles:
return
# Enables manual selection for Series when --select-titles is set
if select_titles and isinstance(titles, Series):
console.print(Padding(Rule("[rule.text]Select Titles"), (1, 2)))
selection_titles = []
dependencies = {}
original_indices = {}
current_season = None
current_season_header_idx = -1
unique_seasons = {t.season for t in titles}
multiple_seasons = len(unique_seasons) > 1
# Build selection options
for i, t in enumerate(titles):
# Insert season header only if multiple seasons exist
if multiple_seasons and t.season != current_season:
current_season = t.season
header_text = f"Season {t.season}"
selection_titles.append(header_text)
current_season_header_idx = len(selection_titles) - 1
dependencies[current_season_header_idx] = []
# Note: Headers are not mapped to actual title indices
# Format display name
display_name = ((t.name[:35].rstrip() + "") if len(t.name) > 35 else t.name) if t.name else None
# Apply indentation only for multiple seasons
prefix = " " if multiple_seasons else ""
option_text = f"{prefix}{t.number}" + (f". {display_name}" if t.name else "")
selection_titles.append(option_text)
current_ui_idx = len(selection_titles) - 1
# Map UI index to actual title index
original_indices[current_ui_idx] = i
# Link episode to season header for group selection
if current_season_header_idx != -1:
dependencies[current_season_header_idx].append(current_ui_idx)
selection_start = time.time()
# Execute selector with dependencies (headers select all children)
selected_ui_idx = select_multiple(
selection_titles, minimal_count=1, page_size=8, return_indices=True, dependencies=dependencies
)
selection_end = time.time()
start_time += selection_end - selection_start
# Map UI indices back to title indices (excluding headers)
selected_idx = []
for idx in selected_ui_idx:
if idx in original_indices:
selected_idx.append(original_indices[idx])
# Ensure indices are unique and ordered
selected_idx = sorted(set(selected_idx))
keep = set(selected_idx)
# In-place filter: remove unselected items (iterate backwards)
for i in range(len(titles) - 1, -1, -1):
if i not in keep:
del titles[i]
# Show selected count
if titles:
count = len(titles)
console.print(Padding(f"[text]Total selected: {count}[/]", (0, 5)))
# Determine the latest episode if --latest-episode is set
latest_episode_id = None
if latest_episode and isinstance(titles, Series) and len(titles) > 0:
@@ -1264,10 +1392,20 @@ class dl:
self.log.warning(f"Skipping {color_range.name} video tracks as none are available.")
if vbitrate:
title.tracks.select_video(lambda x: x.bitrate and x.bitrate // 1000 == vbitrate)
if not title.tracks.videos:
self.log.error(f"There's no {vbitrate}kbps Video Track...")
sys.exit(1)
if any(r == Video.Range.HYBRID for r in range_):
# In HYBRID mode, only apply bitrate filter to non-DV tracks
# DV tracks are kept regardless since they're only used for RPU metadata
title.tracks.select_video(
lambda x: x.range == Video.Range.DV or (x.bitrate and x.bitrate // 1000 == vbitrate)
)
if not any(x.range != Video.Range.DV for x in title.tracks.videos):
self.log.error(f"There's no {vbitrate}kbps Video Track...")
sys.exit(1)
else:
title.tracks.select_video(lambda x: x.bitrate and x.bitrate // 1000 == vbitrate)
if not title.tracks.videos:
self.log.error(f"There's no {vbitrate}kbps Video Track...")
sys.exit(1)
video_languages = [lang for lang in (v_lang or lang) if lang != "best"]
if video_languages and "all" not in video_languages:
@@ -1518,7 +1656,10 @@ class dl:
if audio_description:
standard_audio = [a for a in title.tracks.audio if not a.descriptive]
selected_standards = title.tracks.by_language(
standard_audio, processed_lang, per_language=per_language, exact_match=exact_lang
standard_audio,
processed_lang,
per_language=per_language,
exact_match=exact_lang,
)
desc_audio = [a for a in title.tracks.audio if a.descriptive]
# Include all descriptive tracks for the requested languages.
@@ -1642,9 +1783,7 @@ class dl:
),
licence=partial(
service.get_playready_license
if (
is_playready_cdm(self.cdm)
)
if (is_playready_cdm(self.cdm))
and hasattr(service, "get_playready_license")
else service.get_widevine_license,
title=title,
@@ -1762,9 +1901,7 @@ class dl:
# Subtitle output mode configuration (for sidecar originals)
subtitle_output_mode = config.subtitle.get("output_mode", "mux")
sidecar_format = config.subtitle.get("sidecar_format", "srt")
skip_subtitle_mux = (
subtitle_output_mode == "sidecar" and (title.tracks.videos or title.tracks.audio)
)
skip_subtitle_mux = subtitle_output_mode == "sidecar" and (title.tracks.videos or title.tracks.audio)
sidecar_subtitles: list[Subtitle] = []
sidecar_original_paths: dict[str, Path] = {}
if subtitle_output_mode in ("sidecar", "both") and not no_mux:
@@ -2015,7 +2152,9 @@ class dl:
sidecar_dir = config.directories.downloads
if not no_folder and isinstance(title, (Episode, Song)) and media_info:
sidecar_dir /= title.get_filename(media_info, show_service=not no_source, folder=True)
sidecar_dir /= title.get_filename(
media_info, show_service=not no_source, folder=True
)
sidecar_dir.mkdir(parents=True, exist_ok=True)
with console.status("Saving subtitle sidecar files..."):

View File

@@ -1 +1 @@
__version__ = "2.4.0"
__version__ = "3.0.0"

View File

@@ -1,5 +1,6 @@
import atexit
import logging
from datetime import datetime
import click
import urllib3
@@ -58,7 +59,7 @@ def main(version: bool, debug: bool) -> None:
r" ▀▀▀ ▀▀ █▪ ▀▀▀▀ ▀▀▀ · ▀ ▀ ·▀▀▀ ·▀ ▀.▀▀▀ ▀▀▀ ",
style="ascii.art",
),
f"v [repr.number]{__version__}[/] - © 2025 - github.com/unshackle-dl/unshackle",
f"v [repr.number]{__version__}[/] - © 2025-{datetime.now().year} - github.com/unshackle-dl/unshackle",
),
(1, 11, 1, 10),
expand=True,

View File

@@ -192,8 +192,10 @@ def build_download_args(
if ad_keyword:
args["--ad-keyword"] = ad_keyword
key_args = []
if content_keys:
args["--key"] = next((f"{kid.hex}:{key.lower()}" for kid, key in content_keys.items()), None)
for kid, key in content_keys.items():
key_args.extend(["--key", f"{kid.hex}:{key.lower()}"])
decryption_config = config.decryption.lower()
engine_name = DECRYPTION_ENGINE.get(decryption_config) or "SHAKA_PACKAGER"
@@ -221,6 +223,9 @@ def build_download_args(
elif value is not False and value is not None:
command.extend([flag, str(value)])
# Append all content keys (multiple --key flags supported by N_m3u8DL-RE)
command.extend(key_args)
if headers:
for key, value in headers.items():
if key.lower() not in ("accept-encoding", "cookie"):

View File

@@ -116,9 +116,14 @@ class HLS:
for playlist in self.manifest.playlists:
audio_group = playlist.stream_info.audio
if audio_group:
audio_codec = Audio.Codec.from_codecs(playlist.stream_info.codecs)
audio_codecs_by_group_id[audio_group] = audio_codec
audio_codec: Optional[Audio.Codec] = None
if audio_group and playlist.stream_info.codecs:
try:
audio_codec = Audio.Codec.from_codecs(playlist.stream_info.codecs)
except ValueError:
audio_codec = None
if audio_codec:
audio_codecs_by_group_id[audio_group] = audio_codec
try:
# TODO: Any better way to figure out the primary track type?

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import mimetypes
import os
import re
from pathlib import Path
from typing import Optional, Union
from urllib.parse import urlparse
@@ -56,7 +57,8 @@ class Attachment:
# Use provided name for the file if available
if name:
file_name = f"{name.replace(' ', '_')}{os.path.splitext(file_name)[1]}"
safe_name = re.sub(r'[<>:"/\\|?*]', "", name).replace(" ", "_")
file_name = f"{safe_name}{os.path.splitext(file_name)[1]}"
download_path = config.directories.temp / file_name

View File

@@ -1,6 +1,8 @@
import json
import logging
import os
import random
import re
import subprocess
import sys
from pathlib import Path
@@ -8,14 +10,16 @@ from pathlib import Path
from rich.padding import Padding
from rich.rule import Rule
from unshackle.core.binaries import FFMPEG, DoviTool, HDR10PlusTool
from unshackle.core.binaries import FFMPEG, DoviTool, FFProbe, HDR10PlusTool
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.utilities import get_debug_logger
class Hybrid:
def __init__(self, videos, source) -> None:
self.log = logging.getLogger("hybrid")
self.debug_logger = get_debug_logger()
"""
Takes the Dolby Vision and HDR10(+) streams out of the VideoTracks.
@@ -41,6 +45,19 @@ class Hybrid:
console.print(Padding(Rule(f"[rule.text]HDR10+DV Hybrid ({self.resolution})"), (1, 2)))
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_init",
message="Starting HDR10+DV hybrid processing",
context={
"source": source,
"resolution": self.resolution,
"video_count": len(videos),
"video_ranges": [str(v.range) for v in videos],
},
)
for video in self.videos:
if not video.path or not os.path.exists(video.path):
raise ValueError(f"Video track {video.id} was not downloaded before injection.")
@@ -55,13 +72,13 @@ class Hybrid:
# If we have HDR10+ but no DV, we can convert HDR10+ to DV
if not has_dv and has_hdr10p:
self.log.info("No DV track found, but HDR10+ is available. Will convert HDR10+ to DV.")
console.status("No DV track found, but HDR10+ is available. Will convert HDR10+ to DV.")
self.hdr10plus_to_dv = True
elif not has_dv:
raise ValueError("No DV track available and no HDR10+ to convert.")
if os.path.isfile(config.directories.temp / self.hevc_file):
self.log.info("Already Injected")
console.status("Already Injected")
return
for video in videos:
@@ -89,14 +106,33 @@ class Hybrid:
self.extract_rpu(dv_video)
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
self.rpu_file = "RPU_UNT.bin"
self.level_6()
# Mode 3 conversion already done during extraction when not untouched
elif os.path.isfile(config.directories.temp / "RPU.bin"):
# RPU already extracted with mode 3
pass
# Edit L6 with actual luminance values from RPU, then L5 active area
self.level_6()
hdr10_video = next((v for v in videos if v.range == Video.Range.HDR10), None)
hdr10_input = hdr10_video.path if hdr10_video else None
if hdr10_input:
self.level_5(hdr10_input)
self.injecting()
if self.debug_logger:
self.debug_logger.log(
level="INFO",
operation="hybrid_complete",
message="Injection Completed",
context={
"hdr_type": self.hdr_type,
"resolution": self.resolution,
"hdr10plus_to_dv": self.hdr10plus_to_dv,
"rpu_file": self.rpu_file,
"output_file": self.hevc_file,
},
)
self.log.info("✓ Injection Completed")
if self.source == ("itunes" or "appletvplus"):
Path.unlink(config.directories.temp / "hdr10.mkv")
@@ -104,6 +140,10 @@ class Hybrid:
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
Path.unlink(config.directories.temp / "L5.json", missing_ok=True)
Path.unlink(config.directories.temp / "L6.json", missing_ok=True)
def ffmpeg_simple(self, save_path, output):
"""Simple ffmpeg execution without progress tracking"""
@@ -121,20 +161,41 @@ class Hybrid:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return p.returncode
return p
def extract_stream(self, save_path, type_):
output = Path(config.directories.temp / f"{type_}.hevc")
with console.status(f"Extracting {type_} stream...", spinner="dots"):
returncode = self.ffmpeg_simple(save_path, output)
result = self.ffmpeg_simple(save_path, output)
if returncode:
if result.returncode:
output.unlink(missing_ok=True)
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_stream",
message=f"Failed extracting {type_} stream",
context={
"type": type_,
"input": str(save_path),
"output": str(output),
"returncode": result.returncode,
"stderr": (result.stderr or b"").decode(errors="replace"),
"stdout": (result.stdout or b"").decode(errors="replace"),
},
)
self.log.error(f"x Failed extracting {type_} stream")
sys.exit(1)
self.log.info(f"Extracted {type_} stream")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_extract_stream",
message=f"Extracted {type_} stream",
context={"type": type_, "input": str(save_path), "output": str(output)},
success=True,
)
def extract_rpu(self, video, untouched=False):
if os.path.isfile(config.directories.temp / "RPU.bin") or os.path.isfile(
@@ -161,58 +222,326 @@ class Hybrid:
stderr=subprocess.PIPE,
)
rpu_name = "RPU" if not untouched else "RPU_UNT"
if rpu_extraction.returncode:
Path.unlink(config.directories.temp / f"{'RPU' if not untouched else 'RPU_UNT'}.bin")
Path.unlink(config.directories.temp / f"{rpu_name}.bin")
stderr_text = rpu_extraction.stderr.decode(errors="replace") if rpu_extraction.stderr else ""
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_rpu",
message=f"Failed extracting{' untouched ' if untouched else ' '}RPU",
context={
"untouched": untouched,
"returncode": rpu_extraction.returncode,
"stderr": stderr_text,
"args": [str(a) for a in extraction_args],
},
)
if b"MAX_PQ_LUMINANCE" in rpu_extraction.stderr:
self.extract_rpu(video, untouched=True)
elif b"Invalid PPS index" in rpu_extraction.stderr:
raise ValueError("Dolby Vision VideoTrack seems to be corrupt")
else:
raise ValueError(f"Failed extracting{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
elif self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_extract_rpu",
message=f"Extracted{' untouched ' if untouched else ' '}RPU from Dolby Vision stream",
context={"untouched": untouched, "output": f"{rpu_name}.bin"},
success=True,
)
self.log.info(f"Extracted{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
def level_5(self, input_video):
"""Generate Level 5 active area metadata via crop detection on the HDR10 stream.
def level_6(self):
"""Edit RPU Level 6 values"""
with open(config.directories.temp / "L6.json", "w+") as level6_file:
level6 = {
"cm_version": "V29",
"length": 0,
"level6": {
"max_display_mastering_luminance": 1000,
"min_display_mastering_luminance": 1,
"max_content_light_level": 0,
"max_frame_average_light_level": 0,
},
}
This resolves mismatches where DV has no black bars but HDR10 does (or vice versa)
by telling the display the correct active area.
"""
if os.path.isfile(config.directories.temp / "RPU_L5.bin"):
return
json.dump(level6, level6_file, indent=3)
ffprobe_bin = str(FFProbe) if FFProbe else "ffprobe"
ffmpeg_bin = str(FFMPEG) if FFMPEG else "ffmpeg"
if not os.path.isfile(config.directories.temp / "RPU_L6.bin"):
with console.status("Editing RPU Level 6 values...", spinner="dots"):
level6 = subprocess.run(
# Get video duration for random sampling
with console.status("Detecting active area (crop detection)...", spinner="dots"):
result_duration = subprocess.run(
[ffprobe_bin, "-v", "error", "-show_entries", "format=duration", "-of", "json", str(input_video)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result_duration.returncode != 0:
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="hybrid_level5",
message="Could not probe video duration",
context={"returncode": result_duration.returncode, "stderr": (result_duration.stderr or "")},
)
self.log.warning("Could not probe video duration, skipping L5 crop detection")
return
duration_info = json.loads(result_duration.stdout)
duration = float(duration_info["format"]["duration"])
# Get video resolution for proper border calculation
result_streams = subprocess.run(
[
ffprobe_bin,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"json",
str(input_video),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result_streams.returncode != 0:
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="hybrid_level5",
message="Could not probe video resolution",
context={"returncode": result_streams.returncode, "stderr": (result_streams.stderr or "")},
)
self.log.warning("Could not probe video resolution, skipping L5 crop detection")
return
stream_info = json.loads(result_streams.stdout)
original_width = int(stream_info["streams"][0]["width"])
original_height = int(stream_info["streams"][0]["height"])
# Sample 10 random timestamps and run cropdetect on each
random_times = sorted(random.uniform(0, duration) for _ in range(10))
crop_results = []
for t in random_times:
result_cropdetect = subprocess.run(
[
str(DoviTool),
"editor",
ffmpeg_bin,
"-y",
"-nostdin",
"-loglevel",
"info",
"-ss",
f"{t:.2f}",
"-i",
config.directories.temp / self.rpu_file,
"-j",
config.directories.temp / "L6.json",
"-o",
config.directories.temp / "RPU_L6.bin",
str(input_video),
"-vf",
"cropdetect=round=2",
"-vframes",
"10",
"-f",
"null",
"-",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if level6.returncode:
Path.unlink(config.directories.temp / "RPU_L6.bin")
raise ValueError("Failed editing RPU Level 6 values")
# cropdetect outputs crop=w:h:x:y
crop_match = re.search(
r"crop=(\d+):(\d+):(\d+):(\d+)",
(result_cropdetect.stdout or "") + (result_cropdetect.stderr or ""),
)
if crop_match:
w, h = int(crop_match.group(1)), int(crop_match.group(2))
x, y = int(crop_match.group(3)), int(crop_match.group(4))
# Calculate actual border sizes from crop geometry
left = x
top = y
right = original_width - w - x
bottom = original_height - h - y
crop_results.append((left, top, right, bottom))
self.log.info("Edited RPU Level 6 values")
if not crop_results:
if self.debug_logger:
self.debug_logger.log(
level="WARNING",
operation="hybrid_level5",
message="No crop data detected, skipping L5",
context={"samples": len(random_times)},
)
self.log.warning("No crop data detected, skipping L5")
return
# Update rpu_file to use the edited version
self.rpu_file = "RPU_L6.bin"
# Find the most common crop values
crop_counts = {}
for crop in crop_results:
crop_counts[crop] = crop_counts.get(crop, 0) + 1
most_common = max(crop_counts, key=crop_counts.get)
left, top, right, bottom = most_common
# If all borders are 0 there's nothing to correct
if left == 0 and top == 0 and right == 0 and bottom == 0:
return
l5_json = {
"active_area": {
"crop": False,
"presets": [{"id": 0, "left": left, "right": right, "top": top, "bottom": bottom}],
"edits": {"all": 0},
}
}
l5_path = config.directories.temp / "L5.json"
with open(l5_path, "w") as f:
json.dump(l5_json, f, indent=4)
with console.status("Editing RPU Level 5 active area...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
str(config.directories.temp / self.rpu_file),
"-j",
str(l5_path),
"-o",
str(config.directories.temp / "RPU_L5.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level5",
message="Failed editing RPU Level 5 values",
context={"returncode": result.returncode, "stderr": (result.stderr or b"").decode(errors="replace")},
)
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 5 values")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_level5",
message="Edited RPU Level 5 active area",
context={"crop": {"left": left, "right": right, "top": top, "bottom": bottom}, "samples": len(crop_results)},
success=True,
)
self.rpu_file = "RPU_L5.bin"
def level_6(self):
"""Edit RPU Level 6 values using actual luminance data from the RPU."""
if os.path.isfile(config.directories.temp / "RPU_L6.bin"):
return
with console.status("Reading RPU luminance metadata...", spinner="dots"):
result = subprocess.run(
[str(DoviTool), "info", "-i", str(config.directories.temp / self.rpu_file), "-s"],
capture_output=True,
text=True,
)
if result.returncode != 0:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Failed reading RPU metadata for Level 6 values",
context={"returncode": result.returncode, "stderr": (result.stderr or "")},
)
raise ValueError("Failed reading RPU metadata for Level 6 values")
max_cll = None
max_fall = None
max_mdl = None
min_mdl = None
for line in result.stdout.splitlines():
if "RPU content light level (L1):" in line:
parts = line.split("MaxCLL:")[1].split(",")
max_cll = int(float(parts[0].strip().split()[0]))
if len(parts) > 1 and "MaxFALL:" in parts[1]:
max_fall = int(float(parts[1].split("MaxFALL:")[1].strip().split()[0]))
elif "RPU mastering display:" in line:
mastering = line.split(":", 1)[1].strip()
min_lum, max_lum = mastering.split("/")[0], mastering.split("/")[1].split(" ")[0]
min_mdl = int(float(min_lum) * 10000)
max_mdl = int(float(max_lum))
if any(v is None for v in (max_cll, max_fall, max_mdl, min_mdl)):
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Could not extract Level 6 luminance data from RPU",
context={"max_cll": max_cll, "max_fall": max_fall, "max_mdl": max_mdl, "min_mdl": min_mdl},
)
raise ValueError("Could not extract Level 6 luminance data from RPU")
level6_data = {
"level6": {
"remove_cmv4": False,
"remove_mapping": False,
"max_display_mastering_luminance": max_mdl,
"min_display_mastering_luminance": min_mdl,
"max_content_light_level": max_cll,
"max_frame_average_light_level": max_fall,
}
}
l6_path = config.directories.temp / "L6.json"
with open(l6_path, "w") as f:
json.dump(level6_data, f, indent=4)
with console.status("Editing RPU Level 6 values...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
str(config.directories.temp / self.rpu_file),
"-j",
str(l6_path),
"-o",
str(config.directories.temp / "RPU_L6.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_level6",
message="Failed editing RPU Level 6 values",
context={"returncode": result.returncode, "stderr": (result.stderr or b"").decode(errors="replace")},
)
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 6 values")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_level6",
message="Edited RPU Level 6 luminance values",
context={
"max_cll": max_cll,
"max_fall": max_fall,
"max_mdl": max_mdl,
"min_mdl": min_mdl,
},
success=True,
)
self.rpu_file = "RPU_L6.bin"
def injecting(self):
if os.path.isfile(config.directories.temp / self.hevc_file):
@@ -232,7 +561,7 @@ class Hybrid:
# Default to removing HDR10+ metadata since we're converting to DV
if self.hdr10plus_to_dv:
inject_cmd.append("--drop-hdr10plus")
self.log.info(" - Removing HDR10+ metadata during injection")
console.status("Removing HDR10+ metadata during injection")
inject_cmd.extend(["-o", config.directories.temp / self.hevc_file])
@@ -243,10 +572,29 @@ class Hybrid:
)
if inject.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_inject_rpu",
message="Failed injecting Dolby Vision metadata into HDR10 stream",
context={
"returncode": inject.returncode,
"stderr": (inject.stderr or b"").decode(errors="replace"),
"stdout": (inject.stdout or b"").decode(errors="replace"),
"cmd": [str(a) for a in inject_cmd],
},
)
Path.unlink(config.directories.temp / self.hevc_file)
raise ValueError("Failed injecting Dolby Vision metadata into HDR10 stream")
self.log.info(f"Injected Dolby Vision metadata into {self.hdr_type} stream")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_inject_rpu",
message=f"Injected Dolby Vision metadata into {self.hdr_type} stream",
context={"hdr_type": self.hdr_type, "rpu_file": self.rpu_file, "output": self.hevc_file, "drop_hdr10plus": self.hdr10plus_to_dv},
success=True,
)
def extract_hdr10plus(self, _video):
"""Extract HDR10+ metadata from the video stream"""
@@ -271,13 +619,39 @@ class Hybrid:
)
if extraction.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_hdr10plus",
message="Failed extracting HDR10+ metadata",
context={
"returncode": extraction.returncode,
"stderr": (extraction.stderr or b"").decode(errors="replace"),
"stdout": (extraction.stdout or b"").decode(errors="replace"),
},
)
raise ValueError("Failed extracting HDR10+ metadata")
# Check if the extracted file has content
if os.path.getsize(config.directories.temp / self.hdr10plus_file) == 0:
file_size = os.path.getsize(config.directories.temp / self.hdr10plus_file)
if file_size == 0:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_extract_hdr10plus",
message="No HDR10+ metadata found in the stream",
context={"file_size": 0},
)
raise ValueError("No HDR10+ metadata found in the stream")
self.log.info("Extracted HDR10+ metadata")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_extract_hdr10plus",
message="Extracted HDR10+ metadata",
context={"output": self.hdr10plus_file, "file_size": file_size},
success=True,
)
def convert_hdr10plus_to_dv(self):
"""Convert HDR10+ metadata to Dolby Vision RPU"""
@@ -317,10 +691,26 @@ class Hybrid:
)
if conversion.returncode:
if self.debug_logger:
self.debug_logger.log(
level="ERROR",
operation="hybrid_convert_hdr10plus",
message="Failed converting HDR10+ to Dolby Vision",
context={
"returncode": conversion.returncode,
"stderr": (conversion.stderr or b"").decode(errors="replace"),
"stdout": (conversion.stdout or b"").decode(errors="replace"),
},
)
raise ValueError("Failed converting HDR10+ to Dolby Vision")
self.log.info("Converted HDR10+ metadata to Dolby Vision")
self.log.info("✓ HDR10+ successfully converted to Dolby Vision Profile 8")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="hybrid_convert_hdr10plus",
message="Converted HDR10+ metadata to Dolby Vision Profile 8",
success=True,
)
# Clean up temporary files
Path.unlink(config.directories.temp / "extra.json")

View File

@@ -221,13 +221,15 @@ class Tracks:
self.videos.sort(key=lambda x: not is_close_match(language, [x.language]))
def sort_audio(self, by_language: Optional[Sequence[Union[str, Language]]] = None) -> None:
"""Sort audio tracks by bitrate, descriptive, and optionally language."""
"""Sort audio tracks by bitrate, Atmos, descriptive, and optionally language."""
if not self.audio:
return
# descriptive
self.audio.sort(key=lambda x: x.descriptive)
# bitrate (within each descriptive group)
# bitrate (highest first)
self.audio.sort(key=lambda x: float(x.bitrate or 0.0), reverse=True)
# Atmos tracks first (prioritize over higher bitrate non-Atmos)
self.audio.sort(key=lambda x: not x.atmos)
# descriptive tracks last
self.audio.sort(key=lambda x: x.descriptive)
# language
for language in reversed(by_language or []):
if str(language) in ("all", "best"):

View File

@@ -0,0 +1,310 @@
import sys
import click
from rich.console import Group
from rich.live import Live
from rich.padding import Padding
from rich.table import Table
from rich.text import Text
from unshackle.core.console import console
IS_WINDOWS = sys.platform == "win32"
if IS_WINDOWS:
import msvcrt
class Selector:
"""
A custom interactive selector class using the Rich library.
Allows for multi-selection of items with pagination.
"""
def __init__(
self,
options: list[str],
cursor_style: str = "pink",
text_style: str = "text",
page_size: int = 8,
minimal_count: int = 0,
dependencies: dict[int, list[int]] = None,
prefixes: list[str] = None,
):
"""
Initialize the Selector.
Args:
options: List of strings to select from.
cursor_style: Rich style for the highlighted cursor item.
text_style: Rich style for normal items.
page_size: Number of items to show per page.
minimal_count: Minimum number of items that must be selected.
dependencies: Dictionary mapping parent index to list of child indices.
"""
self.options = options
self.cursor_style = cursor_style
self.text_style = text_style
self.page_size = page_size
self.minimal_count = minimal_count
self.dependencies = dependencies or {}
self.cursor_index = 0
self.selected_indices = set()
self.scroll_offset = 0
def get_renderable(self):
"""
Constructs and returns the renderable object (Table + Info) for the current state.
"""
table = Table(show_header=False, show_edge=False, box=None, pad_edge=False, padding=(0, 1, 0, 0))
table.add_column("Indicator", justify="right", no_wrap=True)
table.add_column("Option", overflow="ellipsis", no_wrap=True)
for i in range(self.page_size):
idx = self.scroll_offset + i
if idx < len(self.options):
option = self.options[idx]
is_cursor = idx == self.cursor_index
is_selected = idx in self.selected_indices
symbol = "[X]" if is_selected else "[ ]"
style = self.cursor_style if is_cursor else self.text_style
indicator_text = Text(f"{symbol}", style=style)
content_text = Text.from_markup(option)
content_text.style = style
table.add_row(indicator_text, content_text)
else:
table.add_row(Text(" "), Text(" "))
total_pages = (len(self.options) + self.page_size - 1) // self.page_size
current_page = (self.scroll_offset // self.page_size) + 1
info_text = Text(
f"\n[Space]: Toggle [a]: All [←/→]: Page [Enter]: Confirm (Page {current_page}/{total_pages})",
style="gray",
)
return Padding(Group(table, info_text), (0, 5))
def move_cursor(self, delta: int):
"""
Moves the cursor up or down by the specified delta.
Updates the scroll offset if the cursor moves out of the current view.
"""
self.cursor_index = (self.cursor_index + delta) % len(self.options)
new_page_idx = self.cursor_index // self.page_size
self.scroll_offset = new_page_idx * self.page_size
def change_page(self, delta: int):
"""
Changes the current page view by the specified delta (previous/next page).
Also moves the cursor to the first item of the new page.
"""
current_page = self.scroll_offset // self.page_size
total_pages = (len(self.options) + self.page_size - 1) // self.page_size
new_page = current_page + delta
if 0 <= new_page < total_pages:
self.scroll_offset = new_page * self.page_size
first_idx_of_page = self.scroll_offset
if first_idx_of_page < len(self.options):
self.cursor_index = first_idx_of_page
else:
self.cursor_index = len(self.options) - 1
def toggle_selection(self):
"""
Toggles the selection state of the item currently under the cursor.
Propagates selection to children if defined in dependencies.
"""
target_indices = {self.cursor_index}
if self.cursor_index in self.dependencies:
target_indices.update(self.dependencies[self.cursor_index])
should_select = self.cursor_index not in self.selected_indices
if should_select:
self.selected_indices.update(target_indices)
else:
self.selected_indices.difference_update(target_indices)
def toggle_all(self):
"""
Toggles the selection of all items.
If all are selected, clears selection. Otherwise, selects all.
"""
if len(self.selected_indices) == len(self.options):
self.selected_indices.clear()
else:
self.selected_indices = set(range(len(self.options)))
def get_input_windows(self):
"""
Captures and parses keyboard input on Windows systems using msvcrt.
Returns command strings like 'UP', 'DOWN', 'ENTER', etc.
"""
key = msvcrt.getch()
if key == b"\x03" or key == b"\x1b":
return "CANCEL"
if key == b"\xe0" or key == b"\x00":
try:
key = msvcrt.getch()
if key == b"H":
return "UP"
if key == b"P":
return "DOWN"
if key == b"K":
return "LEFT"
if key == b"M":
return "RIGHT"
except Exception:
pass
try:
char = key.decode("utf-8", errors="ignore")
except Exception:
return None
if char in ("\r", "\n"):
return "ENTER"
if char == " ":
return "SPACE"
if char in ("q", "Q"):
return "QUIT"
if char in ("a", "A"):
return "ALL"
if char in ("w", "W", "k", "K"):
return "UP"
if char in ("s", "S", "j", "J"):
return "DOWN"
if char in ("h", "H"):
return "LEFT"
if char in ("d", "D", "l", "L"):
return "RIGHT"
return None
def get_input_unix(self):
"""
Captures and parses keyboard input on Unix/Linux systems using click.getchar().
Returns command strings like 'UP', 'DOWN', 'ENTER', etc.
"""
char = click.getchar()
if char == "\x03":
return "CANCEL"
mapping = {
"\x1b[A": "UP",
"\x1b[B": "DOWN",
"\x1b[C": "RIGHT",
"\x1b[D": "LEFT",
}
if char in mapping:
return mapping[char]
if char == "\x1b":
try:
next1 = click.getchar()
if next1 in ("[", "O"):
next2 = click.getchar()
if next2 == "A":
return "UP"
if next2 == "B":
return "DOWN"
if next2 == "C":
return "RIGHT"
if next2 == "D":
return "LEFT"
return "CANCEL"
except Exception:
return "CANCEL"
if char in ("\r", "\n"):
return "ENTER"
if char == " ":
return "SPACE"
if char in ("q", "Q"):
return "QUIT"
if char in ("a", "A"):
return "ALL"
if char in ("w", "W", "k", "K"):
return "UP"
if char in ("s", "S", "j", "J"):
return "DOWN"
if char in ("h", "H"):
return "LEFT"
if char in ("d", "D", "l", "L"):
return "RIGHT"
return None
def run(self) -> list[int]:
"""
Starts the main event loop for the selector.
Renders the UI and processes input until confirmed or cancelled.
Returns:
list[int]: A sorted list of selected indices.
"""
try:
with Live(self.get_renderable(), console=console, auto_refresh=False, transient=True) as live:
while True:
live.update(self.get_renderable(), refresh=True)
if IS_WINDOWS:
action = self.get_input_windows()
else:
action = self.get_input_unix()
if action == "UP":
self.move_cursor(-1)
elif action == "DOWN":
self.move_cursor(1)
elif action == "LEFT":
self.change_page(-1)
elif action == "RIGHT":
self.change_page(1)
elif action == "SPACE":
self.toggle_selection()
elif action == "ALL":
self.toggle_all()
elif action in ("ENTER", "QUIT"):
if len(self.selected_indices) >= self.minimal_count:
return sorted(list(self.selected_indices))
elif action == "CANCEL":
raise KeyboardInterrupt
except KeyboardInterrupt:
return []
def select_multiple(
options: list[str],
minimal_count: int = 1,
page_size: int = 8,
return_indices: bool = True,
cursor_style: str = "pink",
**kwargs,
) -> list[int]:
"""
Drop-in replacement using custom Selector with global console.
Args:
options: List of options to display.
minimal_count: Minimum number of selections required.
page_size: Number of items per page.
return_indices: If True, returns indices; otherwise returns the option strings.
cursor_style: Style color for the cursor.
"""
selector = Selector(
options=options,
cursor_style=cursor_style,
text_style="text",
page_size=page_size,
minimal_count=minimal_count,
**kwargs,
)
selected_indices = selector.run()
if return_indices:
return selected_indices
return [options[i] for i in selected_indices]

2
uv.lock generated
View File

@@ -1627,7 +1627,7 @@ wheels = [
[[package]]
name = "unshackle"
version = "2.4.0"
version = "3.0.0"
source = { editable = "." }
dependencies = [
{ name = "aiohttp" },