6 Commits

Author SHA1 Message Date
Andy
9ed56709cd Merge branch 'dev' of https://github.com/unshackle-dl/unshackle into dev 2026-02-15 17:38:52 -07:00
Andy
f96f1f9a95 feat(hybrid): add L5 active area and dynamic L6 luminance metadata
Add crop detection via ffmpeg to generate Level 5 active area metadata, resolving DV/HDR10 black bar mismatches. Update Level 6 to extract actual luminance values from the RPU instead of hardcoding defaults.
2026-02-15 17:38:43 -07:00
Sp5rky
9f9a609d71 Merge pull request #77 from CodeName393/Select-Titles
Add Select titles option
2026-02-15 16:19:42 -07:00
Andy
cee7d9a75f fix(n_m3u8dl_re): pass all content keys for DualKey DRM decryption 2026-02-15 13:37:49 -07:00
CodeName393
dd19f405a4 Add selector 2026-02-09 02:21:04 +09:00
CodeName393
dbebf68f18 Add select-titles 2026-02-09 02:20:26 +09:00
4 changed files with 583 additions and 33 deletions

View File

@@ -65,6 +65,7 @@ from unshackle.core.utils import tags
from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE,
ContextData, MultipleChoice, SubtitleCodecChoice, VideoCodecChoice)
from unshackle.core.utils.collections import merge_dict
from unshackle.core.utils.selector import select_multiple
from unshackle.core.utils.subprocess import ffprobe
from unshackle.core.vaults import Vaults
@@ -346,6 +347,12 @@ class dl:
default=None,
help="Create separate output files per audio codec instead of merging all audio.",
)
@click.option(
"--select-titles",
is_flag=True,
default=False,
help="Interactively select downloads from a list. Only use with Series to select Episodes",
)
@click.option(
"-w",
"--wanted",
@@ -859,6 +866,7 @@ class dl:
range_: list[Video.Range],
channels: float,
no_atmos: bool,
select_titles: bool,
wanted: list[str],
latest_episode: bool,
lang: list[str],
@@ -1047,6 +1055,84 @@ class dl:
if list_titles:
return
# Enables manual selection for Series when --select-titles is set
if select_titles and type(titles) == Series:
console.print(Padding(Rule(f"[rule.text]Select Titles"), (1, 2)))
selection_titles = []
dependencies = {}
original_indices = {}
current_season = None
current_season_header_idx = -1
unique_seasons = {t.season for t in titles}
multiple_seasons = len(unique_seasons) > 1
# Build selection options
for i, t in enumerate(titles):
# Insert season header only if multiple seasons exist
if multiple_seasons and t.season != current_season:
current_season = t.season
header_text = f"Season {t.season}"
selection_titles.append(header_text)
current_season_header_idx = len(selection_titles) - 1
dependencies[current_season_header_idx] = []
# Note: Headers are not mapped to actual title indices
# Format display name
display_name = ((t.name[:35].rstrip() + "") if len(t.name) > 35 else t.name) if t.name else None
# Apply indentation only for multiple seasons
prefix = " " if multiple_seasons else ""
option_text = (
f"{prefix}{t.number}" + (f". {display_name}" if t.name else "")
)
selection_titles.append(option_text)
current_ui_idx = len(selection_titles) - 1
# Map UI index to actual title index
original_indices[current_ui_idx] = i
# Link episode to season header for group selection
if current_season_header_idx != -1:
dependencies[current_season_header_idx].append(current_ui_idx)
selection_start = time.time()
# Execute selector with dependencies (headers select all children)
selected_ui_idx = select_multiple(
selection_titles,
minimal_count=1,
page_size=8,
return_indices=True,
dependencies=dependencies
)
selection_end = time.time()
start_time += (selection_end - selection_start)
# Map UI indices back to title indices (excluding headers)
selected_idx = []
for idx in selected_ui_idx:
if idx in original_indices:
selected_idx.append(original_indices[idx])
# Ensure indices are unique and ordered
selected_idx = sorted(set(selected_idx))
keep = set(selected_idx)
# In-place filter: remove unselected items (iterate backwards)
for i in range(len(titles) - 1, -1, -1):
if i not in keep:
del titles[i]
# Show selected count
if titles:
count = len(titles)
console.print(Padding(f"[text]Total selected: {count}[/]", (0, 5)))
# Determine the latest episode if --latest-episode is set
latest_episode_id = None
if latest_episode and isinstance(titles, Series) and len(titles) > 0:

View File

@@ -192,8 +192,10 @@ def build_download_args(
if ad_keyword:
args["--ad-keyword"] = ad_keyword
key_args = []
if content_keys:
args["--key"] = next((f"{kid.hex}:{key.lower()}" for kid, key in content_keys.items()), None)
for kid, key in content_keys.items():
key_args.extend(["--key", f"{kid.hex}:{key.lower()}"])
decryption_config = config.decryption.lower()
engine_name = DECRYPTION_ENGINE.get(decryption_config) or "SHAKA_PACKAGER"
@@ -221,6 +223,9 @@ def build_download_args(
elif value is not False and value is not None:
command.extend([flag, str(value)])
# Append all content keys (multiple --key flags supported by N_m3u8DL-RE)
command.extend(key_args)
if headers:
for key, value in headers.items():
if key.lower() not in ("accept-encoding", "cookie"):

View File

@@ -1,6 +1,8 @@
import json
import logging
import os
import random
import re
import subprocess
import sys
from pathlib import Path
@@ -8,7 +10,7 @@ from pathlib import Path
from rich.padding import Padding
from rich.rule import Rule
from unshackle.core.binaries import FFMPEG, DoviTool, HDR10PlusTool
from unshackle.core.binaries import FFMPEG, DoviTool, FFProbe, HDR10PlusTool
from unshackle.core.config import config
from unshackle.core.console import console
@@ -89,12 +91,18 @@ class Hybrid:
self.extract_rpu(dv_video)
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
self.rpu_file = "RPU_UNT.bin"
self.level_6()
# Mode 3 conversion already done during extraction when not untouched
elif os.path.isfile(config.directories.temp / "RPU.bin"):
# RPU already extracted with mode 3
pass
# Edit L6 with actual luminance values from RPU, then L5 active area
self.level_6()
hdr10_video = next((v for v in videos if v.range == Video.Range.HDR10), None)
hdr10_input = hdr10_video.path if hdr10_video else None
if hdr10_input:
self.level_5(hdr10_input)
self.injecting()
self.log.info("✓ Injection Completed")
@@ -104,6 +112,10 @@ class Hybrid:
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
Path.unlink(config.directories.temp / "L5.json", missing_ok=True)
Path.unlink(config.directories.temp / "L6.json", missing_ok=True)
def ffmpeg_simple(self, save_path, output):
"""Simple ffmpeg execution without progress tracking"""
@@ -172,46 +184,224 @@ class Hybrid:
self.log.info(f"Extracted{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
def level_6(self):
"""Edit RPU Level 6 values"""
with open(config.directories.temp / "L6.json", "w+") as level6_file:
level6 = {
"cm_version": "V29",
"length": 0,
"level6": {
"max_display_mastering_luminance": 1000,
"min_display_mastering_luminance": 1,
"max_content_light_level": 0,
"max_frame_average_light_level": 0,
},
def level_5(self, input_video):
"""Generate Level 5 active area metadata via crop detection on the HDR10 stream.
This resolves mismatches where DV has no black bars but HDR10 does (or vice versa)
by telling the display the correct active area.
"""
if os.path.isfile(config.directories.temp / "RPU_L5.bin"):
return
ffprobe_bin = str(FFProbe) if FFProbe else "ffprobe"
ffmpeg_bin = str(FFMPEG) if FFMPEG else "ffmpeg"
# Get video duration for random sampling
with console.status("Detecting active area (crop detection)...", spinner="dots"):
result_duration = subprocess.run(
[ffprobe_bin, "-v", "error", "-show_entries", "format=duration", "-of", "json", str(input_video)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result_duration.returncode != 0:
self.log.warning("Could not probe video duration, skipping L5 crop detection")
return
duration_info = json.loads(result_duration.stdout)
duration = float(duration_info["format"]["duration"])
# Get video resolution for proper border calculation
result_streams = subprocess.run(
[
ffprobe_bin,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"json",
str(input_video),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result_streams.returncode != 0:
self.log.warning("Could not probe video resolution, skipping L5 crop detection")
return
stream_info = json.loads(result_streams.stdout)
original_width = int(stream_info["streams"][0]["width"])
original_height = int(stream_info["streams"][0]["height"])
# Sample 10 random timestamps and run cropdetect on each
random_times = sorted(random.uniform(0, duration) for _ in range(10))
crop_results = []
for t in random_times:
result_cropdetect = subprocess.run(
[
ffmpeg_bin,
"-y",
"-nostdin",
"-loglevel",
"info",
"-ss",
f"{t:.2f}",
"-i",
str(input_video),
"-vf",
"cropdetect=round=2",
"-vframes",
"10",
"-f",
"null",
"-",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# cropdetect outputs crop=w:h:x:y
crop_match = re.search(
r"crop=(\d+):(\d+):(\d+):(\d+)",
(result_cropdetect.stdout or "") + (result_cropdetect.stderr or ""),
)
if crop_match:
w, h = int(crop_match.group(1)), int(crop_match.group(2))
x, y = int(crop_match.group(3)), int(crop_match.group(4))
# Calculate actual border sizes from crop geometry
left = x
top = y
right = original_width - w - x
bottom = original_height - h - y
crop_results.append((left, top, right, bottom))
if not crop_results:
self.log.warning("No crop data detected, skipping L5")
return
# Find the most common crop values
crop_counts = {}
for crop in crop_results:
crop_counts[crop] = crop_counts.get(crop, 0) + 1
most_common = max(crop_counts, key=crop_counts.get)
left, top, right, bottom = most_common
# If all borders are 0 there's nothing to correct
if left == 0 and top == 0 and right == 0 and bottom == 0:
return
l5_json = {
"active_area": {
"crop": False,
"presets": [{"id": 0, "left": left, "right": right, "top": top, "bottom": bottom}],
"edits": {"all": 0},
}
}
json.dump(level6, level6_file, indent=3)
l5_path = config.directories.temp / "L5.json"
with open(l5_path, "w") as f:
json.dump(l5_json, f, indent=4)
if not os.path.isfile(config.directories.temp / "RPU_L6.bin"):
with console.status("Editing RPU Level 6 values...", spinner="dots"):
level6 = subprocess.run(
with console.status("Editing RPU Level 5 active area...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
config.directories.temp / self.rpu_file,
str(config.directories.temp / self.rpu_file),
"-j",
config.directories.temp / "L6.json",
str(l5_path),
"-o",
config.directories.temp / "RPU_L6.bin",
str(config.directories.temp / "RPU_L5.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if level6.returncode:
Path.unlink(config.directories.temp / "RPU_L6.bin")
if result.returncode:
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 5 values")
self.rpu_file = "RPU_L5.bin"
def level_6(self):
"""Edit RPU Level 6 values using actual luminance data from the RPU."""
if os.path.isfile(config.directories.temp / "RPU_L6.bin"):
return
with console.status("Reading RPU luminance metadata...", spinner="dots"):
result = subprocess.run(
[str(DoviTool), "info", "-i", str(config.directories.temp / self.rpu_file), "-s"],
capture_output=True,
text=True,
)
if result.returncode != 0:
raise ValueError("Failed reading RPU metadata for Level 6 values")
max_cll = None
max_fall = None
max_mdl = None
min_mdl = None
for line in result.stdout.splitlines():
if "RPU content light level (L1):" in line:
parts = line.split("MaxCLL:")[1].split(",")
max_cll = int(float(parts[0].strip().split()[0]))
if len(parts) > 1 and "MaxFALL:" in parts[1]:
max_fall = int(float(parts[1].split("MaxFALL:")[1].strip().split()[0]))
elif "RPU mastering display:" in line:
mastering = line.split(":", 1)[1].strip()
min_lum, max_lum = mastering.split("/")[0], mastering.split("/")[1].split(" ")[0]
min_mdl = int(float(min_lum) * 10000)
max_mdl = int(float(max_lum))
if any(v is None for v in (max_cll, max_fall, max_mdl, min_mdl)):
raise ValueError("Could not extract Level 6 luminance data from RPU")
level6_data = {
"level6": {
"remove_cmv4": False,
"remove_mapping": False,
"max_display_mastering_luminance": max_mdl,
"min_display_mastering_luminance": min_mdl,
"max_content_light_level": max_cll,
"max_frame_average_light_level": max_fall,
}
}
l6_path = config.directories.temp / "L6.json"
with open(l6_path, "w") as f:
json.dump(level6_data, f, indent=4)
with console.status("Editing RPU Level 6 values...", spinner="dots"):
result = subprocess.run(
[
str(DoviTool),
"editor",
"-i",
str(config.directories.temp / self.rpu_file),
"-j",
str(l6_path),
"-o",
str(config.directories.temp / "RPU_L6.bin"),
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode:
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
raise ValueError("Failed editing RPU Level 6 values")
self.log.info("Edited RPU Level 6 values")
# Update rpu_file to use the edited version
self.rpu_file = "RPU_L6.bin"
def injecting(self):

View File

@@ -0,0 +1,269 @@
import click
import sys
from rich.console import Group
from rich.live import Live
from rich.padding import Padding
from rich.table import Table
from rich.text import Text
from unshackle.core.console import console
IS_WINDOWS = sys.platform == "win32"
if IS_WINDOWS:
import msvcrt
class Selector:
"""
A custom interactive selector class using the Rich library.
Allows for multi-selection of items with pagination.
"""
def __init__(
self,
options: list[str],
cursor_style: str = "pink",
text_style: str = "text",
page_size: int = 8,
minimal_count: int = 0,
dependencies: dict[int, list[int]] = None,
prefixes: list[str] = None
):
"""
Initialize the Selector.
Args:
options: List of strings to select from.
cursor_style: Rich style for the highlighted cursor item.
text_style: Rich style for normal items.
page_size: Number of items to show per page.
minimal_count: Minimum number of items that must be selected.
dependencies: Dictionary mapping parent index to list of child indices.
"""
self.options = options
self.cursor_style = cursor_style
self.text_style = text_style
self.page_size = page_size
self.minimal_count = minimal_count
self.dependencies = dependencies or {}
self.cursor_index = 0
self.selected_indices = set()
self.scroll_offset = 0
def get_renderable(self):
"""
Constructs and returns the renderable object (Table + Info) for the current state.
"""
table = Table(show_header=False, show_edge=False, box=None, pad_edge=False, padding=(0, 1, 0, 0))
table.add_column("Indicator", justify="right", no_wrap=True)
table.add_column("Option", overflow="ellipsis", no_wrap=True)
for i in range(self.page_size):
idx = self.scroll_offset + i
if idx < len(self.options):
option = self.options[idx]
is_cursor = (idx == self.cursor_index)
is_selected = (idx in self.selected_indices)
symbol = "[X]" if is_selected else "[ ]"
style = self.cursor_style if is_cursor else self.text_style
indicator_text = Text(f"{symbol}", style=style)
content_text = Text.from_markup(option)
content_text.style = style
table.add_row(indicator_text, content_text)
else:
table.add_row(Text(" "), Text(" "))
total_pages = (len(self.options) + self.page_size - 1) // self.page_size
current_page = (self.scroll_offset // self.page_size) + 1
info_text = Text(
f"\n[Space]: Toggle [a]: All [←/→]: Page [Enter]: Confirm (Page {current_page}/{total_pages})",
style="gray"
)
return Padding(Group(table, info_text), (0, 5))
def move_cursor(self, delta: int):
"""
Moves the cursor up or down by the specified delta.
Updates the scroll offset if the cursor moves out of the current view.
"""
self.cursor_index = (self.cursor_index + delta) % len(self.options)
new_page_idx = self.cursor_index // self.page_size
self.scroll_offset = new_page_idx * self.page_size
def change_page(self, delta: int):
"""
Changes the current page view by the specified delta (previous/next page).
Also moves the cursor to the first item of the new page.
"""
current_page = self.scroll_offset // self.page_size
total_pages = (len(self.options) + self.page_size - 1) // self.page_size
new_page = current_page + delta
if 0 <= new_page < total_pages:
self.scroll_offset = new_page * self.page_size
first_idx_of_page = self.scroll_offset
if first_idx_of_page < len(self.options):
self.cursor_index = first_idx_of_page
else:
self.cursor_index = len(self.options) - 1
def toggle_selection(self):
"""
Toggles the selection state of the item currently under the cursor.
Propagates selection to children if defined in dependencies.
"""
target_indices = {self.cursor_index}
if self.cursor_index in self.dependencies:
target_indices.update(self.dependencies[self.cursor_index])
should_select = self.cursor_index not in self.selected_indices
if should_select:
self.selected_indices.update(target_indices)
else:
self.selected_indices.difference_update(target_indices)
def toggle_all(self):
"""
Toggles the selection of all items.
If all are selected, clears selection. Otherwise, selects all.
"""
if len(self.selected_indices) == len(self.options):
self.selected_indices.clear()
else:
self.selected_indices = set(range(len(self.options)))
def get_input_windows(self):
"""
Captures and parses keyboard input on Windows systems using msvcrt.
Returns command strings like 'UP', 'DOWN', 'ENTER', etc.
"""
key = msvcrt.getch()
if key == b'\x03' or key == b'\x1b':
return 'CANCEL'
if key == b'\xe0' or key == b'\x00':
try:
key = msvcrt.getch()
if key == b'H': return 'UP'
if key == b'P': return 'DOWN'
if key == b'K': return 'LEFT'
if key == b'M': return 'RIGHT'
except: pass
try: char = key.decode('utf-8', errors='ignore')
except: return None
if char in ('\r', '\n'): return 'ENTER'
if char == ' ': return 'SPACE'
if char in ('q', 'Q'): return 'QUIT'
if char in ('a', 'A'): return 'ALL'
if char in ('w', 'W', 'k', 'K'): return 'UP'
if char in ('s', 'S', 'j', 'J'): return 'DOWN'
if char in ('h', 'H'): return 'LEFT'
if char in ('d', 'D', 'l', 'L'): return 'RIGHT'
return None
def get_input_unix(self):
"""
Captures and parses keyboard input on Unix/Linux systems using click.getchar().
Returns command strings like 'UP', 'DOWN', 'ENTER', etc.
"""
char = click.getchar()
if char == '\x03':
return 'CANCEL'
mapping = {
'\x1b[A': 'UP',
'\x1b[B': 'DOWN',
'\x1b[C': 'RIGHT',
'\x1b[D': 'LEFT',
}
if char in mapping:
return mapping[char]
if char == '\x1b':
try:
next1 = click.getchar()
if next1 in ('[', 'O'):
next2 = click.getchar()
if next2 == 'A': return 'UP'
if next2 == 'B': return 'DOWN'
if next2 == 'C': return 'RIGHT'
if next2 == 'D': return 'LEFT'
return 'CANCEL'
except:
return 'CANCEL'
if char in ('\r', '\n'): return 'ENTER'
if char == ' ': return 'SPACE'
if char in ('q', 'Q'): return 'QUIT'
if char in ('a', 'A'): return 'ALL'
if char in ('w', 'W', 'k', 'K'): return 'UP'
if char in ('s', 'S', 'j', 'J'): return 'DOWN'
if char in ('h', 'H'): return 'LEFT'
if char in ('d', 'D', 'l', 'L'): return 'RIGHT'
return None
def run(self) -> list[int]:
"""
Starts the main event loop for the selector.
Renders the UI and processes input until confirmed or cancelled.
Returns:
list[int]: A sorted list of selected indices.
"""
try:
with Live(self.get_renderable(), console=console, auto_refresh=False, transient=True) as live:
while True:
live.update(self.get_renderable(), refresh=True)
if IS_WINDOWS: action = self.get_input_windows()
else: action = self.get_input_unix()
if action == 'UP': self.move_cursor(-1)
elif action == 'DOWN': self.move_cursor(1)
elif action == 'LEFT': self.change_page(-1)
elif action == 'RIGHT': self.change_page(1)
elif action == 'SPACE': self.toggle_selection()
elif action == 'ALL': self.toggle_all()
elif action in ('ENTER', 'QUIT'):
if len(self.selected_indices) >= self.minimal_count:
return sorted(list(self.selected_indices))
elif action == 'CANCEL': raise KeyboardInterrupt
except KeyboardInterrupt:
return []
def select_multiple(
options: list[str],
minimal_count: int = 1,
page_size: int = 8,
return_indices: bool = True,
cursor_style: str = "pink",
**kwargs
) -> list[int]:
"""
Drop-in replacement using custom Selector with global console.
Args:
options: List of options to display.
minimal_count: Minimum number of selections required.
page_size: Number of items per page.
return_indices: If True, returns indices; otherwise returns the option strings.
cursor_style: Style color for the cursor.
"""
selector = Selector(
options=options,
cursor_style=cursor_style,
text_style="text",
page_size=page_size,
minimal_count=minimal_count,
**kwargs
)
selected_indices = selector.run()
if return_indices:
return selected_indices
return [options[i] for i in selected_indices]