From c323db94810e40d546cd8f567cf8d1d874473fd1 Mon Sep 17 00:00:00 2001 From: Andy Date: Thu, 19 Mar 2026 18:13:43 -0600 Subject: [PATCH] feat(downloader): consolidate into unified requests-based downloader Replace 4 separate downloaders (requests, curl_impersonate, aria2c, n_m3u8dl_re) with a single optimized requests downloader with adaptive chunk sizing and session passthrough for TLS fingerprinting support. - Adaptive chunk sizing (512KB-4MB) based on content length, up from fixed 1KB - Buffered writes (1MB buffer) for improved I/O throughput - Session passthrough: accepts both requests.Session and CurlSession - Per-call speed tracking with rolling window (fixes cross-track speed bleed) - Worker count default capped at 16 - Removed all downloader.__name__ special-casing from manifest parsers - Removed aria2c/curl_impersonate/n_m3u8dl_re downloader modules - Deprecated downloader config key in unshackle.yaml --- unshackle/commands/dl.py | 3 - unshackle/core/config.py | 16 +- unshackle/core/downloaders/__init__.py | 5 +- unshackle/core/downloaders/aria2c.py | 543 ----------------- .../core/downloaders/curl_impersonate.py | 308 ---------- unshackle/core/downloaders/n_m3u8dl_re.py | 548 ------------------ unshackle/core/downloaders/requests.py | 179 +++--- unshackle/core/manifests/dash.py | 122 +--- unshackle/core/manifests/hls.py | 43 +- unshackle/core/manifests/ism.py | 79 +-- unshackle/core/tracks/track.py | 24 +- 11 files changed, 179 insertions(+), 1691 deletions(-) delete mode 100644 unshackle/core/downloaders/aria2c.py delete mode 100644 unshackle/core/downloaders/curl_impersonate.py delete mode 100644 unshackle/core/downloaders/n_m3u8dl_re.py diff --git a/unshackle/commands/dl.py b/unshackle/commands/dl.py index c04c0be..603c685 100644 --- a/unshackle/commands/dl.py +++ b/unshackle/commands/dl.py @@ -723,9 +723,6 @@ class dl: pass merge_dict(config.services.get(self.service), self.service_config) - if getattr(config, "downloader_map", None): - config.downloader = config.downloader_map.get(self.service, config.downloader) - if getattr(config, "decryption_map", None): config.decryption = config.decryption_map.get(self.service, config.decryption) diff --git a/unshackle/core/config.py b/unshackle/core/config.py index 686043e..51e9a9a 100644 --- a/unshackle/core/config.py +++ b/unshackle/core/config.py @@ -41,8 +41,6 @@ class Config: def __init__(self, **kwargs: Any): self.dl: dict = kwargs.get("dl") or {} - self.aria2c: dict = kwargs.get("aria2c") or {} - self.n_m3u8dl_re: dict = kwargs.get("n_m3u8dl_re") or {} self.cdm: dict = kwargs.get("cdm") or {} self.chapter_fallback_name: str = kwargs.get("chapter_fallback_name") or "" self.curl_impersonate: dict = kwargs.get("curl_impersonate") or {} @@ -60,13 +58,13 @@ class Config: else: setattr(self.directories, name, Path(path).expanduser()) - downloader_cfg = kwargs.get("downloader") or "requests" - if isinstance(downloader_cfg, dict): - self.downloader_map = {k.upper(): v for k, v in downloader_cfg.items()} - self.downloader = self.downloader_map.get("DEFAULT", "requests") - else: - self.downloader_map = {} - self.downloader = downloader_cfg + downloader_cfg = kwargs.get("downloader") + if downloader_cfg and downloader_cfg != "requests": + warnings.warn( + f"downloader '{downloader_cfg}' is deprecated. The unified requests downloader is now used.", + DeprecationWarning, + stacklevel=2, + ) self.filenames = self._Filenames() for name, filename in (kwargs.get("filenames") or {}).items(): diff --git a/unshackle/core/downloaders/__init__.py b/unshackle/core/downloaders/__init__.py index aa0aecb..66d3b42 100644 --- a/unshackle/core/downloaders/__init__.py +++ b/unshackle/core/downloaders/__init__.py @@ -1,6 +1,3 @@ -from .aria2c import aria2c -from .curl_impersonate import curl_impersonate -from .n_m3u8dl_re import n_m3u8dl_re from .requests import requests -__all__ = ("aria2c", "curl_impersonate", "requests", "n_m3u8dl_re") +__all__ = ("requests",) diff --git a/unshackle/core/downloaders/aria2c.py b/unshackle/core/downloaders/aria2c.py deleted file mode 100644 index 8620b09..0000000 --- a/unshackle/core/downloaders/aria2c.py +++ /dev/null @@ -1,543 +0,0 @@ -import logging -import os -import subprocess -import textwrap -import threading -import time -from functools import partial -from http.cookiejar import CookieJar -from pathlib import Path -from typing import Any, Callable, Generator, MutableMapping, Optional, Union -from urllib.parse import urlparse - -import requests -from Crypto.Random import get_random_bytes -from requests import Session -from requests.cookies import cookiejar_from_dict, get_cookie_header -from rich import filesize -from rich.text import Text - -from unshackle.core import binaries -from unshackle.core.config import config -from unshackle.core.console import console -from unshackle.core.constants import DOWNLOAD_CANCELLED -from unshackle.core.utilities import get_debug_logger, get_extension, get_free_port - - -def rpc(caller: Callable, secret: str, method: str, params: Optional[list[Any]] = None) -> Any: - """Make a call to Aria2's JSON-RPC API.""" - try: - rpc_res = caller( - json={ - "jsonrpc": "2.0", - "id": get_random_bytes(16).hex(), - "method": method, - "params": [f"token:{secret}", *(params or [])], - } - ).json() - if rpc_res.get("code"): - # wrap to console width - padding - '[Aria2c]: ' - error_pretty = "\n ".join( - textwrap.wrap( - f"RPC Error: {rpc_res['message']} ({rpc_res['code']})".strip(), - width=console.width - 20, - initial_indent="", - ) - ) - console.log(Text.from_ansi("\n[Aria2c]: " + error_pretty)) - return rpc_res["result"] - except requests.exceptions.ConnectionError: - # absorb, process likely ended as it was calling RPC - return - - -class _Aria2Manager: - """Singleton manager to run one aria2c process and enqueue downloads via RPC.""" - - def __init__(self) -> None: - self._logger = logging.getLogger(__name__) - self._proc: Optional[subprocess.Popen] = None - self._rpc_port: Optional[int] = None - self._rpc_secret: Optional[str] = None - self._rpc_uri: Optional[str] = None - self._session: Session = Session() - self._max_workers: Optional[int] = None - self._max_concurrent_downloads: int = 0 - self._max_connection_per_server: int = 1 - self._split_default: int = 5 - self._file_allocation: str = "prealloc" - self._proxy: Optional[str] = None - self._lock: threading.Lock = threading.Lock() - - def _wait_for_rpc_ready(self, timeout_s: float = 8.0, interval_s: float = 0.1) -> None: - assert self._proc is not None - assert self._rpc_uri is not None - assert self._rpc_secret is not None - - deadline = time.monotonic() + timeout_s - - payload = { - "jsonrpc": "2.0", - "id": get_random_bytes(16).hex(), - "method": "aria2.getVersion", - "params": [f"token:{self._rpc_secret}"], - } - - while time.monotonic() < deadline: - if self._proc.poll() is not None: - raise RuntimeError( - f"aria2c exited before RPC became ready (exit code {self._proc.returncode})" - ) - try: - res = self._session.post(self._rpc_uri, json=payload, timeout=0.25) - data = res.json() - if isinstance(data, dict) and data.get("result") is not None: - return - except (requests.exceptions.RequestException, ValueError): - # Not ready yet (connection refused / bad response / etc.) - pass - time.sleep(interval_s) - - # Timed out: ensure we don't leave a zombie/stray aria2c process behind. - try: - self._proc.terminate() - self._proc.wait(timeout=2) - except Exception: - try: - self._proc.kill() - self._proc.wait(timeout=2) - except Exception: - pass - raise TimeoutError(f"aria2c RPC did not become ready within {timeout_s:.1f}s") - - def _build_args(self) -> list[str]: - args = [ - "--continue=true", - f"--max-concurrent-downloads={self._max_concurrent_downloads}", - f"--max-connection-per-server={self._max_connection_per_server}", - f"--split={self._split_default}", - "--max-file-not-found=5", - "--max-tries=5", - "--retry-wait=2", - "--allow-overwrite=true", - "--auto-file-renaming=false", - "--console-log-level=warn", - "--download-result=default", - f"--file-allocation={self._file_allocation}", - "--summary-interval=0", - "--enable-rpc=true", - f"--rpc-listen-port={self._rpc_port}", - f"--rpc-secret={self._rpc_secret}", - ] - if self._proxy: - args.extend(["--all-proxy", self._proxy]) - return args - - def ensure_started( - self, - proxy: Optional[str], - max_workers: Optional[int], - ) -> None: - with self._lock: - if not binaries.Aria2: - debug_logger = get_debug_logger() - if debug_logger: - debug_logger.log( - level="ERROR", - operation="downloader_aria2c_binary_missing", - message="Aria2c executable not found in PATH or local binaries directory", - context={"searched_names": ["aria2c", "aria2"]}, - ) - raise EnvironmentError("Aria2c executable not found...") - - effective_proxy = proxy or None - - if not max_workers: - effective_max_workers = min(32, (os.cpu_count() or 1) + 4) - elif not isinstance(max_workers, int): - raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}") - else: - effective_max_workers = max_workers - - if self._proc and self._proc.poll() is None: - if effective_proxy != self._proxy or effective_max_workers != self._max_workers: - self._logger.warning( - "aria2c process is already running; requested proxy=%r, max_workers=%r, " - "but running process will continue with proxy=%r, max_workers=%r", - effective_proxy, - effective_max_workers, - self._proxy, - self._max_workers, - ) - return - - self._rpc_port = get_free_port() - self._rpc_secret = get_random_bytes(16).hex() - self._rpc_uri = f"http://127.0.0.1:{self._rpc_port}/jsonrpc" - - self._max_workers = effective_max_workers - self._max_concurrent_downloads = int( - config.aria2c.get("max_concurrent_downloads", effective_max_workers) - ) - self._max_connection_per_server = int(config.aria2c.get("max_connection_per_server", 1)) - self._split_default = int(config.aria2c.get("split", 5)) - self._file_allocation = config.aria2c.get("file_allocation", "prealloc") - self._proxy = effective_proxy - - args = self._build_args() - self._proc = subprocess.Popen( - [binaries.Aria2, *args], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL - ) - self._wait_for_rpc_ready() - - @property - def rpc_uri(self) -> str: - assert self._rpc_uri - return self._rpc_uri - - @property - def rpc_secret(self) -> str: - assert self._rpc_secret - return self._rpc_secret - - @property - def session(self) -> Session: - return self._session - - def add_uris(self, uris: list[str], options: dict[str, Any]) -> str: - """Add a single download with multiple URIs via RPC.""" - gid = rpc( - caller=partial(self._session.post, url=self.rpc_uri), - secret=self.rpc_secret, - method="aria2.addUri", - params=[uris, options], - ) - return gid or "" - - def get_global_stat(self) -> dict[str, Any]: - return rpc( - caller=partial(self.session.post, url=self.rpc_uri), - secret=self.rpc_secret, - method="aria2.getGlobalStat", - ) or {} - - def tell_status(self, gid: str) -> Optional[dict[str, Any]]: - return rpc( - caller=partial(self.session.post, url=self.rpc_uri), - secret=self.rpc_secret, - method="aria2.tellStatus", - params=[gid, ["status", "errorCode", "errorMessage", "files", "completedLength", "totalLength"]], - ) - - def remove(self, gid: str) -> None: - rpc( - caller=partial(self.session.post, url=self.rpc_uri), - secret=self.rpc_secret, - method="aria2.forceRemove", - params=[gid], - ) - - -_manager = _Aria2Manager() - - -def download( - urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]], - output_dir: Path, - filename: str, - headers: Optional[MutableMapping[str, Union[str, bytes]]] = None, - cookies: Optional[Union[MutableMapping[str, str], CookieJar]] = None, - proxy: Optional[str] = None, - max_workers: Optional[int] = None, -) -> Generator[dict[str, Any], None, None]: - """Enqueue downloads to the singleton aria2c instance via stdin and track per-call progress via RPC.""" - debug_logger = get_debug_logger() - - if not urls: - raise ValueError("urls must be provided and not empty") - elif not isinstance(urls, (str, dict, list)): - raise TypeError(f"Expected urls to be {str} or {dict} or a list of one of them, not {type(urls)}") - - if not output_dir: - raise ValueError("output_dir must be provided") - elif not isinstance(output_dir, Path): - raise TypeError(f"Expected output_dir to be {Path}, not {type(output_dir)}") - - if not filename: - raise ValueError("filename must be provided") - elif not isinstance(filename, str): - raise TypeError(f"Expected filename to be {str}, not {type(filename)}") - - if not isinstance(headers, (MutableMapping, type(None))): - raise TypeError(f"Expected headers to be {MutableMapping}, not {type(headers)}") - - if not isinstance(cookies, (MutableMapping, CookieJar, type(None))): - raise TypeError(f"Expected cookies to be {MutableMapping} or {CookieJar}, not {type(cookies)}") - - if not isinstance(proxy, (str, type(None))): - raise TypeError(f"Expected proxy to be {str}, not {type(proxy)}") - - if not max_workers: - max_workers = min(32, (os.cpu_count() or 1) + 4) - elif not isinstance(max_workers, int): - raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}") - - if not isinstance(urls, list): - urls = [urls] - - if cookies and not isinstance(cookies, CookieJar): - cookies = cookiejar_from_dict(cookies) - - _manager.ensure_started(proxy=proxy, max_workers=max_workers) - - if debug_logger: - first_url = urls[0] if isinstance(urls[0], str) else urls[0].get("url", "") - url_display = first_url[:200] + "..." if len(first_url) > 200 else first_url - debug_logger.log( - level="DEBUG", - operation="downloader_aria2c_start", - message="Starting Aria2c download", - context={ - "binary_path": str(binaries.Aria2), - "url_count": len(urls), - "first_url": url_display, - "output_dir": str(output_dir), - "filename": filename, - "has_proxy": bool(proxy), - }, - ) - - # Build options for each URI and add via RPC - gids: list[str] = [] - - for i, url in enumerate(urls): - if isinstance(url, str): - url_data = {"url": url} - else: - url_data: dict[str, Any] = url - - url_filename = filename.format(i=i, ext=get_extension(url_data["url"])) - - opts: dict[str, Any] = { - "dir": str(output_dir), - "out": url_filename, - "split": str(1 if len(urls) > 1 else int(config.aria2c.get("split", 5))), - } - - # Cookies as header - if cookies: - mock_request = requests.Request(url=url_data["url"]) - cookie_header = get_cookie_header(cookies, mock_request) - if cookie_header: - opts.setdefault("header", []).append(f"Cookie: {cookie_header}") - - # Global headers - for header, value in (headers or {}).items(): - if header.lower() == "cookie": - raise ValueError("You cannot set Cookies as a header manually, please use the `cookies` param.") - if header.lower() == "accept-encoding": - continue - if header.lower() == "referer": - opts["referer"] = str(value) - continue - if header.lower() == "user-agent": - opts["user-agent"] = str(value) - continue - opts.setdefault("header", []).append(f"{header}: {value}") - - # Per-url extra args - for key, value in url_data.items(): - if key == "url": - continue - if key == "headers": - for header_name, header_value in value.items(): - opts.setdefault("header", []).append(f"{header_name}: {header_value}") - else: - opts[key] = str(value) - - # Add via RPC - gid = _manager.add_uris([url_data["url"]], opts) - if gid: - gids.append(gid) - - yield dict(total=len(gids)) - - completed: set[str] = set() - - try: - while len(completed) < len(gids): - if DOWNLOAD_CANCELLED.is_set(): - # Remove tracked downloads on cancel - for gid in gids: - if gid not in completed: - _manager.remove(gid) - yield dict(downloaded="[yellow]CANCELLED") - raise KeyboardInterrupt() - - stats = _manager.get_global_stat() - dl_speed = int(stats.get("downloadSpeed", -1)) - - # Aggregate progress across all GIDs for this call - total_completed = 0 - total_size = 0 - - # Check each tracked GID - for gid in gids: - if gid in completed: - continue - - status = _manager.tell_status(gid) - if not status: - continue - - completed_length = int(status.get("completedLength", 0)) - total_length = int(status.get("totalLength", 0)) - total_completed += completed_length - total_size += total_length - - state = status.get("status") - if state in ("complete", "error"): - completed.add(gid) - yield dict(completed=len(completed)) - - if state == "error": - used_uri = None - try: - used_uri = next( - uri["uri"] - for file in status.get("files", []) - for uri in file.get("uris", []) - if uri.get("status") == "used" - ) - except Exception: - used_uri = "unknown" - error = f"Download Error (#{gid}): {status.get('errorMessage')} ({status.get('errorCode')}), {used_uri}" - error_pretty = "\n ".join(textwrap.wrap(error, width=console.width - 20, initial_indent="")) - console.log(Text.from_ansi("\n[Aria2c]: " + error_pretty)) - if debug_logger: - debug_logger.log( - level="ERROR", - operation="downloader_aria2c_download_error", - message=f"Aria2c download failed: {status.get('errorMessage')}", - context={ - "gid": gid, - "error_code": status.get("errorCode"), - "error_message": status.get("errorMessage"), - "used_uri": used_uri[:200] + "..." if used_uri and len(used_uri) > 200 else used_uri, - "completed_length": status.get("completedLength"), - "total_length": status.get("totalLength"), - }, - ) - raise ValueError(error) - - # Yield aggregate progress for this call's downloads - progress_data = {"advance": 0} - - if len(gids) > 1: - # Multi-file mode (e.g., HLS): Return the count of completed segments - progress_data["completed"] = len(completed) - progress_data["total"] = len(gids) - else: - # Single-file mode: Return the total bytes downloaded - progress_data["completed"] = total_completed - if total_size > 0: - progress_data["total"] = total_size - else: - progress_data["total"] = None - - if dl_speed != -1: - progress_data["downloaded"] = f"{filesize.decimal(dl_speed)}/s" - - yield progress_data - - time.sleep(1) - except KeyboardInterrupt: - DOWNLOAD_CANCELLED.set() - raise - except Exception as e: - DOWNLOAD_CANCELLED.set() - yield dict(downloaded="[red]FAILED") - if debug_logger and not isinstance(e, ValueError): - debug_logger.log( - level="ERROR", - operation="downloader_aria2c_exception", - message=f"Unexpected error during Aria2c download: {e}", - error=e, - context={ - "url_count": len(urls), - "output_dir": str(output_dir), - }, - ) - raise - - -def aria2c( - urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]], - output_dir: Path, - filename: str, - headers: Optional[MutableMapping[str, Union[str, bytes]]] = None, - cookies: Optional[Union[MutableMapping[str, str], CookieJar]] = None, - proxy: Optional[str] = None, - max_workers: Optional[int] = None, -) -> Generator[dict[str, Any], None, None]: - """ - Download files using Aria2(c). - https://aria2.github.io - - Yields the following download status updates while chunks are downloading: - - - {total: 100} (100% download total) - - {completed: 1} (1% download progress out of 100%) - - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) - - The data is in the same format accepted by rich's progress.update() function. - - Parameters: - urls: Web URL(s) to file(s) to download. You can use a dictionary with the key - "url" for the URI, and other keys for extra arguments to use per-URL. - output_dir: The folder to save the file into. If the save path's directory does - not exist then it will be made automatically. - filename: The filename or filename template to use for each file. The variables - you can use are `i` for the URL index and `ext` for the URL extension. - headers: A mapping of HTTP Header Key/Values to use for all downloads. - cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads. - proxy: An optional proxy URI to route connections through for all downloads. - max_workers: The maximum amount of threads to use for downloads. Defaults to - min(32,(cpu_count+4)). Use for the --max-concurrent-downloads option. - """ - if proxy and not proxy.lower().startswith("http://"): - # Only HTTP proxies are supported by aria2(c) - proxy = urlparse(proxy) - - port = get_free_port() - username, password = get_random_bytes(8).hex(), get_random_bytes(8).hex() - local_proxy = f"http://{username}:{password}@localhost:{port}" - - scheme = {"https": "http+ssl", "socks5h": "socks"}.get(proxy.scheme, proxy.scheme) - - remote_server = f"{scheme}://{proxy.hostname}" - if proxy.port: - remote_server += f":{proxy.port}" - if proxy.username or proxy.password: - remote_server += "#" - if proxy.username: - remote_server += proxy.username - if proxy.password: - remote_server += f":{proxy.password}" - - p = subprocess.Popen( - ["pproxy", "-l", f"http://:{port}#{username}:{password}", "-r", remote_server], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - try: - yield from download(urls, output_dir, filename, headers, cookies, local_proxy, max_workers) - finally: - p.kill() - p.wait() - return - yield from download(urls, output_dir, filename, headers, cookies, proxy, max_workers) - - -__all__ = ("aria2c",) diff --git a/unshackle/core/downloaders/curl_impersonate.py b/unshackle/core/downloaders/curl_impersonate.py deleted file mode 100644 index d278e91..0000000 --- a/unshackle/core/downloaders/curl_impersonate.py +++ /dev/null @@ -1,308 +0,0 @@ -import math -import time -from concurrent import futures -from concurrent.futures.thread import ThreadPoolExecutor -from http.cookiejar import CookieJar -from pathlib import Path -from typing import Any, Generator, MutableMapping, Optional, Union - -from curl_cffi.requests import Session -from rich import filesize - -from unshackle.core.config import config -from unshackle.core.constants import DOWNLOAD_CANCELLED -from unshackle.core.utilities import get_debug_logger, get_extension - -MAX_ATTEMPTS = 5 -RETRY_WAIT = 2 -CHUNK_SIZE = 1024 -PROGRESS_WINDOW = 5 -BROWSER = config.curl_impersonate.get("browser", "chrome124") - - -def download(url: str, save_path: Path, session: Session, **kwargs: Any) -> Generator[dict[str, Any], None, None]: - """ - Download files using Curl Impersonate. - https://github.com/lwthiker/curl-impersonate - - Yields the following download status updates while chunks are downloading: - - - {total: 123} (there are 123 chunks to download) - - {total: None} (there are an unknown number of chunks to download) - - {advance: 1} (one chunk was downloaded) - - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) - - {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size) - - The data is in the same format accepted by rich's progress.update() function. The - `downloaded` key is custom and is not natively accepted by all rich progress bars. - - Parameters: - url: Web URL of a file to download. - save_path: The path to save the file to. If the save path's directory does not - exist then it will be made automatically. - session: The Requests or Curl-Impersonate Session to make HTTP requests with. - Useful to set Header, Cookie, and Proxy data. Connections are saved and - re-used with the session so long as the server keeps the connection alive. - kwargs: Any extra keyword arguments to pass to the session.get() call. Use this - for one-time request changes like a header, cookie, or proxy. For example, - to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`. - """ - save_dir = save_path.parent - control_file = save_path.with_name(f"{save_path.name}.!dev") - - save_dir.mkdir(parents=True, exist_ok=True) - - if control_file.exists(): - # consider the file corrupt if the control file exists - save_path.unlink(missing_ok=True) - control_file.unlink() - elif save_path.exists(): - # if it exists, and no control file, then it should be safe - yield dict(file_downloaded=save_path, written=save_path.stat().st_size) - - # TODO: Design a control file format so we know how much of the file is missing - control_file.write_bytes(b"") - - attempts = 1 - try: - while True: - written = 0 - download_sizes = [] - last_speed_refresh = time.time() - - try: - stream = session.get(url, stream=True, **kwargs) - stream.raise_for_status() - - try: - content_length = int(stream.headers.get("Content-Length", "0")) - - # Skip Content-Length validation for compressed responses since - # curl_impersonate automatically decompresses but Content-Length shows compressed size - if stream.headers.get("Content-Encoding", "").lower() in ["gzip", "deflate", "br"]: - content_length = 0 - except ValueError: - content_length = 0 - - if content_length > 0: - yield dict(total=math.ceil(content_length / CHUNK_SIZE)) - else: - # we have no data to calculate total chunks - yield dict(total=None) # indeterminate mode - - with open(save_path, "wb") as f: - for chunk in stream.iter_content(chunk_size=CHUNK_SIZE): - download_size = len(chunk) - f.write(chunk) - written += download_size - - yield dict(advance=1) - - now = time.time() - time_since = now - last_speed_refresh - - download_sizes.append(download_size) - if time_since > PROGRESS_WINDOW or download_size < CHUNK_SIZE: - data_size = sum(download_sizes) - download_speed = math.ceil(data_size / (time_since or 1)) - yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() - - if content_length and written < content_length: - raise IOError(f"Failed to read {content_length} bytes from the track URI.") - - yield dict(file_downloaded=save_path, written=written) - break - except Exception as e: - save_path.unlink(missing_ok=True) - if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS: - raise e - time.sleep(RETRY_WAIT) - attempts += 1 - finally: - control_file.unlink() - - -def curl_impersonate( - urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]], - output_dir: Path, - filename: str, - headers: Optional[MutableMapping[str, Union[str, bytes]]] = None, - cookies: Optional[Union[MutableMapping[str, str], CookieJar]] = None, - proxy: Optional[str] = None, - max_workers: Optional[int] = None, -) -> Generator[dict[str, Any], None, None]: - """ - Download files using Curl Impersonate. - https://github.com/lwthiker/curl-impersonate - - Yields the following download status updates while chunks are downloading: - - - {total: 123} (there are 123 chunks to download) - - {total: None} (there are an unknown number of chunks to download) - - {advance: 1} (one chunk was downloaded) - - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) - - {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size) - - The data is in the same format accepted by rich's progress.update() function. - However, The `downloaded`, `file_downloaded` and `written` keys are custom and not - natively accepted by rich progress bars. - - Parameters: - urls: Web URL(s) to file(s) to download. You can use a dictionary with the key - "url" for the URI, and other keys for extra arguments to use per-URL. - output_dir: The folder to save the file into. If the save path's directory does - not exist then it will be made automatically. - filename: The filename or filename template to use for each file. The variables - you can use are `i` for the URL index and `ext` for the URL extension. - headers: A mapping of HTTP Header Key/Values to use for all downloads. - cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads. - proxy: An optional proxy URI to route connections through for all downloads. - max_workers: The maximum amount of threads to use for downloads. Defaults to - min(32,(cpu_count+4)). - """ - if not urls: - raise ValueError("urls must be provided and not empty") - elif not isinstance(urls, (str, dict, list)): - raise TypeError(f"Expected urls to be {str} or {dict} or a list of one of them, not {type(urls)}") - - if not output_dir: - raise ValueError("output_dir must be provided") - elif not isinstance(output_dir, Path): - raise TypeError(f"Expected output_dir to be {Path}, not {type(output_dir)}") - - if not filename: - raise ValueError("filename must be provided") - elif not isinstance(filename, str): - raise TypeError(f"Expected filename to be {str}, not {type(filename)}") - - if not isinstance(headers, (MutableMapping, type(None))): - raise TypeError(f"Expected headers to be {MutableMapping}, not {type(headers)}") - - if not isinstance(cookies, (MutableMapping, CookieJar, type(None))): - raise TypeError(f"Expected cookies to be {MutableMapping} or {CookieJar}, not {type(cookies)}") - - if not isinstance(proxy, (str, type(None))): - raise TypeError(f"Expected proxy to be {str}, not {type(proxy)}") - - if not isinstance(max_workers, (int, type(None))): - raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}") - - debug_logger = get_debug_logger() - - if not isinstance(urls, list): - urls = [urls] - - urls = [ - dict(save_path=save_path, **url) if isinstance(url, dict) else dict(url=url, save_path=save_path) - for i, url in enumerate(urls) - for save_path in [ - output_dir / filename.format(i=i, ext=get_extension(url["url"] if isinstance(url, dict) else url)) - ] - ] - - session = Session(impersonate=BROWSER) - if headers: - headers = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} - session.headers.update(headers) - if cookies: - session.cookies.update(cookies) - if proxy: - session.proxies.update({"all": proxy}) - - if debug_logger: - first_url = urls[0].get("url", "") if urls else "" - url_display = first_url[:200] + "..." if len(first_url) > 200 else first_url - debug_logger.log( - level="DEBUG", - operation="downloader_curl_impersonate_start", - message="Starting curl_impersonate download", - context={ - "url_count": len(urls), - "first_url": url_display, - "output_dir": str(output_dir), - "filename": filename, - "max_workers": max_workers, - "browser": BROWSER, - "has_proxy": bool(proxy), - }, - ) - - yield dict(total=len(urls)) - - download_sizes = [] - last_speed_refresh = time.time() - - with ThreadPoolExecutor(max_workers=max_workers) as pool: - for i, future in enumerate( - futures.as_completed((pool.submit(download, session=session, **url) for url in urls)) - ): - file_path, download_size = None, None - try: - for status_update in future.result(): - if status_update.get("file_downloaded") and status_update.get("written"): - file_path = status_update["file_downloaded"] - download_size = status_update["written"] - elif len(urls) == 1: - # these are per-chunk updates, only useful if it's one big file - yield status_update - except KeyboardInterrupt: - DOWNLOAD_CANCELLED.set() # skip pending track downloads - yield dict(downloaded="[yellow]CANCELLING") - pool.shutdown(wait=True, cancel_futures=True) - yield dict(downloaded="[yellow]CANCELLED") - # tell dl that it was cancelled - # the pool is already shut down, so exiting loop is fine - raise - except Exception as e: - DOWNLOAD_CANCELLED.set() # skip pending track downloads - yield dict(downloaded="[red]FAILING") - pool.shutdown(wait=True, cancel_futures=True) - yield dict(downloaded="[red]FAILED") - if debug_logger: - debug_logger.log( - level="ERROR", - operation="downloader_curl_impersonate_failed", - message=f"curl_impersonate download failed: {e}", - error=e, - context={ - "url_count": len(urls), - "output_dir": str(output_dir), - "browser": BROWSER, - }, - ) - # tell dl that it failed - # the pool is already shut down, so exiting loop is fine - raise - else: - yield dict(file_downloaded=file_path) - yield dict(advance=1) - - now = time.time() - time_since = now - last_speed_refresh - - if download_size: # no size == skipped dl - download_sizes.append(download_size) - - if download_sizes and (time_since > PROGRESS_WINDOW or i == len(urls)): - data_size = sum(download_sizes) - download_speed = math.ceil(data_size / (time_since or 1)) - yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() - - if debug_logger: - debug_logger.log( - level="DEBUG", - operation="downloader_curl_impersonate_complete", - message="curl_impersonate download completed successfully", - context={ - "url_count": len(urls), - "output_dir": str(output_dir), - "filename": filename, - }, - ) - - -__all__ = ("curl_impersonate",) diff --git a/unshackle/core/downloaders/n_m3u8dl_re.py b/unshackle/core/downloaders/n_m3u8dl_re.py deleted file mode 100644 index c88ab3f..0000000 --- a/unshackle/core/downloaders/n_m3u8dl_re.py +++ /dev/null @@ -1,548 +0,0 @@ -import os -import re -import subprocess -import warnings -from http.cookiejar import CookieJar -from pathlib import Path -from typing import Any, Generator, MutableMapping - -import requests -from requests.cookies import cookiejar_from_dict, get_cookie_header - -from unshackle.core import binaries -from unshackle.core.binaries import FFMPEG, Mp4decrypt, ShakaPackager -from unshackle.core.config import config -from unshackle.core.console import console -from unshackle.core.constants import DOWNLOAD_CANCELLED -from unshackle.core.utilities import get_debug_logger - -PERCENT_RE = re.compile(r"(\d+\.\d+%)") -SPEED_RE = re.compile(r"(\d+\.\d+(?:MB|KB)ps)") -SIZE_RE = re.compile(r"(\d+\.\d+(?:MB|GB|KB)/\d+\.\d+(?:MB|GB|KB))") -WARN_RE = re.compile(r"(WARN : Response.*|WARN : One or more errors occurred.*)") -ERROR_RE = re.compile(r"(\bERROR\b.*|\bFAILED\b.*|\bException\b.*)") - -DECRYPTION_ENGINE = { - "shaka": "SHAKA_PACKAGER", - "mp4decrypt": "MP4DECRYPT", -} - -# Ignore FutureWarnings -warnings.simplefilter(action="ignore", category=FutureWarning) - - -def get_track_selection_args(track: Any) -> list[str]: - """ - Generates track selection arguments for N_m3u8dl_RE. - - Args: - track: A track object with attributes like descriptor, data, and class name. - - Returns: - A list of strings for track selection. - - Raises: - ValueError: If the manifest type is unsupported or track selection fails. - """ - descriptor = track.descriptor.name - track_type = track.__class__.__name__ - - def _create_args(flag: str, parts: list[str], type_str: str, extra_args: list[str] | None = None) -> list[str]: - if not parts: - raise ValueError(f"[N_m3u8DL-RE]: Unable to select {type_str} track from {descriptor} manifest") - - final_args = [flag, ":".join(parts)] - if extra_args: - final_args.extend(extra_args) - - return final_args - - match descriptor: - case "HLS": - # HLS playlists are direct inputs; no selection arguments needed. - return [] - - case "DASH": - representation = track.data.get("dash", {}).get("representation", {}) - adaptation_set = track.data.get("dash", {}).get("adaptation_set", {}) - parts = [] - - if track_type == "Audio": - track_id = representation.get("id") or adaptation_set.get("audioTrackId") - lang = representation.get("lang") or adaptation_set.get("lang") - - if track_id: - parts.append(rf'"id=\b{track_id}\b"') - if lang: - parts.append(f"lang={lang}") - else: - if codecs := representation.get("codecs"): - parts.append(f"codecs={codecs}") - if lang: - parts.append(f"lang={lang}") - if bw := representation.get("bandwidth"): - bitrate = int(bw) // 1000 - parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}") - if roles := representation.findall("Role") + adaptation_set.findall("Role"): - if role := next((r.get("value") for r in roles if r.get("value", "").lower() == "main"), None): - parts.append(f"role={role}") - return _create_args("-sa", parts, "audio") - - if track_type == "Video": - if track_id := representation.get("id"): - parts.append(rf'"id=\b{track_id}\b"') - else: - if width := representation.get("width"): - parts.append(f"res={width}*") - if codecs := representation.get("codecs"): - parts.append(f"codecs={codecs}") - if bw := representation.get("bandwidth"): - bitrate = int(bw) // 1000 - parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}") - return _create_args("-sv", parts, "video") - - if track_type == "Subtitle": - if track_id := representation.get("id"): - parts.append(rf'"id=\b{track_id}\b"') - else: - if lang := representation.get("lang"): - parts.append(f"lang={lang}") - return _create_args("-ss", parts, "subtitle", extra_args=["--auto-subtitle-fix", "false"]) - - case "ISM": - quality_level = track.data.get("ism", {}).get("quality_level", {}) - stream_index = track.data.get("ism", {}).get("stream_index", {}) - parts = [] - - if track_type == "Audio": - if name := stream_index.get("Name") or quality_level.get("Index"): - parts.append(rf'"id=\b{name}\b"') - else: - if codecs := quality_level.get("FourCC"): - parts.append(f"codecs={codecs}") - if lang := stream_index.get("Language"): - parts.append(f"lang={lang}") - if br := quality_level.get("Bitrate"): - bitrate = int(br) // 1000 - parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}") - return _create_args("-sa", parts, "audio") - - if track_type == "Video": - if name := stream_index.get("Name") or quality_level.get("Index"): - parts.append(rf'"id=\b{name}\b"') - else: - if width := quality_level.get("MaxWidth"): - parts.append(f"res={width}*") - if codecs := quality_level.get("FourCC"): - parts.append(f"codecs={codecs}") - if br := quality_level.get("Bitrate"): - bitrate = int(br) // 1000 - parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}") - return _create_args("-sv", parts, "video") - - # I've yet to encounter a subtitle track in ISM manifests, so this is mostly theoretical. - if track_type == "Subtitle": - if name := stream_index.get("Name") or quality_level.get("Index"): - parts.append(rf'"id=\b{name}\b"') - else: - if lang := stream_index.get("Language"): - parts.append(f"lang={lang}") - return _create_args("-ss", parts, "subtitle", extra_args=["--auto-subtitle-fix", "false"]) - - case "URL": - raise ValueError( - f"[N_m3u8DL-RE]: Direct URL downloads are not supported for {track_type} tracks. " - f"The track should use a different downloader (e.g., 'requests', 'aria2c')." - ) - - raise ValueError(f"[N_m3u8DL-RE]: Unsupported manifest type: {descriptor}") - - -def build_download_args( - track_url: str, - filename: str, - output_dir: Path, - thread_count: int, - retry_count: int, - track_from_file: Path | None, - custom_args: dict[str, Any] | None, - headers: dict[str, Any] | None, - cookies: CookieJar | None, - proxy: str | None, - content_keys: dict[str, str] | None, - ad_keyword: str | None, - skip_merge: bool | None = False, -) -> list[str]: - """Constructs the CLI arguments for N_m3u8DL-RE.""" - - # Default arguments - args = { - "--save-name": filename, - "--save-dir": output_dir, - "--tmp-dir": output_dir, - "--thread-count": thread_count, - "--download-retry-count": retry_count, - } - if FFMPEG: - args["--ffmpeg-binary-path"] = str(FFMPEG) - if proxy: - args["--custom-proxy"] = proxy - if skip_merge: - args["--skip-merge"] = skip_merge - if ad_keyword: - args["--ad-keyword"] = ad_keyword - # Disable segment count validation to work around N_m3u8DL-RE's Math.Ceiling - # bug in duration-based SegmentTemplate calculation (see nilaoda/N_m3u8DL-RE#108) - args["--check-segments-count"] = False - - key_args = [] - if content_keys: - for kid, key in content_keys.items(): - key_args.extend(["--key", f"{kid.hex}:{key.lower()}"]) - - decryption_config = config.decryption.lower() - engine_name = DECRYPTION_ENGINE.get(decryption_config) or "SHAKA_PACKAGER" - args["--decryption-engine"] = engine_name - - binary_path = None - if engine_name == "SHAKA_PACKAGER": - if ShakaPackager: - binary_path = str(ShakaPackager) - elif engine_name == "MP4DECRYPT": - if Mp4decrypt: - binary_path = str(Mp4decrypt) - if binary_path: - args["--decryption-binary-path"] = binary_path - - if custom_args: - args.update(custom_args) - - command = [track_from_file or track_url] - for flag, value in args.items(): - if value is True: - command.append(flag) - elif value is False: - command.extend([flag, "false"]) - elif value is not False and value is not None: - command.extend([flag, str(value)]) - - # Append all content keys (multiple --key flags supported by N_m3u8DL-RE) - command.extend(key_args) - - if headers: - for key, value in headers.items(): - if key.lower() not in ("accept-encoding", "cookie"): - command.extend(["--header", f"{key}: {value}"]) - - if cookies: - req = requests.Request(method="GET", url=track_url) - cookie_header = get_cookie_header(cookies, req) - command.extend(["--header", f"Cookie: {cookie_header}"]) - - return command - - -def download( - urls: str | dict[str, Any] | list[str | dict[str, Any]], - track: Any, - output_dir: Path, - filename: str, - headers: MutableMapping[str, str | bytes] | None, - cookies: MutableMapping[str, str] | CookieJar | None, - proxy: str | None, - max_workers: int | None, - content_keys: dict[str, Any] | None, - skip_merge: bool | None = False, -) -> Generator[dict[str, Any], None, None]: - debug_logger = get_debug_logger() - - if not urls: - raise ValueError("urls must be provided and not empty") - if not isinstance(urls, (str, dict, list)): - raise TypeError(f"Expected urls to be str, dict, or list, not {type(urls)}") - if not isinstance(output_dir, Path): - raise TypeError(f"Expected output_dir to be Path, not {type(output_dir)}") - if not isinstance(filename, str) or not filename: - raise ValueError("filename must be a non-empty string") - if not isinstance(headers, (MutableMapping, type(None))): - raise TypeError(f"Expected headers to be a mapping or None, not {type(headers)}") - if not isinstance(cookies, (MutableMapping, CookieJar, type(None))): - raise TypeError(f"Expected cookies to be a mapping, CookieJar, or None, not {type(cookies)}") - if not isinstance(proxy, (str, type(None))): - raise TypeError(f"Expected proxy to be a str or None, not {type(proxy)}") - if not isinstance(max_workers, (int, type(None))): - raise TypeError(f"Expected max_workers to be an int or None, not {type(max_workers)}") - if not isinstance(content_keys, (dict, type(None))): - raise TypeError(f"Expected content_keys to be a dict or None, not {type(content_keys)}") - if not isinstance(skip_merge, (bool, type(None))): - raise TypeError(f"Expected skip_merge to be a bool or None, not {type(skip_merge)}") - - if cookies and not isinstance(cookies, CookieJar): - cookies = cookiejar_from_dict(cookies) - - if not binaries.N_m3u8DL_RE: - raise EnvironmentError("N_m3u8DL-RE executable not found...") - - effective_max_workers = max_workers or min(32, (os.cpu_count() or 1) + 4) - - if proxy and not config.n_m3u8dl_re.get("use_proxy", True): - proxy = None - - thread_count = config.n_m3u8dl_re.get("thread_count", effective_max_workers) - retry_count = config.n_m3u8dl_re.get("retry_count", 10) - ad_keyword = config.n_m3u8dl_re.get("ad_keyword") - - arguments = build_download_args( - track_url=track.url, - track_from_file=track.from_file, - filename=filename, - output_dir=output_dir, - thread_count=thread_count, - retry_count=retry_count, - custom_args=track.downloader_args, - headers=headers, - cookies=cookies, - proxy=proxy, - content_keys=content_keys, - skip_merge=skip_merge, - ad_keyword=ad_keyword, - ) - selection_args = get_track_selection_args(track) - arguments.extend(selection_args) - - log_file_path: Path | None = None - if debug_logger: - log_file_path = output_dir / f".n_m3u8dl_re_{filename}.log" - arguments.extend([ - "--log-file-path", str(log_file_path), - "--log-level", "DEBUG", - ]) - - track_url_display = track.url[:200] + "..." if len(track.url) > 200 else track.url - debug_logger.log( - level="DEBUG", - operation="downloader_n_m3u8dl_re_start", - message="Starting N_m3u8DL-RE download", - context={ - "binary_path": str(binaries.N_m3u8DL_RE), - "track_id": getattr(track, "id", None), - "track_type": track.__class__.__name__, - "track_url": track_url_display, - "output_dir": str(output_dir), - "filename": filename, - "thread_count": thread_count, - "retry_count": retry_count, - "has_content_keys": bool(content_keys), - "content_key_count": len(content_keys) if content_keys else 0, - "has_proxy": bool(proxy), - "skip_merge": skip_merge, - "has_custom_args": bool(track.downloader_args), - "selection_args": selection_args, - "descriptor": track.descriptor.name if hasattr(track, "descriptor") else None, - }, - ) - else: - arguments.extend(["--no-log", "true"]) - - yield {"total": 100} - yield {"downloaded": "Parsing streams..."} - - try: - with subprocess.Popen( - [binaries.N_m3u8DL_RE, *arguments], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - encoding="utf-8", - ) as process: - last_line = "" - track_type = track.__class__.__name__ - - for line in process.stdout: - output = line.strip() - if not output: - continue - last_line = output - - if warn_match := WARN_RE.search(output): - console.log(f"{track_type} {warn_match.group(1)}") - continue - - if speed_match := SPEED_RE.search(output): - size = size_match.group(1) if (size_match := SIZE_RE.search(output)) else "" - yield {"downloaded": f"{speed_match.group(1)} {size}"} - - if percent_match := PERCENT_RE.search(output): - progress = int(percent_match.group(1).split(".", 1)[0]) - yield {"completed": progress} if progress < 100 else {"downloaded": "Merging"} - - process.wait() - - if process.returncode != 0: - if debug_logger and log_file_path: - log_contents = "" - if log_file_path.exists(): - try: - log_contents = log_file_path.read_text(encoding="utf-8", errors="replace") - except Exception: - log_contents = "" - - debug_logger.log( - level="ERROR", - operation="downloader_n_m3u8dl_re_failed", - message=f"N_m3u8DL-RE exited with code {process.returncode}", - context={ - "returncode": process.returncode, - "track_id": getattr(track, "id", None), - "track_type": track.__class__.__name__, - "last_line": last_line, - "log_file_contents": log_contents, - }, - ) - if error_match := ERROR_RE.search(last_line): - raise ValueError(f"[N_m3u8DL-RE]: {error_match.group(1)}") - raise subprocess.CalledProcessError(process.returncode, arguments) - - if debug_logger: - output_dir_exists = output_dir.exists() - output_files = [] - if output_dir_exists: - try: - output_files = [f.name for f in output_dir.iterdir() if f.is_file()][:20] - except Exception: - output_files = [""] - - debug_logger.log( - level="DEBUG", - operation="downloader_n_m3u8dl_re_complete", - message="N_m3u8DL-RE download completed successfully", - context={ - "track_id": getattr(track, "id", None), - "track_type": track.__class__.__name__, - "output_dir": str(output_dir), - "output_dir_exists": output_dir_exists, - "output_files_count": len(output_files), - "output_files": output_files, - "filename": filename, - }, - ) - - # Warn if no output was produced - include N_m3u8DL-RE's logs for diagnosis - if not output_dir_exists or not output_files: - # Read N_m3u8DL-RE's log file for debugging - n_m3u8dl_log = "" - if log_file_path and log_file_path.exists(): - try: - n_m3u8dl_log = log_file_path.read_text(encoding="utf-8", errors="replace") - except Exception: - n_m3u8dl_log = "" - - debug_logger.log( - level="WARNING", - operation="downloader_n_m3u8dl_re_no_output", - message="N_m3u8DL-RE exited successfully but produced no output files", - context={ - "track_id": getattr(track, "id", None), - "track_type": track.__class__.__name__, - "output_dir": str(output_dir), - "output_dir_exists": output_dir_exists, - "selection_args": selection_args, - "track_url": track.url[:200] + "..." if len(track.url) > 200 else track.url, - "n_m3u8dl_re_log": n_m3u8dl_log, - }, - ) - - except ConnectionResetError: - # interrupted while passing URI to download - raise KeyboardInterrupt() - except KeyboardInterrupt: - DOWNLOAD_CANCELLED.set() # skip pending track downloads - yield {"downloaded": "[yellow]CANCELLED"} - raise - except Exception as e: - DOWNLOAD_CANCELLED.set() # skip pending track downloads - yield {"downloaded": "[red]FAILED"} - if debug_logger and log_file_path and not isinstance(e, (subprocess.CalledProcessError, ValueError)): - log_contents = "" - if log_file_path.exists(): - try: - log_contents = log_file_path.read_text(encoding="utf-8", errors="replace") - except Exception: - log_contents = "" - - debug_logger.log( - level="ERROR", - operation="downloader_n_m3u8dl_re_exception", - message=f"Unexpected error during N_m3u8DL-RE download: {e}", - error=e, - context={ - "track_id": getattr(track, "id", None), - "track_type": track.__class__.__name__, - "log_file_contents": log_contents, - }, - ) - raise - finally: - # Clean up temporary debug files - if log_file_path and log_file_path.exists(): - try: - log_file_path.unlink() - except Exception: - pass - - -def n_m3u8dl_re( - urls: str | list[str] | dict[str, Any] | list[dict[str, Any]], - track: Any, - output_dir: Path, - filename: str, - headers: MutableMapping[str, str | bytes] | None = None, - cookies: MutableMapping[str, str] | CookieJar | None = None, - proxy: str | None = None, - max_workers: int | None = None, - content_keys: dict[str, Any] | None = None, - skip_merge: bool | None = False, -) -> Generator[dict[str, Any], None, None]: - """ - Download files using N_m3u8DL-RE. - https://github.com/nilaoda/N_m3u8DL-RE - - Yields the following download status updates while chunks are downloading: - - - {total: 100} (100% download total) - - {completed: 1} (1% download progress out of 100%) - - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) - - The data is in the same format accepted by rich's progress.update() function. - - Parameters: - urls: Web URL(s) to file(s) to download. NOTE: This parameter is ignored for now. - track: The track to download. Used to get track attributes for the selection - process. Note that Track.Descriptor.URL is not supported by N_m3u8DL-RE. - output_dir: The folder to save the file into. If the save path's directory does - not exist then it will be made automatically. - filename: The filename or filename template to use for each file. - headers: A mapping of HTTP Header Key/Values to use for all downloads. - cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads. - proxy: A proxy to use for all downloads. - max_workers: The maximum amount of threads to use for downloads. Defaults to - min(32,(cpu_count+4)). Can be set in config with --thread-count option. - content_keys: The content keys to use for decryption. - skip_merge: Whether to skip merging the downloaded chunks. - """ - - yield from download( - urls=urls, - track=track, - output_dir=output_dir, - filename=filename, - headers=headers, - cookies=cookies, - proxy=proxy, - max_workers=max_workers, - content_keys=content_keys, - skip_merge=skip_merge, - ) - - -__all__ = ("n_m3u8dl_re",) diff --git a/unshackle/core/downloaders/requests.py b/unshackle/core/downloaders/requests.py index 0cb6b4e..fc9c792 100644 --- a/unshackle/core/downloaders/requests.py +++ b/unshackle/core/downloaders/requests.py @@ -16,19 +16,36 @@ from unshackle.core.utilities import get_debug_logger, get_extension MAX_ATTEMPTS = 5 RETRY_WAIT = 2 -CHUNK_SIZE = 1024 -PROGRESS_WINDOW = 5 +PROGRESS_WINDOW = 2 + +# Adaptive chunk sizing — benchmarked optimal range +MIN_CHUNK = 524_288 # 512KB +MAX_CHUNK = 4_194_304 # 4MB +DEFAULT_CHUNK = 524_288 # 512KB +SPEED_ROLLING_WINDOW = 10 # seconds of history to keep for speed calculation + + +def _adaptive_chunk_size(content_length: int) -> int: + """Pick chunk size based on content length. Benchmarked sweet spot: 512KB-4MB.""" + if content_length <= 0: + return DEFAULT_CHUNK + return min(MAX_CHUNK, max(MIN_CHUNK, content_length // 4)) -DOWNLOAD_SIZES = [] -LAST_SPEED_REFRESH = time.time() def download( - url: str, save_path: Path, session: Optional[Session] = None, segmented: bool = False, **kwargs: Any + url: str, + save_path: Path, + session: Optional[Any] = None, + segmented: bool = False, + _speed_tracker: Optional[dict] = None, + **kwargs: Any, ) -> Generator[dict[str, Any], None, None]: """ - Download a file using Python Requests. - https://requests.readthedocs.io + Download a file with optimized I/O. + + Supports both requests.Session and curl_cffi CurlSession for TLS fingerprinting. + Uses adaptive chunk sizing with buffered writes for maximum throughput. Yields the following download status updates while chunks are downloading: @@ -38,74 +55,68 @@ def download( - {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s) - {file_downloaded: Path(...), written: 1024} (download finished, has the save path and size) - The data is in the same format accepted by rich's progress.update() function. The - `downloaded` key is custom and is not natively accepted by all rich progress bars. - Parameters: url: Web URL of a file to download. save_path: The path to save the file to. If the save path's directory does not exist then it will be made automatically. - session: The Requests Session to make HTTP requests with. Useful to set Header, - Cookie, and Proxy data. Connections are saved and re-used with the session - so long as the server keeps the connection alive. + session: A requests.Session or curl_cffi CurlSession to make HTTP requests with. + CurlSession preserves TLS fingerprinting for services that need it. segmented: If downloads are segments or parts of one bigger file. + _speed_tracker: Shared speed tracking state for this download batch (per-call, not global). kwargs: Any extra keyword arguments to pass to the session.get() call. Use this for one-time request changes like a header, cookie, or proxy. For example, to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`. """ - global LAST_SPEED_REFRESH - session = session or Session() + # Per-call speed tracking (shared across threads within one requests() call) + if _speed_tracker is None: + _speed_tracker = {"sizes": [], "last_refresh": time.time()} + save_dir = save_path.parent control_file = save_path.with_name(f"{save_path.name}.!dev") save_dir.mkdir(parents=True, exist_ok=True) if control_file.exists(): - # consider the file corrupt if the control file exists save_path.unlink(missing_ok=True) control_file.unlink() elif save_path.exists(): - # if it exists, and no control file, then it should be safe yield dict(file_downloaded=save_path, written=save_path.stat().st_size) - # TODO: This should return, potential recovery bug - # TODO: Design a control file format so we know how much of the file is missing control_file.write_bytes(b"") attempts = 1 try: while True: written = 0 - - # these are for single-url speed calcs only - download_sizes = [] + download_sizes: list[int] = [] last_speed_refresh = time.time() try: stream = session.get(url, stream=True, **kwargs) stream.raise_for_status() - if not segmented: - try: - content_length = int(stream.headers.get("Content-Length", "0")) - - # Skip Content-Length validation for compressed responses since - # requests automatically decompresses but Content-Length shows compressed size - if stream.headers.get("Content-Encoding", "").lower() in ["gzip", "deflate", "br"]: - content_length = 0 - except ValueError: + # Determine content length and adaptive chunk size + try: + content_length = int(stream.headers.get("Content-Length", "0")) + if stream.headers.get("Content-Encoding", "").lower() in ["gzip", "deflate", "br"]: content_length = 0 + except ValueError: + content_length = 0 + chunk_size = _adaptive_chunk_size(content_length) + + if not segmented: if content_length > 0: - yield dict(total=math.ceil(content_length / CHUNK_SIZE)) + yield dict(total=math.ceil(content_length / chunk_size)) else: - # we have no data to calculate total chunks - yield dict(total=None) # indeterminate mode + yield dict(total=None) - with open(save_path, "wb") as f: - for chunk in stream.iter_content(chunk_size=CHUNK_SIZE): + # Buffered iter_content with adaptive chunk size + # Works with both requests.Session and CurlSession + with open(save_path, "wb", buffering=1_048_576) as f: + for chunk in stream.iter_content(chunk_size=chunk_size): download_size = len(chunk) f.write(chunk) written += download_size @@ -115,7 +126,7 @@ def download( now = time.time() time_since = now - last_speed_refresh download_sizes.append(download_size) - if time_since > PROGRESS_WINDOW or download_size < CHUNK_SIZE: + if time_since > PROGRESS_WINDOW or download_size < chunk_size: data_size = sum(download_sizes) download_speed = math.ceil(data_size / (time_since or 1)) yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") @@ -130,15 +141,21 @@ def download( if segmented: yield dict(advance=1) now = time.time() - time_since = now - LAST_SPEED_REFRESH - if written: # no size == skipped dl - DOWNLOAD_SIZES.append(written) - if DOWNLOAD_SIZES and time_since > PROGRESS_WINDOW: - data_size = sum(DOWNLOAD_SIZES) - download_speed = math.ceil(data_size / (time_since or 1)) + sizes = _speed_tracker["sizes"] + if written: + sizes.append((now, written)) + # Prune entries older than the rolling window + cutoff = now - SPEED_ROLLING_WINDOW + while sizes and sizes[0][0] < cutoff: + sizes.pop(0) + time_since = now - _speed_tracker["last_refresh"] + if sizes and time_since > PROGRESS_WINDOW: + window_start = sizes[0][0] + window_duration = now - window_start + data_size = sum(size for _, size in sizes) + download_speed = math.ceil(data_size / (window_duration or 1)) yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") - LAST_SPEED_REFRESH = now - DOWNLOAD_SIZES.clear() + _speed_tracker["last_refresh"] = now break except Exception as e: save_path.unlink(missing_ok=True) @@ -158,10 +175,14 @@ def requests( cookies: Optional[Union[MutableMapping[str, str], CookieJar]] = None, proxy: Optional[str] = None, max_workers: Optional[int] = None, + session: Optional[Any] = None, ) -> Generator[dict[str, Any], None, None]: """ - Download a file using Python Requests. - https://requests.readthedocs.io + Download files with optimized I/O and adaptive chunk sizing. + + Supports both requests.Session and curl_cffi CurlSession. When a CurlSession is + provided (e.g. from a service's get_session()), TLS fingerprinting is preserved + on all segment downloads. Yields the following download status updates while chunks are downloading: @@ -186,7 +207,10 @@ def requests( cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads. proxy: An optional proxy URI to route connections through for all downloads. max_workers: The maximum amount of threads to use for downloads. Defaults to - min(32,(cpu_count+4)). + min(12,(cpu_count+4)). + session: An optional requests.Session or curl_cffi CurlSession to use. If provided, + it will be used directly (preserving TLS fingerprinting). If None, a new + requests.Session with HTTPAdapter connection pooling will be created. """ if not urls: raise ValueError("urls must be provided and not empty") @@ -221,7 +245,7 @@ def requests( urls = [urls] if not max_workers: - max_workers = min(32, (os.cpu_count() or 1) + 4) + max_workers = min(16, (os.cpu_count() or 1) + 4) urls = [ dict(save_path=save_path, **url) if isinstance(url, dict) else dict(url=url, save_path=save_path) @@ -231,25 +255,28 @@ def requests( ] ] - session = Session() - session.mount("https://", HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers, pool_block=True)) - session.mount("http://", session.adapters["https://"]) - - if headers: - headers = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} - session.headers.update(headers) - if cookies: - session.cookies.update(cookies) - if proxy: - session.proxies.update({"all": proxy}) + # Use provided session or create a new optimized requests.Session + # When a session is provided (e.g., service's CurlSession), don't mutate it — + # headers/cookies/proxy are already set on it and it may be shared across tracks. + if session is None: + session = Session() + session.mount("https://", HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers, pool_block=True)) + session.mount("http://", HTTPAdapter(pool_connections=max_workers, pool_maxsize=max_workers, pool_block=True)) + if headers: + headers = {k: v for k, v in headers.items() if k.lower() != "accept-encoding"} + session.headers.update(headers) + if cookies: + session.cookies.update(cookies) + if proxy: + session.proxies.update({"all": proxy}) if debug_logger: first_url = urls[0].get("url", "") if urls else "" url_display = first_url[:200] + "..." if len(first_url) > 200 else first_url debug_logger.log( level="DEBUG", - operation="downloader_requests_start", - message="Starting requests download", + operation="downloader_start", + message="Starting download", context={ "url_count": len(urls), "first_url": url_display, @@ -257,56 +284,54 @@ def requests( "filename": filename, "max_workers": max_workers, "has_proxy": bool(proxy), + "session_type": type(session).__name__, }, ) - # If we're downloading more than one URL, treat them as "segments" for progress purposes. - # For single-URL downloads we want per-chunk progress (and the inner `download()` will yield - # a chunk-based `total`), so don't set a segment total of 1 here. segmented_batch = len(urls) > 1 if segmented_batch: yield dict(total=len(urls)) + # Per-call speed tracker — shared across threads within this call only + speed_tracker: dict[str, Any] = {"sizes": [], "last_refresh": time.time()} + try: with ThreadPoolExecutor(max_workers=max_workers) as pool: for future in as_completed( - pool.submit(download, session=session, segmented=segmented_batch, **url) for url in urls + pool.submit(download, session=session, segmented=segmented_batch, _speed_tracker=speed_tracker, **url) + for url in urls ): try: yield from future.result() except KeyboardInterrupt: - DOWNLOAD_CANCELLED.set() # skip pending track downloads + DOWNLOAD_CANCELLED.set() yield dict(downloaded="[yellow]CANCELLING") pool.shutdown(wait=True, cancel_futures=True) yield dict(downloaded="[yellow]CANCELLED") - # tell dl that it was cancelled - # the pool is already shut down, so exiting loop is fine raise except Exception as e: - DOWNLOAD_CANCELLED.set() # skip pending track downloads + DOWNLOAD_CANCELLED.set() yield dict(downloaded="[red]FAILING") pool.shutdown(wait=True, cancel_futures=True) yield dict(downloaded="[red]FAILED") if debug_logger: debug_logger.log( level="ERROR", - operation="downloader_requests_failed", - message=f"Requests download failed: {e}", + operation="downloader_failed", + message=f"Download failed: {e}", error=e, context={ "url_count": len(urls), "output_dir": str(output_dir), }, ) - # tell dl that it failed - # the pool is already shut down, so exiting loop is fine raise if debug_logger: debug_logger.log( level="DEBUG", - operation="downloader_requests_complete", - message="Requests download completed successfully", + operation="downloader_complete", + message="Download completed successfully", context={ "url_count": len(urls), "output_dir": str(output_dir), @@ -314,7 +339,7 @@ def requests( }, ) finally: - DOWNLOAD_SIZES.clear() + speed_tracker["sizes"].clear() __all__ = ("requests",) diff --git a/unshackle/core/manifests/dash.py b/unshackle/core/manifests/dash.py index 92052fd..fe338c0 100644 --- a/unshackle/core/manifests/dash.py +++ b/unshackle/core/manifests/dash.py @@ -7,7 +7,7 @@ import math import re import shutil import sys -from copy import copy, deepcopy +from copy import copy from functools import partial from pathlib import Path from typing import Any, Callable, Optional, Union @@ -18,7 +18,6 @@ from zlib import crc32 import requests from curl_cffi.requests import Session as CurlSession from langcodes import Language, tag_is_valid -from lxml import etree from lxml.etree import Element, ElementTree from pyplayready.system.pssh import PSSH as PR_PSSH from pywidevine.cdm import Cdm as WidevineCdm @@ -27,7 +26,6 @@ from requests import Session from unshackle.core.cdm.detect import is_playready_cdm from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack -from unshackle.core.downloaders import requests as requests_downloader from unshackle.core.drm import DRM_T, PlayReady, Widevine from unshackle.core.events import events from unshackle.core.tracks import Audio, Subtitle, Tracks, Video @@ -543,10 +541,6 @@ class DASH: progress(total=len(segments)) downloader = track.downloader - if downloader.__name__ == "aria2c" and any(bytes_range is not None for url, bytes_range in segments): - # aria2(c) is shit and doesn't support the Range header, fallback to the requests downloader - downloader = requests_downloader - log.warning("Falling back to the requests downloader as aria2(c) doesn't support the Range header") downloader_args = dict( urls=[ @@ -559,39 +553,9 @@ class DASH: cookies=session.cookies, proxy=proxy, max_workers=max_workers, + session=session, ) - skip_merge = False - if downloader.__name__ == "n_m3u8dl_re": - skip_merge = True - - # When periods were filtered out during to_tracks(), n_m3u8dl_re will re-parse - # the raw MPD and download ALL periods (including ads/pre-rolls). Write a filtered - # MPD with the rejected periods removed so n_m3u8dl_re downloads the correct content. - filtered_period_ids = track.data.get("dash", {}).get("filtered_period_ids", []) - if filtered_period_ids: - filtered_manifest = deepcopy(manifest) - for child in list(filtered_manifest): - if not hasattr(child.tag, "find"): - continue - if child.tag == "Period" and child.get("id") in filtered_period_ids: - filtered_manifest.remove(child) - - filtered_mpd_path = save_dir / f".{track.id}_filtered.mpd" - filtered_mpd_path.parent.mkdir(parents=True, exist_ok=True) - etree.ElementTree(filtered_manifest).write( - str(filtered_mpd_path), xml_declaration=True, encoding="utf-8" - ) - track.from_file = filtered_mpd_path - - downloader_args.update( - { - "filename": track.id, - "track": track, - "content_keys": drm.content_keys if drm else None, - } - ) - debug_logger = get_debug_logger() if debug_logger: debug_logger.log( @@ -602,10 +566,8 @@ class DASH: "track_id": getattr(track, "id", None), "track_type": track.__class__.__name__, "total_segments": len(segments), - "downloader": downloader.__name__, "has_drm": bool(track.drm), "drm_types": [drm.__class__.__name__ for drm in (track.drm or [])], - "skip_merge": skip_merge, "save_path": str(save_path), "has_init_data": bool(init_data), }, @@ -621,15 +583,6 @@ class DASH: status_update["downloaded"] = f"DASH {downloaded}" progress(**status_update) - # Clean up filtered MPD temp file before enumerating segments - filtered_mpd_path = save_dir / f".{track.id}_filtered.mpd" - if filtered_mpd_path.exists(): - filtered_mpd_path.unlink() - - # see https://github.com/devine-dl/devine/issues/71 - for control_file in save_dir.glob("*.aria2__temp"): - control_file.unlink() - # Verify output directory exists and contains files if not save_dir.exists(): error_msg = f"Output directory does not exist: {save_dir}" @@ -643,9 +596,8 @@ class DASH: "track_type": track.__class__.__name__, "save_dir": str(save_dir), "save_path": str(save_path), - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) raise FileNotFoundError(error_msg) @@ -663,9 +615,8 @@ class DASH: "save_dir_exists": save_dir.exists(), "segments_found": len(segments_to_merge), "segment_files": [f.name for f in segments_to_merge[:10]], # Limit to first 10 - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) if not segments_to_merge: @@ -682,48 +633,39 @@ class DASH: "track_type": track.__class__.__name__, "save_dir": str(save_dir), "directory_contents": [str(p) for p in all_contents], - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) raise FileNotFoundError(error_msg) - if skip_merge: - # N_m3u8DL-RE handles merging and decryption internally - shutil.move(segments_to_merge[0], save_path) - if drm: - track.drm = None - events.emit(events.Types.TRACK_DECRYPTED, track=track, drm=drm, segment=None) - else: - with open(save_path, "wb") as f: - if init_data: - f.write(init_data) - if len(segments_to_merge) > 1: - progress(downloaded="Merging", completed=0, total=len(segments_to_merge)) - for segment_file in segments_to_merge: - segment_data = segment_file.read_bytes() - # TODO: fix encoding after decryption? - if ( - not drm - and isinstance(track, Subtitle) - and track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML) - ): - segment_data = try_ensure_utf8(segment_data) - segment_data = ( - segment_data.decode("utf8") - .replace("‎", html.unescape("‎")) - .replace("‏", html.unescape("‏")) - .encode("utf8") - ) - f.write(segment_data) - f.flush() - segment_file.unlink() - progress(advance=1) + with open(save_path, "wb") as f: + if init_data: + f.write(init_data) + if len(segments_to_merge) > 1: + progress(downloaded="Merging", completed=0, total=len(segments_to_merge)) + for segment_file in segments_to_merge: + segment_data = segment_file.read_bytes() + if ( + not drm + and isinstance(track, Subtitle) + and track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML) + ): + segment_data = try_ensure_utf8(segment_data) + segment_data = ( + segment_data.decode("utf8") + .replace("‎", html.unescape("‎")) + .replace("‏", html.unescape("‏")) + .encode("utf8") + ) + f.write(segment_data) + f.flush() + segment_file.unlink() + progress(advance=1) track.path = save_path events.emit(events.Types.TRACK_DOWNLOADED, track=track) - if not skip_merge and drm: + if drm: progress(downloaded="Decrypting", completed=0, total=100) drm.decrypt(save_path) track.drm = None diff --git a/unshackle/core/manifests/hls.py b/unshackle/core/manifests/hls.py index 2904c79..7c6ee63 100644 --- a/unshackle/core/manifests/hls.py +++ b/unshackle/core/manifests/hls.py @@ -30,7 +30,6 @@ from requests import Session from unshackle.core import binaries from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack -from unshackle.core.downloaders import requests as requests_downloader from unshackle.core.drm import DRM_T, ClearKey, MonaLisa, PlayReady, Widevine from unshackle.core.events import events from unshackle.core.tracks import Audio, Subtitle, Tracks, Video @@ -391,9 +390,6 @@ class HLS: progress(total=total_segments) downloader = track.downloader - if downloader.__name__ == "aria2c" and any(x.byterange for x in master.segments if x not in unwanted_segments): - downloader = requests_downloader - log.warning("Falling back to the requests downloader as aria2(c) doesn't support the Range header") urls: list[dict[str, Any]] = [] segment_durations: list[int] = [] @@ -422,7 +418,6 @@ class HLS: segment_save_dir = save_dir / "segments" - skip_merge = False downloader_args = dict( urls=urls, output_dir=segment_save_dir, @@ -431,22 +426,9 @@ class HLS: cookies=session.cookies, proxy=proxy, max_workers=max_workers, + session=session, ) - if downloader.__name__ == "n_m3u8dl_re": - skip_merge = True - # session_drm already has correct content_keys from initial licensing above - n_m3u8dl_content_keys = session_drm.content_keys if session_drm else None - - downloader_args.update( - { - "output_dir": save_dir, - "filename": track.id, - "track": track, - "content_keys": n_m3u8dl_content_keys, - } - ) - debug_logger = get_debug_logger() if debug_logger: debug_logger.log( @@ -457,10 +439,8 @@ class HLS: "track_id": getattr(track, "id", None), "track_type": track.__class__.__name__, "total_segments": total_segments, - "downloader": downloader.__name__, "has_drm": bool(session_drm), "drm_type": session_drm.__class__.__name__ if session_drm else None, - "skip_merge": skip_merge, "save_path": str(save_path), }, ) @@ -475,17 +455,6 @@ class HLS: status_update["downloaded"] = f"HLS {downloaded}" progress(**status_update) - # see https://github.com/devine-dl/devine/issues/71 - for control_file in segment_save_dir.glob("*.aria2__temp"): - control_file.unlink() - - if skip_merge: - final_save_path = HLS._finalize_n_m3u8dl_re_output(track=track, save_dir=save_dir, save_path=save_path) - progress(downloaded="Downloaded") - track.path = final_save_path - events.emit(events.Types.TRACK_DOWNLOADED, track=track) - return - progress(total=total_segments, completed=0, downloaded="Merging") name_len = len(str(total_segments)) @@ -736,9 +705,8 @@ class HLS: "save_dir_exists": save_dir.exists(), "segments_found": len(segments_to_merge), "segment_files": [f.name for f in segments_to_merge[:10]], # Limit to first 10 - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) if not segments_to_merge: @@ -755,9 +723,8 @@ class HLS: "save_dir": str(save_dir), "save_dir_exists": save_dir.exists(), "directory_contents": [str(p) for p in all_contents], - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) raise FileNotFoundError(error_msg) diff --git a/unshackle/core/manifests/ism.py b/unshackle/core/manifests/ism.py index 816e2bd..7f4d4a9 100644 --- a/unshackle/core/manifests/ism.py +++ b/unshackle/core/manifests/ism.py @@ -3,7 +3,6 @@ from __future__ import annotations import base64 import hashlib import html -import shutil import urllib.parse from functools import partial from pathlib import Path @@ -269,7 +268,6 @@ class ISM: progress(total=len(segments)) downloader = track.downloader - skip_merge = False downloader_args = dict( urls=[{"url": url} for url in segments], output_dir=save_dir, @@ -278,18 +276,9 @@ class ISM: cookies=session.cookies, proxy=proxy, max_workers=max_workers, + session=session, ) - if downloader.__name__ == "n_m3u8dl_re": - skip_merge = True - downloader_args.update( - { - "filename": track.id, - "track": track, - "content_keys": session_drm.content_keys if session_drm else None, - } - ) - debug_logger = get_debug_logger() if debug_logger: debug_logger.log( @@ -300,11 +289,10 @@ class ISM: "track_id": getattr(track, "id", None), "track_type": track.__class__.__name__, "total_segments": len(segments), - "downloader": downloader.__name__, + "downloader": "requests", "has_drm": bool(session_drm), "drm_type": session_drm.__class__.__name__ if session_drm else None, - "skip_merge": skip_merge, - "save_path": str(save_path), + "save_path": str(save_path), }, ) @@ -318,9 +306,6 @@ class ISM: status_update["downloaded"] = f"ISM {downloaded}" progress(**status_update) - for control_file in save_dir.glob("*.aria2__temp"): - control_file.unlink() - # Verify output directory exists and contains files if not save_dir.exists(): error_msg = f"Output directory does not exist: {save_dir}" @@ -334,9 +319,8 @@ class ISM: "track_type": track.__class__.__name__, "save_dir": str(save_dir), "save_path": str(save_path), - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) raise FileNotFoundError(error_msg) @@ -354,9 +338,8 @@ class ISM: "save_dir_exists": save_dir.exists(), "segments_found": len(segments_to_merge), "segment_files": [f.name for f in segments_to_merge[:10]], # Limit to first 10 - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) if not segments_to_merge: @@ -372,39 +355,35 @@ class ISM: "track_type": track.__class__.__name__, "save_dir": str(save_dir), "directory_contents": [str(p) for p in all_contents], - "downloader": downloader.__name__, - "skip_merge": skip_merge, - }, + "downloader": "requests", + }, ) raise FileNotFoundError(error_msg) - if skip_merge: - shutil.move(segments_to_merge[0], save_path) - else: - with open(save_path, "wb") as f: - for segment_file in segments_to_merge: - segment_data = segment_file.read_bytes() - if ( - not session_drm - and isinstance(track, Subtitle) - and track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML) - ): - segment_data = try_ensure_utf8(segment_data) - segment_data = ( - segment_data.decode("utf8") - .replace("‎", html.unescape("‎")) - .replace("‏", html.unescape("‏")) - .encode("utf8") - ) - f.write(segment_data) - f.flush() - segment_file.unlink() - progress(advance=1) + with open(save_path, "wb") as f: + for segment_file in segments_to_merge: + segment_data = segment_file.read_bytes() + if ( + not session_drm + and isinstance(track, Subtitle) + and track.codec not in (Subtitle.Codec.fVTT, Subtitle.Codec.fTTML) + ): + segment_data = try_ensure_utf8(segment_data) + segment_data = ( + segment_data.decode("utf8") + .replace("‎", html.unescape("‎")) + .replace("‏", html.unescape("‏")) + .encode("utf8") + ) + f.write(segment_data) + f.flush() + segment_file.unlink() + progress(advance=1) track.path = save_path events.emit(events.Types.TRACK_DOWNLOADED, track=track) - if not skip_merge and session_drm: + if session_drm: progress(downloaded="Decrypting", completed=0, total=100) session_drm.decrypt(save_path) track.drm = None diff --git a/unshackle/core/tracks/track.py b/unshackle/core/tracks/track.py index 6031dc2..d73b528 100644 --- a/unshackle/core/tracks/track.py +++ b/unshackle/core/tracks/track.py @@ -21,7 +21,7 @@ from unshackle.core import binaries from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm from unshackle.core.config import config from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY -from unshackle.core.downloaders import aria2c, curl_impersonate, n_m3u8dl_re, requests +from unshackle.core.downloaders import requests from unshackle.core.drm import DRM_T, PlayReady, Widevine from unshackle.core.events import events from unshackle.core.utilities import get_boxes, try_ensure_utf8 @@ -88,12 +88,7 @@ class Track: raise TypeError(f"Expected drm to be an iterable, not {type(drm)}") if downloader is None: - downloader = { - "aria2c": aria2c, - "curl_impersonate": curl_impersonate, - "requests": requests, - "n_m3u8dl_re": n_m3u8dl_re, - }[config.downloader] + downloader = requests self.path: Optional[Path] = None self.url = url @@ -211,23 +206,13 @@ class Track: if track_type == "Subtitle": save_path = save_path.with_suffix(f".{self.codec.extension}") - if self.downloader.__name__ == "n_m3u8dl_re" and ( - self.descriptor == self.Descriptor.URL - or track_type in ("Subtitle", "Attachment") - ): - self.downloader = requests - if self.descriptor != self.Descriptor.URL: save_dir = save_path.with_name(save_path.name + "_segments") else: save_dir = save_path.parent def cleanup(): - # track file (e.g., "foo.mp4") save_path.unlink(missing_ok=True) - # aria2c control file (e.g., "foo.mp4.aria2" or "foo.mp4.aria2__temp") - save_path.with_suffix(f"{save_path.suffix}.aria2").unlink(missing_ok=True) - save_path.with_suffix(f"{save_path.suffix}.aria2__temp").unlink(missing_ok=True) if save_dir.exists() and save_dir.name.endswith("_segments"): shutil.rmtree(save_dir) @@ -328,10 +313,6 @@ class Track: if DOWNLOAD_LICENCE_ONLY.is_set(): progress(downloaded="[yellow]SKIPPED") - elif track_type != "Subtitle" and self.downloader.__name__ == "n_m3u8dl_re": - progress(downloaded="[red]FAILED") - error = f"[N_m3u8DL-RE]: {self.descriptor} is currently not supported" - raise ValueError(error) else: for status_update in self.downloader( urls=self.url, @@ -341,6 +322,7 @@ class Track: cookies=session.cookies, proxy=proxy, max_workers=max_workers, + session=session, ): file_downloaded = status_update.get("file_downloaded") if not file_downloaded: