From 6840944738f56e82bde5164f843e305c0043993d Mon Sep 17 00:00:00 2001 From: Andy Date: Mon, 23 Mar 2026 18:17:12 -0600 Subject: [PATCH] perf(downloader): optimize hot loop and threading efficiency Replace list.pop(0) with deque.popleft() for O(1) speed tracker eviction, skip urllib3 decode chain with decode_content=False on raw reads, use running total instead of sum() for progress reporting, add explicit stream.close() on CurlSession path, replace busy-poll loop with concurrent.futures.wait(FIRST_COMPLETED), skip ThreadPoolExecutor for single-URL downloads, and DRY up duplicated raw/iter_content progress logic into a unified chunk iterator. --- unshackle/core/downloaders/requests.py | 192 ++++++++++++------------- 1 file changed, 96 insertions(+), 96 deletions(-) diff --git a/unshackle/core/downloaders/requests.py b/unshackle/core/downloaders/requests.py index 800b26d..74732d7 100644 --- a/unshackle/core/downloaders/requests.py +++ b/unshackle/core/downloaders/requests.py @@ -1,6 +1,8 @@ import math import os import time +from collections import deque +from concurrent.futures import FIRST_COMPLETED, wait from concurrent.futures.thread import ThreadPoolExecutor from http.cookiejar import CookieJar from pathlib import Path @@ -74,7 +76,7 @@ def download( session = session or Session() if _speed_tracker is None: - _speed_tracker = {"sizes": [], "last_refresh": time.time()} + _speed_tracker = {"sizes": deque(), "last_refresh": time.time()} save_dir = save_path.parent control_file = save_path.with_name(f"{save_path.name}.!dev") @@ -95,7 +97,6 @@ def download( try: while True: written = 0 - download_sizes: list[int] = [] last_speed_refresh = _time() try: @@ -127,48 +128,44 @@ def download( # Cache f.write for hot loop _write = f.write + # Build chunk iterator — raw reads for requests.Session, iter_content for CurlSession if use_raw: - # Raw socket read — 30-35% faster than iter_content (benchmarked) - # Safe in worker threads with Queue-based event dispatch + stream.raw.decode_content = False _read = stream.raw.read - while True: - chunk = _read(chunk_size) - if not chunk: - break - _write(chunk) - download_size = len(chunk) - written += download_size - if not segmented: - yield dict(advance=1) - now = _time() - time_since = now - last_speed_refresh - download_sizes.append(download_size) - if time_since > PROGRESS_WINDOW or download_size < chunk_size: - data_size = sum(download_sizes) - download_speed = math.ceil(data_size / (time_since or 1)) - yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() - stream.close() + def _chunks() -> Generator[bytes, None, None]: + while True: + chunk = _read(chunk_size) + if not chunk: + break + yield chunk + stream.close() + + chunks = _chunks() else: - # CurlSession: use iter_content (raw not available) - for chunk in stream.iter_content(chunk_size=chunk_size): - _write(chunk) - download_size = len(chunk) - written += download_size + def _chunks_iter() -> Generator[bytes, None, None]: + yield from stream.iter_content(chunk_size=chunk_size) + stream.close() - if not segmented: - yield dict(advance=1) - now = _time() - time_since = now - last_speed_refresh - download_sizes.append(download_size) - if time_since > PROGRESS_WINDOW or download_size < chunk_size: - data_size = sum(download_sizes) - download_speed = math.ceil(data_size / (time_since or 1)) - yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") - last_speed_refresh = now - download_sizes.clear() + chunks = _chunks_iter() + + # Unified write + progress loop + _data_accumulated = 0 + for chunk in chunks: + _write(chunk) + download_size = len(chunk) + written += download_size + + if not segmented: + yield dict(advance=1) + now = _time() + time_since = now - last_speed_refresh + _data_accumulated += download_size + if time_since > PROGRESS_WINDOW or download_size < chunk_size: + download_speed = math.ceil(_data_accumulated / (time_since or 1)) + yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") + last_speed_refresh = now + _data_accumulated = 0 # Truncate to actual written size in case pre-allocation overshot if content_length > 0 and written != content_length: @@ -187,7 +184,7 @@ def download( sizes.append((now, written)) cutoff = now - SPEED_ROLLING_WINDOW while sizes and sizes[0][0] < cutoff: - sizes.pop(0) + sizes.popleft() time_since = now - _speed_tracker["last_refresh"] if sizes and time_since > PROGRESS_WINDOW: window_start = sizes[0][0] @@ -338,73 +335,76 @@ def requests( yield dict(total=len(urls)) # Per-call speed tracker — shared across threads within this call only - speed_tracker: dict[str, Any] = {"sizes": [], "last_refresh": time.time()} + speed_tracker: dict[str, Any] = {"sizes": deque(), "last_refresh": time.time()} try: - with ThreadPoolExecutor(max_workers=max_workers) as pool: - event_queue: Queue[dict[str, Any]] = Queue() + # Fast path: single URL — no thread pool overhead + if len(urls) == 1: + yield from download( + session=session, + segmented=segmented_batch, + _speed_tracker=speed_tracker, + **urls[0], + ) + else: + with ThreadPoolExecutor(max_workers=max_workers) as pool: + event_queue: Queue[dict[str, Any]] = Queue() - def _download_worker(url_item: dict[str, Any]) -> None: - for event in download( - session=session, - segmented=segmented_batch, - _speed_tracker=speed_tracker, - **url_item, - ): - event_queue.put(event) + def _download_worker(url_item: dict[str, Any]) -> None: + for event in download( + session=session, + segmented=segmented_batch, + _speed_tracker=speed_tracker, + **url_item, + ): + event_queue.put(event) - futures = [pool.submit(_download_worker, url) for url in urls] - pending = set(futures) + futures = [pool.submit(_download_worker, url) for url in urls] + pending = set(futures) - while pending: - # Drain queued progress updates for responsive UI + while pending: + # Drain queued progress updates for responsive UI + while True: + try: + yield event_queue.get_nowait() + except Empty: + break + + # Wait efficiently for next future completion (OS condition variable) + completed, pending = wait(pending, timeout=0.1, return_when=FIRST_COMPLETED) + for future in completed: + exc = future.exception() + if isinstance(exc, KeyboardInterrupt): + DOWNLOAD_CANCELLED.set() + yield dict(downloaded="[yellow]CANCELLING") + pool.shutdown(wait=True, cancel_futures=True) + yield dict(downloaded="[yellow]CANCELLED") + raise exc + elif exc: + DOWNLOAD_CANCELLED.set() + yield dict(downloaded="[red]FAILING") + pool.shutdown(wait=True, cancel_futures=True) + yield dict(downloaded="[red]FAILED") + if debug_logger: + debug_logger.log( + level="ERROR", + operation="downloader_failed", + message=f"Download failed: {exc}", + error=exc, + context={ + "url_count": len(urls), + "output_dir": str(output_dir), + }, + ) + raise exc + + # Drain any remaining events from workers that just finished while True: try: yield event_queue.get_nowait() except Empty: break - done = {future for future in pending if future.done()} - for future in done: - pending.remove(future) - exc = future.exception() - if isinstance(exc, KeyboardInterrupt): - DOWNLOAD_CANCELLED.set() - yield dict(downloaded="[yellow]CANCELLING") - pool.shutdown(wait=True, cancel_futures=True) - yield dict(downloaded="[yellow]CANCELLED") - raise exc - elif exc: - DOWNLOAD_CANCELLED.set() - yield dict(downloaded="[red]FAILING") - pool.shutdown(wait=True, cancel_futures=True) - yield dict(downloaded="[red]FAILED") - if debug_logger: - debug_logger.log( - level="ERROR", - operation="downloader_failed", - message=f"Download failed: {exc}", - error=exc, - context={ - "url_count": len(urls), - "output_dir": str(output_dir), - }, - ) - raise exc - - if pending: - try: - yield event_queue.get(timeout=0.1) - except Empty: - pass - - # Drain any remaining events from workers that just finished - while True: - try: - yield event_queue.get_nowait() - except Empty: - break - if debug_logger: debug_logger.log( level="DEBUG",