4 Commits

13 changed files with 168 additions and 1701 deletions

91
.github/workflows/release.yml vendored Normal file
View File

@@ -0,0 +1,91 @@
name: Release
on:
workflow_dispatch:
push:
branches: [main]
paths:
- "pyproject.toml"
permissions:
contents: write
jobs:
check-version:
runs-on: ubuntu-latest
outputs:
should_release: ${{ steps.version_check.outputs.should_release }}
new_version: ${{ steps.version_check.outputs.new_version }}
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check for major/minor version bump
id: version_check
run: |
NEW_VERSION=$(grep -m1 '^version' pyproject.toml | sed 's/version = "\(.*\)"/\1/')
echo "Detected version in pyproject.toml: $NEW_VERSION"
LATEST_TAG=$(git tag --list | grep -E '^[0-9]+\.[0-9]+\.[0-9]+$' | sort -V | tail -1)
echo "Latest git tag: $LATEST_TAG"
if [ -z "$LATEST_TAG" ]; then
echo "No previous tag found, treating as new release"
echo "should_release=true" >> "$GITHUB_OUTPUT"
echo "new_version=$NEW_VERSION" >> "$GITHUB_OUTPUT"
exit 0
fi
OLD_MAJOR=$(echo "$LATEST_TAG" | cut -d. -f1)
OLD_MINOR=$(echo "$LATEST_TAG" | cut -d. -f2)
NEW_MAJOR=$(echo "$NEW_VERSION" | cut -d. -f1)
NEW_MINOR=$(echo "$NEW_VERSION" | cut -d. -f2)
if [ "$NEW_MAJOR" -gt "$OLD_MAJOR" ] || [ "$NEW_MINOR" -gt "$OLD_MINOR" ]; then
echo "Major or minor version bump detected: $LATEST_TAG -> $NEW_VERSION"
echo "should_release=true" >> "$GITHUB_OUTPUT"
else
echo "Patch-only change ($LATEST_TAG -> $NEW_VERSION), skipping release"
echo "should_release=false" >> "$GITHUB_OUTPUT"
fi
echo "new_version=$NEW_VERSION" >> "$GITHUB_OUTPUT"
release:
needs: check-version
if: needs.check-version.outputs.should_release == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Set up Python
run: uv python install 3.12
- name: Install dependencies and build
run: |
uv sync
uv build
- name: Extract changelog for release
id: changelog
run: |
VERSION=${{ needs.check-version.outputs.new_version }}
# Extract the section for this version from CHANGELOG.md
awk "/^## \[$VERSION\]/{found=1; next} /^## \[/{if(found) exit} found{print}" CHANGELOG.md > release_notes.md
echo "Extracted release notes:"
cat release_notes.md
- name: Create GitHub Release
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
VERSION=${{ needs.check-version.outputs.new_version }}
gh release create "$VERSION" \
--title "$VERSION" \
--notes-file release_notes.md \
dist/unshackle-${VERSION}-py3-none-any.whl \
dist/unshackle-${VERSION}.tar.gz

View File

