chore(api): remove remote services

This commit is contained in:
Andy
2026-02-06 10:46:39 -07:00
parent 939ca25c5b
commit 2308644374
4 changed files with 0 additions and 2598 deletions

View File

@@ -1,145 +0,0 @@
"""API key tier management for remote services."""
import logging
from typing import Any, Dict, List, Optional
from aiohttp import web
log = logging.getLogger("api.keys")
def get_api_key_from_request(request: web.Request) -> Optional[str]:
"""
Extract API key from request headers.
Args:
request: aiohttp request object
Returns:
API key string or None
"""
api_key = request.headers.get("X-API-Key")
if api_key:
return api_key
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:] # len("Bearer ") == 7
return None
def get_api_key_config(app: web.Application, api_key: str) -> Optional[Dict[str, Any]]:
"""
Get configuration for a specific API key.
Args:
app: aiohttp application
api_key: API key to look up
Returns:
API key configuration dict or None if not found
"""
config = app.get("config", {})
# Check new-style tiered API keys
api_keys = config.get("api_keys", [])
for key_config in api_keys:
if isinstance(key_config, dict) and key_config.get("key") == api_key:
return key_config
# Check legacy users list (backward compatibility)
users = config.get("users", [])
if api_key in users:
return {
"key": api_key,
"tier": "basic",
"allowed_cdms": []
}
return None
def is_premium_user(app: web.Application, api_key: str) -> bool:
"""
Check if an API key belongs to a premium user.
Premium users can use server-side CDM for decryption.
Args:
app: aiohttp application
api_key: API key to check
Returns:
True if premium user, False otherwise
"""
key_config = get_api_key_config(app, api_key)
if not key_config:
return False
tier = key_config.get("tier", "basic")
return tier == "premium"
def get_allowed_cdms(app: web.Application, api_key: str) -> List[str]:
"""
Get list of CDMs that an API key is allowed to use.
Args:
app: aiohttp application
api_key: API key to check
Returns:
List of allowed CDM names, or empty list if not premium
"""
key_config = get_api_key_config(app, api_key)
if not key_config:
return []
allowed_cdms = key_config.get("allowed_cdms", [])
# Handle wildcard
if allowed_cdms == "*" or allowed_cdms == ["*"]:
return ["*"]
return allowed_cdms if isinstance(allowed_cdms, list) else []
def get_default_cdm(app: web.Application, api_key: str) -> Optional[str]:
"""
Get default CDM for an API key.
Args:
app: aiohttp application
api_key: API key to check
Returns:
Default CDM name or None
"""
key_config = get_api_key_config(app, api_key)
if not key_config:
return None
return key_config.get("default_cdm")
def can_use_cdm(app: web.Application, api_key: str, cdm_name: str) -> bool:
"""
Check if an API key can use a specific CDM.
Args:
app: aiohttp application
api_key: API key to check
cdm_name: CDM name to check access for
Returns:
True if allowed, False otherwise
"""
allowed_cdms = get_allowed_cdms(app, api_key)
# Wildcard access
if "*" in allowed_cdms:
return True
# Specific CDM access
return cdm_name in allowed_cdms

File diff suppressed because it is too large Load Diff

View File

@@ -8,9 +8,6 @@ from unshackle.core import __version__
from unshackle.core.api.errors import APIError, APIErrorCode, build_error_response, handle_api_exception
from unshackle.core.api.handlers import (cancel_download_job_handler, download_handler, get_download_job_handler,
list_download_jobs_handler, list_titles_handler, list_tracks_handler)
from unshackle.core.api.remote_handlers import (remote_decrypt, remote_get_chapters, remote_get_license,
remote_get_manifest, remote_get_titles, remote_get_tracks,
remote_list_services, remote_search)
from unshackle.core.services import Services
from unshackle.core.update_checker import UpdateChecker
@@ -733,16 +730,6 @@ def setup_routes(app: web.Application) -> None:
app.router.add_get("/api/download/jobs/{job_id}", download_job_detail)
app.router.add_delete("/api/download/jobs/{job_id}", cancel_download_job)
# Remote service endpoints
app.router.add_get("/api/remote/services", remote_list_services)
app.router.add_post("/api/remote/{service}/search", remote_search)
app.router.add_post("/api/remote/{service}/titles", remote_get_titles)
app.router.add_post("/api/remote/{service}/tracks", remote_get_tracks)
app.router.add_post("/api/remote/{service}/manifest", remote_get_manifest)
app.router.add_post("/api/remote/{service}/chapters", remote_get_chapters)
app.router.add_post("/api/remote/{service}/license", remote_get_license)
app.router.add_post("/api/remote/{service}/decrypt", remote_decrypt)
def setup_swagger(app: web.Application) -> None:
"""Setup Swagger UI documentation."""
@@ -767,14 +754,5 @@ def setup_swagger(app: web.Application) -> None:
web.get("/api/download/jobs", download_jobs),
web.get("/api/download/jobs/{job_id}", download_job_detail),
web.delete("/api/download/jobs/{job_id}", cancel_download_job),
# Remote service routes
web.get("/api/remote/services", remote_list_services),
web.post("/api/remote/{service}/search", remote_search),
web.post("/api/remote/{service}/titles", remote_get_titles),
web.post("/api/remote/{service}/tracks", remote_get_tracks),
web.post("/api/remote/{service}/manifest", remote_get_manifest),
web.post("/api/remote/{service}/chapters", remote_get_chapters),
web.post("/api/remote/{service}/license", remote_get_license),
web.post("/api/remote/{service}/decrypt", remote_decrypt),
]
)

View File

@@ -1,236 +0,0 @@
"""Session serialization helpers for remote services."""
from http.cookiejar import CookieJar
from typing import Any, Dict, Optional
import requests
from unshackle.core.credential import Credential
def serialize_session(session: requests.Session) -> Dict[str, Any]:
"""
Serialize a requests.Session into a JSON-serializable dictionary.
Extracts cookies, headers, and other session data that can be
transferred to a remote client for downloading.
Args:
session: The requests.Session to serialize
Returns:
Dictionary containing serialized session data
"""
session_data = {
"cookies": {},
"headers": {},
"proxies": session.proxies.copy() if session.proxies else {},
}
# Serialize cookies
if session.cookies:
for cookie in session.cookies:
session_data["cookies"][cookie.name] = {
"value": cookie.value,
"domain": cookie.domain,
"path": cookie.path,
"secure": cookie.secure,
"expires": cookie.expires,
}
# Serialize headers (exclude proxy-authorization for security)
if session.headers:
for key, value in session.headers.items():
# Skip proxy-related headers as they're server-specific
if key.lower() not in ["proxy-authorization"]:
session_data["headers"][key] = value
return session_data
def deserialize_session(
session_data: Dict[str, Any], target_session: Optional[requests.Session] = None
) -> requests.Session:
"""
Deserialize session data into a requests.Session.
Applies cookies, headers, and other session data from a remote server
to a local session for downloading.
Args:
session_data: Dictionary containing serialized session data
target_session: Optional existing session to update (creates new if None)
Returns:
requests.Session with applied session data
"""
if target_session is None:
target_session = requests.Session()
# Apply cookies
if "cookies" in session_data:
for cookie_name, cookie_data in session_data["cookies"].items():
target_session.cookies.set(
name=cookie_name,
value=cookie_data["value"],
domain=cookie_data.get("domain"),
path=cookie_data.get("path", "/"),
secure=cookie_data.get("secure", False),
expires=cookie_data.get("expires"),
)
# Apply headers
if "headers" in session_data:
target_session.headers.update(session_data["headers"])
# Note: We don't apply proxies from remote as the local client
# should use its own proxy configuration
return target_session
def extract_session_tokens(session: requests.Session) -> Dict[str, Any]:
"""
Extract authentication tokens and similar data from a session.
Looks for common authentication patterns like Bearer tokens,
API keys in headers, etc.
Args:
session: The requests.Session to extract tokens from
Returns:
Dictionary containing extracted tokens
"""
tokens = {}
# Check for Authorization header
if "Authorization" in session.headers:
tokens["authorization"] = session.headers["Authorization"]
# Check for common API key headers
for key in ["X-API-Key", "Api-Key", "X-Auth-Token"]:
if key in session.headers:
tokens[key.lower().replace("-", "_")] = session.headers[key]
return tokens
def apply_session_tokens(tokens: Dict[str, Any], target_session: requests.Session) -> None:
"""
Apply authentication tokens to a session.
Args:
tokens: Dictionary containing tokens to apply
target_session: Session to apply tokens to
"""
# Apply Authorization header
if "authorization" in tokens:
target_session.headers["Authorization"] = tokens["authorization"]
# Apply other token headers
token_header_map = {
"x_api_key": "X-API-Key",
"api_key": "Api-Key",
"x_auth_token": "X-Auth-Token",
}
for token_key, header_name in token_header_map.items():
if token_key in tokens:
target_session.headers[header_name] = tokens[token_key]
def serialize_cookies(cookie_jar: Optional[CookieJar]) -> Dict[str, Any]:
"""
Serialize a CookieJar into a JSON-serializable dictionary.
Args:
cookie_jar: The CookieJar to serialize
Returns:
Dictionary containing serialized cookies
"""
if not cookie_jar:
return {}
cookies = {}
for cookie in cookie_jar:
cookies[cookie.name] = {
"value": cookie.value,
"domain": cookie.domain,
"path": cookie.path,
"secure": cookie.secure,
"expires": cookie.expires,
}
return cookies
def deserialize_cookies(cookies_data: Dict[str, Any]) -> CookieJar:
"""
Deserialize cookies into a CookieJar.
Args:
cookies_data: Dictionary containing serialized cookies
Returns:
CookieJar with cookies
"""
import http.cookiejar
cookie_jar = http.cookiejar.CookieJar()
for cookie_name, cookie_data in cookies_data.items():
cookie = http.cookiejar.Cookie(
version=0,
name=cookie_name,
value=cookie_data["value"],
port=None,
port_specified=False,
domain=cookie_data.get("domain", ""),
domain_specified=bool(cookie_data.get("domain")),
domain_initial_dot=cookie_data.get("domain", "").startswith("."),
path=cookie_data.get("path", "/"),
path_specified=True,
secure=cookie_data.get("secure", False),
expires=cookie_data.get("expires"),
discard=False,
comment=None,
comment_url=None,
rest={},
)
cookie_jar.set_cookie(cookie)
return cookie_jar
def serialize_credential(credential: Optional[Credential]) -> Optional[Dict[str, str]]:
"""
Serialize a Credential into a JSON-serializable dictionary.
Args:
credential: The Credential to serialize
Returns:
Dictionary containing username and password, or None
"""
if not credential:
return None
return {"username": credential.username, "password": credential.password}
def deserialize_credential(credential_data: Optional[Dict[str, str]]) -> Optional[Credential]:
"""
Deserialize credential data into a Credential object.
Args:
credential_data: Dictionary containing username and password
Returns:
Credential object or None
"""
if not credential_data:
return None
return Credential(username=credential_data["username"], password=credential_data["password"])