mirror of
https://github.com/unshackle-dl/unshackle.git
synced 2026-05-17 22:39:27 +00:00
Compare commits
6 Commits
3.0.0
...
9ed56709cd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9ed56709cd | ||
|
|
f96f1f9a95 | ||
|
|
9f9a609d71 | ||
|
|
cee7d9a75f | ||
|
|
dd19f405a4 | ||
|
|
dbebf68f18 |
@@ -65,6 +65,7 @@ from unshackle.core.utils import tags
|
|||||||
from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE,
|
from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE,
|
||||||
ContextData, MultipleChoice, SubtitleCodecChoice, VideoCodecChoice)
|
ContextData, MultipleChoice, SubtitleCodecChoice, VideoCodecChoice)
|
||||||
from unshackle.core.utils.collections import merge_dict
|
from unshackle.core.utils.collections import merge_dict
|
||||||
|
from unshackle.core.utils.selector import select_multiple
|
||||||
from unshackle.core.utils.subprocess import ffprobe
|
from unshackle.core.utils.subprocess import ffprobe
|
||||||
from unshackle.core.vaults import Vaults
|
from unshackle.core.vaults import Vaults
|
||||||
|
|
||||||
@@ -346,6 +347,12 @@ class dl:
|
|||||||
default=None,
|
default=None,
|
||||||
help="Create separate output files per audio codec instead of merging all audio.",
|
help="Create separate output files per audio codec instead of merging all audio.",
|
||||||
)
|
)
|
||||||
|
@click.option(
|
||||||
|
"--select-titles",
|
||||||
|
is_flag=True,
|
||||||
|
default=False,
|
||||||
|
help="Interactively select downloads from a list. Only use with Series to select Episodes",
|
||||||
|
)
|
||||||
@click.option(
|
@click.option(
|
||||||
"-w",
|
"-w",
|
||||||
"--wanted",
|
"--wanted",
|
||||||
@@ -859,6 +866,7 @@ class dl:
|
|||||||
range_: list[Video.Range],
|
range_: list[Video.Range],
|
||||||
channels: float,
|
channels: float,
|
||||||
no_atmos: bool,
|
no_atmos: bool,
|
||||||
|
select_titles: bool,
|
||||||
wanted: list[str],
|
wanted: list[str],
|
||||||
latest_episode: bool,
|
latest_episode: bool,
|
||||||
lang: list[str],
|
lang: list[str],
|
||||||
@@ -1047,6 +1055,84 @@ class dl:
|
|||||||
if list_titles:
|
if list_titles:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Enables manual selection for Series when --select-titles is set
|
||||||
|
if select_titles and type(titles) == Series:
|
||||||
|
console.print(Padding(Rule(f"[rule.text]Select Titles"), (1, 2)))
|
||||||
|
|
||||||
|
selection_titles = []
|
||||||
|
dependencies = {}
|
||||||
|
original_indices = {}
|
||||||
|
|
||||||
|
current_season = None
|
||||||
|
current_season_header_idx = -1
|
||||||
|
|
||||||
|
unique_seasons = {t.season for t in titles}
|
||||||
|
multiple_seasons = len(unique_seasons) > 1
|
||||||
|
|
||||||
|
# Build selection options
|
||||||
|
for i, t in enumerate(titles):
|
||||||
|
# Insert season header only if multiple seasons exist
|
||||||
|
if multiple_seasons and t.season != current_season:
|
||||||
|
current_season = t.season
|
||||||
|
header_text = f"Season {t.season}"
|
||||||
|
selection_titles.append(header_text)
|
||||||
|
current_season_header_idx = len(selection_titles) - 1
|
||||||
|
dependencies[current_season_header_idx] = []
|
||||||
|
# Note: Headers are not mapped to actual title indices
|
||||||
|
|
||||||
|
# Format display name
|
||||||
|
display_name = ((t.name[:35].rstrip() + "…") if len(t.name) > 35 else t.name) if t.name else None
|
||||||
|
|
||||||
|
# Apply indentation only for multiple seasons
|
||||||
|
prefix = " " if multiple_seasons else ""
|
||||||
|
option_text = (
|
||||||
|
f"{prefix}{t.number}" + (f". {display_name}" if t.name else "")
|
||||||
|
)
|
||||||
|
|
||||||
|
selection_titles.append(option_text)
|
||||||
|
current_ui_idx = len(selection_titles) - 1
|
||||||
|
|
||||||
|
# Map UI index to actual title index
|
||||||
|
original_indices[current_ui_idx] = i
|
||||||
|
|
||||||
|
# Link episode to season header for group selection
|
||||||
|
if current_season_header_idx != -1:
|
||||||
|
dependencies[current_season_header_idx].append(current_ui_idx)
|
||||||
|
|
||||||
|
selection_start = time.time()
|
||||||
|
|
||||||
|
# Execute selector with dependencies (headers select all children)
|
||||||
|
selected_ui_idx = select_multiple(
|
||||||
|
selection_titles,
|
||||||
|
minimal_count=1,
|
||||||
|
page_size=8,
|
||||||
|
return_indices=True,
|
||||||
|
dependencies=dependencies
|
||||||
|
)
|
||||||
|
|
||||||
|
selection_end = time.time()
|
||||||
|
start_time += (selection_end - selection_start)
|
||||||
|
|
||||||
|
# Map UI indices back to title indices (excluding headers)
|
||||||
|
selected_idx = []
|
||||||
|
for idx in selected_ui_idx:
|
||||||
|
if idx in original_indices:
|
||||||
|
selected_idx.append(original_indices[idx])
|
||||||
|
|
||||||
|
# Ensure indices are unique and ordered
|
||||||
|
selected_idx = sorted(set(selected_idx))
|
||||||
|
keep = set(selected_idx)
|
||||||
|
|
||||||
|
# In-place filter: remove unselected items (iterate backwards)
|
||||||
|
for i in range(len(titles) - 1, -1, -1):
|
||||||
|
if i not in keep:
|
||||||
|
del titles[i]
|
||||||
|
|
||||||
|
# Show selected count
|
||||||
|
if titles:
|
||||||
|
count = len(titles)
|
||||||
|
console.print(Padding(f"[text]Total selected: {count}[/]", (0, 5)))
|
||||||
|
|
||||||
# Determine the latest episode if --latest-episode is set
|
# Determine the latest episode if --latest-episode is set
|
||||||
latest_episode_id = None
|
latest_episode_id = None
|
||||||
if latest_episode and isinstance(titles, Series) and len(titles) > 0:
|
if latest_episode and isinstance(titles, Series) and len(titles) > 0:
|
||||||
|
|||||||
@@ -192,8 +192,10 @@ def build_download_args(
|
|||||||
if ad_keyword:
|
if ad_keyword:
|
||||||
args["--ad-keyword"] = ad_keyword
|
args["--ad-keyword"] = ad_keyword
|
||||||
|
|
||||||
|
key_args = []
|
||||||
if content_keys:
|
if content_keys:
|
||||||
args["--key"] = next((f"{kid.hex}:{key.lower()}" for kid, key in content_keys.items()), None)
|
for kid, key in content_keys.items():
|
||||||
|
key_args.extend(["--key", f"{kid.hex}:{key.lower()}"])
|
||||||
|
|
||||||
decryption_config = config.decryption.lower()
|
decryption_config = config.decryption.lower()
|
||||||
engine_name = DECRYPTION_ENGINE.get(decryption_config) or "SHAKA_PACKAGER"
|
engine_name = DECRYPTION_ENGINE.get(decryption_config) or "SHAKA_PACKAGER"
|
||||||
@@ -221,6 +223,9 @@ def build_download_args(
|
|||||||
elif value is not False and value is not None:
|
elif value is not False and value is not None:
|
||||||
command.extend([flag, str(value)])
|
command.extend([flag, str(value)])
|
||||||
|
|
||||||
|
# Append all content keys (multiple --key flags supported by N_m3u8DL-RE)
|
||||||
|
command.extend(key_args)
|
||||||
|
|
||||||
if headers:
|
if headers:
|
||||||
for key, value in headers.items():
|
for key, value in headers.items():
|
||||||
if key.lower() not in ("accept-encoding", "cookie"):
|
if key.lower() not in ("accept-encoding", "cookie"):
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import random
|
||||||
|
import re
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -8,7 +10,7 @@ from pathlib import Path
|
|||||||
from rich.padding import Padding
|
from rich.padding import Padding
|
||||||
from rich.rule import Rule
|
from rich.rule import Rule
|
||||||
|
|
||||||
from unshackle.core.binaries import FFMPEG, DoviTool, HDR10PlusTool
|
from unshackle.core.binaries import FFMPEG, DoviTool, FFProbe, HDR10PlusTool
|
||||||
from unshackle.core.config import config
|
from unshackle.core.config import config
|
||||||
from unshackle.core.console import console
|
from unshackle.core.console import console
|
||||||
|
|
||||||
@@ -89,12 +91,18 @@ class Hybrid:
|
|||||||
self.extract_rpu(dv_video)
|
self.extract_rpu(dv_video)
|
||||||
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
|
if os.path.isfile(config.directories.temp / "RPU_UNT.bin"):
|
||||||
self.rpu_file = "RPU_UNT.bin"
|
self.rpu_file = "RPU_UNT.bin"
|
||||||
self.level_6()
|
|
||||||
# Mode 3 conversion already done during extraction when not untouched
|
# Mode 3 conversion already done during extraction when not untouched
|
||||||
elif os.path.isfile(config.directories.temp / "RPU.bin"):
|
elif os.path.isfile(config.directories.temp / "RPU.bin"):
|
||||||
# RPU already extracted with mode 3
|
# RPU already extracted with mode 3
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Edit L6 with actual luminance values from RPU, then L5 active area
|
||||||
|
self.level_6()
|
||||||
|
hdr10_video = next((v for v in videos if v.range == Video.Range.HDR10), None)
|
||||||
|
hdr10_input = hdr10_video.path if hdr10_video else None
|
||||||
|
if hdr10_input:
|
||||||
|
self.level_5(hdr10_input)
|
||||||
|
|
||||||
self.injecting()
|
self.injecting()
|
||||||
|
|
||||||
self.log.info("✓ Injection Completed")
|
self.log.info("✓ Injection Completed")
|
||||||
@@ -104,6 +112,10 @@ class Hybrid:
|
|||||||
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
|
Path.unlink(config.directories.temp / "HDR10.hevc", missing_ok=True)
|
||||||
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
|
Path.unlink(config.directories.temp / "DV.hevc", missing_ok=True)
|
||||||
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
|
Path.unlink(config.directories.temp / f"{self.rpu_file}", missing_ok=True)
|
||||||
|
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
|
||||||
|
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
|
||||||
|
Path.unlink(config.directories.temp / "L5.json", missing_ok=True)
|
||||||
|
Path.unlink(config.directories.temp / "L6.json", missing_ok=True)
|
||||||
|
|
||||||
def ffmpeg_simple(self, save_path, output):
|
def ffmpeg_simple(self, save_path, output):
|
||||||
"""Simple ffmpeg execution without progress tracking"""
|
"""Simple ffmpeg execution without progress tracking"""
|
||||||
@@ -172,47 +184,225 @@ class Hybrid:
|
|||||||
|
|
||||||
self.log.info(f"Extracted{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
|
self.log.info(f"Extracted{' untouched ' if untouched else ' '}RPU from Dolby Vision stream")
|
||||||
|
|
||||||
def level_6(self):
|
def level_5(self, input_video):
|
||||||
"""Edit RPU Level 6 values"""
|
"""Generate Level 5 active area metadata via crop detection on the HDR10 stream.
|
||||||
with open(config.directories.temp / "L6.json", "w+") as level6_file:
|
|
||||||
level6 = {
|
|
||||||
"cm_version": "V29",
|
|
||||||
"length": 0,
|
|
||||||
"level6": {
|
|
||||||
"max_display_mastering_luminance": 1000,
|
|
||||||
"min_display_mastering_luminance": 1,
|
|
||||||
"max_content_light_level": 0,
|
|
||||||
"max_frame_average_light_level": 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
json.dump(level6, level6_file, indent=3)
|
This resolves mismatches where DV has no black bars but HDR10 does (or vice versa)
|
||||||
|
by telling the display the correct active area.
|
||||||
|
"""
|
||||||
|
if os.path.isfile(config.directories.temp / "RPU_L5.bin"):
|
||||||
|
return
|
||||||
|
|
||||||
if not os.path.isfile(config.directories.temp / "RPU_L6.bin"):
|
ffprobe_bin = str(FFProbe) if FFProbe else "ffprobe"
|
||||||
with console.status("Editing RPU Level 6 values...", spinner="dots"):
|
ffmpeg_bin = str(FFMPEG) if FFMPEG else "ffmpeg"
|
||||||
level6 = subprocess.run(
|
|
||||||
|
# Get video duration for random sampling
|
||||||
|
with console.status("Detecting active area (crop detection)...", spinner="dots"):
|
||||||
|
result_duration = subprocess.run(
|
||||||
|
[ffprobe_bin, "-v", "error", "-show_entries", "format=duration", "-of", "json", str(input_video)],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result_duration.returncode != 0:
|
||||||
|
self.log.warning("Could not probe video duration, skipping L5 crop detection")
|
||||||
|
return
|
||||||
|
|
||||||
|
duration_info = json.loads(result_duration.stdout)
|
||||||
|
duration = float(duration_info["format"]["duration"])
|
||||||
|
|
||||||
|
# Get video resolution for proper border calculation
|
||||||
|
result_streams = subprocess.run(
|
||||||
|
[
|
||||||
|
ffprobe_bin,
|
||||||
|
"-v",
|
||||||
|
"error",
|
||||||
|
"-select_streams",
|
||||||
|
"v:0",
|
||||||
|
"-show_entries",
|
||||||
|
"stream=width,height",
|
||||||
|
"-of",
|
||||||
|
"json",
|
||||||
|
str(input_video),
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result_streams.returncode != 0:
|
||||||
|
self.log.warning("Could not probe video resolution, skipping L5 crop detection")
|
||||||
|
return
|
||||||
|
|
||||||
|
stream_info = json.loads(result_streams.stdout)
|
||||||
|
original_width = int(stream_info["streams"][0]["width"])
|
||||||
|
original_height = int(stream_info["streams"][0]["height"])
|
||||||
|
|
||||||
|
# Sample 10 random timestamps and run cropdetect on each
|
||||||
|
random_times = sorted(random.uniform(0, duration) for _ in range(10))
|
||||||
|
|
||||||
|
crop_results = []
|
||||||
|
for t in random_times:
|
||||||
|
result_cropdetect = subprocess.run(
|
||||||
[
|
[
|
||||||
str(DoviTool),
|
ffmpeg_bin,
|
||||||
"editor",
|
"-y",
|
||||||
|
"-nostdin",
|
||||||
|
"-loglevel",
|
||||||
|
"info",
|
||||||
|
"-ss",
|
||||||
|
f"{t:.2f}",
|
||||||
"-i",
|
"-i",
|
||||||
config.directories.temp / self.rpu_file,
|
str(input_video),
|
||||||
"-j",
|
"-vf",
|
||||||
config.directories.temp / "L6.json",
|
"cropdetect=round=2",
|
||||||
"-o",
|
"-vframes",
|
||||||
config.directories.temp / "RPU_L6.bin",
|
"10",
|
||||||
|
"-f",
|
||||||
|
"null",
|
||||||
|
"-",
|
||||||
],
|
],
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=subprocess.PIPE,
|
||||||
|
text=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
if level6.returncode:
|
# cropdetect outputs crop=w:h:x:y
|
||||||
Path.unlink(config.directories.temp / "RPU_L6.bin")
|
crop_match = re.search(
|
||||||
raise ValueError("Failed editing RPU Level 6 values")
|
r"crop=(\d+):(\d+):(\d+):(\d+)",
|
||||||
|
(result_cropdetect.stdout or "") + (result_cropdetect.stderr or ""),
|
||||||
|
)
|
||||||
|
if crop_match:
|
||||||
|
w, h = int(crop_match.group(1)), int(crop_match.group(2))
|
||||||
|
x, y = int(crop_match.group(3)), int(crop_match.group(4))
|
||||||
|
# Calculate actual border sizes from crop geometry
|
||||||
|
left = x
|
||||||
|
top = y
|
||||||
|
right = original_width - w - x
|
||||||
|
bottom = original_height - h - y
|
||||||
|
crop_results.append((left, top, right, bottom))
|
||||||
|
|
||||||
self.log.info("Edited RPU Level 6 values")
|
if not crop_results:
|
||||||
|
self.log.warning("No crop data detected, skipping L5")
|
||||||
|
return
|
||||||
|
|
||||||
# Update rpu_file to use the edited version
|
# Find the most common crop values
|
||||||
self.rpu_file = "RPU_L6.bin"
|
crop_counts = {}
|
||||||
|
for crop in crop_results:
|
||||||
|
crop_counts[crop] = crop_counts.get(crop, 0) + 1
|
||||||
|
most_common = max(crop_counts, key=crop_counts.get)
|
||||||
|
left, top, right, bottom = most_common
|
||||||
|
|
||||||
|
# If all borders are 0 there's nothing to correct
|
||||||
|
if left == 0 and top == 0 and right == 0 and bottom == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
l5_json = {
|
||||||
|
"active_area": {
|
||||||
|
"crop": False,
|
||||||
|
"presets": [{"id": 0, "left": left, "right": right, "top": top, "bottom": bottom}],
|
||||||
|
"edits": {"all": 0},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
l5_path = config.directories.temp / "L5.json"
|
||||||
|
with open(l5_path, "w") as f:
|
||||||
|
json.dump(l5_json, f, indent=4)
|
||||||
|
|
||||||
|
with console.status("Editing RPU Level 5 active area...", spinner="dots"):
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
str(DoviTool),
|
||||||
|
"editor",
|
||||||
|
"-i",
|
||||||
|
str(config.directories.temp / self.rpu_file),
|
||||||
|
"-j",
|
||||||
|
str(l5_path),
|
||||||
|
"-o",
|
||||||
|
str(config.directories.temp / "RPU_L5.bin"),
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode:
|
||||||
|
Path.unlink(config.directories.temp / "RPU_L5.bin", missing_ok=True)
|
||||||
|
raise ValueError("Failed editing RPU Level 5 values")
|
||||||
|
|
||||||
|
self.rpu_file = "RPU_L5.bin"
|
||||||
|
|
||||||
|
def level_6(self):
|
||||||
|
"""Edit RPU Level 6 values using actual luminance data from the RPU."""
|
||||||
|
if os.path.isfile(config.directories.temp / "RPU_L6.bin"):
|
||||||
|
return
|
||||||
|
|
||||||
|
with console.status("Reading RPU luminance metadata...", spinner="dots"):
|
||||||
|
result = subprocess.run(
|
||||||
|
[str(DoviTool), "info", "-i", str(config.directories.temp / self.rpu_file), "-s"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
raise ValueError("Failed reading RPU metadata for Level 6 values")
|
||||||
|
|
||||||
|
max_cll = None
|
||||||
|
max_fall = None
|
||||||
|
max_mdl = None
|
||||||
|
min_mdl = None
|
||||||
|
|
||||||
|
for line in result.stdout.splitlines():
|
||||||
|
if "RPU content light level (L1):" in line:
|
||||||
|
parts = line.split("MaxCLL:")[1].split(",")
|
||||||
|
max_cll = int(float(parts[0].strip().split()[0]))
|
||||||
|
if len(parts) > 1 and "MaxFALL:" in parts[1]:
|
||||||
|
max_fall = int(float(parts[1].split("MaxFALL:")[1].strip().split()[0]))
|
||||||
|
elif "RPU mastering display:" in line:
|
||||||
|
mastering = line.split(":", 1)[1].strip()
|
||||||
|
min_lum, max_lum = mastering.split("/")[0], mastering.split("/")[1].split(" ")[0]
|
||||||
|
min_mdl = int(float(min_lum) * 10000)
|
||||||
|
max_mdl = int(float(max_lum))
|
||||||
|
|
||||||
|
if any(v is None for v in (max_cll, max_fall, max_mdl, min_mdl)):
|
||||||
|
raise ValueError("Could not extract Level 6 luminance data from RPU")
|
||||||
|
|
||||||
|
level6_data = {
|
||||||
|
"level6": {
|
||||||
|
"remove_cmv4": False,
|
||||||
|
"remove_mapping": False,
|
||||||
|
"max_display_mastering_luminance": max_mdl,
|
||||||
|
"min_display_mastering_luminance": min_mdl,
|
||||||
|
"max_content_light_level": max_cll,
|
||||||
|
"max_frame_average_light_level": max_fall,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
l6_path = config.directories.temp / "L6.json"
|
||||||
|
with open(l6_path, "w") as f:
|
||||||
|
json.dump(level6_data, f, indent=4)
|
||||||
|
|
||||||
|
with console.status("Editing RPU Level 6 values...", spinner="dots"):
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
str(DoviTool),
|
||||||
|
"editor",
|
||||||
|
"-i",
|
||||||
|
str(config.directories.temp / self.rpu_file),
|
||||||
|
"-j",
|
||||||
|
str(l6_path),
|
||||||
|
"-o",
|
||||||
|
str(config.directories.temp / "RPU_L6.bin"),
|
||||||
|
],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode:
|
||||||
|
Path.unlink(config.directories.temp / "RPU_L6.bin", missing_ok=True)
|
||||||
|
raise ValueError("Failed editing RPU Level 6 values")
|
||||||
|
|
||||||
|
self.rpu_file = "RPU_L6.bin"
|
||||||
|
|
||||||
def injecting(self):
|
def injecting(self):
|
||||||
if os.path.isfile(config.directories.temp / self.hevc_file):
|
if os.path.isfile(config.directories.temp / self.hevc_file):
|
||||||
|
|||||||
269
unshackle/core/utils/selector.py
Normal file
269
unshackle/core/utils/selector.py
Normal file
@@ -0,0 +1,269 @@
|
|||||||
|
import click
|
||||||
|
import sys
|
||||||
|
from rich.console import Group
|
||||||
|
from rich.live import Live
|
||||||
|
from rich.padding import Padding
|
||||||
|
from rich.table import Table
|
||||||
|
from rich.text import Text
|
||||||
|
from unshackle.core.console import console
|
||||||
|
|
||||||
|
IS_WINDOWS = sys.platform == "win32"
|
||||||
|
if IS_WINDOWS:
|
||||||
|
import msvcrt
|
||||||
|
|
||||||
|
class Selector:
|
||||||
|
"""
|
||||||
|
A custom interactive selector class using the Rich library.
|
||||||
|
Allows for multi-selection of items with pagination.
|
||||||
|
"""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
options: list[str],
|
||||||
|
cursor_style: str = "pink",
|
||||||
|
text_style: str = "text",
|
||||||
|
page_size: int = 8,
|
||||||
|
minimal_count: int = 0,
|
||||||
|
dependencies: dict[int, list[int]] = None,
|
||||||
|
prefixes: list[str] = None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize the Selector.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
options: List of strings to select from.
|
||||||
|
cursor_style: Rich style for the highlighted cursor item.
|
||||||
|
text_style: Rich style for normal items.
|
||||||
|
page_size: Number of items to show per page.
|
||||||
|
minimal_count: Minimum number of items that must be selected.
|
||||||
|
dependencies: Dictionary mapping parent index to list of child indices.
|
||||||
|
"""
|
||||||
|
self.options = options
|
||||||
|
self.cursor_style = cursor_style
|
||||||
|
self.text_style = text_style
|
||||||
|
self.page_size = page_size
|
||||||
|
self.minimal_count = minimal_count
|
||||||
|
self.dependencies = dependencies or {}
|
||||||
|
|
||||||
|
self.cursor_index = 0
|
||||||
|
self.selected_indices = set()
|
||||||
|
self.scroll_offset = 0
|
||||||
|
|
||||||
|
def get_renderable(self):
|
||||||
|
"""
|
||||||
|
Constructs and returns the renderable object (Table + Info) for the current state.
|
||||||
|
"""
|
||||||
|
table = Table(show_header=False, show_edge=False, box=None, pad_edge=False, padding=(0, 1, 0, 0))
|
||||||
|
table.add_column("Indicator", justify="right", no_wrap=True)
|
||||||
|
table.add_column("Option", overflow="ellipsis", no_wrap=True)
|
||||||
|
|
||||||
|
for i in range(self.page_size):
|
||||||
|
idx = self.scroll_offset + i
|
||||||
|
|
||||||
|
if idx < len(self.options):
|
||||||
|
option = self.options[idx]
|
||||||
|
is_cursor = (idx == self.cursor_index)
|
||||||
|
is_selected = (idx in self.selected_indices)
|
||||||
|
|
||||||
|
symbol = "[X]" if is_selected else "[ ]"
|
||||||
|
style = self.cursor_style if is_cursor else self.text_style
|
||||||
|
indicator_text = Text(f"{symbol}", style=style)
|
||||||
|
|
||||||
|
content_text = Text.from_markup(option)
|
||||||
|
content_text.style = style
|
||||||
|
|
||||||
|
table.add_row(indicator_text, content_text)
|
||||||
|
else:
|
||||||
|
table.add_row(Text(" "), Text(" "))
|
||||||
|
|
||||||
|
total_pages = (len(self.options) + self.page_size - 1) // self.page_size
|
||||||
|
current_page = (self.scroll_offset // self.page_size) + 1
|
||||||
|
|
||||||
|
info_text = Text(
|
||||||
|
f"\n[Space]: Toggle [a]: All [←/→]: Page [Enter]: Confirm (Page {current_page}/{total_pages})",
|
||||||
|
style="gray"
|
||||||
|
)
|
||||||
|
|
||||||
|
return Padding(Group(table, info_text), (0, 5))
|
||||||
|
|
||||||
|
def move_cursor(self, delta: int):
|
||||||
|
"""
|
||||||
|
Moves the cursor up or down by the specified delta.
|
||||||
|
Updates the scroll offset if the cursor moves out of the current view.
|
||||||
|
"""
|
||||||
|
self.cursor_index = (self.cursor_index + delta) % len(self.options)
|
||||||
|
new_page_idx = self.cursor_index // self.page_size
|
||||||
|
self.scroll_offset = new_page_idx * self.page_size
|
||||||
|
|
||||||
|
def change_page(self, delta: int):
|
||||||
|
"""
|
||||||
|
Changes the current page view by the specified delta (previous/next page).
|
||||||
|
Also moves the cursor to the first item of the new page.
|
||||||
|
"""
|
||||||
|
current_page = self.scroll_offset // self.page_size
|
||||||
|
total_pages = (len(self.options) + self.page_size - 1) // self.page_size
|
||||||
|
new_page = current_page + delta
|
||||||
|
|
||||||
|
if 0 <= new_page < total_pages:
|
||||||
|
self.scroll_offset = new_page * self.page_size
|
||||||
|
first_idx_of_page = self.scroll_offset
|
||||||
|
if first_idx_of_page < len(self.options):
|
||||||
|
self.cursor_index = first_idx_of_page
|
||||||
|
else:
|
||||||
|
self.cursor_index = len(self.options) - 1
|
||||||
|
|
||||||
|
def toggle_selection(self):
|
||||||
|
"""
|
||||||
|
Toggles the selection state of the item currently under the cursor.
|
||||||
|
Propagates selection to children if defined in dependencies.
|
||||||
|
"""
|
||||||
|
target_indices = {self.cursor_index}
|
||||||
|
|
||||||
|
if self.cursor_index in self.dependencies:
|
||||||
|
target_indices.update(self.dependencies[self.cursor_index])
|
||||||
|
|
||||||
|
should_select = self.cursor_index not in self.selected_indices
|
||||||
|
|
||||||
|
if should_select:
|
||||||
|
self.selected_indices.update(target_indices)
|
||||||
|
else:
|
||||||
|
self.selected_indices.difference_update(target_indices)
|
||||||
|
|
||||||
|
def toggle_all(self):
|
||||||
|
"""
|
||||||
|
Toggles the selection of all items.
|
||||||
|
If all are selected, clears selection. Otherwise, selects all.
|
||||||
|
"""
|
||||||
|
if len(self.selected_indices) == len(self.options):
|
||||||
|
self.selected_indices.clear()
|
||||||
|
else:
|
||||||
|
self.selected_indices = set(range(len(self.options)))
|
||||||
|
|
||||||
|
def get_input_windows(self):
|
||||||
|
"""
|
||||||
|
Captures and parses keyboard input on Windows systems using msvcrt.
|
||||||
|
Returns command strings like 'UP', 'DOWN', 'ENTER', etc.
|
||||||
|
"""
|
||||||
|
key = msvcrt.getch()
|
||||||
|
if key == b'\x03' or key == b'\x1b':
|
||||||
|
return 'CANCEL'
|
||||||
|
if key == b'\xe0' or key == b'\x00':
|
||||||
|
try:
|
||||||
|
key = msvcrt.getch()
|
||||||
|
if key == b'H': return 'UP'
|
||||||
|
if key == b'P': return 'DOWN'
|
||||||
|
if key == b'K': return 'LEFT'
|
||||||
|
if key == b'M': return 'RIGHT'
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
try: char = key.decode('utf-8', errors='ignore')
|
||||||
|
except: return None
|
||||||
|
|
||||||
|
if char in ('\r', '\n'): return 'ENTER'
|
||||||
|
if char == ' ': return 'SPACE'
|
||||||
|
if char in ('q', 'Q'): return 'QUIT'
|
||||||
|
if char in ('a', 'A'): return 'ALL'
|
||||||
|
if char in ('w', 'W', 'k', 'K'): return 'UP'
|
||||||
|
if char in ('s', 'S', 'j', 'J'): return 'DOWN'
|
||||||
|
if char in ('h', 'H'): return 'LEFT'
|
||||||
|
if char in ('d', 'D', 'l', 'L'): return 'RIGHT'
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_input_unix(self):
|
||||||
|
"""
|
||||||
|
Captures and parses keyboard input on Unix/Linux systems using click.getchar().
|
||||||
|
Returns command strings like 'UP', 'DOWN', 'ENTER', etc.
|
||||||
|
"""
|
||||||
|
char = click.getchar()
|
||||||
|
if char == '\x03':
|
||||||
|
return 'CANCEL'
|
||||||
|
mapping = {
|
||||||
|
'\x1b[A': 'UP',
|
||||||
|
'\x1b[B': 'DOWN',
|
||||||
|
'\x1b[C': 'RIGHT',
|
||||||
|
'\x1b[D': 'LEFT',
|
||||||
|
}
|
||||||
|
if char in mapping:
|
||||||
|
return mapping[char]
|
||||||
|
if char == '\x1b':
|
||||||
|
try:
|
||||||
|
next1 = click.getchar()
|
||||||
|
if next1 in ('[', 'O'):
|
||||||
|
next2 = click.getchar()
|
||||||
|
if next2 == 'A': return 'UP'
|
||||||
|
if next2 == 'B': return 'DOWN'
|
||||||
|
if next2 == 'C': return 'RIGHT'
|
||||||
|
if next2 == 'D': return 'LEFT'
|
||||||
|
return 'CANCEL'
|
||||||
|
except:
|
||||||
|
return 'CANCEL'
|
||||||
|
|
||||||
|
if char in ('\r', '\n'): return 'ENTER'
|
||||||
|
if char == ' ': return 'SPACE'
|
||||||
|
if char in ('q', 'Q'): return 'QUIT'
|
||||||
|
if char in ('a', 'A'): return 'ALL'
|
||||||
|
if char in ('w', 'W', 'k', 'K'): return 'UP'
|
||||||
|
if char in ('s', 'S', 'j', 'J'): return 'DOWN'
|
||||||
|
if char in ('h', 'H'): return 'LEFT'
|
||||||
|
if char in ('d', 'D', 'l', 'L'): return 'RIGHT'
|
||||||
|
return None
|
||||||
|
|
||||||
|
def run(self) -> list[int]:
|
||||||
|
"""
|
||||||
|
Starts the main event loop for the selector.
|
||||||
|
Renders the UI and processes input until confirmed or cancelled.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[int]: A sorted list of selected indices.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with Live(self.get_renderable(), console=console, auto_refresh=False, transient=True) as live:
|
||||||
|
while True:
|
||||||
|
live.update(self.get_renderable(), refresh=True)
|
||||||
|
if IS_WINDOWS: action = self.get_input_windows()
|
||||||
|
else: action = self.get_input_unix()
|
||||||
|
|
||||||
|
if action == 'UP': self.move_cursor(-1)
|
||||||
|
elif action == 'DOWN': self.move_cursor(1)
|
||||||
|
elif action == 'LEFT': self.change_page(-1)
|
||||||
|
elif action == 'RIGHT': self.change_page(1)
|
||||||
|
elif action == 'SPACE': self.toggle_selection()
|
||||||
|
elif action == 'ALL': self.toggle_all()
|
||||||
|
elif action in ('ENTER', 'QUIT'):
|
||||||
|
if len(self.selected_indices) >= self.minimal_count:
|
||||||
|
return sorted(list(self.selected_indices))
|
||||||
|
elif action == 'CANCEL': raise KeyboardInterrupt
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def select_multiple(
|
||||||
|
options: list[str],
|
||||||
|
minimal_count: int = 1,
|
||||||
|
page_size: int = 8,
|
||||||
|
return_indices: bool = True,
|
||||||
|
cursor_style: str = "pink",
|
||||||
|
**kwargs
|
||||||
|
) -> list[int]:
|
||||||
|
"""
|
||||||
|
Drop-in replacement using custom Selector with global console.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
options: List of options to display.
|
||||||
|
minimal_count: Minimum number of selections required.
|
||||||
|
page_size: Number of items per page.
|
||||||
|
return_indices: If True, returns indices; otherwise returns the option strings.
|
||||||
|
cursor_style: Style color for the cursor.
|
||||||
|
"""
|
||||||
|
selector = Selector(
|
||||||
|
options=options,
|
||||||
|
cursor_style=cursor_style,
|
||||||
|
text_style="text",
|
||||||
|
page_size=page_size,
|
||||||
|
minimal_count=minimal_count,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
selected_indices = selector.run()
|
||||||
|
|
||||||
|
if return_indices:
|
||||||
|
return selected_indices
|
||||||
|
return [options[i] for i in selected_indices]
|
||||||
Reference in New Issue
Block a user