@@ -6,7 +6,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This changelog is automatically generated using [git-cliff](https://git-cliff.org).
## [Unreleased]
## [4.0.0] - 2026-03-17
### Features
@@ -19,6 +19,11 @@ This changelog is automatically generated using [git-cliff](https://git-cliff.or
- *tracks*: Add edition tags to output filenames
- *templates*: [**breaking**] Add customizable output filename templates
- *templates*: Add configurable language tagging rule engine
- Update unshackle version to 4.0.0
- *dl*: Add --animeapi and --enrich options for anime metadata and tagging
- *dl*: Add skip messages for --no-audio and --no-chapters flags
- *dl*: Extract closed captions from HLS manifests and improve CC extraction
- *dl*: Add --worst flag and SHIELD OkHttp fingerprint preset
### Bug Fixes
@@ -33,6 +38,13 @@ This changelog is automatically generated using [git-cliff](https://git-cliff.or
- *n_m3u8dl_re*: Disable segment count validation for duration-based DASH
- Correct formatting and add missing newlines in selector and EXAMPLE service
- *dependencies*: Update pyplayready version to 0.8.3 and adjust dependencies
- *drm*: Update PlayReady KID extraction for pyplayready 0.8.3 compatibility
- *api*: Resolve Sentinel serialization, missing params, and add search endpoint
- *dash*: Pass period_filter to n_m3u8dl_re via filtered MPD file
- *title*: Add HDR Vivid Format HDR Tag
- *ism*: Prevent duplicate track IDs for audio tracks with same lang/codec/bitrate
- *aria2c*: Correct progress bar tracking for HLS downloads
- *dl*: Filter CC subtitle languages with --s-lang and extract all manifest CCs
### Documentation
@@ -45,10 +57,6 @@ This changelog is automatically generated using [git-cliff](https://git-cliff.or
- *example*: Migrate EXAMPLE service to track_request pattern
- *providers*: Extract metadata providers into modular system
### Maintenance
- *changelog*: Update changelog for upcoming release and reorganize sections
## [3.0.0] - 2026-02-15
### Features

View File

@@ -62,7 +62,7 @@ from unshackle.core.tracks import Audio, Subtitle, Tracks, Video
from unshackle.core.tracks.attachment import Attachment
from unshackle.core.tracks.hybrid import Hybrid
from unshackle.core.utilities import (find_font_with_fallbacks, get_debug_logger, get_system_fonts, init_debug_logger,
is_close_match, suggest_font_packages, time_elapsed_since)
is_close_match, is_exact_match, suggest_font_packages, time_elapsed_since)
from unshackle.core.utils import tags
from unshackle.core.utils.click_types import (AUDIO_CODEC_LIST, LANGUAGE_RANGE, QUALITY_LIST, SEASON_RANGE,
ContextData, MultipleChoice, MultipleVideoCodecChoice,
@@ -519,18 +519,6 @@ class dl:
default=False,
help="Continue with best available quality if requested resolutions are not available.",
)
@click.option(
"--remote",
is_flag=True,
default=False,
help="Use a remote unshackle server instead of local service code.",
)
@click.option(
"--server",
type=str,
default=None,
help="Name of the remote server from remote_services config (if multiple configured).",
)
@click.pass_context
def cli(ctx: click.Context, **kwargs: Any) -> dl:
return dl(ctx, **kwargs)
@@ -708,22 +696,19 @@ class dl:
self.log.info(f"Using profile: '{self.profile}'")
with console.status("Loading Service Config...", spinner="dots"):
self.service_config = {}
if not ctx.params.get("remote"):
try:
service_config_path = Services.get_path(self.service) / config.filenames.config
if service_config_path.exists():
self.service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
self.log.info("Service Config loaded")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="load_service_config",
service=self.service,
context={"config_path": str(service_config_path), "config": self.service_config},
)
except KeyError:
pass
service_config_path = Services.get_path(self.service) / config.filenames.config
if service_config_path.exists():
self.service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
self.log.info("Service Config loaded")
if self.debug_logger:
self.debug_logger.log(
level="DEBUG",
operation="load_service_config",
service=self.service,
context={"config_path": str(service_config_path), "config": self.service_config},
)
else:
self.service_config = {}
merge_dict(config.services.get(self.service), self.service_config)
if getattr(config, "downloader_map", None):
@@ -1727,8 +1712,6 @@ class dl:
f"Required languages found ({', '.join(require_subs)}), downloading all available subtitles"
)
elif s_lang and "all" not in s_lang:
from unshackle.core.utilities import is_exact_match
match_func = is_exact_match if exact_lang else is_close_match
missing_langs = [
@@ -2120,6 +2103,7 @@ class dl:
and not video_only
and not no_video
):
match_func = is_exact_match if exact_lang else is_close_match
for video_track_n, video_track in enumerate(title.tracks.videos):
has_manifest_cc = bool(getattr(video_track, "closed_captions", None))
has_eia_cc = (
@@ -2133,31 +2117,48 @@ class dl:
if not has_manifest_cc and not has_eia_cc:
continue
# Build list of CC entries to extract
if has_manifest_cc:
cc_entries = video_track.closed_captions
# Filter CC languages against --s-lang if specified
if s_lang and "all" not in s_lang:
cc_entries = [
entry for entry in cc_entries
if entry.get("language")
and match_func(Language.get(entry["language"]), s_lang)
]
if not cc_entries:
continue
else:
# EIA fallback: single entry with unknown language
cc_entries = [{}]
with console.status(f"Checking Video track {video_track_n + 1} for Closed Captions..."):
try:
cc_lang = (
Language.get(video_track.closed_captions[0]["language"])
if has_manifest_cc and video_track.closed_captions[0].get("language")
else title.language or video_track.language
)
track_id = f"ccextractor-{video_track.id}"
cc = video_track.ccextractor(
track_id=track_id,
out_path=config.directories.temp
/ config.filenames.subtitle.format(id=track_id, language=cc_lang),
language=cc_lang,
original=False,
)
if cc:
cc.cc = True
title.tracks.add(cc)
self.log.info(
f"Extracted a Closed Caption from Video track {video_track_n + 1}"
for cc_idx, cc_entry in enumerate(cc_entries):
cc_lang = (
Language.get(cc_entry["language"])
if cc_entry.get("language")
else title.language or video_track.language
)
else:
self.log.info(
f"No Closed Captions were found in Video track {video_track_n + 1}"
track_id = f"ccextractor-{video_track.id}-{cc_idx}"
cc = video_track.ccextractor(
track_id=track_id,
out_path=config.directories.temp
/ config.filenames.subtitle.format(id=track_id, language=cc_lang),
language=cc_lang,
original=False,
)
if cc:
cc.cc = True
title.tracks.add(cc)
self.log.info(
f"Extracted a Closed Caption ({cc_lang}) from Video track {video_track_n + 1}"
)
else:
self.log.info(
f"No Closed Captions were found in Video track {video_track_n + 1}"
)
except EnvironmentError:
self.log.error(
"Cannot extract Closed Captions as the ccextractor executable was not found..."
@@ -2500,13 +2501,10 @@ class dl:
Padding(f":tada: Title downloaded in [progress.elapsed]{title_dl_time}[/]!", (0, 5, 1, 5))
)
if not hasattr(service, "close"):
cookie_file = self.get_cookie_path(self.service, self.profile)
if cookie_file:
self.save_cookies(cookie_file, service.session.cookies)
if hasattr(service, "close"):
service.close()
# update cookies
cookie_file = self.get_cookie_path(self.service, self.profile)
if cookie_file:
self.save_cookies(cookie_file, service.session.cookies)
dl_time = time_elapsed_since(start_time)

View File

@@ -119,39 +119,15 @@ def serve(
config.serve["playready_devices"] = []
config.serve["playready_devices"].extend(list(config.directories.prds.glob("*.prd")))
@web.middleware
async def api_key_authentication(request: web.Request, handler) -> web.Response:
"""Authenticate API requests using X-Secret-Key header."""
secret_key = request.headers.get("X-Secret-Key")
if not secret_key:
return web.json_response({"status": 401, "message": "Secret Key is Empty."}, status=401)
if secret_key not in request.app["config"]["users"]:
return web.json_response({"status": 401, "message": "Secret Key is Invalid."}, status=401)
return await handler(request)
if api_only:
log.info("Starting REST API server (pywidevine/pyplayready CDM disabled)")
if no_key:
app = web.Application(middlewares=[cors_middleware])
app["config"] = {"users": {}}
else:
app = web.Application(middlewares=[cors_middleware, api_key_authentication])
app = web.Application(middlewares=[cors_middleware, pywidevine_serve.authentication])
app["config"] = {"users": {api_secret: {"devices": [], "username": "api_user"}}}
app["debug_api"] = debug_api
# Start session cleanup loop for remote-dl sessions
from unshackle.core.api.session_store import get_session_store
session_store = get_session_store()
async def start_session_cleanup(_app: web.Application) -> None:
await session_store.start_cleanup_loop()
async def stop_session_cleanup(_app: web.Application) -> None:
await session_store.stop_cleanup_loop()
app.on_startup.append(start_session_cleanup)
app.on_cleanup.append(stop_session_cleanup)
setup_routes(app)
setup_swagger(app)
log.info(f"REST API endpoints available at http://{host}:{port}/api/")
@@ -219,19 +195,6 @@ def serve(
app["config"] = serve_config
app["debug_api"] = debug_api
# Start session cleanup loop for remote-dl sessions
from unshackle.core.api.session_store import get_session_store
session_store = get_session_store()
async def start_session_cleanup(_app: web.Application) -> None:
await session_store.start_cleanup_loop()
async def stop_session_cleanup(_app: web.Application) -> None:
await session_store.stop_cleanup_loop()
app.on_startup.append(start_session_cleanup)
app.on_cleanup.append(stop_session_cleanup)
if serve_widevine:
app.on_startup.append(pywidevine_serve._startup)
app.on_cleanup.append(pywidevine_serve._cleanup)

View File

@@ -35,8 +35,6 @@ class APIErrorCode(str, Enum):
NOT_FOUND = "NOT_FOUND" # Resource not found (title, job, etc.)
NO_CONTENT = "NO_CONTENT" # No titles/tracks/episodes found
JOB_NOT_FOUND = "JOB_NOT_FOUND" # Download job doesn't exist
SESSION_NOT_FOUND = "SESSION_NOT_FOUND" # Remote-dl session doesn't exist or expired
TRACK_NOT_FOUND = "TRACK_NOT_FOUND" # Track ID not found in session
RATE_LIMITED = "RATE_LIMITED" # Service rate limiting
@@ -99,8 +97,6 @@ class APIError(Exception):
APIErrorCode.NOT_FOUND: 404,
APIErrorCode.NO_CONTENT: 404,
APIErrorCode.JOB_NOT_FOUND: 404,
APIErrorCode.SESSION_NOT_FOUND: 404,
APIErrorCode.TRACK_NOT_FOUND: 404,
# 429 Too Many Requests
APIErrorCode.RATE_LIMITED: 429,
# 500 Internal Server Error

View File

@@ -171,8 +171,6 @@ def validate_service(service_tag: str) -> Optional[str]:
def serialize_title(title: Title_T) -> Dict[str, Any]:
"""Convert a title object to JSON-serializable dict."""
title_language = str(title.language) if hasattr(title, "language") and title.language else None
if isinstance(title, Episode):
episode_name = title.name if title.name else f"Episode {title.number:02d}"
result = {
@@ -183,7 +181,6 @@ def serialize_title(title: Title_T) -> Dict[str, Any]:
"number": title.number,
"year": title.year,
"id": str(title.id) if hasattr(title, "id") else None,
"language": title_language,
}
elif isinstance(title, Movie):
result = {
@@ -191,14 +188,12 @@ def serialize_title(title: Title_T) -> Dict[str, Any]:
"name": str(title.name) if hasattr(title, "name") else str(title),
"year": title.year,
"id": str(title.id) if hasattr(title, "id") else None,
"language": title_language,
}
else:
result = {
"type": "other",
"name": str(title.name) if hasattr(title, "name") else str(title),
"id": str(title.id) if hasattr(title, "id") else None,
"language": title_language,
}
return result
@@ -1197,626 +1192,3 @@ async def cancel_download_job_handler(job_id: str, request: Optional[web.Request
context={"operation": "cancel_download_job", "job_id": job_id},
debug_mode=debug_mode,
)
# ---------------------------------------------------------------------------
# Remote-DL Session Handlers
# ---------------------------------------------------------------------------
def _create_service_instance(
normalized_service: str,
title_id: str,
data: Dict[str, Any],
proxy_param: Optional[str],
proxy_providers: list,
profile: Optional[str],
) -> Any:
"""Create and authenticate a service instance.
Supports client-sent credentials/cookies (for remote-dl) with fallback
to server-local config (for backward compatibility).
"""
import inspect
import click
import yaml
from unshackle.commands.dl import dl
from unshackle.core.config import config
from unshackle.core.credential import Credential
from unshackle.core.utils.click_types import ContextData
from unshackle.core.utils.collections import merge_dict
service_config_path = Services.get_path(normalized_service) / config.filenames.config
if service_config_path.exists():
service_config = yaml.safe_load(service_config_path.read_text(encoding="utf8"))
else:
service_config = {}
merge_dict(config.services.get(normalized_service), service_config)
@click.command()
@click.pass_context
def dummy_service(ctx: click.Context) -> None:
pass
ctx = click.Context(dummy_service)
ctx.obj = ContextData(config=service_config, cdm=None, proxy_providers=proxy_providers, profile=profile)
ctx.params = {"proxy": proxy_param, "no_proxy": data.get("no_proxy", False)}
service_module = Services.load(normalized_service)
dummy_service.name = normalized_service
dummy_service.params = [click.Argument([title_id], type=str)]
ctx.invoked_subcommand = normalized_service
service_ctx = click.Context(dummy_service, parent=ctx)
service_ctx.obj = ctx.obj
service_kwargs: Dict[str, Any] = {"title": title_id}
for key, value in data.items():
if key not in [
"service", "title_id", "profile", "season", "episode", "wanted",
"proxy", "no_proxy", "credentials", "cookies",
]:
service_kwargs[key] = value
service_init_params = inspect.signature(service_module.__init__).parameters
if hasattr(service_module, "cli") and hasattr(service_module.cli, "params"):
for param in service_module.cli.params:
if hasattr(param, "name") and param.name not in service_kwargs:
if hasattr(param, "default") and param.default is not None and not isinstance(param.default, enum.Enum):
service_kwargs[param.name] = param.default
for param_name, param_info in service_init_params.items():
if param_name not in service_kwargs and param_name not in ["self", "ctx"]:
if param_info.default is inspect.Parameter.empty:
if param_name == "meta_lang":
service_kwargs[param_name] = None
elif param_name == "movie":
service_kwargs[param_name] = False
else:
log.warning(f"Unknown required parameter '{param_name}' for service {normalized_service}")
filtered_kwargs = {k: v for k, v in service_kwargs.items() if k in service_init_params}
service_instance = service_module(service_ctx, **filtered_kwargs)
# Resolve credentials: client-sent > server-local
cred_data = data.get("credentials")
if cred_data and isinstance(cred_data, dict):
credential = Credential(
username=cred_data["username"],
password=cred_data["password"],
extra=cred_data.get("extra"),
)
else:
credential = dl.get_credentials(normalized_service, profile)
# Resolve cookies: client-sent > server-local
cookie_text = data.get("cookies")
if cookie_text and isinstance(cookie_text, str):
import tempfile
from http.cookiejar import MozillaCookieJar
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(cookie_text)
tmp_path = f.name
try:
cookies = MozillaCookieJar(tmp_path)
cookies.load(ignore_discard=True, ignore_expires=True)
finally:
import os
os.unlink(tmp_path)
else:
cookies = dl.get_cookie_jar(normalized_service, profile)
return service_instance, cookies, credential
async def session_create_handler(data: Dict[str, Any], request: Optional[web.Request] = None) -> web.Response:
"""Handle session creation: authenticate + get titles + get tracks + get chapters.
This is the main entry point for remote-dl clients. It creates a persistent
session on the server with the authenticated service instance, fetches all
titles and tracks, and returns everything the client needs for track selection.
"""
from unshackle.core.api.session_store import get_session_store
service_tag = data.get("service")
title_id = data.get("title_id")
profile = data.get("profile")
if not service_tag:
raise APIError(APIErrorCode.INVALID_INPUT, "Missing required parameter: service")
if not title_id:
raise APIError(APIErrorCode.INVALID_INPUT, "Missing required parameter: title_id")
normalized_service = validate_service(service_tag)
if not normalized_service:
raise APIError(
APIErrorCode.INVALID_SERVICE,
f"Invalid or unavailable service: {service_tag}",
details={"service": service_tag},
)
try:
# Resolve proxy
proxy_param = data.get("proxy")
no_proxy = data.get("no_proxy", False)
proxy_providers: list = []
if not no_proxy:
proxy_providers = initialize_proxy_providers()
if proxy_param and not no_proxy:
try:
proxy_param = resolve_proxy(proxy_param, proxy_providers)
except ValueError as e:
raise APIError(
APIErrorCode.INVALID_PROXY,
f"Proxy error: {e}",
details={"proxy": data.get("proxy"), "service": normalized_service},
)
import hashlib
import uuid as uuid_mod
from unshackle.core.cacher import Cacher
from unshackle.core.config import config as app_config
session_id = str(uuid_mod.uuid4())
api_key = request.headers.get("X-Secret-Key", "anonymous") if request else "anonymous"
api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:12]
session_cache_tag = f"_sessions/{api_key_hash}/{session_id}/{normalized_service}"
service_instance, cookies, credential = _create_service_instance(
normalized_service, title_id, data, proxy_param, proxy_providers, profile,
)
service_instance.cache = Cacher(session_cache_tag)
cache_data = data.get("cache", {})
if cache_data:
cache_dir = app_config.directories.cache / session_cache_tag
cache_dir.mkdir(parents=True, exist_ok=True)
for key, content in cache_data.items():
(cache_dir / key).with_suffix(".json").write_text(content, encoding="utf-8")
service_instance.authenticate(cookies, credential)
store = get_session_store()
session = await store.create(
normalized_service, service_instance, session_id=session_id,
)
session.cache_tag = session_cache_tag
return web.json_response({
"session_id": session.session_id,
"service": normalized_service,
})
except APIError:
raise
except Exception as e:
log.exception("Error creating session")
debug_mode = request.app.get("debug_api", False) if request else False
return handle_api_exception(
e,
context={"operation": "session_create", "service": service_tag, "title_id": title_id},
debug_mode=debug_mode,
)
async def session_titles_handler(session_id: str,
request: Optional[web.Request] = None) -> web.Response:
"""Get titles for the authenticated session.
Called after session/create. This is separate from auth so that
interactive auth flows (OTP, captcha) can complete before titles
are fetched.
"""
from unshackle.core.api.session_store import get_session_store
store = get_session_store()
session = await store.get(session_id)
if not session:
raise APIError(
APIErrorCode.SESSION_NOT_FOUND,
f"Session not found: {session_id}",
)
try:
service_instance = session.service_instance
titles = service_instance.get_titles()
session.titles = titles
# Serialize titles and build title map
if hasattr(titles, "__iter__") and not isinstance(titles, str):
titles_list = list(titles)
else:
titles_list = [titles]
serialized_titles = []
for t in titles_list:
tid = str(t.id) if hasattr(t, "id") else str(id(t))
session.title_map[tid] = t
serialized_titles.append(serialize_title(t))
return web.json_response({
"session_id": session_id,
"titles": serialized_titles,
})
except Exception as e:
log.exception("Error getting titles")
debug_mode = request.app.get("debug_api", False) if request else False
return handle_api_exception(
e,
context={"operation": "session_titles", "session_id": session_id},
debug_mode=debug_mode,
)
async def session_tracks_handler(data: Dict[str, Any], session_id: str,
request: Optional[web.Request] = None) -> web.Response:
"""Get tracks and chapters for a specific title in the session.
Called per-title by the client after session/create returns titles.
This keeps auth separate from track fetching, allowing interactive
auth flows (OTP, captcha) before any tracks are requested.
"""
from unshackle.core.api.session_store import get_session_store
store = get_session_store()
session = await store.get(session_id)
if not session:
raise APIError(
APIErrorCode.SESSION_NOT_FOUND,
f"Session not found: {session_id}",
)
title_id = data.get("title_id")
if not title_id:
raise APIError(APIErrorCode.INVALID_INPUT, "Missing required parameter: title_id")
title = session.title_map.get(str(title_id))
if not title:
raise APIError(
APIErrorCode.INVALID_INPUT,
f"Title not found in session: {title_id}",
details={"available_titles": list(session.title_map.keys())},
)
try:
service_instance = session.service_instance
tracks = service_instance.get_tracks(title)
title_tracks: Dict[str, Any] = {}
for track in tracks.videos:
title_tracks[str(track.id)] = track
session.tracks[str(track.id)] = track
for track in tracks.audio:
title_tracks[str(track.id)] = track
session.tracks[str(track.id)] = track
for track in tracks.subtitles:
title_tracks[str(track.id)] = track
session.tracks[str(track.id)] = track
session.tracks_by_title[str(title_id)] = title_tracks
try:
chapters = service_instance.get_chapters(title)
session.chapters_by_title[str(title_id)] = chapters if chapters else []
except (NotImplementedError, Exception):
session.chapters_by_title[str(title_id)] = []
video_tracks = sorted(tracks.videos, key=lambda t: t.bitrate or 0, reverse=True)
audio_tracks = sorted(tracks.audio, key=lambda t: t.bitrate or 0, reverse=True)
return web.json_response({
"title": serialize_title(title),
"video": [serialize_video_track(t, include_url=True) for t in video_tracks],
"audio": [serialize_audio_track(t, include_url=True) for t in audio_tracks],
"subtitles": [serialize_subtitle_track(t, include_url=True) for t in tracks.subtitles],
"chapters": [
{"timestamp": ch.timestamp, "name": ch.name}
for ch in session.chapters_by_title.get(str(title_id), [])
],
"attachments": [
{"url": a.url, "name": a.name, "mime_type": a.mime_type, "description": a.description}
for a in tracks.attachments
if hasattr(a, "url") and a.url
],
})
except Exception as e:
log.exception(f"Error getting tracks for title {title_id}")
debug_mode = request.app.get("debug_api", False) if request else False
return handle_api_exception(
e,
context={"operation": "session_tracks", "session_id": session_id, "title_id": title_id},
debug_mode=debug_mode,
)
async def session_segments_handler(data: Dict[str, Any], session_id: str,
request: Optional[web.Request] = None) -> web.Response:
"""Resolve segment URLs for selected tracks.
The client calls this after selecting which tracks to download.
Returns segment URLs, init data, DRM info, and any headers/cookies
needed for CDN download.
"""
from unshackle.core.api.session_store import get_session_store
store = get_session_store()
session = await store.get(session_id)
if not session:
raise APIError(
APIErrorCode.SESSION_NOT_FOUND,
f"Session not found or expired: {session_id}",
details={"session_id": session_id},
)
track_ids = data.get("track_ids", [])
if not track_ids:
raise APIError(APIErrorCode.INVALID_INPUT, "Missing required parameter: track_ids")
try:
result: Dict[str, Any] = {}
for track_id in track_ids:
track = session.tracks.get(track_id)
if not track:
raise APIError(
APIErrorCode.TRACK_NOT_FOUND,
f"Track not found in session: {track_id}",
details={"track_id": track_id, "session_id": session_id},
)
descriptor_name = track.descriptor.name if hasattr(track.descriptor, "name") else str(track.descriptor)
track_info: Dict[str, Any] = {
"descriptor": descriptor_name,
"url": str(track.url) if track.url else None,
"drm": serialize_drm(track.drm) if hasattr(track, "drm") and track.drm else None,
}
# Extract session headers/cookies for CDN access
service_session = session.service_instance.session
if hasattr(service_session, "headers"):
# Only include relevant headers, not all session headers
headers = dict(service_session.headers) if service_session.headers else {}
track_info["headers"] = headers
else:
track_info["headers"] = {}
if hasattr(service_session, "cookies"):
cookie_dict = {}
for cookie in service_session.cookies:
if hasattr(cookie, "name") and hasattr(cookie, "value"):
cookie_dict[cookie.name] = cookie.value
elif isinstance(cookie, str):
pass # Skip non-standard cookie objects
track_info["cookies"] = cookie_dict
else:
track_info["cookies"] = {}
# Include manifest-specific data for segment resolution
if hasattr(track, "data") and track.data:
track_data = {}
for key, val in track.data.items():
if isinstance(val, dict):
# Convert non-serializable values
serializable = {}
for k, v in val.items():
try:
import json
json.dumps(v)
serializable[k] = v
except (TypeError, ValueError):
serializable[k] = str(v)
track_data[key] = serializable
else:
try:
import json
json.dumps(val)
track_data[key] = val
except (TypeError, ValueError):
track_data[key] = str(val)
track_info["data"] = track_data
else:
track_info["data"] = {}
result[track_id] = track_info
return web.json_response({"tracks": result})
except APIError:
raise
except Exception as e:
log.exception("Error resolving segments")
debug_mode = request.app.get("debug_api", False) if request else False
return handle_api_exception(
e,
context={"operation": "session_segments", "session_id": session_id},
debug_mode=debug_mode,
)
async def session_license_handler(data: Dict[str, Any], session_id: str,
request: Optional[web.Request] = None) -> web.Response:
"""Proxy a DRM license challenge through the authenticated service.
The client generates a CDM challenge locally, sends it here, and the server
calls the service's get_widevine_license/get_playready_license method using
the authenticated session. Returns the raw license response for the client
to process with their local CDM.
"""
import base64
from unshackle.core.api.session_store import get_session_store
store = get_session_store()
session = await store.get(session_id)
if not session:
raise APIError(
APIErrorCode.SESSION_NOT_FOUND,
f"Session not found or expired: {session_id}",
details={"session_id": session_id},
)
track_id = data.get("track_id")
challenge_b64 = data.get("challenge")
drm_type = data.get("drm_type", "widevine")
if not track_id:
raise APIError(APIErrorCode.INVALID_INPUT, "Missing required parameter: track_id")
if not challenge_b64:
raise APIError(APIErrorCode.INVALID_INPUT, "Missing required parameter: challenge")
track = session.tracks.get(track_id)
if not track:
raise APIError(
APIErrorCode.TRACK_NOT_FOUND,
f"Track not found in session: {track_id}",
details={"track_id": track_id, "session_id": session_id},
)
try:
challenge_bytes = base64.b64decode(challenge_b64)
title = None
for tid, tracks_dict in session.tracks_by_title.items():
if track_id in tracks_dict:
title = session.title_map.get(tid)
break
if title is None:
if session.title_map:
title = next(iter(session.title_map.values()))
service = session.service_instance
pssh_b64 = data.get("pssh")
if pssh_b64:
if not track.drm:
track.drm = []
if drm_type == "playready":
track.pr_pssh = pssh_b64
from pyplayready.system.pssh import PSSH as PlayReadyPSSH
from unshackle.core.drm import PlayReady
pr_pssh = PlayReadyPSSH(base64.b64decode(pssh_b64))
pr_drm = PlayReady(pssh=pr_pssh, pssh_b64=pssh_b64)
track.drm.append(pr_drm)
elif drm_type == "widevine":
from pywidevine.pssh import PSSH as WidevinePSSH
from unshackle.core.drm import Widevine
wv_pssh = WidevinePSSH(pssh_b64)
wv_drm = Widevine(pssh=wv_pssh)
track.drm.append(wv_drm)
if drm_type == "widevine":
license_response = service.get_widevine_license(
challenge=challenge_bytes, title=title, track=track,
)
elif drm_type == "playready":
license_response = service.get_playready_license(
challenge=challenge_bytes, title=title, track=track,
)
else:
raise APIError(
APIErrorCode.INVALID_PARAMETERS,
f"Unsupported DRM type: {drm_type}",
details={"drm_type": drm_type, "supported": ["widevine", "playready"]},
)
# Ensure response is bytes for base64 encoding
if isinstance(license_response, str):
license_response = license_response.encode("utf-8")
return web.json_response({
"license": base64.b64encode(license_response).decode("ascii"),
})
except APIError:
raise
except Exception as e:
log.exception(f"Error proxying license for track {track_id}")
debug_mode = request.app.get("debug_api", False) if request else False
return handle_api_exception(
e,
context={
"operation": "session_license",
"session_id": session_id,
"track_id": track_id,
"drm_type": drm_type,
},
debug_mode=debug_mode,
)
async def session_info_handler(session_id: str, request: Optional[web.Request] = None) -> web.Response:
"""Check session validity and get session info."""
from datetime import timezone
from unshackle.core.api.session_store import get_session_store
store = get_session_store()
session = await store.get(session_id)
if not session:
raise APIError(
APIErrorCode.SESSION_NOT_FOUND,
f"Session not found or expired: {session_id}",
details={"session_id": session_id},
)
from datetime import datetime
now = datetime.now(timezone.utc)
elapsed = (now - session.last_accessed).total_seconds()
expires_in = max(0, store._ttl - int(elapsed))
return web.json_response({
"session_id": session.session_id,
"service": session.service_tag,
"valid": True,
"expires_in": expires_in,
"track_count": len(session.tracks),
"title_count": len(session.title_map),
})
async def session_delete_handler(session_id: str, request: Optional[web.Request] = None) -> web.Response:
"""Delete a session and clean up client-sent data from the server."""
import shutil
from unshackle.core.api.session_store import get_session_store
from unshackle.core.config import config as app_config
store = get_session_store()
session = await store.get(session_id)
if not session:
raise APIError(
APIErrorCode.SESSION_NOT_FOUND,
f"Session not found: {session_id}",
details={"session_id": session_id},
)
cache_tag = session.cache_tag
await store.delete(session_id)
if cache_tag:
cache_dir = app_config.directories.cache / cache_tag
if cache_dir.is_dir():
shutil.rmtree(cache_dir, ignore_errors=True)
# Clean up empty parent directories (session_id, api_key_hash, _sessions)
for parent in cache_dir.parents:
if parent == app_config.directories.cache:
break
if parent.is_dir() and not any(parent.iterdir()):
parent.rmdir()
return web.json_response({"status": "ok"})

View File

@@ -8,9 +8,7 @@ from unshackle.core import __version__
from unshackle.core.api.errors import APIError, APIErrorCode, build_error_response, handle_api_exception
from unshackle.core.api.handlers import (cancel_download_job_handler, download_handler, get_download_job_handler,
list_download_jobs_handler, list_titles_handler, list_tracks_handler,
search_handler, session_create_handler, session_delete_handler,
session_info_handler, session_license_handler, session_segments_handler,
session_titles_handler, session_tracks_handler)
search_handler)
from unshackle.core.services import Services
from unshackle.core.update_checker import UpdateChecker
@@ -838,294 +836,6 @@ async def cancel_download_job(request: web.Request) -> web.Response:
return build_error_response(e, debug_mode)
async def session_create(request: web.Request) -> web.Response:
"""
Create a remote-dl session.
---
summary: Create session
description: Authenticate with a service, get titles, tracks, and chapters in one call
requestBody:
required: true
content:
application/json:
schema:
type: object
additionalProperties: true
required:
- service
- title_id
properties:
service:
type: string
title_id:
type: string
credentials:
type: object
additionalProperties: true
cookies:
type: string
proxy:
type: string
no_proxy:
type: boolean
profile:
type: string
cache:
type: object
additionalProperties: true
responses:
'200':
description: Session created with titles, tracks, and chapters
'400':
description: Invalid request
'401':
description: Authentication failed
"""
try:
data = await request.json()
except Exception as e:
return build_error_response(
APIError(APIErrorCode.INVALID_INPUT, "Invalid JSON request body", details={"error": str(e)}),
request.app.get("debug_api", False),
)
try:
return await session_create_handler(data, request)
except APIError as e:
return build_error_response(e, request.app.get("debug_api", False))
except Exception as e:
log.exception("Error in session create")
return handle_api_exception(e, context={"operation": "session_create"}, debug_mode=request.app.get("debug_api", False))
async def session_titles(request: web.Request) -> web.Response:
"""
Get titles for an authenticated session.
---
summary: Get titles
description: Fetch titles from the authenticated service session
parameters:
- name: session_id
in: path
required: true
schema:
type: string
responses:
'200':
description: List of titles
'404':
description: Session not found
"""
session_id = request.match_info["session_id"]
try:
return await session_titles_handler(session_id, request)
except APIError as e:
return build_error_response(e, request.app.get("debug_api", False))
except Exception as e:
log.exception("Error in session titles")
return handle_api_exception(e, context={"operation": "session_titles"}, debug_mode=request.app.get("debug_api", False))
async def session_tracks(request: web.Request) -> web.Response:
"""
Get tracks and chapters for a specific title.
---
summary: Get tracks
description: Fetch tracks and chapters for a title in the session
parameters:
- name: session_id
in: path
required: true
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- title_id
properties:
title_id:
type: string
description: ID of the title to get tracks for
responses:
'200':
description: Tracks and chapters for the title
'404':
description: Session or title not found
"""
session_id = request.match_info["session_id"]
try:
data = await request.json()
except Exception as e:
return build_error_response(
APIError(APIErrorCode.INVALID_INPUT, "Invalid JSON request body", details={"error": str(e)}),
request.app.get("debug_api", False),
)
try:
return await session_tracks_handler(data, session_id, request)
except APIError as e:
return build_error_response(e, request.app.get("debug_api", False))
except Exception as e:
log.exception("Error in session tracks")
return handle_api_exception(e, context={"operation": "session_tracks"}, debug_mode=request.app.get("debug_api", False))
async def session_segments(request: web.Request) -> web.Response:
"""
Resolve segment URLs for selected tracks.
---
summary: Resolve segments
description: Get download URLs, DRM info, and headers for selected tracks
parameters:
- name: session_id
in: path
required: true
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- track_ids
properties:
track_ids:
type: array
items:
type: string
description: List of track IDs to resolve
responses:
'200':
description: Segment URLs and DRM info for each track
'404':
description: Session or track not found
"""
session_id = request.match_info["session_id"]
try:
data = await request.json()
except Exception as e:
return build_error_response(
APIError(APIErrorCode.INVALID_INPUT, "Invalid JSON request body", details={"error": str(e)}),
request.app.get("debug_api", False),
)
try:
return await session_segments_handler(data, session_id, request)
except APIError as e:
return build_error_response(e, request.app.get("debug_api", False))
except Exception as e:
log.exception("Error in session segments")
return handle_api_exception(e, context={"operation": "session_segments"}, debug_mode=request.app.get("debug_api", False))
async def session_license(request: web.Request) -> web.Response:
"""
Proxy DRM license through authenticated service.
---
summary: Proxy license
description: Forward a CDM challenge to the service's license endpoint
parameters:
- name: session_id
in: path
required: true
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- track_id
- challenge
properties:
track_id:
type: string
description: Track ID this license is for
challenge:
type: string
description: Base64-encoded CDM challenge
drm_type:
type: string
enum: [widevine, playready]
description: DRM type (default widevine)
responses:
'200':
description: License response
'404':
description: Session or track not found
"""
session_id = request.match_info["session_id"]
try:
data = await request.json()
except Exception as e:
return build_error_response(
APIError(APIErrorCode.INVALID_INPUT, "Invalid JSON request body", details={"error": str(e)}),
request.app.get("debug_api", False),
)
try:
return await session_license_handler(data, session_id, request)
except APIError as e:
return build_error_response(e, request.app.get("debug_api", False))
except Exception as e:
log.exception("Error in session license")
return handle_api_exception(e, context={"operation": "session_license"}, debug_mode=request.app.get("debug_api", False))
async def session_info(request: web.Request) -> web.Response:
"""
Get session info.
---
summary: Session info
description: Check session validity and get metadata
parameters:
- name: session_id
in: path
required: true
schema:
type: string
responses:
'200':
description: Session info
'404':
description: Session not found
"""
session_id = request.match_info["session_id"]
try:
return await session_info_handler(session_id, request)
except APIError as e:
return build_error_response(e, request.app.get("debug_api", False))
async def session_delete(request: web.Request) -> web.Response:
"""
Delete a session.
---
summary: Delete session
description: Clean up a remote-dl session
parameters:
- name: session_id
in: path
required: true
schema:
type: string
responses:
'200':
description: Session deleted
'404':
description: Session not found
"""
session_id = request.match_info["session_id"]
try:
return await session_delete_handler(session_id, request)
except APIError as e:
return build_error_response(e, request.app.get("debug_api", False))
def setup_routes(app: web.Application) -> None:
"""Setup all API routes."""
app.router.add_get("/api/health", health)
@@ -1138,15 +848,6 @@ def setup_routes(app: web.Application) -> None:
app.router.add_get("/api/download/jobs/{job_id}", download_job_detail)
app.router.add_delete("/api/download/jobs/{job_id}", cancel_download_job)
# Remote-DL session endpoints
app.router.add_post("/api/session/create", session_create)
app.router.add_get("/api/session/{session_id}/titles", session_titles)
app.router.add_post("/api/session/{session_id}/tracks", session_tracks)
app.router.add_post("/api/session/{session_id}/segments", session_segments)
app.router.add_post("/api/session/{session_id}/license", session_license)
app.router.add_get("/api/session/{session_id}", session_info)
app.router.add_delete("/api/session/{session_id}", session_delete)
def setup_swagger(app: web.Application) -> None:
"""Setup Swagger UI documentation."""
@@ -1172,13 +873,5 @@ def setup_swagger(app: web.Application) -> None:
web.get("/api/download/jobs", download_jobs),
web.get("/api/download/jobs/{job_id}", download_job_detail),
web.delete("/api/download/jobs/{job_id}", cancel_download_job),
# Remote-DL session endpoints
web.post("/api/session/create", session_create),
web.get("/api/session/{session_id}/titles", session_titles),
web.post("/api/session/{session_id}/tracks", session_tracks),
web.post("/api/session/{session_id}/segments", session_segments),
web.post("/api/session/{session_id}/license", session_license),
web.get("/api/session/{session_id}", session_info),
web.delete("/api/session/{session_id}", session_delete),
]
)

View File

@@ -1,161 +0,0 @@
"""Server-side session store for remote-dl client-server architecture.
Maintains authenticated service instances between API calls so that
a client can authenticate once and then make multiple requests (list tracks,
resolve segments, proxy license) using the same session.
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from unshackle.core.config import config
from unshackle.core.tracks import Track
log = logging.getLogger("api.session")
@dataclass
class SessionEntry:
"""A single authenticated session with a service."""
session_id: str
service_tag: str
service_instance: Any # Service instance (authenticated)
titles: Any = None # Titles_T from get_titles()
title_map: Dict[str, Any] = field(default_factory=dict) # title_id -> Title object
tracks: Dict[str, Track] = field(default_factory=dict) # track_id -> Track object
tracks_by_title: Dict[str, Dict[str, Track]] = field(default_factory=dict) # title_key -> {track_id -> Track}
chapters_by_title: Dict[str, List[Any]] = field(default_factory=dict) # title_key -> [Chapter]
cache_tag: Optional[str] = None # per-session cache directory tag
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
last_accessed: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def touch(self) -> None:
"""Update last_accessed timestamp."""
self.last_accessed = datetime.now(timezone.utc)
class SessionStore:
"""Thread-safe session store with TTL-based expiration."""
def __init__(self) -> None:
self._sessions: Dict[str, SessionEntry] = {}
self._lock = asyncio.Lock()
self._cleanup_task: Optional[asyncio.Task] = None
@property
def _ttl(self) -> int:
"""Session TTL in seconds from config."""
return config.serve.get("session_ttl", 900) # 15 min default
@property
def _max_sessions(self) -> int:
"""Max concurrent sessions from config."""
return config.serve.get("max_sessions", 100)
async def create(
self,
service_tag: str,
service_instance: Any,
session_id: Optional[str] = None,
) -> SessionEntry:
"""Create a new session with an authenticated service instance."""
async with self._lock:
if len(self._sessions) >= self._max_sessions:
oldest_id = min(self._sessions, key=lambda k: self._sessions[k].last_accessed)
log.warning(f"Max sessions reached ({self._max_sessions}), evicting oldest: {oldest_id}")
del self._sessions[oldest_id]
session_id = session_id or str(uuid.uuid4())
entry = SessionEntry(
session_id=session_id,
service_tag=service_tag,
service_instance=service_instance,
)
self._sessions[session_id] = entry
log.info(f"Created session {session_id} for service {service_tag}")
return entry
async def get(self, session_id: str) -> Optional[SessionEntry]:
"""Get a session by ID, returns None if not found or expired."""
async with self._lock:
entry = self._sessions.get(session_id)
if entry is None:
return None
# Check expiration
elapsed = (datetime.now(timezone.utc) - entry.last_accessed).total_seconds()
if elapsed > self._ttl:
log.info(f"Session {session_id} expired (elapsed={elapsed:.0f}s, ttl={self._ttl}s)")
del self._sessions[session_id]
return None
entry.touch()
return entry
async def delete(self, session_id: str) -> bool:
"""Delete a session. Returns True if it existed."""
async with self._lock:
if session_id in self._sessions:
del self._sessions[session_id]
log.info(f"Deleted session {session_id}")
return True
return False
async def cleanup_expired(self) -> int:
"""Remove all expired sessions. Returns count of removed sessions."""
async with self._lock:
now = datetime.now(timezone.utc)
expired = [
sid for sid, entry in self._sessions.items()
if (now - entry.last_accessed).total_seconds() > self._ttl
]
for sid in expired:
del self._sessions[sid]
if expired:
log.info(f"Cleaned up {len(expired)} expired sessions")
return len(expired)
async def start_cleanup_loop(self) -> None:
"""Start periodic cleanup of expired sessions."""
if self._cleanup_task is not None:
return
async def _loop() -> None:
while True:
await asyncio.sleep(60) # Check every minute
try:
await self.cleanup_expired()
except Exception:
log.exception("Error during session cleanup")
self._cleanup_task = asyncio.create_task(_loop())
log.info("Session cleanup loop started")
async def stop_cleanup_loop(self) -> None:
"""Stop the periodic cleanup task."""
if self._cleanup_task is not None:
self._cleanup_task.cancel()
self._cleanup_task = None
@property
def session_count(self) -> int:
"""Number of active sessions."""
return len(self._sessions)
# Singleton instance
_session_store: Optional[SessionStore] = None
def get_session_store() -> SessionStore:
"""Get or create the global session store singleton."""
global _session_store
if _session_store is None:
_session_store = SessionStore()
return _session_store

View File

@@ -17,11 +17,11 @@ class Commands(click.MultiCommand):
def list_commands(self, ctx: click.Context) -> list[str]:
"""Returns a list of command names from the command filenames."""
return [x.stem.replace("_", "-") for x in _COMMANDS]
return [x.stem for x in _COMMANDS]
def get_command(self, ctx: click.Context, name: str) -> Optional[click.Command]:
"""Load the command code and return the main click command function."""
module = _MODULES.get(name) or _MODULES.get(name.replace("-", "_"))
module = _MODULES.get(name)
if not module:
raise click.ClickException(f"Unable to find command by the name '{name}'")

View File

@@ -76,7 +76,6 @@ class Config:
self.key_vaults: list[dict[str, Any]] = kwargs.get("key_vaults", [])
self.muxing: dict = kwargs.get("muxing") or {}
self.proxy_providers: dict = kwargs.get("proxy_providers") or {}
self.remote_services: dict = kwargs.get("remote_services") or {}
self.serve: dict = kwargs.get("serve") or {}
self.services: dict = kwargs.get("services") or {}
decryption_cfg = kwargs.get("decryption") or {}

View File

@@ -339,7 +339,6 @@ class HLS:
media_drm = HLS.get_drm(media_playlist_key, session)
if isinstance(media_drm, (Widevine, PlayReady)):
track_kid = HLS.get_track_kid_from_init(master, track, session) or media_drm.kid
track.drm = [media_drm]
try:
if not license_widevine:
raise ValueError("license_widevine func must be supplied to use DRM")
@@ -348,6 +347,7 @@ class HLS:
progress(downloaded="[yellow]LICENSED")
initial_drm_licensed = True
initial_drm_key = media_playlist_key
track.drm = [media_drm]
session_drm = media_drm
except Exception: # noqa
DOWNLOAD_CANCELLED.set() # skip pending track downloads

View File

@@ -1,458 +0,0 @@
"""Remote service adapter for unshackle.
Implements the Service interface by proxying authenticate, get_titles,
get_tracks, get_chapters, and license methods to a remote unshackle server.
Everything else (track selection, download, decrypt, mux) runs locally.
"""
from __future__ import annotations
import base64
import logging
import re
from enum import Enum
from http.cookiejar import CookieJar
from typing import Any, Dict, Optional, Union
import click
import requests
from langcodes import Language
from requests.adapters import HTTPAdapter, Retry
from rich.padding import Padding
from rich.rule import Rule
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.titles import Title_T, Titles_T
from unshackle.core.titles.episode import Episode, Series
from unshackle.core.titles.movie import Movie, Movies
from unshackle.core.tracks import Audio, Chapter, Chapters, Subtitle, Tracks, Video
from unshackle.core.tracks.attachment import Attachment
from unshackle.core.tracks.track import Track
log = logging.getLogger("remote_service")
class RemoteClient:
"""HTTP client for the unshackle serve API."""
def __init__(self, server_url: str, api_key: str) -> None:
self.server_url = server_url.rstrip("/")
self.api_key = api_key
self._session: Optional[requests.Session] = None
@property
def session(self) -> requests.Session:
if self._session is None:
self._session = requests.Session()
if self.api_key:
self._session.headers["X-Secret-Key"] = self.api_key
return self._session
def _request(self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{self.server_url}{endpoint}"
resp = getattr(self.session, method)(url, json=data, timeout=120 if method == "post" else 30)
result = resp.json()
if resp.status_code >= 400:
error_msg = result.get("message", resp.text)
error_code = result.get("error_code", "UNKNOWN")
raise click.ClickException(f"Server error [{error_code}]: {error_msg}")
return result
def post(self, endpoint: str, data: Dict[str, Any]) -> Dict[str, Any]:
return self._request("post", endpoint, data)
def get(self, endpoint: str) -> Dict[str, Any]:
return self._request("get", endpoint)
def delete(self, endpoint: str) -> Dict[str, Any]:
return self._request("delete", endpoint)
def _enum_get(enum_cls: type[Enum], name: Optional[str], default: Any = None) -> Any:
"""Safely get an enum value by name."""
if not name:
return default
try:
return enum_cls[name]
except KeyError:
return default
def _deserialize_video(data: Dict[str, Any]) -> Video:
return Video(
url=data.get("url") or "https://placeholder",
language=Language.get(data.get("language") or "und"),
descriptor=_enum_get(Track.Descriptor, data.get("descriptor"), Track.Descriptor.URL),
codec=_enum_get(Video.Codec, data.get("codec")),
range_=_enum_get(Video.Range, data.get("range"), Video.Range.SDR),
bitrate=data["bitrate"] * 1000 if data.get("bitrate") else 0,
width=data.get("width") or 0,
height=data.get("height") or 0,
fps=data.get("fps"),
id_=data.get("id"),
)
def _deserialize_audio(data: Dict[str, Any]) -> Audio:
return Audio(
url=data.get("url") or "https://placeholder",
language=Language.get(data.get("language") or "und"),
descriptor=_enum_get(Track.Descriptor, data.get("descriptor"), Track.Descriptor.URL),
codec=_enum_get(Audio.Codec, data.get("codec")),
bitrate=data["bitrate"] * 1000 if data.get("bitrate") else 0,
channels=data.get("channels"),
joc=1 if data.get("atmos") else 0,
descriptive=data.get("descriptive", False),
id_=data.get("id"),
)
def _deserialize_subtitle(data: Dict[str, Any]) -> Subtitle:
return Subtitle(
url=data.get("url") or "https://placeholder",
language=Language.get(data.get("language") or "und"),
descriptor=_enum_get(Track.Descriptor, data.get("descriptor"), Track.Descriptor.URL),
codec=_enum_get(Subtitle.Codec, data.get("codec")),
cc=data.get("cc", False),
sdh=data.get("sdh", False),
forced=data.get("forced", False),
id_=data.get("id"),
)
def _build_tracks(data: Dict[str, Any]) -> Tracks:
tracks = Tracks()
tracks.videos = [_deserialize_video(v) for v in data.get("video", [])]
tracks.audio = [_deserialize_audio(a) for a in data.get("audio", [])]
tracks.subtitles = [_deserialize_subtitle(s) for s in data.get("subtitles", [])]
tracks.attachments = [
Attachment(url=a["url"], name=a.get("name"), mime_type=a.get("mime_type"), description=a.get("description"))
for a in data.get("attachments", [])
]
return tracks
def _build_title(info: Dict[str, Any], service_tag: str, fallback_id: str) -> Union[Episode, Movie]:
svc_class = type(service_tag, (), {})
lang = Language.get(info["language"]) if info.get("language") else None
if info.get("type") == "episode":
return Episode(
id_=info.get("id", fallback_id), service=svc_class,
title=info.get("series_title", "Unknown"),
season=info.get("season", 0), number=info.get("number", 0),
name=info.get("name"), year=info.get("year"), language=lang,
)
return Movie(
id_=info.get("id", fallback_id), service=svc_class,
name=info.get("name", "Unknown"), year=info.get("year"), language=lang,
)
def resolve_server(server_name: Optional[str]) -> tuple[str, str, dict]:
"""Resolve server URL, API key, and per-service config from remote_services."""
remote_services = config.remote_services
if not remote_services:
raise click.ClickException(
"No remote services configured. Add 'remote_services' to your unshackle.yaml:\n\n"
" remote_services:\n"
" my_server:\n"
" url: \"https://server:8080\"\n"
" api_key: \"your-api-key\""
)
if server_name:
svc = remote_services.get(server_name)
if not svc:
available = ", ".join(remote_services.keys())
raise click.ClickException(f"Remote service '{server_name}' not found. Available: {available}")
return svc["url"], svc.get("api_key", ""), svc.get("services", {})
if len(remote_services) == 1:
name, svc = next(iter(remote_services.items()))
log.info(f"Using remote service: {name}")
return svc["url"], svc.get("api_key", ""), svc.get("services", {})
available = ", ".join(remote_services.keys())
raise click.ClickException(f"Multiple remote services configured. Use --server to select one: {available}")
def _load_credentials_for_transport(service_tag: str, profile: Optional[str]) -> Optional[Dict[str, str]]:
from unshackle.commands.dl import dl
credential = dl.get_credentials(service_tag, profile)
if credential:
result: Dict[str, str] = {"username": credential.username, "password": credential.password}
if credential.extra:
result["extra"] = credential.extra
return result
return None
def _load_cookies_for_transport(service_tag: str, profile: Optional[str]) -> Optional[str]:
from unshackle.commands.dl import dl
cookie_path = dl.get_cookie_path(service_tag, profile)
if cookie_path and cookie_path.exists():
return cookie_path.read_text(encoding="utf-8")
return None
def _resolve_proxy(proxy_arg: Optional[str]) -> Optional[str]:
if not proxy_arg:
return None
if re.match(r"^(https?://|socks)", proxy_arg):
return proxy_arg
from unshackle.core.proxies.basic import Basic
from unshackle.core.proxies.nordvpn import NordVPN
from unshackle.core.proxies.surfsharkvpn import SurfsharkVPN
providers: list = []
proxy_config = config.proxy_providers
if proxy_config.get("basic"):
providers.append(Basic(**proxy_config["basic"]))
if proxy_config.get("nordvpn"):
providers.append(NordVPN(**proxy_config["nordvpn"]))
if proxy_config.get("surfsharkvpn"):
providers.append(SurfsharkVPN(**proxy_config["surfsharkvpn"]))
requested_provider = None
query = proxy_arg
if re.match(r"^[a-z]+:.+$", proxy_arg, re.IGNORECASE):
requested_provider, query = proxy_arg.split(":", maxsplit=1)
if requested_provider:
provider = next(
(x for x in providers if x.__class__.__name__.lower() == requested_provider.lower()), None,
)
if not provider:
raise click.ClickException(f"Proxy provider '{requested_provider}' not found.")
proxy_uri = provider.get_proxy(query)
if not proxy_uri:
raise click.ClickException(f"Proxy provider {requested_provider} had no proxy for {query}")
return proxy_uri
for provider in providers:
proxy_uri = provider.get_proxy(query)
if proxy_uri:
return proxy_uri
raise click.ClickException(f"No proxy provider had a proxy for {proxy_arg}")
class RemoteService:
"""Service adapter that proxies to a remote unshackle server.
Implements the same interface dl.py's result() expects without
subclassing Service (avoids proxy/geofence setup in __init__).
"""
ALIASES: tuple[str, ...] = ()
GEOFENCE: tuple[str, ...] = ()
NO_SUBTITLES: bool = False
def __init__(
self, ctx: click.Context, service_tag: str, title_id: str,
server_url: str, api_key: str, services_config: dict,
) -> None:
self.__class__.__name__ = service_tag
console.print(Padding(Rule(f"[rule.text]Service: {service_tag} (Remote)"), (1, 2)))
self.service_tag = service_tag
self.title_id = title_id
self.client = RemoteClient(server_url, api_key)
self.ctx = ctx
self.log = logging.getLogger(service_tag)
self.credential: Optional[Credential] = None
self.current_region: Optional[str] = None
self.title_cache = None
self._titles: Optional[Titles_T] = None
self._tracks_by_title: Dict[str, Tracks] = {}
self._chapters_by_title: Dict[str, list] = {}
self._session_id: Optional[str] = None
self._session = requests.Session()
self._session.headers.update(config.headers)
self._session.mount(
"https://",
HTTPAdapter(
max_retries=Retry(total=5, backoff_factor=0.2, status_forcelist=[429, 500, 502, 503, 504]),
pool_block=True,
),
)
self._session.mount("http://", self._session.adapters["https://"])
self._apply_service_config(services_config.get(service_tag, {}))
def _apply_service_config(self, svc_config: dict) -> None:
if not svc_config:
return
config_maps = {
"cdm": ("cdm", self.service_tag),
"decryption": ("decryption_map", self.service_tag),
"downloader": ("downloader_map", self.service_tag),
}
for key, (attr, tag) in config_maps.items():
if svc_config.get(key):
target = getattr(config, attr, None)
if target is None:
setattr(config, attr, {})
target = getattr(config, attr)
target[tag] = svc_config[key]
extra = {k: v for k, v in svc_config.items() if k not in config_maps}
if extra:
existing = config.services.get(self.service_tag, {})
for key, value in extra.items():
if key in existing and isinstance(existing[key], dict) and isinstance(value, dict):
existing[key].update(value)
else:
existing[key] = value
config.services[self.service_tag] = existing
@property
def session(self) -> requests.Session:
return self._session
@property
def title(self) -> str:
return self.title_id
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
self.credential = credential
profile = self.ctx.parent.params.get("profile") if self.ctx.parent else None
proxy = self.ctx.parent.params.get("proxy") if self.ctx.parent else None
no_proxy = self.ctx.parent.params.get("no_proxy", False) if self.ctx.parent else False
create_data: Dict[str, Any] = {"service": self.service_tag, "title_id": self.title_id}
credentials = _load_credentials_for_transport(self.service_tag, profile)
if credentials:
create_data["credentials"] = credentials
cookies_text = _load_cookies_for_transport(self.service_tag, profile)
if cookies_text:
create_data["cookies"] = cookies_text
if not no_proxy and proxy:
resolved_proxy = _resolve_proxy(proxy)
if resolved_proxy:
create_data["proxy"] = resolved_proxy
if profile:
create_data["profile"] = profile
if no_proxy:
create_data["no_proxy"] = True
cache_data = self._load_cache_files()
if cache_data:
create_data["cache"] = cache_data
result = self.client.post("/api/session/create", create_data)
self._session_id = result["session_id"]
def get_titles(self) -> Titles_T:
if self._titles is not None:
return self._titles
result = self.client.get(f"/api/session/{self._session_id}/titles")
titles_list = [_build_title(t, self.service_tag, self.title_id) for t in result.get("titles", [])]
self._titles = Series(titles_list) if titles_list and isinstance(titles_list[0], Episode) else Movies(titles_list)
return self._titles
def get_titles_cached(self, title_id: str = None) -> Titles_T:
return self.get_titles()
def get_tracks(self, title: Title_T) -> Tracks:
title_id = str(title.id)
if title_id in self._tracks_by_title:
return self._tracks_by_title[title_id]
result = self.client.post(f"/api/session/{self._session_id}/tracks", {"title_id": title_id})
tracks = _build_tracks(result)
self._tracks_by_title[title_id] = tracks
self._chapters_by_title[title_id] = result.get("chapters", [])
return tracks
def get_chapters(self, title: Title_T) -> Chapters:
title_id = str(title.id)
if title_id not in self._chapters_by_title:
self.get_tracks(title)
raw = self._chapters_by_title.get(title_id, [])
return Chapters([Chapter(ch["timestamp"], ch.get("name")) for ch in raw])
def get_widevine_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
return self._proxy_license(challenge, track, "widevine")
def get_playready_license(self, *, challenge: bytes, title: Title_T, track: AnyTrack) -> Optional[Union[bytes, str]]:
return self._proxy_license(challenge, track, "playready")
def get_widevine_service_certificate(
self, *, challenge: bytes, title: Title_T, track: AnyTrack,
) -> Union[bytes, str]:
try:
resp = self.client.post(f"/api/session/{self._session_id}/license", {
"track_id": str(track.id),
"challenge": base64.b64encode(challenge).decode("ascii"),
"drm_type": "widevine", "is_certificate": True,
})
return base64.b64decode(resp["license"])
except Exception:
return None
def _proxy_license(self, challenge: Union[bytes, str], track: AnyTrack, drm_type: str) -> bytes:
if isinstance(challenge, str):
challenge = challenge.encode("utf-8")
payload: Dict[str, Any] = {
"track_id": str(track.id),
"challenge": base64.b64encode(challenge).decode("ascii"),
"drm_type": drm_type,
}
if track.drm:
for drm_obj in track.drm:
drm_class = drm_obj.__class__.__name__
if drm_type == "playready" and drm_class == "PlayReady":
payload["pssh"] = drm_obj.data["pssh_b64"]
break
elif drm_type == "widevine" and drm_class == "Widevine":
payload["pssh"] = drm_obj.pssh.dumps()
break
resp = self.client.post(f"/api/session/{self._session_id}/license", payload)
return base64.b64decode(resp["license"])
def on_segment_downloaded(self, track: AnyTrack, segment: Any) -> None:
pass
def on_track_downloaded(self, track: AnyTrack) -> None:
pass
def on_track_decrypted(self, track: AnyTrack, drm: Any, segment: Any = None) -> None:
pass
def on_track_repacked(self, track: AnyTrack) -> None:
pass
def on_track_multiplex(self, track: AnyTrack) -> None:
pass
def close(self) -> None:
if self._session_id:
try:
self.client.delete(f"/api/session/{self._session_id}")
except Exception as e:
self.log.warning(f"Failed to clean up remote session: {e}")
self._session_id = None
def _load_cache_files(self) -> Dict[str, str]:
cache_dir = config.directories.cache / self.service_tag
if not cache_dir.is_dir():
return {}
return {
f.stem: f.read_text(encoding="utf-8")
for f in cache_dir.glob("*.json")
if not f.stem.startswith("titles_")
}
__all__ = ("RemoteClient", "RemoteService", "resolve_server")

View File

@@ -1,5 +1,3 @@
from __future__ import annotations
from pathlib import Path
import click
@@ -28,28 +26,12 @@ class Services(click.MultiCommand):
# Click-specific methods
def list_commands(self, ctx: click.Context) -> list[str]:
"""Returns a list of all available Services as command names for Click.
In remote mode, also includes service tags from remote_services config
so the user can use services that aren't installed locally.
"""
tags = Services.get_tags()
remote = ctx.params.get("remote") or (ctx.parent and ctx.parent.params.get("remote"))
if remote:
for svc_cfg in config.remote_services.values():
for remote_tag in svc_cfg.get("services", {}).keys():
if remote_tag not in tags:
tags.append(remote_tag)
return tags
"""Returns a list of all available Services as command names for Click."""
return Services.get_tags()
def get_command(self, ctx: click.Context, name: str) -> click.Command:
"""Load the Service and return the Click CLI method."""
tag = Services.get_tag(name)
remote = ctx.params.get("remote") or (ctx.parent and ctx.parent.params.get("remote"))
if remote:
return Services._make_remote_command(tag)
try:
service = Services.load(tag)
except KeyError as e:
@@ -65,22 +47,6 @@ class Services(click.MultiCommand):
raise click.ClickException(f"Service '{tag}' has no 'cli' method configured.")
@staticmethod
def _make_remote_command(tag: str) -> click.Command:
"""Create a synthetic Click command for a remote service."""
@click.command(name=tag)
@click.argument("title", type=str)
@click.pass_context
def remote_cli(ctx: click.Context, title: str) -> object:
from unshackle.core.remote_service import RemoteService, resolve_server
server_name = ctx.parent.params.get("server") if ctx.parent else None
server_url, api_key, services_config = resolve_server(server_name)
return RemoteService(ctx, tag, title, server_url, api_key, services_config)
return remote_cli
# Methods intended to be used anywhere
@staticmethod