perf(downloader): optimize hot loop and threading efficiency

Replace list.pop(0) with deque.popleft() for O(1) speed tracker eviction, skip urllib3 decode chain with decode_content=False on raw reads, use running total instead of sum() for progress reporting, add explicit stream.close() on CurlSession path, replace busy-poll loop with concurrent.futures.wait(FIRST_COMPLETED), skip ThreadPoolExecutor for single-URL downloads, and DRY up duplicated raw/iter_content progress logic into a unified chunk iterator.
This commit is contained in:
Andy
2026-03-23 18:17:12 -06:00
parent 006d080416
commit 6840944738

View File

@@ -1,6 +1,8 @@
import math import math
import os import os
import time import time
from collections import deque
from concurrent.futures import FIRST_COMPLETED, wait
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from http.cookiejar import CookieJar from http.cookiejar import CookieJar
from pathlib import Path from pathlib import Path
@@ -74,7 +76,7 @@ def download(
session = session or Session() session = session or Session()
if _speed_tracker is None: if _speed_tracker is None:
_speed_tracker = {"sizes": [], "last_refresh": time.time()} _speed_tracker = {"sizes": deque(), "last_refresh": time.time()}
save_dir = save_path.parent save_dir = save_path.parent
control_file = save_path.with_name(f"{save_path.name}.!dev") control_file = save_path.with_name(f"{save_path.name}.!dev")
@@ -95,7 +97,6 @@ def download(
try: try:
while True: while True:
written = 0 written = 0
download_sizes: list[int] = []
last_speed_refresh = _time() last_speed_refresh = _time()
try: try:
@@ -127,48 +128,44 @@ def download(
# Cache f.write for hot loop # Cache f.write for hot loop
_write = f.write _write = f.write
# Build chunk iterator — raw reads for requests.Session, iter_content for CurlSession
if use_raw: if use_raw:
# Raw socket read — 30-35% faster than iter_content (benchmarked) stream.raw.decode_content = False
# Safe in worker threads with Queue-based event dispatch
_read = stream.raw.read _read = stream.raw.read
while True:
chunk = _read(chunk_size)
if not chunk:
break
_write(chunk)
download_size = len(chunk)
written += download_size
if not segmented: def _chunks() -> Generator[bytes, None, None]:
yield dict(advance=1) while True:
now = _time() chunk = _read(chunk_size)
time_since = now - last_speed_refresh if not chunk:
download_sizes.append(download_size) break
if time_since > PROGRESS_WINDOW or download_size < chunk_size: yield chunk
data_size = sum(download_sizes) stream.close()
download_speed = math.ceil(data_size / (time_since or 1))
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") chunks = _chunks()
last_speed_refresh = now
download_sizes.clear()
stream.close()
else: else:
# CurlSession: use iter_content (raw not available) def _chunks_iter() -> Generator[bytes, None, None]:
for chunk in stream.iter_content(chunk_size=chunk_size): yield from stream.iter_content(chunk_size=chunk_size)
_write(chunk) stream.close()
download_size = len(chunk)
written += download_size
if not segmented: chunks = _chunks_iter()
yield dict(advance=1)
now = _time() # Unified write + progress loop
time_since = now - last_speed_refresh _data_accumulated = 0
download_sizes.append(download_size) for chunk in chunks:
if time_since > PROGRESS_WINDOW or download_size < chunk_size: _write(chunk)
data_size = sum(download_sizes) download_size = len(chunk)
download_speed = math.ceil(data_size / (time_since or 1)) written += download_size
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
last_speed_refresh = now if not segmented:
download_sizes.clear() yield dict(advance=1)
now = _time()
time_since = now - last_speed_refresh
_data_accumulated += download_size
if time_since > PROGRESS_WINDOW or download_size < chunk_size:
download_speed = math.ceil(_data_accumulated / (time_since or 1))
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
last_speed_refresh = now
_data_accumulated = 0
# Truncate to actual written size in case pre-allocation overshot # Truncate to actual written size in case pre-allocation overshot
if content_length > 0 and written != content_length: if content_length > 0 and written != content_length:
@@ -187,7 +184,7 @@ def download(
sizes.append((now, written)) sizes.append((now, written))
cutoff = now - SPEED_ROLLING_WINDOW cutoff = now - SPEED_ROLLING_WINDOW
while sizes and sizes[0][0] < cutoff: while sizes and sizes[0][0] < cutoff:
sizes.pop(0) sizes.popleft()
time_since = now - _speed_tracker["last_refresh"] time_since = now - _speed_tracker["last_refresh"]
if sizes and time_since > PROGRESS_WINDOW: if sizes and time_since > PROGRESS_WINDOW:
window_start = sizes[0][0] window_start = sizes[0][0]
@@ -338,73 +335,76 @@ def requests(
yield dict(total=len(urls)) yield dict(total=len(urls))
# Per-call speed tracker — shared across threads within this call only # Per-call speed tracker — shared across threads within this call only
speed_tracker: dict[str, Any] = {"sizes": [], "last_refresh": time.time()} speed_tracker: dict[str, Any] = {"sizes": deque(), "last_refresh": time.time()}
try: try:
with ThreadPoolExecutor(max_workers=max_workers) as pool: # Fast path: single URL — no thread pool overhead
event_queue: Queue[dict[str, Any]] = Queue() if len(urls) == 1:
yield from download(
session=session,
segmented=segmented_batch,
_speed_tracker=speed_tracker,
**urls[0],
)
else:
with ThreadPoolExecutor(max_workers=max_workers) as pool:
event_queue: Queue[dict[str, Any]] = Queue()
def _download_worker(url_item: dict[str, Any]) -> None: def _download_worker(url_item: dict[str, Any]) -> None:
for event in download( for event in download(
session=session, session=session,
segmented=segmented_batch, segmented=segmented_batch,
_speed_tracker=speed_tracker, _speed_tracker=speed_tracker,
**url_item, **url_item,
): ):
event_queue.put(event) event_queue.put(event)
futures = [pool.submit(_download_worker, url) for url in urls] futures = [pool.submit(_download_worker, url) for url in urls]
pending = set(futures) pending = set(futures)
while pending: while pending:
# Drain queued progress updates for responsive UI # Drain queued progress updates for responsive UI
while True:
try:
yield event_queue.get_nowait()
except Empty:
break
# Wait efficiently for next future completion (OS condition variable)
completed, pending = wait(pending, timeout=0.1, return_when=FIRST_COMPLETED)
for future in completed:
exc = future.exception()
if isinstance(exc, KeyboardInterrupt):
DOWNLOAD_CANCELLED.set()
yield dict(downloaded="[yellow]CANCELLING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[yellow]CANCELLED")
raise exc
elif exc:
DOWNLOAD_CANCELLED.set()
yield dict(downloaded="[red]FAILING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[red]FAILED")
if debug_logger:
debug_logger.log(
level="ERROR",
operation="downloader_failed",
message=f"Download failed: {exc}",
error=exc,
context={
"url_count": len(urls),
"output_dir": str(output_dir),
},
)
raise exc
# Drain any remaining events from workers that just finished
while True: while True:
try: try:
yield event_queue.get_nowait() yield event_queue.get_nowait()
except Empty: except Empty:
break break
done = {future for future in pending if future.done()}
for future in done:
pending.remove(future)
exc = future.exception()
if isinstance(exc, KeyboardInterrupt):
DOWNLOAD_CANCELLED.set()
yield dict(downloaded="[yellow]CANCELLING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[yellow]CANCELLED")
raise exc
elif exc:
DOWNLOAD_CANCELLED.set()
yield dict(downloaded="[red]FAILING")
pool.shutdown(wait=True, cancel_futures=True)
yield dict(downloaded="[red]FAILED")
if debug_logger:
debug_logger.log(
level="ERROR",
operation="downloader_failed",
message=f"Download failed: {exc}",
error=exc,
context={
"url_count": len(urls),
"output_dir": str(output_dir),
},
)
raise exc
if pending:
try:
yield event_queue.get(timeout=0.1)
except Empty:
pass
# Drain any remaining events from workers that just finished
while True:
try:
yield event_queue.get_nowait()
except Empty:
break
if debug_logger: if debug_logger:
debug_logger.log( debug_logger.log(
level="DEBUG", level="DEBUG",