perf(downloader): parallel byte-range fetch for single-URL tracks

Single-URL tracks (no DASH/HLS/ISM manifest) previously streamed sequentially over one TCP connection, capping throughput at per-flow CDN shaping limits. Probe ranges via a 1-byte GET; if supported and total size >= 64MB, split the byte range across N workers (capped by --workers) writing to a pre-allocated file at offsets. Each worker delegates to download() in part mode for shared retry/Range-resume semantics. ~2-3x speedup observed on shaped CDN edges.
This commit is contained in:
imSp4rky
2026-04-29 23:34:56 -06:00
parent b3a8a531e6
commit 605b46f723

View File

@@ -25,6 +25,9 @@ MAX_CHUNK = 4_194_304 # 4MB
DEFAULT_CHUNK = 524_288 # 512KB DEFAULT_CHUNK = 524_288 # 512KB
SPEED_ROLLING_WINDOW = 10 # seconds of history to keep for speed calculation SPEED_ROLLING_WINDOW = 10 # seconds of history to keep for speed calculation
RANGE_PARALLEL_MIN_SIZE = 64 * 1024 * 1024
RANGE_PARALLEL_PART_SIZE = 16 * 1024 * 1024
def _adaptive_chunk_size(content_length: int) -> int: def _adaptive_chunk_size(content_length: int) -> int:
"""Pick chunk size based on content length. Benchmarked sweet spot: 512KB-4MB.""" """Pick chunk size based on content length. Benchmarked sweet spot: 512KB-4MB."""
@@ -45,11 +48,137 @@ def _is_rnet_session(session: Any) -> bool:
return isinstance(session, RnetSession) return isinstance(session, RnetSession)
def _probe_ranged(url: str, session: Any, **kwargs: Any) -> tuple[int, bool]:
headers = {**(kwargs.get("headers") or {}), "Range": "bytes=0-0"}
rest = {k: v for k, v in kwargs.items() if k != "headers"}
try:
resp = session.get(url, stream=True, headers=headers, **rest)
except Exception:
return 0, False
try:
if resp.status_code != 206:
return 0, False
ce = (resp.headers.get("Content-Encoding") or resp.headers.get("content-encoding") or "").lower()
if ce in ("gzip", "deflate", "br"):
return 0, False
content_range = resp.headers.get("Content-Range") or resp.headers.get("content-range") or ""
total = content_range.rsplit("/", 1)[-1].strip()
return (int(total), True) if total.isdigit() else (0, False)
finally:
try:
resp.close()
except Exception:
pass
def _dispatch_parts(
url: str,
save_path: Path,
session: Any,
total_size: int,
max_workers: int,
**kwargs: Any,
) -> Generator[dict[str, Any], None, None]:
save_path.parent.mkdir(parents=True, exist_ok=True)
control_file = save_path.with_name(f"{save_path.name}.!dev")
n_parts = max(1, min(max_workers, math.ceil(total_size / RANGE_PARALLEL_PART_SIZE)))
part_size = math.ceil(total_size / n_parts)
parts = [
(s, e)
for i in range(n_parts)
for s, e in [(i * part_size, min(total_size - 1, (i + 1) * part_size - 1))]
if s <= e
]
control_file.write_bytes(b"")
with open(save_path, "wb") as f:
f.truncate(total_size)
events: Queue[dict[str, Any]] = Queue()
def _worker(start: int, end: int) -> None:
for ev in download(
url=url,
save_path=save_path,
session=session,
part_offset=start,
part_end=end,
**kwargs,
):
events.put(ev)
pool = ThreadPoolExecutor(max_workers=len(parts))
futures = [pool.submit(_worker, s, e) for s, e in parts]
pending = set(futures)
yield {"total": total_size}
total_bytes = 0
start_time = last_report = time.time()
completed = False
worker_error = False
try:
while pending:
advance = 0
while not events.empty():
try:
ev = events.get_nowait()
except Empty:
break
a = ev.get("advance")
if a:
advance += a
if advance:
total_bytes += advance
yield {"advance": advance}
now = time.time()
if now - last_report > 0.5 and total_bytes > 0:
yield {"downloaded": f"{filesize.decimal(math.ceil(total_bytes / (now - start_time)))}/s"}
last_report = now
done, pending = wait(pending, timeout=0.1, return_when=FIRST_COMPLETED)
for fut in done:
exc = fut.exception()
if exc:
worker_error = True
DOWNLOAD_CANCELLED.set()
raise exc
advance = 0
while not events.empty():
try:
ev = events.get_nowait()
except Empty:
break
a = ev.get("advance")
if a:
advance += a
if advance:
total_bytes += advance
yield {"advance": advance}
yield {"file_downloaded": save_path, "written": total_size}
completed = True
except KeyboardInterrupt:
DOWNLOAD_CANCELLED.set()
pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
pool.shutdown(wait=worker_error, cancel_futures=True)
if completed:
control_file.unlink(missing_ok=True)
def download( def download(
url: str, url: str,
save_path: Path, save_path: Path,
session: Optional[Any] = None, session: Optional[Any] = None,
segmented: bool = False, segmented: bool = False,
part_offset: Optional[int] = None,
part_end: Optional[int] = None,
**kwargs: Any, **kwargs: Any,
) -> Generator[dict[str, Any], None, None]: ) -> Generator[dict[str, Any], None, None]:
""" """
@@ -73,42 +202,54 @@ def download(
session: A requests.Session or RnetSession to make HTTP requests with. session: A requests.Session or RnetSession to make HTTP requests with.
RnetSession preserves TLS fingerprinting for services that need it. RnetSession preserves TLS fingerprinting for services that need it.
segmented: If downloads are segments or parts of one bigger file. segmented: If downloads are segments or parts of one bigger file.
part_offset: Byte offset to write at within a pre-allocated file. When set
(with `part_end`), enables part mode for parallel ranged downloads —
no truncate, no skip-if-exists, no control file; emits only `advance`
events; retries resume mid-part via Range.
part_end: Inclusive end byte of the part. Required when `part_offset` is set.
kwargs: Any extra keyword arguments to pass to the session.get() call. Use this kwargs: Any extra keyword arguments to pass to the session.get() call. Use this
for one-time request changes like a header, cookie, or proxy. For example, for one-time request changes like a header, cookie, or proxy. For example,
to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`. to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`.
""" """
session = session or Session() session = session or Session()
part_mode = part_offset is not None and part_end is not None
save_dir = save_path.parent save_dir = save_path.parent
control_file = save_path.with_name(f"{save_path.name}.!dev") control_file = save_path.with_name(f"{save_path.name}.!dev")
save_dir.mkdir(parents=True, exist_ok=True) save_dir.mkdir(parents=True, exist_ok=True)
resume_offset = 0 resume_offset = 0
if control_file.exists() and save_path.exists(): if not part_mode:
resume_offset = save_path.stat().st_size if control_file.exists() and save_path.exists():
elif control_file.exists(): resume_offset = save_path.stat().st_size
control_file.unlink() elif control_file.exists():
elif save_path.exists(): control_file.unlink()
yield dict(file_downloaded=save_path, written=save_path.stat().st_size) elif save_path.exists():
return yield dict(file_downloaded=save_path, written=save_path.stat().st_size)
return
control_file.write_bytes(b"")
control_file.write_bytes(b"")
_time = time.time _time = time.time
use_raw = _is_requests_session(session) use_raw = _is_requests_session(session)
attempts = 1 attempts = 1
completed = False completed = False
written = 0
try: try:
while True: while True:
written = 0 if not part_mode:
written = 0
last_speed_refresh = _time() last_speed_refresh = _time()
try: try:
use_rnet = _is_rnet_session(session) use_rnet = _is_rnet_session(session)
request_kwargs = dict(kwargs) request_kwargs = dict(kwargs)
if resume_offset > 0: if part_mode:
req_headers = dict(request_kwargs.get("headers", {}) or {})
req_headers["Range"] = f"bytes={part_offset + written}-{part_end}"
request_kwargs["headers"] = req_headers
elif resume_offset > 0:
req_headers = dict(request_kwargs.get("headers", {}) or {}) req_headers = dict(request_kwargs.get("headers", {}) or {})
req_headers["Range"] = f"bytes={resume_offset}-" req_headers["Range"] = f"bytes={resume_offset}-"
request_kwargs["headers"] = req_headers request_kwargs["headers"] = req_headers
@@ -116,9 +257,11 @@ def download(
stream = session.get(url, stream=True, **request_kwargs) stream = session.get(url, stream=True, **request_kwargs)
stream.raise_for_status() stream.raise_for_status()
resumed = resume_offset > 0 and stream.status_code == 206 resumed = (not part_mode) and resume_offset > 0 and stream.status_code == 206
if resume_offset > 0 and not resumed: if (not part_mode) and resume_offset > 0 and not resumed:
resume_offset = 0 resume_offset = 0
if part_mode and stream.status_code != 206:
raise IOError(f"expected 206 for ranged part, got {stream.status_code}")
if use_rnet: if use_rnet:
content_length = stream.content_length or 0 content_length = stream.content_length or 0
else: else:
@@ -132,7 +275,7 @@ def download(
chunk_size = _adaptive_chunk_size(content_length) chunk_size = _adaptive_chunk_size(content_length)
total_size = (resume_offset + content_length) if resumed and content_length > 0 else content_length total_size = (resume_offset + content_length) if resumed and content_length > 0 else content_length
if not segmented: if not segmented and not part_mode:
if total_size > 0: if total_size > 0:
yield dict(total=total_size) yield dict(total=total_size)
else: else:
@@ -140,9 +283,16 @@ def download(
if resumed and resume_offset > 0: if resumed and resume_offset > 0:
yield dict(advance=resume_offset) yield dict(advance=resume_offset)
file_mode = "ab" if resumed else "wb" if part_mode:
with open(save_path, file_mode, buffering=1_048_576) as f: file_mode = "r+b"
if not resumed and content_length > 0: file_buffering = 0
else:
file_mode = "ab" if resumed else "wb"
file_buffering = 1_048_576
with open(save_path, file_mode, buffering=file_buffering) as f:
if part_mode:
f.seek(part_offset + written)
elif not resumed and content_length > 0:
f.truncate(content_length) f.truncate(content_length)
f.seek(0) f.seek(0)
@@ -151,27 +301,13 @@ def download(
if use_rnet: if use_rnet:
chunks = stream.stream() chunks = stream.stream()
elif use_raw: elif use_raw:
_read = stream.raw.read chunks = iter(lambda: stream.raw.read(chunk_size), b"")
def _chunks() -> Generator[bytes, None, None]:
while True:
chunk = _read(chunk_size)
if not chunk:
break
yield chunk
stream.close()
chunks = _chunks()
else: else:
chunks = stream.iter_content(chunk_size=chunk_size)
def _chunks_iter() -> Generator[bytes, None, None]:
yield from stream.iter_content(chunk_size=chunk_size)
stream.close()
chunks = _chunks_iter()
_data_accumulated = 0 _data_accumulated = 0
_bytes_since_yield = 0 _bytes_since_yield = 0
emit_progress = (not segmented) or part_mode
for chunk in chunks: for chunk in chunks:
if DOWNLOAD_CANCELLED.is_set(): if DOWNLOAD_CANCELLED.is_set():
break break
@@ -179,7 +315,7 @@ def download(
download_size = len(chunk) download_size = len(chunk)
written += download_size written += download_size
if not segmented: if emit_progress:
_bytes_since_yield += download_size _bytes_since_yield += download_size
_data_accumulated += download_size _data_accumulated += download_size
now = _time() now = _time()
@@ -187,35 +323,51 @@ def download(
if time_since > PROGRESS_WINDOW: if time_since > PROGRESS_WINDOW:
yield dict(advance=_bytes_since_yield) yield dict(advance=_bytes_since_yield)
_bytes_since_yield = 0 _bytes_since_yield = 0
download_speed = math.ceil(_data_accumulated / (time_since or 1)) if not part_mode:
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") download_speed = math.ceil(_data_accumulated / (time_since or 1))
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
last_speed_refresh = now last_speed_refresh = now
_data_accumulated = 0 _data_accumulated = 0
if not segmented and _bytes_since_yield > 0: if emit_progress and _bytes_since_yield > 0:
yield dict(advance=_bytes_since_yield) yield dict(advance=_bytes_since_yield)
if not resumed and content_length > 0 and written != content_length: try:
stream.close()
except Exception:
pass
if not part_mode and not resumed and content_length > 0 and written != content_length:
f.truncate(written) f.truncate(written)
if not segmented and content_length and written < content_length: if part_mode:
expected = part_end - part_offset + 1
if written < expected:
raise IOError(f"Failed to read part {part_offset}-{part_end}: got {written}/{expected}")
elif not segmented and content_length and written < content_length:
raise IOError(f"Failed to read {content_length} bytes from the track URI.") raise IOError(f"Failed to read {content_length} bytes from the track URI.")
yield dict(file_downloaded=save_path, written=resume_offset + written) if not part_mode:
yield dict(file_downloaded=save_path, written=resume_offset + written)
if segmented: if segmented:
yield dict(advance=1) yield dict(advance=1)
completed = True completed = True
break break
except Exception: except Exception:
try:
stream.close()
except Exception:
pass
if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS: if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS:
if part_mode and not DOWNLOAD_CANCELLED.is_set():
raise
return return
if save_path.exists(): if not part_mode and save_path.exists():
resume_offset = save_path.stat().st_size resume_offset = save_path.stat().st_size
time.sleep(RETRY_WAIT) time.sleep(RETRY_WAIT)
attempts += 1 attempts += 1
finally: finally:
if completed: if completed and not part_mode:
control_file.unlink(missing_ok=True) control_file.unlink(missing_ok=True)
@@ -347,14 +499,38 @@ def requests(
segmented_batch = len(urls) > 1 segmented_batch = len(urls) > 1
# Fast path: single URL — no thread pool overhead
if len(urls) == 1: if len(urls) == 1:
url_item = urls[0]
try: try:
yield from download( ranged_used = False
session=session, if max_workers > 1:
segmented=segmented_batch, total_size, supports_ranges = _probe_ranged(url_item["url"], session)
**urls[0], if supports_ranges and total_size >= RANGE_PARALLEL_MIN_SIZE:
) try:
yield from _dispatch_parts(
session=session,
total_size=total_size,
max_workers=max_workers,
**url_item,
)
ranged_used = True
except KeyboardInterrupt:
raise
except Exception:
save_path = url_item.get("save_path")
if save_path:
sp = Path(save_path)
for target in (sp, sp.with_name(f"{sp.name}.!dev")):
try:
target.unlink(missing_ok=True)
except OSError:
pass
if not ranged_used:
yield from download(
session=session,
segmented=segmented_batch,
**url_item,
)
except KeyboardInterrupt: except KeyboardInterrupt:
DOWNLOAD_CANCELLED.set() DOWNLOAD_CANCELLED.set()
yield dict(downloaded="[yellow]CANCELLED") yield dict(downloaded="[yellow]CANCELLED")