From c7d4a68cbff84e0e8319235444aed6ea2a741114 Mon Sep 17 00:00:00 2001 From: Andy Date: Sat, 7 Feb 2026 19:04:39 -0700 Subject: [PATCH] fix(aria2c): warn on config mismatch and wait for RPC ready When ensure_started() is called while aria2c is already running, it now compares the requested proxy/max_workers against the values the process was started with and logs a warning if they differ (since the running process cannot be reconfigured in-place). Startup no longer uses a fixed sleep; instead it probes the JSON-RPC endpoint with a bounded retry loop (aria2.getVersion) and only proceeds once RPC is responsive, terminating the subprocess and raising on timeout. --- unshackle/core/downloaders/aria2c.py | 75 +++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 8 deletions(-) diff --git a/unshackle/core/downloaders/aria2c.py b/unshackle/core/downloaders/aria2c.py index af7d34f..c4125a7 100644 --- a/unshackle/core/downloaders/aria2c.py +++ b/unshackle/core/downloaders/aria2c.py @@ -1,3 +1,4 @@ +import logging import os import subprocess import textwrap @@ -54,11 +55,13 @@ class _Aria2Manager: """Singleton manager to run one aria2c process and enqueue downloads via RPC.""" def __init__(self) -> None: + self._logger = logging.getLogger(__name__) self._proc: Optional[subprocess.Popen] = None self._rpc_port: Optional[int] = None self._rpc_secret: Optional[str] = None self._rpc_uri: Optional[str] = None self._session: Session = Session() + self._max_workers: Optional[int] = None self._max_concurrent_downloads: int = 0 self._max_connection_per_server: int = 1 self._split_default: int = 5 @@ -66,6 +69,47 @@ class _Aria2Manager: self._proxy: Optional[str] = None self._lock: threading.Lock = threading.Lock() + def _wait_for_rpc_ready(self, timeout_s: float = 8.0, interval_s: float = 0.1) -> None: + assert self._proc is not None + assert self._rpc_uri is not None + assert self._rpc_secret is not None + + deadline = time.monotonic() + timeout_s + + payload = { + "jsonrpc": "2.0", + "id": get_random_bytes(16).hex(), + "method": "aria2.getVersion", + "params": [f"token:{self._rpc_secret}"], + } + + while time.monotonic() < deadline: + if self._proc.poll() is not None: + raise RuntimeError( + f"aria2c exited before RPC became ready (exit code {self._proc.returncode})" + ) + try: + res = self._session.post(self._rpc_uri, json=payload, timeout=0.25) + data = res.json() + if isinstance(data, dict) and data.get("result") is not None: + return + except (requests.exceptions.RequestException, ValueError): + # Not ready yet (connection refused / bad response / etc.) + pass + time.sleep(interval_s) + + # Timed out: ensure we don't leave a zombie/stray aria2c process behind. + try: + self._proc.terminate() + self._proc.wait(timeout=2) + except Exception: + try: + self._proc.kill() + self._proc.wait(timeout=2) + except Exception: + pass + raise TimeoutError(f"aria2c RPC did not become ready within {timeout_s:.1f}s") + def _build_args(self) -> list[str]: args = [ "--continue=true", @@ -95,9 +139,6 @@ class _Aria2Manager: max_workers: Optional[int], ) -> None: with self._lock: - if self._proc and self._proc.poll() is None: - return - if not binaries.Aria2: debug_logger = get_debug_logger() if debug_logger: @@ -109,27 +150,45 @@ class _Aria2Manager: ) raise EnvironmentError("Aria2c executable not found...") + effective_proxy = proxy or None + if not max_workers: - max_workers = min(32, (os.cpu_count() or 1) + 4) + effective_max_workers = min(32, (os.cpu_count() or 1) + 4) elif not isinstance(max_workers, int): raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}") + else: + effective_max_workers = max_workers + + if self._proc and self._proc.poll() is None: + if effective_proxy != self._proxy or effective_max_workers != self._max_workers: + self._logger.warning( + "aria2c process is already running; requested proxy=%r, max_workers=%r, " + "but running process will continue with proxy=%r, max_workers=%r", + effective_proxy, + effective_max_workers, + self._proxy, + self._max_workers, + ) + return self._rpc_port = get_free_port() self._rpc_secret = get_random_bytes(16).hex() self._rpc_uri = f"http://127.0.0.1:{self._rpc_port}/jsonrpc" - self._max_concurrent_downloads = int(config.aria2c.get("max_concurrent_downloads", max_workers)) + self._max_workers = effective_max_workers + self._max_concurrent_downloads = int( + config.aria2c.get("max_concurrent_downloads", effective_max_workers) + ) self._max_connection_per_server = int(config.aria2c.get("max_connection_per_server", 1)) self._split_default = int(config.aria2c.get("split", 5)) self._file_allocation = config.aria2c.get("file_allocation", "prealloc") - self._proxy = proxy or None + self._proxy = effective_proxy args = self._build_args() self._proc = subprocess.Popen( [binaries.Aria2, *args], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) - # Give aria2c a moment to start up and bind to the RPC port - time.sleep(0.5) + self._wait_for_rpc_ready() @property def rpc_uri(self) -> str: