From 746b573711e6247c3f15a12b497a5fd3d7bc5342 Mon Sep 17 00:00:00 2001 From: imSp4rky Date: Thu, 21 May 2026 10:45:25 -0600 Subject: [PATCH] test(remote): add unit + e2e suite for remote-services subsystem Covers RemoteClient/RemoteService, REST routes, handlers, SessionStore, InputBridge, DownloadQueueManager, errors, compression, and serve CLI. E2e tier opts in via --live and can auto-spawn its own serve. --- .gitignore | 3 + pyproject.toml | 22 ++ tests/__init__.py | 0 tests/remote/README.md | 130 +++++++++ tests/remote/__init__.py | 0 tests/remote/conftest.py | 208 +++++++++++++ tests/remote/e2e/__init__.py | 0 tests/remote/e2e/conftest.py | 40 +++ .../remote/e2e/fixtures/fixtures-example.yaml | 67 +++++ tests/remote/e2e/test_live_download.py | 124 ++++++++ tests/remote/e2e/test_live_health.py | 37 +++ tests/remote/e2e/test_live_input_bridge.py | 46 +++ tests/remote/e2e/test_live_license.py | 120 ++++++++ tests/remote/e2e/test_live_quality.py | 141 +++++++++ tests/remote/e2e/test_live_remote_client.py | 29 ++ tests/remote/e2e/test_live_search.py | 28 ++ .../remote/e2e/test_live_session_lifecycle.py | 116 ++++++++ tests/remote/unit/__init__.py | 0 tests/remote/unit/test_compression.py | 65 +++++ tests/remote/unit/test_download_manager.py | 120 ++++++++ tests/remote/unit/test_errors.py | 132 +++++++++ tests/remote/unit/test_handlers_serialize.py | 273 ++++++++++++++++++ tests/remote/unit/test_input_bridge.py | 100 +++++++ tests/remote/unit/test_remote_client.py | 111 +++++++ tests/remote/unit/test_remote_resolve.py | 117 ++++++++ .../unit/test_remote_service_helpers.py | 157 ++++++++++ tests/remote/unit/test_routes.py | 159 ++++++++++ tests/remote/unit/test_serve_cli.py | 79 +++++ tests/remote/unit/test_session_store.py | 117 ++++++++ 29 files changed, 2541 insertions(+) create mode 100644 tests/__init__.py create mode 100644 tests/remote/README.md create mode 100644 tests/remote/__init__.py create mode 100644 tests/remote/conftest.py create mode 100644 tests/remote/e2e/__init__.py create mode 100644 tests/remote/e2e/conftest.py create mode 100644 tests/remote/e2e/fixtures/fixtures-example.yaml create mode 100644 tests/remote/e2e/test_live_download.py create mode 100644 tests/remote/e2e/test_live_health.py create mode 100644 tests/remote/e2e/test_live_input_bridge.py create mode 100644 tests/remote/e2e/test_live_license.py create mode 100644 tests/remote/e2e/test_live_quality.py create mode 100644 tests/remote/e2e/test_live_remote_client.py create mode 100644 tests/remote/e2e/test_live_search.py create mode 100644 tests/remote/e2e/test_live_session_lifecycle.py create mode 100644 tests/remote/unit/__init__.py create mode 100644 tests/remote/unit/test_compression.py create mode 100644 tests/remote/unit/test_download_manager.py create mode 100644 tests/remote/unit/test_errors.py create mode 100644 tests/remote/unit/test_handlers_serialize.py create mode 100644 tests/remote/unit/test_input_bridge.py create mode 100644 tests/remote/unit/test_remote_client.py create mode 100644 tests/remote/unit/test_remote_resolve.py create mode 100644 tests/remote/unit/test_remote_service_helpers.py create mode 100644 tests/remote/unit/test_routes.py create mode 100644 tests/remote/unit/test_serve_cli.py create mode 100644 tests/remote/unit/test_session_store.py diff --git a/.gitignore b/.gitignore index c7fc888..8a8754c 100644 --- a/.gitignore +++ b/.gitignore @@ -245,3 +245,6 @@ AGENTS.md marimo/_static/ marimo/_lsp/ __marimo__/ + +# Test fixtures with private/local content (services, IDs, credentials) +tests/remote/e2e/fixtures/fixtures.yaml diff --git a/pyproject.toml b/pyproject.toml index ae0ff5f..2ebd3dd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,6 +91,28 @@ dev = [ "isort>=5.13.2,<8", "ruff>=0.3.7,<0.15", ] +test = [ + "pytest>=8.3.0,<9", + "pytest-asyncio>=0.24.0,<1", + "pytest-aiohttp>=1.0.5,<2", + "responses>=0.25.0,<1", + "PyYAML>=6.0.1,<7", +] + +[tool.pytest.ini_options] +testpaths = ["tests"] +asyncio_mode = "auto" +markers = [ + "unit: fast, mocked tests (default)", + "live: end-to-end tests against a running serve (opt-in via --live)", + "slow: tests that hit real services and may take >10s", +] +filterwarnings = [ + "ignore::DeprecationWarning:click.*", + "ignore::DeprecationWarning:unshackle.core.services", + "ignore::DeprecationWarning:ast", +] +addopts = "-ra --strict-markers" [tool.hatch.build.targets.wheel] packages = ["unshackle"] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/remote/README.md b/tests/remote/README.md new file mode 100644 index 0000000..5f54727 --- /dev/null +++ b/tests/remote/README.md @@ -0,0 +1,130 @@ +# tests/remote/ + +Test suite for the unshackle remote-services subsystem: + +- `unshackle/core/remote_service.py` — `RemoteClient`, `RemoteService`, helpers +- `unshackle/core/api/` — routes, handlers, session_store, input_bridge, + download_manager, errors, compression +- `unshackle/commands/serve.py` — CLI surface + +Two tiers: + +| Tier | Where | Network | Default | +|------|-------|---------|---------| +| **unit** | `tests/remote/unit/` | mocked (`responses`, in-process aiohttp) | runs by default | +| **e2e** | `tests/remote/e2e/` | hits an `unshackle serve` instance | skipped unless `--live` | + +## Install test deps + +```powershell +uv sync --group test +``` + +## Run unit tests + +```powershell +uv run pytest tests/remote/unit -v +``` + +Fast, hermetic, no external calls. + +## Run e2e tests + +The suite can either: +1. **Spawn its own serve** (default with `--live`): starts + `uv run unshackle serve --host 127.0.0.1 --port --no-key --remote-only` + for the session, waits for `/api/health`, tears it down at the end. +2. **Talk to an external serve** you started in another shell: pass + `--server-url http://host:port`. + +Spawn mode is controlled by `--spawn-serve {auto, always, never}` (default +`auto` — spawn only when `--server-url` is empty). + +### Auto-spawn (recommended) + +```powershell +uv run pytest tests/remote/e2e -v --live +``` + +### External serve + +```powershell +# in shell A +uv run unshackle serve --host 0.0.0.0 --no-key --remote-only + +# in shell B +uv run pytest tests/remote/e2e -v --live --server-url http://localhost:8786 +``` + +### With API key + +```powershell +uv run pytest tests/remote/e2e -v --live --secret-key your-key +``` + +### Limit which services run + +```powershell +uv run pytest tests/remote/e2e -v --live --services FOO,BAR +``` + +## Configure e2e scenarios + +Copy `tests/remote/e2e/fixtures/fixtures-example.yaml` to +`tests/remote/e2e/fixtures/fixtures.yaml` (gitignored) and fill in the +services you have access to. Schema (see the example file for full docs): + +```yaml +services: + EXAMPLE: + title_url: "..." # required + series_url: "..." # optional — overrides movie target when set + target_season: 1 + target_episode: 1 + search_query: "..." + expected_quality: + min_height: 1080 + min_codecs: [AVC] + min_ranges: [SDR] + min_track_count: 4 + runs_download_test: true # opt in to the download smoke test + runs_license_test: false # opt in to the license test + license_drm: widevine # or "playready" + license_quality: 1080 +``` + +Tests skip a service if its session can't be created (auth missing, +geofence, etc.) or if the matching `runs_*_test` flag is false. + +## Run everything + +```powershell +uv run pytest tests/remote -v --live +``` + +## CLI flags added + +| Flag | Default | Purpose | +|------|---------|---------| +| `--live` | off | Opt in to e2e tests | +| `--server-url URL` | `""` (or `$UNSHACKLE_SERVE_URL`) | Target external server; empty triggers auto-spawn | +| `--spawn-serve {auto,always,never}` | `auto` | Spawn serve when no URL given | +| `--secret-key KEY` | `""` (or `$UNSHACKLE_SECRET_KEY`) | `X-Secret-Key` header | +| `--services A,B` | (all) | Restrict e2e to these tags | + +## Markers + +- `unit` — fast, mocked (default) +- `live` — needs `--live` +- `slow` — hits real services; combine with `--live` + +Run only fast unit tests: + +```powershell +uv run pytest tests/remote -m "unit and not slow" +``` + +## Adding a new service to e2e + +1. Add a block under `services:` in your local `fixtures.yaml`. +2. No new Python needed — every e2e test is parametrized over the YAML. diff --git a/tests/remote/__init__.py b/tests/remote/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/remote/conftest.py b/tests/remote/conftest.py new file mode 100644 index 0000000..9b6b078 --- /dev/null +++ b/tests/remote/conftest.py @@ -0,0 +1,208 @@ +"""Shared fixtures + CLI flags for tests/remote/.""" + +from __future__ import annotations + +import os +import socket +import subprocess +import time +from pathlib import Path + +import pytest + + +def pytest_configure(config: pytest.Config) -> None: + config.addinivalue_line("markers", "unit: fast, mocked tests (default)") + config.addinivalue_line("markers", "live: end-to-end tests against a running serve (opt-in via --live)") + config.addinivalue_line("markers", "slow: tests that hit real services and may take >10s") + + +def pytest_addoption(parser: pytest.Parser) -> None: + parser.addoption( + "--live", + action="store_true", + default=False, + help="Run e2e tests against a running 'unshackle serve' instance.", + ) + parser.addoption( + "--server-url", + action="store", + default=os.environ.get("UNSHACKLE_SERVE_URL", ""), + help="Server URL for e2e tests. If empty, the suite will spawn its own serve " + "(via --spawn-serve, default on for --live). Override with $UNSHACKLE_SERVE_URL.", + ) + parser.addoption( + "--secret-key", + action="store", + default=os.environ.get("UNSHACKLE_SECRET_KEY", ""), + help="X-Secret-Key for e2e tests. Empty for --no-key servers.", + ) + parser.addoption( + "--services", + action="store", + default="", + help="Comma-separated service tags to run e2e against (default: all in fixtures.yaml).", + ) + parser.addoption( + "--spawn-serve", + action="store", + default="auto", + choices=["auto", "always", "never"], + help="Whether to spawn 'unshackle serve' for e2e tests. " + "'auto' (default): spawn only when --server-url is empty. " + "'always': always spawn (kills any existing process is your problem). " + "'never': require an external serve.", + ) + + +def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]) -> None: + """Skip live tests unless --live is passed.""" + if config.getoption("--live"): + return + skip_live = pytest.mark.skip(reason="needs --live") + for item in items: + if "live" in item.keywords: + item.add_marker(skip_live) + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +def _wait_for_health(url: str, timeout: float = 60.0) -> bool: + import requests + + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + r = requests.get(f"{url}/api/health", timeout=2) + if r.status_code == 200: + return True + except requests.RequestException: + pass + time.sleep(0.5) + return False + + +@pytest.fixture(scope="session") +def secret_key(request: pytest.FixtureRequest) -> str: + return str(request.config.getoption("--secret-key")) + + +@pytest.fixture(scope="session") +def server_url(request: pytest.FixtureRequest): + """Provide a base URL for the live serve. + + - If --server-url is set: use it as-is (external server). + - Else (default) and --live is on: spawn 'unshackle serve' on a free + port, wait for /api/health, yield the URL, terminate at session end. + - Spawn mode controlled by --spawn-serve {auto, always, never}. + """ + cfg = request.config + explicit = str(cfg.getoption("--server-url")).rstrip("/") + mode = cfg.getoption("--spawn-serve") + is_live = cfg.getoption("--live") + + if not is_live: + yield explicit or "http://localhost:8786" + return + + if mode == "never": + if not explicit: + pytest.fail("--spawn-serve=never requires --server-url") + if not _wait_for_health(explicit, timeout=10): + pytest.fail(f"External serve not reachable at {explicit}") + yield explicit + return + + if mode == "auto" and explicit: + if not _wait_for_health(explicit, timeout=10): + pytest.fail(f"External serve not reachable at {explicit}") + yield explicit + return + + # Spawn our own serve. + port = _free_port() + cmd = [ + "uv", + "run", + "unshackle", + "serve", + "--host", + "127.0.0.1", + "--port", + str(port), + "--no-key", + "--remote-only", + ] + proc = subprocess.Popen( # nosec B603 + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + cwd=Path(__file__).resolve().parents[2], + ) + url = f"http://127.0.0.1:{port}" + try: + if not _wait_for_health(url, timeout=60): + proc.terminate() + pytest.fail(f"Spawned serve at {url} did not become healthy within 60s") + yield url + finally: + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + + +@pytest.fixture(scope="session") +def selected_services(request: pytest.FixtureRequest) -> list[str]: + raw = str(request.config.getoption("--services")).strip() + return [s.strip().upper() for s in raw.split(",") if s.strip()] if raw else [] + + +@pytest.fixture(scope="session") +def fixtures_dir() -> Path: + return Path(__file__).parent / "e2e" / "fixtures" + + +@pytest.fixture(scope="session") +def e2e_scenarios(fixtures_dir: Path, selected_services: list[str]) -> dict: + """Load e2e service scenarios from fixtures.yaml. + + See fixtures-example.yaml for the schema. fixtures.yaml is .gitignored + so each user keeps their own private scenario set locally. + """ + import yaml + + path = fixtures_dir / "fixtures.yaml" + if not path.exists(): + return {"services": {}} + data = yaml.safe_load(path.read_text()) or {} + services = data.get("services", {}) + if selected_services: + services = {k: v for k, v in services.items() if k.upper() in selected_services} + data["services"] = services + return data + + +@pytest.fixture(scope="session") +def http_session(secret_key: str): + """Plain requests.Session with X-Secret-Key wired in.""" + import requests + + s = requests.Session() + s.headers["User-Agent"] = "unshackle-tests/1.0" + if secret_key: + s.headers["X-Secret-Key"] = secret_key + return s + + +@pytest.fixture +def remote_client(server_url: str, secret_key: str): + """RemoteClient pointed at the e2e serve instance.""" + from unshackle.core.remote_service import RemoteClient + + return RemoteClient(server_url=server_url, api_key=secret_key) diff --git a/tests/remote/e2e/__init__.py b/tests/remote/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/remote/e2e/conftest.py b/tests/remote/e2e/conftest.py new file mode 100644 index 0000000..222ae4f --- /dev/null +++ b/tests/remote/e2e/conftest.py @@ -0,0 +1,40 @@ +"""E2E-specific fixtures. + +These tests are skipped unless ``--live`` is passed on the pytest CLI. +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + + +def _load_scenarios(config: pytest.Config) -> dict: + cached = getattr(config, "_e2e_scenarios_cache", None) + if cached is not None: + return cached + + import yaml + + path = Path(__file__).parent / "fixtures" / "fixtures.yaml" + data = yaml.safe_load(path.read_text()) if path.exists() else {} + selected = (config.getoption("--services") or "").strip() + services = (data or {}).get("services", {}) or {} + if selected: + wanted = {s.strip().upper() for s in selected.split(",") if s.strip()} + services = {k: v for k, v in services.items() if k.upper() in wanted} + cached = {"services": services} + config._e2e_scenarios_cache = cached # type: ignore[attr-defined] + return cached + + +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + if "service_case" in metafunc.fixturenames: + scenarios = _load_scenarios(metafunc.config) + params = [pytest.param((tag, conf), id=tag) for tag, conf in (scenarios.get("services") or {}).items()] + if not params: + params = [ + pytest.param(("__none__", {}), id="no-services", marks=pytest.mark.skip(reason="no e2e fixtures")) + ] + metafunc.parametrize("service_case", params) diff --git a/tests/remote/e2e/fixtures/fixtures-example.yaml b/tests/remote/e2e/fixtures/fixtures-example.yaml new file mode 100644 index 0000000..f1eb48e --- /dev/null +++ b/tests/remote/e2e/fixtures/fixtures-example.yaml @@ -0,0 +1,67 @@ +# tests/remote/e2e/fixtures/fixtures-example.yaml +# +# Copy this file to `fixtures.yaml` in the same directory and fill in +# service-specific scenarios for the services you have access to. +# `fixtures.yaml` is .gitignored so your private IDs / queries stay local. +# +# Each entry under `services:` is keyed by the service TAG used by +# unshackle (matches the service directory name, e.g. EXAMPLE). The keys +# below are read by the e2e tests: +# +# title_url (required) — movie URL/slug/ID accepted by the service +# series_url (optional) — series URL/slug/ID; when set, the +# quality + license + download tests +# target target_season/target_episode +# instead of the movie +# target_season (optional, default 1) +# target_episode (optional, default 1) +# search_query (optional) — string to send to /api/search +# requires_auth (optional, default true) — informational +# server_cdm (optional, default false) — informational +# drm (optional) — informational ("widevine" / "playready") +# +# expected_quality (optional) — when present, the quality test +# asserts these limits against the +# discovered video tracks +# min_height (optional) — max video.height >= this +# min_codecs (optional) — codec names that must be present +# (AVC, HEVC, VC1, VP8, VP9, AV1) +# min_ranges (optional) — range names that must be present +# (SDR, HDR10, HDR10P, DV, HLG, HYBRID) +# min_track_count (optional) — at least this many video tracks +# +# runs_download_test (optional, default false) — when true, the +# download smoke test +# runs for this service +# runs_license_test (optional, default false) — when true, the +# license test runs for +# this service +# license_drm (optional, default "widevine") — DRM type the +# license test +# requests ("widevine" +# or "playready") +# license_quality (optional, default 1080) — pixel height of the +# track the license test +# targets + +services: + EXAMPLE: + title_url: "https://example.com/movie/example-movie-id" + series_url: "https://example.com/series/example-series-id" + target_season: 1 + target_episode: 1 + search_query: "example" + requires_auth: true + server_cdm: false + drm: widevine + + expected_quality: + min_height: 1080 + min_codecs: [AVC] + min_ranges: [SDR] + min_track_count: 4 + + runs_download_test: true + runs_license_test: false + license_drm: widevine + license_quality: 1080 diff --git a/tests/remote/e2e/test_live_download.py b/tests/remote/e2e/test_live_download.py new file mode 100644 index 0000000..730d87d --- /dev/null +++ b/tests/remote/e2e/test_live_download.py @@ -0,0 +1,124 @@ +"""E2E: download smoke test via segments + CDN manifest fetch. + +For any fixture service with ``runs_download_test: true``, this test: + 1. Creates a session with the full track-selection knobs. + 2. Picks a 1080p SDR video track on the configured target title. + 3. Calls /api/session/{id}/segments to resolve the CDN URL + headers. + 4. Fetches the manifest URL with the resolved headers. + 5. Asserts the body is non-empty and looks like DASH/HLS. + +It does NOT decrypt or mux — that requires a full local CDM. It proves +the end-to-end pipeline up to CDN reachability for the selected quality. +""" + +from __future__ import annotations + +import time + +import pytest + +pytestmark = [pytest.mark.live, pytest.mark.slow] + + +def _wait_titles(http_session, server_url: str, sid: str, timeout: float = 120.0): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + r = http_session.get(f"{server_url}/api/session/{sid}/titles", timeout=30) + if r.status_code == 200: + return r.json() + if r.status_code == 400: + try: + body = r.json() + except Exception: + body = {} + if (body.get("details") or {}).get("auth_status") in ("authenticating", "pending_input"): + time.sleep(2.0) + continue + return None + return None + + +def _pick_target_title(titles, season: int = 1, episode: int = 1): + for t in titles: + if t.get("type") == "episode" and t.get("season") == season and t.get("number") == episode: + return t + return titles[0] if titles else None + + +def _pick_track_at_height(video_tracks, target_height: int): + same_height = [v for v in video_tracks if v.get("height") == target_height and v.get("range") == "SDR"] + return sorted(same_height, key=lambda v: v.get("bitrate") or 0)[0] if same_height else None + + +def test_download_manifest_fetch(http_session, server_url: str, service_case) -> None: + service, conf = service_case + if not conf.get("runs_download_test"): + pytest.skip(f"{service}: download test not enabled (runs_download_test)") + + target_height = int(conf.get("license_quality") or 1080) + title_input = conf.get("series_url") or conf.get("title_url") + if not title_input: + pytest.skip(f"{service}: no title/series in fixture") + + r = http_session.post( + f"{server_url}/api/session/create", + json={ + "service": service, + "title_id": title_input, + "range_": ["SDR"], + "vcodec": ["AVC"], + "quality": [target_height], + "best_available": True, + }, + timeout=120, + ) + if r.status_code >= 400: + pytest.skip(f"{service}: session create failed {r.status_code}: {r.text[:200]}") + sid = r.json()["session_id"] + + try: + body = _wait_titles(http_session, server_url, sid) + if not body: + pytest.skip(f"{service}: titles timeout") + target = _pick_target_title( + body.get("titles") or [], + season=conf.get("target_season", 1), + episode=conf.get("target_episode", 1), + ) + if not target: + pytest.skip(f"{service}: no target title") + + tr = http_session.post( + f"{server_url}/api/session/{sid}/tracks", + json={"title_id": target["id"]}, + timeout=240, + ) + assert tr.status_code == 200, tr.text + track = _pick_track_at_height(tr.json().get("video") or [], target_height) + if not track: + pytest.skip(f"{service}: no track at height={target_height}") + + seg = http_session.post( + f"{server_url}/api/session/{sid}/segments", + json={"track_ids": [track["id"]]}, + timeout=120, + ) + assert seg.status_code == 200, seg.text + info = seg.json().get("tracks", {}).get(track["id"]) + assert info, f"no segment info: {seg.text[:200]}" + + manifest_url = info.get("url") + assert manifest_url, f"no manifest url: {info}" + headers = dict(info.get("headers") or {}) + headers.pop("Host", None) + headers.pop("host", None) + + import requests + + cdn = requests.get(manifest_url, headers=headers, timeout=60) + assert cdn.status_code == 200, f"CDN fetch failed {cdn.status_code}: {cdn.text[:200]}" + assert len(cdn.content) > 256, "manifest body too small" + head = cdn.content[:128].lstrip() + assert head.startswith((b" None: + r = http_session.get(f"{server_url}/api/health", timeout=10) + assert r.status_code == 200, r.text + body = r.json() + assert body["status"] == "ok" + assert "version" in body + assert "update_check" in body + + +def test_services_endpoint(http_session, server_url: str) -> None: + r = http_session.get(f"{server_url}/api/services", timeout=10) + assert r.status_code == 200, r.text + body = r.json() + assert "services" in body + assert isinstance(body["services"], list) + + +def test_health_does_not_require_secret_key(server_url: str) -> None: + """Health bypasses auth even when the server runs with a key.""" + import requests + + r = requests.get(f"{server_url}/api/health", timeout=10) + assert r.status_code == 200 + + +def test_unknown_route_returns_404(http_session, server_url: str) -> None: + r = http_session.get(f"{server_url}/api/this-does-not-exist", timeout=10) + assert r.status_code in (404, 405) diff --git a/tests/remote/e2e/test_live_input_bridge.py b/tests/remote/e2e/test_live_input_bridge.py new file mode 100644 index 0000000..677307f --- /dev/null +++ b/tests/remote/e2e/test_live_input_bridge.py @@ -0,0 +1,46 @@ +"""E2E: interactive auth prompt round-trip. + +For services that don't naturally prompt during ``authenticate()``, the +test still verifies the GET prompt endpoint returns the documented +shape (empty when no prompt pending). +""" + +from __future__ import annotations + +import pytest + +pytestmark = [pytest.mark.live, pytest.mark.slow] + + +def test_prompt_endpoint_shape_without_pending(http_session, server_url: str, service_case) -> None: + service, conf = service_case + r = http_session.post( + f"{server_url}/api/session/create", + json={"service": service, "title_id": conf["title_url"]}, + timeout=120, + ) + if r.status_code >= 400: + pytest.skip(f"{service}: session creation failed: {r.status_code}") + sid = r.json()["session_id"] + try: + pr = http_session.get(f"{server_url}/api/session/{sid}/prompt", timeout=10) + assert pr.status_code in (200, 404), pr.text + if pr.status_code == 200: + body = pr.json() + assert "prompt" in body or "status" in body + finally: + http_session.delete(f"{server_url}/api/session/{sid}", timeout=30) + + +def test_prompt_post_unknown_session_returns_404(http_session, server_url: str) -> None: + r = http_session.post( + f"{server_url}/api/session/bogus-session-id/prompt", + json={"response": "irrelevant"}, + timeout=10, + ) + assert r.status_code == 404 + + +def test_prompt_get_unknown_session_returns_404(http_session, server_url: str) -> None: + r = http_session.get(f"{server_url}/api/session/bogus-session-id/prompt", timeout=10) + assert r.status_code == 404 diff --git a/tests/remote/e2e/test_live_license.py b/tests/remote/e2e/test_live_license.py new file mode 100644 index 0000000..5828ce8 --- /dev/null +++ b/tests/remote/e2e/test_live_license.py @@ -0,0 +1,120 @@ +"""E2E: license acquisition via server_cdm batch mode. + +For any fixture service with ``runs_license_test: true``, this test: + 1. Creates a session for the configured target title. + 2. Picks a video track at ``license_quality`` height (default 1080). + 3. Asks the server for keys via ``mode=server_cdm`` with ``drm_type`` + equal to the configured ``license_drm`` (default ``widevine``). + 4. Asserts at least one 32-hex KID + 32-hex KEY pair is returned. + +The server uses its own configured CDM (no client CDM required). Services +without ``runs_license_test`` are skipped, so this file is service-neutral. +""" + +from __future__ import annotations + +import time + +import pytest + +pytestmark = [pytest.mark.live, pytest.mark.slow] + + +def _wait_titles(http_session, server_url: str, sid: str, timeout: float = 120.0): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + r = http_session.get(f"{server_url}/api/session/{sid}/titles", timeout=30) + if r.status_code == 200: + return r.json() + if r.status_code == 400: + try: + body = r.json() + except Exception: + body = {} + if (body.get("details") or {}).get("auth_status") in ("authenticating", "pending_input"): + time.sleep(2.0) + continue + return None + return None + + +def _pick_target_title(titles, season: int = 1, episode: int = 1): + for t in titles: + if t.get("type") == "episode" and t.get("season") == season and t.get("number") == episode: + return t + return titles[0] if titles else None + + +def _pick_track_at_height(video_tracks, target_height: int): + """Prefer SDR + AVC at the requested height; smallest bitrate wins.""" + same_height = [v for v in video_tracks if v.get("height") == target_height] + preferred = [v for v in same_height if v.get("codec") == "AVC" and v.get("range") == "SDR"] + pool = preferred or [v for v in same_height if v.get("range") == "SDR"] or same_height + return sorted(pool, key=lambda v: v.get("bitrate") or 0)[0] if pool else None + + +def test_license_server_cdm(http_session, server_url: str, service_case) -> None: + service, conf = service_case + if not conf.get("runs_license_test"): + pytest.skip(f"{service}: license test not enabled (runs_license_test)") + + drm_type = (conf.get("license_drm") or "widevine").lower() + target_height = int(conf.get("license_quality") or 1080) + title_input = conf.get("series_url") or conf.get("title_url") + if not title_input: + pytest.skip(f"{service}: no title/series in fixture") + + r = http_session.post( + f"{server_url}/api/session/create", + json={ + "service": service, + "title_id": title_input, + "range_": ["SDR"], + "vcodec": ["AVC"], + "best_available": True, + }, + timeout=120, + ) + if r.status_code >= 400: + pytest.skip(f"{service}: session create failed {r.status_code}: {r.text[:200]}") + sid = r.json()["session_id"] + + try: + body = _wait_titles(http_session, server_url, sid) + if not body: + pytest.skip(f"{service}: titles timeout") + target = _pick_target_title( + body.get("titles") or [], + season=conf.get("target_season", 1), + episode=conf.get("target_episode", 1), + ) + if not target: + pytest.skip(f"{service}: no target title") + + tr = http_session.post( + f"{server_url}/api/session/{sid}/tracks", + json={"title_id": target["id"]}, + timeout=240, + ) + assert tr.status_code == 200, tr.text + track = _pick_track_at_height(tr.json().get("video") or [], target_height) + if not track: + pytest.skip(f"{service}: no track at height={target_height}") + + lic = http_session.post( + f"{server_url}/api/session/{sid}/license", + json={"track_ids": [track["id"]], "mode": "server_cdm", "drm_type": drm_type}, + timeout=120, + ) + assert lic.status_code == 200, lic.text + payload = lic.json() + keys = payload.get("keys") or {} + assert keys, f"no keys returned; payload={payload}" + + track_keys = keys.get(track["id"]) or keys + assert isinstance(track_keys, dict) and track_keys, f"unexpected keys shape: {keys}" + for kid, key in track_keys.items(): + assert len(kid) == 32, f"bad kid length: {kid}" + assert len(key) == 32, f"bad key length: {key}" + finally: + http_session.delete(f"{server_url}/api/session/{sid}", timeout=30) diff --git a/tests/remote/e2e/test_live_quality.py b/tests/remote/e2e/test_live_quality.py new file mode 100644 index 0000000..327c5ea --- /dev/null +++ b/tests/remote/e2e/test_live_quality.py @@ -0,0 +1,141 @@ +"""E2E: track quality probing per service. + +Sends a session_create with the full track-selection knob set (range_, +vcodec, best_available) so the server returns every available track. +Picks S01E01 for series, first title for movies, then asserts that the +discovered video tracks meet the expected_quality limits declared in +fixtures.yaml. If no expected block is present, the test prints what was +discovered and skips so you can copy values back into the fixture. +""" + +from __future__ import annotations + +import time +from typing import Any, Optional + +import pytest + +pytestmark = [pytest.mark.live, pytest.mark.slow] + + +def _create_session(http_session, server_url: str, service: str, title_id: str) -> str: + payload = { + "service": service, + "title_id": title_id, + # Quality knobs — send enum names the server's session_create parser accepts. + "range_": ["SDR", "HDR10", "DV"], + "vcodec": ["AVC", "HEVC"], + "best_available": True, + } + r = http_session.post(f"{server_url}/api/session/create", json=payload, timeout=120) + if r.status_code >= 400: + pytest.skip(f"{service}: session create failed {r.status_code}: {r.text[:200]}") + return r.json()["session_id"] + + +def _wait_for_titles(http_session, server_url: str, sid: str, timeout: float = 120.0): + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + r = http_session.get(f"{server_url}/api/session/{sid}/titles", timeout=30) + if r.status_code == 200: + return r.status_code, r.json() + if r.status_code == 400: + try: + body = r.json() + except Exception: + body = {} + if (body.get("details") or {}).get("auth_status") in ("authenticating", "pending_input"): + time.sleep(2.0) + continue + return r.status_code, r.json() if r.text else {} + return 408, {"message": "auth timeout"} + + +def _pick_target_title(titles: list[dict[str, Any]], season: int = 1, episode: int = 1) -> Optional[dict[str, Any]]: + """Pick the configured target episode; fall back to the first title.""" + for t in titles: + if t.get("type") == "episode" and t.get("season") == season and t.get("number") == episode: + return t + return titles[0] if titles else None + + +def _summarize(video_tracks: list[dict[str, Any]]) -> dict[str, Any]: + max_height = max((v.get("height") or 0 for v in video_tracks), default=0) + max_width = max((v.get("width") or 0 for v in video_tracks), default=0) + codecs = sorted({v.get("codec") for v in video_tracks if v.get("codec")}) + ranges = sorted({v.get("range") for v in video_tracks if v.get("range")}) + bitrates = sorted({v.get("bitrate") for v in video_tracks if v.get("bitrate")}) + return { + "track_count": len(video_tracks), + "max_width": max_width, + "max_height": max_height, + "codecs": codecs, + "ranges": ranges, + "bitrate_min_kbps": min(bitrates) if bitrates else None, + "bitrate_max_kbps": max(bitrates) if bitrates else None, + } + + +def _resolve_title_id(conf: dict) -> Optional[str]: + """Prefer series_url so S01E01 logic kicks in; fall back to movie title_url.""" + return conf.get("series_url") or conf.get("title_url") + + +def test_track_quality_meets_expected(http_session, server_url: str, service_case, capsys) -> None: + service, conf = service_case + title_input = _resolve_title_id(conf) + if not title_input: + pytest.skip(f"{service}: no title_url/series_url in fixture") + + sid = _create_session(http_session, server_url, service, title_input) + try: + code, body = _wait_for_titles(http_session, server_url, sid) + if code != 200: + pytest.skip(f"{service}: titles fetch failed {code}: {str(body)[:200]}") + + titles = body.get("titles") or [] + target = _pick_target_title(titles, season=conf.get("target_season", 1), episode=conf.get("target_episode", 1)) + if not target: + pytest.skip(f"{service}: no titles returned") + + # Movies appear with type=='movie'; episodes with type=='episode'. + kind = target.get("type") + title_id = target.get("id") + if not title_id: + pytest.skip(f"{service}: target title has no id") + + r = http_session.post( + f"{server_url}/api/session/{sid}/tracks", + json={"title_id": title_id}, + timeout=300, + ) + if r.status_code != 200: + pytest.skip(f"{service}: tracks fetch failed {r.status_code}: {r.text[:200]}") + + tracks = r.json() + video = tracks.get("video") or [] + summary = _summarize(video) + # Print summary even when test passes — discovery aid. + with capsys.disabled(): + print(f"\n[{service}] target={kind} '{target.get('name')}' -> {summary}") + + expected = conf.get("expected_quality") or {} + if not expected: + pytest.skip(f"{service}: no expected_quality in fixtures.yaml — discovered={summary}") + + if "min_height" in expected: + assert summary["max_height"] >= expected["min_height"], ( + f"{service}: max height {summary['max_height']} < expected min {expected['min_height']}" + ) + if "min_codecs" in expected: + missing = set(expected["min_codecs"]) - set(summary["codecs"]) + assert not missing, f"{service}: missing codecs {missing}, got {summary['codecs']}" + if "min_ranges" in expected: + missing = set(expected["min_ranges"]) - set(summary["ranges"]) + assert not missing, f"{service}: missing ranges {missing}, got {summary['ranges']}" + if "min_track_count" in expected: + assert summary["track_count"] >= expected["min_track_count"], ( + f"{service}: only {summary['track_count']} video tracks, expected >= {expected['min_track_count']}" + ) + finally: + http_session.delete(f"{server_url}/api/session/{sid}", timeout=30) diff --git a/tests/remote/e2e/test_live_remote_client.py b/tests/remote/e2e/test_live_remote_client.py new file mode 100644 index 0000000..6acddee --- /dev/null +++ b/tests/remote/e2e/test_live_remote_client.py @@ -0,0 +1,29 @@ +"""E2E: drive the full RemoteClient HTTP surface against the running serve.""" + +from __future__ import annotations + +import pytest + +pytestmark = [pytest.mark.live] + + +def test_remote_client_get_health(remote_client) -> None: + body = remote_client.get("/api/health") + assert body["status"] == "ok" + assert "version" in body + + +def test_remote_client_get_services(remote_client) -> None: + body = remote_client.get("/api/services") + assert "services" in body + assert isinstance(body["services"], list) + + +def test_remote_client_get_404_raises_systemexit(remote_client) -> None: + with pytest.raises(SystemExit): + remote_client.get("/api/session/this-does-not-exist") + + +def test_remote_client_delete_404_raises_systemexit(remote_client) -> None: + with pytest.raises(SystemExit): + remote_client.delete("/api/session/this-does-not-exist") diff --git a/tests/remote/e2e/test_live_search.py b/tests/remote/e2e/test_live_search.py new file mode 100644 index 0000000..d0d5fa7 --- /dev/null +++ b/tests/remote/e2e/test_live_search.py @@ -0,0 +1,28 @@ +"""E2E: search endpoint per service.""" + +from __future__ import annotations + +import pytest + +pytestmark = [pytest.mark.live, pytest.mark.slow] + + +def test_search_returns_results(http_session, server_url: str, service_case) -> None: + service, conf = service_case + query = conf.get("search_query") + if not query: + pytest.skip(f"no search_query configured for {service}") + + r = http_session.post( + f"{server_url}/api/search", + json={"service": service, "query": query}, + timeout=120, + ) + if r.status_code in (400, 401, 403): + pytest.skip(f"{service} search not available: {r.status_code} {r.text[:200]}") + if r.status_code == 502 and "not supported" in r.text.lower(): + pytest.skip(f"{service} does not implement search") + assert r.status_code == 200, r.text + body = r.json() + assert "results" in body + assert isinstance(body["results"], list) diff --git a/tests/remote/e2e/test_live_session_lifecycle.py b/tests/remote/e2e/test_live_session_lifecycle.py new file mode 100644 index 0000000..7ae7e51 --- /dev/null +++ b/tests/remote/e2e/test_live_session_lifecycle.py @@ -0,0 +1,116 @@ +"""E2E: full session lifecycle per service (create → info → titles → tracks → delete).""" + +from __future__ import annotations + +import time + +import pytest + +pytestmark = [pytest.mark.live, pytest.mark.slow] + + +def _create_session(http_session, server_url: str, service: str, conf: dict) -> str: + payload = {"service": service, "title_id": conf["title_url"]} + r = http_session.post(f"{server_url}/api/session/create", json=payload, timeout=120) + if r.status_code >= 400: + pytest.skip(f"Auth/setup not available for {service}: {r.status_code} {r.text[:200]}") + body = r.json() + sid = body.get("session_id") + assert sid, f"no session_id in body: {body}" + return sid + + +def _wait_for_titles(http_session, server_url: str, sid: str, timeout: float = 120.0): + """Poll /titles until auth completes. Returns (status_code, response_json). + + Server returns 400 + auth_status=authenticating while auth is in-flight, + 200 when authenticated, and other 4xx/5xx on real failure. + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + r = http_session.get(f"{server_url}/api/session/{sid}/titles", timeout=30) + if r.status_code == 200: + return r.status_code, r.json() + if r.status_code == 400: + try: + body = r.json() + except Exception: + body = {} + auth_status = (body.get("details") or {}).get("auth_status") + if auth_status in ("authenticating", "pending_input"): + time.sleep(2.0) + continue + return r.status_code, r.json() if r.text else {} + return 408, {"message": "timeout waiting for auth"} + + +def _delete_session(http_session, server_url: str, sid: str) -> None: + http_session.delete(f"{server_url}/api/session/{sid}", timeout=30) + + +def test_session_create_then_delete(http_session, server_url: str, service_case) -> None: + service, conf = service_case + sid = _create_session(http_session, server_url, service, conf) + try: + r = http_session.get(f"{server_url}/api/session/{sid}", timeout=30) + assert r.status_code == 200 + info = r.json() + assert info.get("session", {}).get("service_tag", service) == service or service in str(info) + finally: + _delete_session(http_session, server_url, sid) + + # After delete, info should 404 + r2 = http_session.get(f"{server_url}/api/session/{sid}", timeout=30) + assert r2.status_code == 404 + + +def test_session_titles_returns_list(http_session, server_url: str, service_case) -> None: + service, conf = service_case + sid = _create_session(http_session, server_url, service, conf) + try: + code, body = _wait_for_titles(http_session, server_url, sid) + if code != 200: + pytest.skip(f"{service}: titles fetch failed {code}: {str(body)[:200]}") + assert "titles" in body + assert isinstance(body["titles"], list) + assert len(body["titles"]) >= 1 + first = body["titles"][0] + assert "id" in first + assert "type" in first + finally: + _delete_session(http_session, server_url, sid) + + +def test_session_tracks_for_first_title(http_session, server_url: str, service_case) -> None: + service, conf = service_case + sid = _create_session(http_session, server_url, service, conf) + try: + code, body = _wait_for_titles(http_session, server_url, sid) + if code != 200: + pytest.skip(f"{service}: titles fetch failed {code}: {str(body)[:200]}") + titles = body.get("titles") or [] + if not titles: + pytest.skip(f"{service}: no titles returned") + title_id = titles[0]["id"] + r = http_session.post( + f"{server_url}/api/session/{sid}/tracks", + json={"title_id": title_id}, + timeout=240, + ) + assert r.status_code == 200, r.text + body = r.json() + # Server returns a flat payload with `video`, `audio`, `subtitles`, + # `chapters`, `manifests`, `attachments` keys. + assert "video" in body or "audio" in body, body + assert body.get("video") or body.get("audio"), body + finally: + _delete_session(http_session, server_url, sid) + + +def test_session_delete_idempotent_returns_404_after(http_session, server_url: str, service_case) -> None: + service, conf = service_case + sid = _create_session(http_session, server_url, service, conf) + _delete_session(http_session, server_url, sid) + + r = http_session.delete(f"{server_url}/api/session/{sid}", timeout=30) + assert r.status_code in (404, 200) # tolerate both depending on server semantics diff --git a/tests/remote/unit/__init__.py b/tests/remote/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/remote/unit/test_compression.py b/tests/remote/unit/test_compression.py new file mode 100644 index 0000000..54e73b8 --- /dev/null +++ b/tests/remote/unit/test_compression.py @@ -0,0 +1,65 @@ +"""Unit tests for unshackle.core.api.compression.compression_middleware.""" + +from __future__ import annotations + +import asyncio +import gzip +import json + +import pytest +from aiohttp import web + +from unshackle.core.api.compression import compression_middleware + +pytestmark = pytest.mark.unit + + +class _FakeReq: + def __init__(self, accept_encoding: str = "gzip") -> None: + self.headers = {"Accept-Encoding": accept_encoding} + + +def _run(coro): + return asyncio.get_event_loop().run_until_complete(coro) if False else asyncio.run(coro) + + +def test_skips_when_client_does_not_accept_gzip() -> None: + payload = b"x" * 4096 + body_json = json.dumps({"data": "x" * 4096}).encode() + + async def handler(req): # noqa: ARG001 + return web.json_response({"data": "x" * 4096}) + + req = _FakeReq(accept_encoding="identity") + resp = _run(compression_middleware(req, handler)) + assert resp.headers.get("Content-Encoding") != "gzip" + assert resp.body == body_json or len(resp.body) >= len(body_json) - 8 + + +def test_skips_when_body_below_threshold() -> None: + async def handler(req): # noqa: ARG001 + return web.json_response({"hi": "x"}) + + resp = _run(compression_middleware(_FakeReq(), handler)) + assert resp.headers.get("Content-Encoding") != "gzip" + + +def test_skips_non_json_response() -> None: + async def handler(req): # noqa: ARG001 + return web.Response(body=b"x" * 4096, content_type="text/plain") + + resp = _run(compression_middleware(_FakeReq(), handler)) + assert resp.headers.get("Content-Encoding") != "gzip" + + +def test_compresses_large_json_when_accepted() -> None: + big = {"data": "x" * 4096} + + async def handler(req): # noqa: ARG001 + return web.json_response(big) + + resp = _run(compression_middleware(_FakeReq(), handler)) + assert resp.headers.get("Content-Encoding") == "gzip" + decompressed = gzip.decompress(resp.body) + assert json.loads(decompressed) == big + assert resp.headers["Content-Length"] == str(len(resp.body)) diff --git a/tests/remote/unit/test_download_manager.py b/tests/remote/unit/test_download_manager.py new file mode 100644 index 0000000..59cc0d7 --- /dev/null +++ b/tests/remote/unit/test_download_manager.py @@ -0,0 +1,120 @@ +"""Unit tests for DownloadJob + DownloadQueueManager state machine. + +These tests focus on the queue manager's data layer (create/get/list/cancel/ +cleanup/serialize) — they do not exercise the actual subprocess download path. +""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +import pytest + +from unshackle.core.api.download_manager import DownloadJob, DownloadQueueManager, JobStatus, get_download_manager + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def manager() -> DownloadQueueManager: + """Fresh manager. We never call start_workers() so no async tasks are created.""" + return DownloadQueueManager(max_concurrent_downloads=2, job_retention_hours=24) + + +def test_create_job_returns_queued_job(manager: DownloadQueueManager) -> None: + job = manager.create_job("ATV", "movie-123", profile="default") + assert isinstance(job, DownloadJob) + assert job.status is JobStatus.QUEUED + assert job.service == "ATV" + assert job.title_id == "movie-123" + assert job.parameters == {"profile": "default"} + + +def test_get_and_list_jobs(manager: DownloadQueueManager) -> None: + a = manager.create_job("ATV", "a") + b = manager.create_job("NF", "b") + assert manager.get_job(a.job_id) is a + assert manager.get_job("missing") is None + listed = manager.list_jobs() + assert {j.job_id for j in listed} == {a.job_id, b.job_id} + + +def test_to_dict_short_vs_full(manager: DownloadQueueManager) -> None: + job = manager.create_job("ATV", "t", profile="p") + short = job.to_dict() + assert "parameters" not in short + assert short["status"] == "queued" + assert short["service"] == "ATV" + full = job.to_dict(include_full_details=True) + assert full["parameters"] == {"profile": "p"} + assert "error_message" in full + assert "output_files" in full + + +def test_cancel_queued_job_sets_cancelled_and_signals_event(manager: DownloadQueueManager) -> None: + job = manager.create_job("ATV", "t") + assert manager.cancel_job(job.job_id) is True + assert job.status is JobStatus.CANCELLED + assert job.cancel_event.is_set() + + +def test_cancel_unknown_job_returns_false(manager: DownloadQueueManager) -> None: + assert manager.cancel_job("never-existed") is False + + +def test_cancel_completed_job_returns_false(manager: DownloadQueueManager) -> None: + job = manager.create_job("ATV", "t") + job.status = JobStatus.COMPLETED + assert manager.cancel_job(job.job_id) is False + + +def test_cancel_downloading_job_signals(manager: DownloadQueueManager) -> None: + job = manager.create_job("ATV", "t") + job.status = JobStatus.DOWNLOADING + assert manager.cancel_job(job.job_id) is True + assert job.status is JobStatus.CANCELLED + assert job.cancel_event.is_set() + + +def test_cleanup_old_jobs_drops_old_terminal_states(manager: DownloadQueueManager) -> None: + now = datetime.now() + old = now - timedelta(hours=48) + keep_recent = manager.create_job("ATV", "recent") + drop_old_done = manager.create_job("ATV", "old-done") + drop_old_failed = manager.create_job("ATV", "old-failed") + keep_running = manager.create_job("ATV", "running") + + keep_recent.status = JobStatus.COMPLETED + keep_recent.completed_time = now + + drop_old_done.status = JobStatus.COMPLETED + drop_old_done.completed_time = old + + drop_old_failed.status = JobStatus.FAILED + drop_old_failed.created_time = old # never set completed_time + + keep_running.status = JobStatus.DOWNLOADING + + removed = manager.cleanup_old_jobs() + assert removed == 2 + remaining = {j.job_id for j in manager.list_jobs()} + assert keep_recent.job_id in remaining + assert keep_running.job_id in remaining + assert drop_old_done.job_id not in remaining + assert drop_old_failed.job_id not in remaining + + +def test_get_download_manager_returns_singleton() -> None: + a = get_download_manager() + b = get_download_manager() + assert a is b + + +def test_job_status_values() -> None: + assert {s.value for s in JobStatus} == { + "queued", + "downloading", + "completed", + "failed", + "cancelled", + } diff --git a/tests/remote/unit/test_errors.py b/tests/remote/unit/test_errors.py new file mode 100644 index 0000000..0a34354 --- /dev/null +++ b/tests/remote/unit/test_errors.py @@ -0,0 +1,132 @@ +"""Unit tests for unshackle.core.api.errors.""" + +from __future__ import annotations + +import json + +import pytest + +from unshackle.core.api.errors import ( + APIError, + APIErrorCode, + build_error_response, + categorize_exception, + handle_api_exception, +) + +pytestmark = pytest.mark.unit + + +def _body(resp) -> dict: + return json.loads(resp.body.decode("utf-8")) + + +def test_api_error_default_http_status_per_code() -> None: + cases = { + APIErrorCode.INVALID_INPUT: 400, + APIErrorCode.INVALID_SERVICE: 400, + APIErrorCode.AUTH_REQUIRED: 401, + APIErrorCode.AUTH_FAILED: 401, + APIErrorCode.FORBIDDEN: 403, + APIErrorCode.GEOFENCE: 403, + APIErrorCode.NOT_FOUND: 404, + APIErrorCode.SESSION_NOT_FOUND: 404, + APIErrorCode.TRACK_NOT_FOUND: 404, + APIErrorCode.RATE_LIMITED: 429, + APIErrorCode.INTERNAL_ERROR: 500, + APIErrorCode.SERVICE_ERROR: 502, + APIErrorCode.DRM_ERROR: 502, + APIErrorCode.NETWORK_ERROR: 503, + APIErrorCode.SERVICE_UNAVAILABLE: 503, + } + for code, expected in cases.items(): + assert APIError(code, "x").http_status == expected, code + + +def test_api_error_explicit_http_status_overrides_default() -> None: + err = APIError(APIErrorCode.INVALID_INPUT, "x", http_status=418) + assert err.http_status == 418 + + +def test_build_error_response_from_api_error() -> None: + err = APIError( + APIErrorCode.SESSION_NOT_FOUND, + "no such session", + details={"session_id": "abc"}, + retryable=False, + ) + resp = build_error_response(err) + assert resp.status == 404 + body = _body(resp) + assert body["status"] == "error" + assert body["error_code"] == "SESSION_NOT_FOUND" + assert body["message"] == "no such session" + assert body["details"] == {"session_id": "abc"} + assert "retryable" not in body + assert "debug_info" not in body + assert "timestamp" in body + + +def test_build_error_response_retryable_flag() -> None: + err = APIError(APIErrorCode.NETWORK_ERROR, "boom", retryable=True) + body = _body(build_error_response(err)) + assert body["retryable"] is True + + +def test_build_error_response_from_generic_exception() -> None: + resp = build_error_response(RuntimeError("oops")) + assert resp.status == 500 + body = _body(resp) + assert body["error_code"] == "INTERNAL_ERROR" + assert body["message"] == "oops" + + +def test_build_error_response_debug_mode_includes_traceback() -> None: + try: + raise ValueError("kaboom") + except ValueError as e: + resp = build_error_response(e, debug_mode=True, extra_debug_info={"foo": "bar"}) + body = _body(resp) + assert body["debug_info"]["exception_type"] == "ValueError" + assert "traceback" in body["debug_info"] + assert body["debug_info"]["foo"] == "bar" + + +@pytest.mark.parametrize( + "exc, expected_code", + [ + (Exception("Invalid credentials provided"), APIErrorCode.AUTH_FAILED), + (Exception("Connection refused"), APIErrorCode.NETWORK_ERROR), + (TimeoutError("read timeout"), APIErrorCode.NETWORK_ERROR), + (Exception("Not available in your region"), APIErrorCode.GEOFENCE), + (Exception("Title not found"), APIErrorCode.NOT_FOUND), + (Exception("HTTP 429 too many requests"), APIErrorCode.RATE_LIMITED), + (Exception("DRM license fetch failed"), APIErrorCode.DRM_ERROR), + (Exception("503 service unavailable"), APIErrorCode.SERVICE_UNAVAILABLE), + (ValueError("malformed body"), APIErrorCode.INVALID_INPUT), + (RuntimeError("totally novel failure xyz"), APIErrorCode.INTERNAL_ERROR), + ], +) +def test_categorize_exception(exc: Exception, expected_code: APIErrorCode) -> None: + api_err = categorize_exception(exc, context={"service": "ATV"}) + assert api_err.error_code == expected_code + assert api_err.details.get("service") == "ATV" + + +def test_categorize_preserves_context() -> None: + api_err = categorize_exception(ValueError("bad"), context={"op": "search"}) + assert api_err.details["op"] == "search" + + +def test_handle_api_exception_with_api_error_preserves_code() -> None: + err = APIError(APIErrorCode.TRACK_NOT_FOUND, "no track") + resp = handle_api_exception(err) + body = _body(resp) + assert body["error_code"] == "TRACK_NOT_FOUND" + assert resp.status == 404 + + +def test_handle_api_exception_categorizes_generic() -> None: + resp = handle_api_exception(ConnectionError("oops")) + body = _body(resp) + assert body["error_code"] == "NETWORK_ERROR" diff --git a/tests/remote/unit/test_handlers_serialize.py b/tests/remote/unit/test_handlers_serialize.py new file mode 100644 index 0000000..038bca9 --- /dev/null +++ b/tests/remote/unit/test_handlers_serialize.py @@ -0,0 +1,273 @@ +"""Unit tests for unshackle.core.api.handlers serializers + validators.""" + +from __future__ import annotations + +import pytest +from langcodes import Language + +from unshackle.core.api.handlers import ( + sanitize_log, + serialize_audio_track, + serialize_drm, + serialize_subtitle_track, + serialize_title, + serialize_video_track, + validate_download_parameters, + validate_service, +) +from unshackle.core.titles.episode import Episode +from unshackle.core.titles.movie import Movie +from unshackle.core.tracks import Audio, Subtitle, Video +from unshackle.core.tracks.track import Track + +pytestmark = pytest.mark.unit + + +class _FakeSvc: + pass + + +def _video(**overrides) -> Video: + base = dict( + url="https://example.com/v.mpd", + language=Language.get("en"), + descriptor=Track.Descriptor.URL, + codec=Video.Codec.AVC, + range_=Video.Range.SDR, + bitrate=5_000_000, + width=1920, + height=1080, + fps=24, + id_="video-001", + ) + base.update(overrides) + return Video(**base) + + +def _audio(**overrides) -> Audio: + base = dict( + url="https://example.com/a.mpd", + language=Language.get("en"), + descriptor=Track.Descriptor.URL, + codec=Audio.Codec.AAC, + bitrate=128_000, + channels=2, + joc=0, + descriptive=False, + id_="audio-001", + ) + base.update(overrides) + return Audio(**base) + + +def _subtitle(**overrides) -> Subtitle: + base = dict( + url="https://example.com/s.vtt", + language=Language.get("en"), + descriptor=Track.Descriptor.URL, + codec=Subtitle.Codec.WebVTT, + cc=False, + sdh=False, + forced=False, + id_="sub-001", + ) + base.update(overrides) + return Subtitle(**base) + + +# ---------- sanitize_log ---------- + + +@pytest.mark.parametrize( + "raw, expected", + [ + ("hello\nworld", "helloworld"), + ("a\r\nb\x00c", "abc"), + ("clean", "clean"), + (12345, "12345"), + ], +) +def test_sanitize_log(raw, expected: str) -> None: + assert sanitize_log(raw) == expected + + +# ---------- serialize_title ---------- + + +def test_serialize_title_movie() -> None: + movie = Movie(id_="movie-0001", service=_FakeSvc, name="Title X", year=2024, language=Language.get("en")) + d = serialize_title(movie) + assert d["type"] == "movie" + assert d["name"] == "Title X" + assert d["year"] == 2024 + assert d["id"] == "movie-0001" + assert d["language"] == "en" + + +def test_serialize_title_episode_named() -> None: + ep = Episode( + id_="ep-00001", + service=_FakeSvc, + title="My Show", + season=2, + number=3, + name="Pilot", + year=2024, + language=Language.get("en"), + ) + d = serialize_title(ep) + assert d["type"] == "episode" + assert d["series_title"] == "My Show" + assert d["season"] == 2 + assert d["number"] == 3 + assert d["name"] == "Pilot" + + +def test_serialize_title_episode_unnamed_falls_back_to_number() -> None: + ep = Episode( + id_="ep-00002", + service=_FakeSvc, + title="Show", + season=1, + number=5, + name=None, + year=None, + language=Language.get("en"), + ) + d = serialize_title(ep) + assert d["name"] == "Episode 05" + + +# ---------- serialize_video/audio/subtitle ---------- + + +def test_serialize_video_track_basic() -> None: + d = serialize_video_track(_video()) + assert d["id"] == "video-001" + assert d["codec"] == "AVC" + assert d["bitrate"] == 5000 # kbps + assert d["resolution"] == "1920x1080" + assert d["fps"] == 24 + assert d["range"] == "SDR" + assert d["language"] == "en" + assert d["drm"] is None + assert "url" not in d + + +def test_serialize_video_track_include_url() -> None: + d = serialize_video_track(_video(), include_url=True) + assert d["url"] == "https://example.com/v.mpd" + + +def test_serialize_audio_track_basic() -> None: + d = serialize_audio_track(_audio()) + assert d["id"] == "audio-001" + assert d["codec"] == "AAC" + assert d["bitrate"] == 128 + assert d["channels"] == 2 + assert d["descriptive"] is False + + +def test_serialize_subtitle_track_basic() -> None: + d = serialize_subtitle_track(_subtitle(forced=True)) + assert d["id"] == "sub-001" + assert d["codec"] == "WebVTT" + assert d["forced"] is True + assert d["sdh"] is False + assert d["cc"] is False + + +# ---------- serialize_drm ---------- + + +def test_serialize_drm_none_returns_none() -> None: + assert serialize_drm(None) is None + assert serialize_drm([]) is None + + +def test_serialize_drm_widevine_minimal() -> None: + class _PSSH: + def dumps(self) -> str: + return "BASE64PSSH==" + + class _Widevine: + def __init__(self) -> None: + self._pssh = _PSSH() + self.kids = ["00112233445566778899aabbccddeeff"] + self.license_url = "https://lic.example.com/wv" + + out = serialize_drm(_Widevine()) + assert isinstance(out, list) + assert len(out) == 1 + info = out[0] + assert info["type"] == "_widevine" # class name lowercased + assert info["pssh"] == "BASE64PSSH==" + assert info["kids"] == ["00112233445566778899aabbccddeeff"] + assert info["license_url"] == "https://lic.example.com/wv" + + +# ---------- validate_service ---------- + + +def test_validate_service_unknown_returns_none() -> None: + assert validate_service("NOPE_THIS_IS_NOT_REAL_") is None + + +# ---------- validate_download_parameters ---------- + + +def test_validate_download_params_accepts_defaults() -> None: + assert validate_download_parameters({}) is None + + +@pytest.mark.parametrize( + "data, fragment", + [ + ({"vcodec": "WUT"}, "Invalid vcodec"), + ({"vcodec": 123}, "vcodec must be a string or list"), + ({"acodec": "MP9"}, "Invalid acodec"), + ({"sub_format": "doc"}, "Invalid sub_format"), + ({"vbitrate": -1}, "vbitrate"), + ({"abitrate": "no"}, "abitrate"), + ({"vbitrate_range": "no-dash-but-letters"}, None), + ({"vbitrate_range": "nope"}, "MIN-MAX"), + ({"channels": -3}, "channels"), + ({"workers": 0}, "workers"), + ({"downloads": 0}, "downloads"), + ({"video_only": True, "audio_only": True}, "exclusive"), + ({"no_subs": True, "subs_only": True}, "no_subs and subs_only"), + ({"no_audio": True, "audio_only": True}, "no_audio and audio_only"), + ({"s_lang": ["en"], "require_subs": ["en"]}, "s_lang and require_subs"), + ({"range": "UHD"}, "Invalid range"), + ({"range": ["SDR", "UHD"]}, "Invalid range value"), + ], +) +def test_validate_download_params_errors(data: dict, fragment) -> None: + result = validate_download_parameters(data) + if fragment is None: + # A dash-containing string is valid syntactically per current rule + assert result is None + else: + assert result is not None + assert fragment in result + + +def test_validate_download_params_accepts_valid_values() -> None: + assert ( + validate_download_parameters( + { + "vcodec": "H264,H265", + "acodec": ["AAC", "EAC3"], + "sub_format": "VTT", + "vbitrate": 6000, + "abitrate": 128, + "vbitrate_range": "6000-7000", + "abitrate_range": "96-192", + "channels": 5.1, + "workers": 8, + "downloads": 2, + "range": ["SDR", "HDR10"], + } + ) + is None + ) diff --git a/tests/remote/unit/test_input_bridge.py b/tests/remote/unit/test_input_bridge.py new file mode 100644 index 0000000..c55e2b5 --- /dev/null +++ b/tests/remote/unit/test_input_bridge.py @@ -0,0 +1,100 @@ +"""Unit tests for unshackle.core.api.input_bridge.InputBridge.""" + +from __future__ import annotations + +import threading +import time + +import pytest + +from unshackle.core.api.input_bridge import AuthStatus, BridgeCancelledError, InputBridge + +pytestmark = pytest.mark.unit + + +def test_initial_status_is_authenticating() -> None: + bridge = InputBridge() + assert bridge.status is AuthStatus.AUTHENTICATING + assert bridge.get_pending_prompt() is None + assert bridge.error is None + + +def test_submit_response_returns_false_when_no_prompt_pending() -> None: + bridge = InputBridge() + assert bridge.submit_response("foo") is False + + +def test_request_input_blocks_until_submit() -> None: + bridge = InputBridge() + result: list[str] = [] + + def worker() -> None: + result.append(bridge.request_input("OTP?", timeout=5)) + + t = threading.Thread(target=worker) + t.start() + for _ in range(50): + if bridge.get_pending_prompt() == "OTP?": + break + time.sleep(0.02) + assert bridge.status is AuthStatus.PENDING_INPUT + assert bridge.get_pending_prompt() == "OTP?" + + assert bridge.submit_response("123456") is True + t.join(timeout=2) + assert result == ["123456"] + assert bridge.status is AuthStatus.AUTHENTICATING + assert bridge.get_pending_prompt() is None + + +def test_request_input_times_out() -> None: + bridge = InputBridge() + with pytest.raises(TimeoutError): + bridge.request_input("hello", timeout=0.6) + assert bridge.status is AuthStatus.FAILED + assert "timed out" in (bridge.error or "") + + +def test_cancel_before_request_raises_immediately() -> None: + bridge = InputBridge() + bridge.cancel() + with pytest.raises(BridgeCancelledError): + bridge.request_input("hello", timeout=5) + assert bridge.status is AuthStatus.FAILED + + +def test_cancel_unblocks_pending_request() -> None: + bridge = InputBridge() + exc: list[Exception] = [] + + def worker() -> None: + try: + bridge.request_input("OTP?", timeout=5) + except BridgeCancelledError as e: + exc.append(e) + + t = threading.Thread(target=worker) + t.start() + for _ in range(50): + if bridge.status is AuthStatus.PENDING_INPUT: + break + time.sleep(0.02) + + bridge.cancel() + t.join(timeout=2) + assert exc and isinstance(exc[0], BridgeCancelledError) + assert bridge.status is AuthStatus.FAILED + + +def test_get_pending_prompt_returns_none_outside_pending_state() -> None: + bridge = InputBridge() + bridge.status = AuthStatus.AUTHENTICATED + assert bridge.get_pending_prompt() is None + + +def test_status_and_error_setters() -> None: + bridge = InputBridge() + bridge.status = AuthStatus.AUTHENTICATED + bridge.error = "boom" + assert bridge.status is AuthStatus.AUTHENTICATED + assert bridge.error == "boom" diff --git a/tests/remote/unit/test_remote_client.py b/tests/remote/unit/test_remote_client.py new file mode 100644 index 0000000..4f6ef20 --- /dev/null +++ b/tests/remote/unit/test_remote_client.py @@ -0,0 +1,111 @@ +"""Unit tests for unshackle.core.remote_service.RemoteClient.""" + +from __future__ import annotations + +import json + +import pytest +import responses + +from unshackle.core.remote_service import RemoteClient + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def client() -> RemoteClient: + return RemoteClient(server_url="http://srv:8786", api_key="secret-xyz") + + +def test_session_sets_secret_key_header(client: RemoteClient) -> None: + s = client.session + assert s.headers.get("X-Secret-Key") == "secret-xyz" + assert s.headers["User-Agent"].startswith("unshackle/") + + +def test_session_omits_secret_key_when_empty() -> None: + c = RemoteClient(server_url="http://srv:8786", api_key="") + assert "X-Secret-Key" not in c.session.headers + + +def test_server_url_trailing_slash_stripped() -> None: + c = RemoteClient(server_url="http://srv:8786/", api_key="") + assert c.server_url == "http://srv:8786" + + +@responses.activate +def test_get_returns_json(client: RemoteClient) -> None: + responses.add( + responses.GET, + "http://srv:8786/api/health", + json={"status": "ok"}, + status=200, + ) + assert client.get("/api/health") == {"status": "ok"} + + +@responses.activate +def test_post_sends_json_body(client: RemoteClient) -> None: + captured = {} + + def cb(request): + captured["body"] = json.loads(request.body) + return (200, {}, json.dumps({"session_id": "abc"})) + + responses.add_callback( + responses.POST, "http://srv:8786/api/session/create", callback=cb, content_type="application/json" + ) + result = client.post("/api/session/create", {"service": "ATV"}) + assert result == {"session_id": "abc"} + assert captured["body"] == {"service": "ATV"} + + +@responses.activate +def test_delete_returns_json(client: RemoteClient) -> None: + responses.add( + responses.DELETE, + "http://srv:8786/api/session/abc", + json={"status": "deleted"}, + status=200, + ) + assert client.delete("/api/session/abc") == {"status": "deleted"} + + +@responses.activate +def test_4xx_raises_systemexit_with_logged_error(client: RemoteClient, caplog: pytest.LogCaptureFixture) -> None: + responses.add( + responses.GET, + "http://srv:8786/api/session/none", + json={"error_code": "SESSION_NOT_FOUND", "message": "no such session"}, + status=404, + ) + with caplog.at_level("ERROR"), pytest.raises(SystemExit): + client.get("/api/session/none") + assert "SESSION_NOT_FOUND" in caplog.text + assert "no such session" in caplog.text + + +@responses.activate +def test_connection_error_raises_systemexit(client: RemoteClient) -> None: + import requests + + responses.add( + responses.GET, + "http://srv:8786/api/health", + body=requests.ConnectionError("boom"), + ) + with pytest.raises(SystemExit): + client.get("/api/health") + + +@responses.activate +def test_timeout_raises_systemexit(client: RemoteClient) -> None: + import requests + + responses.add( + responses.GET, + "http://srv:8786/api/health", + body=requests.Timeout("slow"), + ) + with pytest.raises(SystemExit): + client.get("/api/health") diff --git a/tests/remote/unit/test_remote_resolve.py b/tests/remote/unit/test_remote_resolve.py new file mode 100644 index 0000000..ae31e77 --- /dev/null +++ b/tests/remote/unit/test_remote_resolve.py @@ -0,0 +1,117 @@ +"""Unit tests for resolve_server / _resolve_proxy in unshackle.core.remote_service.""" + +from __future__ import annotations + +import click +import pytest + +from unshackle.core.remote_service import _resolve_proxy, resolve_server + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def empty_remote_services(monkeypatch: pytest.MonkeyPatch) -> None: + from unshackle.core import remote_service as rs + + monkeypatch.setattr(rs.config, "remote_services", {}) + + +@pytest.fixture +def single_remote_service(monkeypatch: pytest.MonkeyPatch) -> None: + from unshackle.core import remote_service as rs + + monkeypatch.setattr( + rs.config, + "remote_services", + { + "primary": { + "url": "https://primary:8080", + "api_key": "key-abc", + "services": {"ATV": True, "NF": True}, + "server_cdm": True, + } + }, + ) + + +@pytest.fixture +def multi_remote_services(monkeypatch: pytest.MonkeyPatch) -> None: + from unshackle.core import remote_service as rs + + monkeypatch.setattr( + rs.config, + "remote_services", + { + "a": {"url": "https://a:8080", "api_key": "ka"}, + "b": {"url": "https://b:8080", "api_key": "kb"}, + }, + ) + + +def test_resolve_server_no_config_raises_click(empty_remote_services) -> None: + with pytest.raises(click.ClickException) as exc: + resolve_server(None) + assert "remote_services" in str(exc.value.message) + + +def test_resolve_server_single_picks_only_entry(single_remote_service) -> None: + url, key, services = resolve_server(None) + assert url == "https://primary:8080" + assert key == "key-abc" + assert services["_server_cdm"] is True + assert services.get("ATV") is True + + +def test_resolve_server_explicit_name(single_remote_service) -> None: + url, key, services = resolve_server("primary") + assert url == "https://primary:8080" + assert services["_server_cdm"] is True + + +def test_resolve_server_unknown_name_raises(single_remote_service) -> None: + with pytest.raises(click.ClickException) as exc: + resolve_server("bogus") + assert "bogus" in str(exc.value.message) + + +def test_resolve_server_multi_requires_explicit(multi_remote_services) -> None: + with pytest.raises(click.ClickException) as exc: + resolve_server(None) + assert "--server" in str(exc.value.message) + + +def test_resolve_server_multi_with_name(multi_remote_services) -> None: + url, key, services = resolve_server("b") + assert url == "https://b:8080" + assert key == "kb" + assert services["_server_cdm"] is False + + +def test_resolve_proxy_none_returns_none() -> None: + assert _resolve_proxy(None) is None + assert _resolve_proxy("") is None + + +def test_resolve_proxy_passes_through_value(monkeypatch: pytest.MonkeyPatch) -> None: + import unshackle.core.proxies.resolve as resolve_mod + + monkeypatch.setattr(resolve_mod, "initialize_proxy_providers", lambda: []) + monkeypatch.setattr(resolve_mod, "resolve_proxy", lambda arg, providers: f"http://proxy/{arg}") + + assert _resolve_proxy("us") == "http://proxy/us" + + +def test_resolve_proxy_value_error_becomes_click(monkeypatch: pytest.MonkeyPatch) -> None: + import unshackle.core.proxies.resolve as resolve_mod + + monkeypatch.setattr(resolve_mod, "initialize_proxy_providers", lambda: []) + + def boom(*_): + raise ValueError("no such country") + + monkeypatch.setattr(resolve_mod, "resolve_proxy", boom) + + with pytest.raises(click.ClickException) as exc: + _resolve_proxy("xx") + assert "no such country" in str(exc.value.message) diff --git a/tests/remote/unit/test_remote_service_helpers.py b/tests/remote/unit/test_remote_service_helpers.py new file mode 100644 index 0000000..81665c3 --- /dev/null +++ b/tests/remote/unit/test_remote_service_helpers.py @@ -0,0 +1,157 @@ +"""Unit tests for module-level helpers in unshackle.core.remote_service.""" + +from __future__ import annotations + +from enum import Enum + +import pytest + +from unshackle.core.remote_service import ( + _build_title, + _build_tracks, + _deserialize_audio, + _deserialize_subtitle, + _deserialize_video, + _enum_get, + _match_track, + _reconstruct_drm, +) +from unshackle.core.titles.episode import Episode +from unshackle.core.titles.movie import Movie +from unshackle.core.tracks import Audio, Subtitle, Video + +pytestmark = pytest.mark.unit + + +class _Color(Enum): + RED = 1 + BLUE = 2 + + +def test_enum_get_known() -> None: + assert _enum_get(_Color, "RED") is _Color.RED + + +def test_enum_get_unknown_returns_default() -> None: + assert _enum_get(_Color, "PURPLE", default=_Color.BLUE) is _Color.BLUE + + +def test_enum_get_none_returns_default() -> None: + assert _enum_get(_Color, None, default=_Color.RED) is _Color.RED + + +def test_deserialize_video_minimal() -> None: + v = _deserialize_video({"id": "video-1", "codec": "AVC", "width": 1920, "height": 1080, "bitrate": 5000}) + assert isinstance(v, Video) + assert v.id == "video-1" + assert v.codec is Video.Codec.AVC + assert v.bitrate == 5_000_000 # kbps -> bps + assert v.width == 1920 + assert v.height == 1080 + assert v.range is Video.Range.SDR + + +def test_deserialize_video_unknown_codec_falls_back_to_none() -> None: + v = _deserialize_video({"id": "v2", "codec": "MADE_UP", "width": 0, "height": 0}) + assert v.codec is None + + +def test_deserialize_audio_atmos_flag_sets_joc() -> None: + a = _deserialize_audio({"id": "a1", "codec": "AAC", "atmos": True, "channels": 6, "bitrate": 256}) + assert isinstance(a, Audio) + assert a.joc == 1 + assert a.channels == 6 + assert a.bitrate == 256_000 + + +def test_deserialize_audio_no_atmos() -> None: + a = _deserialize_audio({"id": "a2", "codec": "AAC", "channels": 2}) + assert a.joc == 0 + + +def test_deserialize_subtitle_forced_flag() -> None: + s = _deserialize_subtitle({"id": "s1", "codec": "WebVTT", "language": "en", "forced": True}) + assert isinstance(s, Subtitle) + assert s.forced is True + assert s.sdh is False + + +def test_deserialize_subtitle_sdh_flag() -> None: + s = _deserialize_subtitle({"id": "s2", "codec": "WebVTT", "language": "en", "sdh": True}) + assert s.sdh is True + assert s.forced is False + + +def test_reconstruct_drm_empty() -> None: + assert _reconstruct_drm(None) == [] + assert _reconstruct_drm([]) == [] + + +def test_reconstruct_drm_skips_entries_without_pssh() -> None: + assert _reconstruct_drm([{"type": "widevine"}]) == [] + + +def test_reconstruct_drm_invalid_pssh_silently_dropped() -> None: + assert _reconstruct_drm([{"type": "widevine", "pssh": "not-real-pssh"}]) == [] + + +def test_build_tracks_aggregates() -> None: + data = { + "video": [{"id": "v", "codec": "AVC", "width": 1280, "height": 720, "bitrate": 2500}], + "audio": [{"id": "a", "codec": "AAC", "channels": 2, "bitrate": 128}], + "subtitles": [{"id": "s", "codec": "WebVTT", "language": "en"}], + "attachments": [], + } + t = _build_tracks(data) + assert len(t.videos) == 1 + assert len(t.audio) == 1 + assert len(t.subtitles) == 1 + + +def test_match_track_by_id() -> None: + a = _deserialize_video({"id": "v1", "codec": "AVC", "width": 1920, "height": 1080}) + b = _deserialize_video({"id": "v2", "codec": "AVC", "width": 1280, "height": 720}) + remote = _deserialize_video({"id": "v2", "codec": "AVC", "width": 1280, "height": 720}) + assert _match_track(remote, [a, b]) is b + + +def test_match_track_by_attributes_when_id_missing() -> None: + local = _deserialize_video({"id": "X", "codec": "AVC", "width": 1920, "height": 1080, "language": "en"}) + remote = _deserialize_video({"id": "Y", "codec": "AVC", "width": 1920, "height": 1080, "language": "en"}) + assert _match_track(remote, [local]) is local + + +def test_match_track_no_candidates_returns_none() -> None: + remote = _deserialize_video({"id": "X", "codec": "AVC", "width": 1, "height": 1}) + assert _match_track(remote, []) is None + + +def test_build_title_movie() -> None: + info = {"type": "movie", "id": "movie-0001", "name": "Foo", "year": 2024, "language": "en"} + title = _build_title(info, "ATV", "fallback") + assert isinstance(title, Movie) + assert title.id == "movie-0001" + assert title.name == "Foo" + + +def test_build_title_episode() -> None: + info = { + "type": "episode", + "id": "ep-00001", + "series_title": "Show", + "season": 1, + "number": 2, + "name": "Pilot", + "year": 2024, + "language": "en", + } + title = _build_title(info, "ATV", "fallback") + assert isinstance(title, Episode) + assert title.season == 1 + assert title.number == 2 + assert title.name == "Pilot" + + +def test_build_title_falls_back_to_id_when_missing() -> None: + title = _build_title({"type": "movie", "name": "x"}, "ATV", "fallback-id") + assert title.id == "fallback-id" diff --git a/tests/remote/unit/test_routes.py b/tests/remote/unit/test_routes.py new file mode 100644 index 0000000..241996f --- /dev/null +++ b/tests/remote/unit/test_routes.py @@ -0,0 +1,159 @@ +"""Unit tests for unshackle.core.api.routes.setup_routes wiring + CORS + auth gating. + +We build small aiohttp apps in-test with setup_routes(), mirroring what +unshackle/commands/serve.py does. We avoid hitting the real handlers by +stubbing the route table for selected paths. +""" + +from __future__ import annotations + +import pytest +from aiohttp import web + +from unshackle.core.api.compression import compression_middleware +from unshackle.core.api.routes import cors_middleware, setup_routes + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def make_app(): + """Factory that builds an aiohttp app for tests.""" + + def _factory(remote_only: bool = False, with_auth_middleware: bool = False): + middlewares = [cors_middleware, compression_middleware] + if with_auth_middleware: + middlewares.insert(1, _no_key_required_auth()) + app = web.Application(middlewares=middlewares) + app["config"] = {"users": {}} + app["debug_api"] = False + setup_routes(app, remote_only=remote_only) + return app + + return _factory + + +def _no_key_required_auth(): + """Mirror serve.py's api_key_authentication middleware: required X-Secret-Key + on every endpoint except /api/health.""" + + @web.middleware + async def mw(request, handler): + if request.path == "/api/health": + return await handler(request) + secret = request.headers.get("X-Secret-Key") + if not secret: + return web.json_response({"status": 401, "message": "Secret Key is Empty."}, status=401) + if secret not in request.app["config"]["users"]: + return web.json_response({"status": 401, "message": "Secret Key is Invalid."}, status=401) + return await handler(request) + + return mw + + +def _collect_paths(app: web.Application) -> list[tuple[str, str]]: + return sorted({(r.method, r.resource.canonical) for r in app.router.routes()}) + + +def test_setup_routes_full_mode_wires_all_endpoints(make_app) -> None: + app = make_app(remote_only=False) + paths = _collect_paths(app) + expected = { + ("GET", "/api/health"), + ("GET", "/api/services"), + ("POST", "/api/search"), + ("POST", "/api/list-titles"), + ("POST", "/api/list-tracks"), + ("POST", "/api/download"), + ("GET", "/api/download/jobs"), + ("GET", "/api/download/jobs/{job_id}"), + ("DELETE", "/api/download/jobs/{job_id}"), + ("POST", "/api/session/create"), + ("GET", "/api/session/{session_id}/titles"), + ("POST", "/api/session/{session_id}/tracks"), + ("POST", "/api/session/{session_id}/segments"), + ("POST", "/api/session/{session_id}/license"), + ("GET", "/api/session/{session_id}/prompt"), + ("POST", "/api/session/{session_id}/prompt"), + ("GET", "/api/session/{session_id}"), + ("DELETE", "/api/session/{session_id}"), + } + assert expected.issubset(set(paths)) + + +def test_setup_routes_remote_only_excludes_list_and_download(make_app) -> None: + app = make_app(remote_only=True) + paths = set(_collect_paths(app)) + assert ("POST", "/api/list-titles") not in paths + assert ("POST", "/api/list-tracks") not in paths + assert ("POST", "/api/download") not in paths + assert ("GET", "/api/download/jobs") not in paths + # session endpoints still present + assert ("POST", "/api/session/create") in paths + assert ("GET", "/api/session/{session_id}/titles") in paths + assert ("POST", "/api/session/{session_id}/license") in paths + + +async def test_cors_preflight_returns_headers(make_app, aiohttp_client) -> None: + app = make_app(remote_only=True) + client = await aiohttp_client(app) + resp = await client.options("/api/health") + assert resp.status == 200 + assert resp.headers["Access-Control-Allow-Origin"] == "*" + assert "GET" in resp.headers["Access-Control-Allow-Methods"] + assert "X-Secret-Key" in resp.headers["Access-Control-Allow-Headers"] + + +async def test_health_endpoint_responds_ok(make_app, aiohttp_client, monkeypatch: pytest.MonkeyPatch) -> None: + from unshackle.core.api import routes as routes_mod + + async def _no_update(_): + return None + + monkeypatch.setattr(routes_mod.UpdateChecker, "check_for_updates", _no_update) + app = make_app(remote_only=True) + client = await aiohttp_client(app) + resp = await client.get("/api/health") + assert resp.status == 200 + body = await resp.json() + assert body["status"] == "ok" + assert "version" in body + + +async def test_health_bypasses_api_key_auth_middleware(make_app, aiohttp_client, monkeypatch) -> None: + from unshackle.core.api import routes as routes_mod + + async def _no_update(_): + return None + + monkeypatch.setattr(routes_mod.UpdateChecker, "check_for_updates", _no_update) + + app = make_app(remote_only=True, with_auth_middleware=True) + client = await aiohttp_client(app) + resp = await client.get("/api/health") + assert resp.status == 200 # health bypasses auth + + +async def test_auth_middleware_rejects_missing_key(make_app, aiohttp_client) -> None: + app = make_app(remote_only=True, with_auth_middleware=True) + client = await aiohttp_client(app) + resp = await client.get("/api/session/abc") + assert resp.status == 401 + body = await resp.json() + assert "Secret Key" in body["message"] + + +async def test_auth_middleware_rejects_invalid_key(make_app, aiohttp_client) -> None: + app = make_app(remote_only=True, with_auth_middleware=True) + client = await aiohttp_client(app) + resp = await client.get("/api/session/abc", headers={"X-Secret-Key": "wrong"}) + assert resp.status == 401 + + +async def test_auth_middleware_accepts_known_key(make_app, aiohttp_client) -> None: + app = make_app(remote_only=True, with_auth_middleware=True) + app["config"]["users"]["good-key"] = {"devices": []} + client = await aiohttp_client(app) + resp = await client.get("/api/session/nonexistent", headers={"X-Secret-Key": "good-key"}) + # Auth passed; handler then 404s the session — anything other than 401 is fine here. + assert resp.status != 401 diff --git a/tests/remote/unit/test_serve_cli.py b/tests/remote/unit/test_serve_cli.py new file mode 100644 index 0000000..5ae0165 --- /dev/null +++ b/tests/remote/unit/test_serve_cli.py @@ -0,0 +1,79 @@ +"""Unit tests for the `unshackle serve` Click command flag surface.""" + +from __future__ import annotations + +import pytest +from click.testing import CliRunner + +from unshackle.commands.serve import serve + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def runner() -> CliRunner: + return CliRunner() + + +def test_serve_help_lists_documented_flags(runner: CliRunner) -> None: + result = runner.invoke(serve, ["--help"]) + assert result.exit_code == 0 + out = result.output + for flag in ( + "--host", + "--port", + "--caddy", + "--api-only", + "--no-widevine", + "--no-playready", + "--no-key", + "--debug-api", + "--debug", + "--remote-only", + ): + assert flag in out, f"missing flag in --help: {flag}" + + +def test_serve_api_only_with_no_widevine_rejected(runner: CliRunner, monkeypatch: pytest.MonkeyPatch) -> None: + """`--api-only` is mutually exclusive with `--no-widevine`/`--no-playready`.""" + monkeypatch.setenv("UNSHACKLE_NO_RUN", "1") # belt-and-braces; not currently checked + + # Stub web.run_app to avoid actually starting the server if validation passes. + from aiohttp import web + + monkeypatch.setattr(web, "run_app", lambda *a, **kw: None) + + # Force a clean config.serve so no_key path doesn't blow up loading wvds. + from unshackle.core.config import config as cfg + + monkeypatch.setattr(cfg, "serve", {"api_secret": "x"}) + + result = runner.invoke(serve, ["--api-only", "--no-widevine", "--no-key"]) + assert result.exit_code != 0 + assert "Cannot use --api-only" in (result.output or str(result.exception)) + + +def test_serve_no_key_without_api_secret_does_not_require_secret( + runner: CliRunner, monkeypatch: pytest.MonkeyPatch +) -> None: + """With --no-key, the missing api_secret check is bypassed.""" + from aiohttp import web + + monkeypatch.setattr(web, "run_app", lambda *a, **kw: None) + from unshackle.core.config import config as cfg + + monkeypatch.setattr(cfg, "serve", {}) + + result = runner.invoke(serve, ["--api-only", "--no-key", "--remote-only"]) + # No exception should escape, exit code 0 means startup proceeded then run_app stub returned. + assert result.exit_code == 0, result.output + + +def test_serve_without_no_key_requires_api_secret(runner: CliRunner, monkeypatch: pytest.MonkeyPatch) -> None: + from unshackle.core.config import config as cfg + + monkeypatch.setattr(cfg, "serve", {}) # no api_secret configured + + result = runner.invoke(serve, ["--api-only"]) + assert result.exit_code != 0 + assert "api_secret" in (result.output or "").lower() or "api_secret" in str(result.exception).lower() diff --git a/tests/remote/unit/test_session_store.py b/tests/remote/unit/test_session_store.py new file mode 100644 index 0000000..077a4de --- /dev/null +++ b/tests/remote/unit/test_session_store.py @@ -0,0 +1,117 @@ +"""Unit tests for unshackle.core.api.session_store.SessionStore.""" + +from __future__ import annotations + +import asyncio + +import pytest + +from unshackle.core.api.input_bridge import AuthStatus, InputBridge +from unshackle.core.api.session_store import SessionEntry, SessionStore, get_session_store + +pytestmark = pytest.mark.unit + + +class _FakeService: + """Minimal stub Service used to fill SessionEntry.service_instance.""" + + def __init__(self, tag: str = "TEST") -> None: + self.tag = tag + + +@pytest.fixture +def store() -> SessionStore: + return SessionStore() + + +async def test_create_returns_entry_with_uuid(store: SessionStore) -> None: + entry = await store.create("ATV", _FakeService()) + assert isinstance(entry, SessionEntry) + assert entry.service_tag == "ATV" + assert entry.session_id and len(entry.session_id) >= 32 + assert store.session_count == 1 + + +async def test_create_with_explicit_session_id(store: SessionStore) -> None: + entry = await store.create("NF", _FakeService(), session_id="fixed-id") + assert entry.session_id == "fixed-id" + + +async def test_get_returns_none_for_missing(store: SessionStore) -> None: + assert await store.get("nope") is None + + +async def test_get_touches_last_accessed(store: SessionStore) -> None: + entry = await store.create("DSNP", _FakeService()) + before = entry.last_accessed + await asyncio.sleep(0.01) + fetched = await store.get(entry.session_id) + assert fetched is entry + assert fetched.last_accessed > before + + +async def test_delete_removes_and_cancels_bridge(store: SessionStore) -> None: + entry = await store.create("CRAV", _FakeService()) + entry.input_bridge = InputBridge() + assert entry.input_bridge.status is AuthStatus.AUTHENTICATING + + deleted = await store.delete(entry.session_id) + assert deleted is True + assert entry.input_bridge.status is AuthStatus.FAILED # cancelled + assert store.session_count == 0 + + +async def test_delete_returns_false_when_missing(store: SessionStore) -> None: + assert await store.delete("missing") is False + + +async def test_cleanup_expired_drops_old_authenticated(store: SessionStore, monkeypatch: pytest.MonkeyPatch) -> None: + from datetime import datetime, timedelta, timezone + + entry = await store.create("ATV", _FakeService()) + entry.last_accessed = datetime.now(timezone.utc) - timedelta(seconds=store._ttl + 100) + removed = await store.cleanup_expired() + assert removed == 1 + assert store.session_count == 0 + + +async def test_cleanup_expired_keeps_pending_input_under_grace(store: SessionStore) -> None: + """Sessions awaiting user input get a longer grace period (10 min) than authenticated TTL.""" + entry = await store.create("ATV", _FakeService()) + entry.input_bridge = InputBridge() + entry.auth_status = AuthStatus.PENDING_INPUT + removed = await store.cleanup_expired() + assert removed == 0 + assert store.session_count == 1 + + +async def test_cancel_all_bridges(store: SessionStore) -> None: + a = await store.create("ATV", _FakeService()) + b = await store.create("NF", _FakeService()) + a.input_bridge = InputBridge() + b.input_bridge = InputBridge() + + await store.cancel_all_bridges() + assert a.input_bridge.status is AuthStatus.FAILED + assert b.input_bridge.status is AuthStatus.FAILED + + +async def test_get_session_store_returns_singleton() -> None: + a = get_session_store() + b = get_session_store() + assert a is b + + +async def test_max_sessions_evicts_oldest(store: SessionStore, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(type(store), "_max_sessions", property(lambda _: 2)) + + a = await store.create("A", _FakeService(), session_id="a") + await asyncio.sleep(0.01) + b = await store.create("B", _FakeService(), session_id="b") + await asyncio.sleep(0.01) + c = await store.create("C", _FakeService(), session_id="c") + + assert store.session_count == 2 + assert await store.get("a") is None # evicted + assert (await store.get("b")) is b + assert (await store.get("c")) is c