forked from kenzuya/unshackle
fix(aria2c): warn on config mismatch and wait for RPC ready
When ensure_started() is called while aria2c is already running, it now compares the requested proxy/max_workers against the values the process was started with and logs a warning if they differ (since the running process cannot be reconfigured in-place). Startup no longer uses a fixed sleep; instead it probes the JSON-RPC endpoint with a bounded retry loop (aria2.getVersion) and only proceeds once RPC is responsive, terminating the subprocess and raising on timeout.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import textwrap
|
||||
@@ -54,11 +55,13 @@ class _Aria2Manager:
|
||||
"""Singleton manager to run one aria2c process and enqueue downloads via RPC."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._logger = logging.getLogger(__name__)
|
||||
self._proc: Optional[subprocess.Popen] = None
|
||||
self._rpc_port: Optional[int] = None
|
||||
self._rpc_secret: Optional[str] = None
|
||||
self._rpc_uri: Optional[str] = None
|
||||
self._session: Session = Session()
|
||||
self._max_workers: Optional[int] = None
|
||||
self._max_concurrent_downloads: int = 0
|
||||
self._max_connection_per_server: int = 1
|
||||
self._split_default: int = 5
|
||||
@@ -66,6 +69,47 @@ class _Aria2Manager:
|
||||
self._proxy: Optional[str] = None
|
||||
self._lock: threading.Lock = threading.Lock()
|
||||
|
||||
def _wait_for_rpc_ready(self, timeout_s: float = 8.0, interval_s: float = 0.1) -> None:
|
||||
assert self._proc is not None
|
||||
assert self._rpc_uri is not None
|
||||
assert self._rpc_secret is not None
|
||||
|
||||
deadline = time.monotonic() + timeout_s
|
||||
|
||||
payload = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": get_random_bytes(16).hex(),
|
||||
"method": "aria2.getVersion",
|
||||
"params": [f"token:{self._rpc_secret}"],
|
||||
}
|
||||
|
||||
while time.monotonic() < deadline:
|
||||
if self._proc.poll() is not None:
|
||||
raise RuntimeError(
|
||||
f"aria2c exited before RPC became ready (exit code {self._proc.returncode})"
|
||||
)
|
||||
try:
|
||||
res = self._session.post(self._rpc_uri, json=payload, timeout=0.25)
|
||||
data = res.json()
|
||||
if isinstance(data, dict) and data.get("result") is not None:
|
||||
return
|
||||
except (requests.exceptions.RequestException, ValueError):
|
||||
# Not ready yet (connection refused / bad response / etc.)
|
||||
pass
|
||||
time.sleep(interval_s)
|
||||
|
||||
# Timed out: ensure we don't leave a zombie/stray aria2c process behind.
|
||||
try:
|
||||
self._proc.terminate()
|
||||
self._proc.wait(timeout=2)
|
||||
except Exception:
|
||||
try:
|
||||
self._proc.kill()
|
||||
self._proc.wait(timeout=2)
|
||||
except Exception:
|
||||
pass
|
||||
raise TimeoutError(f"aria2c RPC did not become ready within {timeout_s:.1f}s")
|
||||
|
||||
def _build_args(self) -> list[str]:
|
||||
args = [
|
||||
"--continue=true",
|
||||
@@ -95,9 +139,6 @@ class _Aria2Manager:
|
||||
max_workers: Optional[int],
|
||||
) -> None:
|
||||
with self._lock:
|
||||
if self._proc and self._proc.poll() is None:
|
||||
return
|
||||
|
||||
if not binaries.Aria2:
|
||||
debug_logger = get_debug_logger()
|
||||
if debug_logger:
|
||||
@@ -109,27 +150,45 @@ class _Aria2Manager:
|
||||
)
|
||||
raise EnvironmentError("Aria2c executable not found...")
|
||||
|
||||
effective_proxy = proxy or None
|
||||
|
||||
if not max_workers:
|
||||
max_workers = min(32, (os.cpu_count() or 1) + 4)
|
||||
effective_max_workers = min(32, (os.cpu_count() or 1) + 4)
|
||||
elif not isinstance(max_workers, int):
|
||||
raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}")
|
||||
else:
|
||||
effective_max_workers = max_workers
|
||||
|
||||
if self._proc and self._proc.poll() is None:
|
||||
if effective_proxy != self._proxy or effective_max_workers != self._max_workers:
|
||||
self._logger.warning(
|
||||
"aria2c process is already running; requested proxy=%r, max_workers=%r, "
|
||||
"but running process will continue with proxy=%r, max_workers=%r",
|
||||
effective_proxy,
|
||||
effective_max_workers,
|
||||
self._proxy,
|
||||
self._max_workers,
|
||||
)
|
||||
return
|
||||
|
||||
self._rpc_port = get_free_port()
|
||||
self._rpc_secret = get_random_bytes(16).hex()
|
||||
self._rpc_uri = f"http://127.0.0.1:{self._rpc_port}/jsonrpc"
|
||||
|
||||
self._max_concurrent_downloads = int(config.aria2c.get("max_concurrent_downloads", max_workers))
|
||||
self._max_workers = effective_max_workers
|
||||
self._max_concurrent_downloads = int(
|
||||
config.aria2c.get("max_concurrent_downloads", effective_max_workers)
|
||||
)
|
||||
self._max_connection_per_server = int(config.aria2c.get("max_connection_per_server", 1))
|
||||
self._split_default = int(config.aria2c.get("split", 5))
|
||||
self._file_allocation = config.aria2c.get("file_allocation", "prealloc")
|
||||
self._proxy = proxy or None
|
||||
self._proxy = effective_proxy
|
||||
|
||||
args = self._build_args()
|
||||
self._proc = subprocess.Popen(
|
||||
[binaries.Aria2, *args], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||
)
|
||||
# Give aria2c a moment to start up and bind to the RPC port
|
||||
time.sleep(0.5)
|
||||
self._wait_for_rpc_ready()
|
||||
|
||||
@property
|
||||
def rpc_uri(self) -> str:
|
||||
|
||||
Reference in New Issue
Block a user