From 605b46f723dd1a566361b7b6064595875579d3b1 Mon Sep 17 00:00:00 2001 From: imSp4rky Date: Wed, 29 Apr 2026 23:34:56 -0600 Subject: [PATCH] perf(downloader): parallel byte-range fetch for single-URL tracks Single-URL tracks (no DASH/HLS/ISM manifest) previously streamed sequentially over one TCP connection, capping throughput at per-flow CDN shaping limits. Probe ranges via a 1-byte GET; if supported and total size >= 64MB, split the byte range across N workers (capped by --workers) writing to a pre-allocated file at offsets. Each worker delegates to download() in part mode for shared retry/Range-resume semantics. ~2-3x speedup observed on shaped CDN edges. --- unshackle/core/downloaders/requests.py | 280 ++++++++++++++++++++----- 1 file changed, 228 insertions(+), 52 deletions(-) diff --git a/unshackle/core/downloaders/requests.py b/unshackle/core/downloaders/requests.py index f51072d..e755a5c 100644 --- a/unshackle/core/downloaders/requests.py +++ b/unshackle/core/downloaders/requests.py @@ -25,6 +25,9 @@ MAX_CHUNK = 4_194_304 # 4MB DEFAULT_CHUNK = 524_288 # 512KB SPEED_ROLLING_WINDOW = 10 # seconds of history to keep for speed calculation +RANGE_PARALLEL_MIN_SIZE = 64 * 1024 * 1024 +RANGE_PARALLEL_PART_SIZE = 16 * 1024 * 1024 + def _adaptive_chunk_size(content_length: int) -> int: """Pick chunk size based on content length. Benchmarked sweet spot: 512KB-4MB.""" @@ -45,11 +48,137 @@ def _is_rnet_session(session: Any) -> bool: return isinstance(session, RnetSession) +def _probe_ranged(url: str, session: Any, **kwargs: Any) -> tuple[int, bool]: + headers = {**(kwargs.get("headers") or {}), "Range": "bytes=0-0"} + rest = {k: v for k, v in kwargs.items() if k != "headers"} + try: + resp = session.get(url, stream=True, headers=headers, **rest) + except Exception: + return 0, False + try: + if resp.status_code != 206: + return 0, False + ce = (resp.headers.get("Content-Encoding") or resp.headers.get("content-encoding") or "").lower() + if ce in ("gzip", "deflate", "br"): + return 0, False + content_range = resp.headers.get("Content-Range") or resp.headers.get("content-range") or "" + total = content_range.rsplit("/", 1)[-1].strip() + return (int(total), True) if total.isdigit() else (0, False) + finally: + try: + resp.close() + except Exception: + pass + + +def _dispatch_parts( + url: str, + save_path: Path, + session: Any, + total_size: int, + max_workers: int, + **kwargs: Any, +) -> Generator[dict[str, Any], None, None]: + save_path.parent.mkdir(parents=True, exist_ok=True) + control_file = save_path.with_name(f"{save_path.name}.!dev") + + n_parts = max(1, min(max_workers, math.ceil(total_size / RANGE_PARALLEL_PART_SIZE))) + part_size = math.ceil(total_size / n_parts) + parts = [ + (s, e) + for i in range(n_parts) + for s, e in [(i * part_size, min(total_size - 1, (i + 1) * part_size - 1))] + if s <= e + ] + + control_file.write_bytes(b"") + with open(save_path, "wb") as f: + f.truncate(total_size) + + events: Queue[dict[str, Any]] = Queue() + + def _worker(start: int, end: int) -> None: + for ev in download( + url=url, + save_path=save_path, + session=session, + part_offset=start, + part_end=end, + **kwargs, + ): + events.put(ev) + + pool = ThreadPoolExecutor(max_workers=len(parts)) + futures = [pool.submit(_worker, s, e) for s, e in parts] + pending = set(futures) + + yield {"total": total_size} + + total_bytes = 0 + start_time = last_report = time.time() + completed = False + worker_error = False + + try: + while pending: + advance = 0 + while not events.empty(): + try: + ev = events.get_nowait() + except Empty: + break + a = ev.get("advance") + if a: + advance += a + if advance: + total_bytes += advance + yield {"advance": advance} + + now = time.time() + if now - last_report > 0.5 and total_bytes > 0: + yield {"downloaded": f"{filesize.decimal(math.ceil(total_bytes / (now - start_time)))}/s"} + last_report = now + + done, pending = wait(pending, timeout=0.1, return_when=FIRST_COMPLETED) + for fut in done: + exc = fut.exception() + if exc: + worker_error = True + DOWNLOAD_CANCELLED.set() + raise exc + + advance = 0 + while not events.empty(): + try: + ev = events.get_nowait() + except Empty: + break + a = ev.get("advance") + if a: + advance += a + if advance: + total_bytes += advance + yield {"advance": advance} + + yield {"file_downloaded": save_path, "written": total_size} + completed = True + except KeyboardInterrupt: + DOWNLOAD_CANCELLED.set() + pool.shutdown(wait=False, cancel_futures=True) + raise + finally: + pool.shutdown(wait=worker_error, cancel_futures=True) + if completed: + control_file.unlink(missing_ok=True) + + def download( url: str, save_path: Path, session: Optional[Any] = None, segmented: bool = False, + part_offset: Optional[int] = None, + part_end: Optional[int] = None, **kwargs: Any, ) -> Generator[dict[str, Any], None, None]: """ @@ -73,42 +202,54 @@ def download( session: A requests.Session or RnetSession to make HTTP requests with. RnetSession preserves TLS fingerprinting for services that need it. segmented: If downloads are segments or parts of one bigger file. + part_offset: Byte offset to write at within a pre-allocated file. When set + (with `part_end`), enables part mode for parallel ranged downloads — + no truncate, no skip-if-exists, no control file; emits only `advance` + events; retries resume mid-part via Range. + part_end: Inclusive end byte of the part. Required when `part_offset` is set. kwargs: Any extra keyword arguments to pass to the session.get() call. Use this for one-time request changes like a header, cookie, or proxy. For example, to request Byte-ranges use e.g., `headers={"Range": "bytes=0-128"}`. """ session = session or Session() + part_mode = part_offset is not None and part_end is not None save_dir = save_path.parent control_file = save_path.with_name(f"{save_path.name}.!dev") - save_dir.mkdir(parents=True, exist_ok=True) resume_offset = 0 - if control_file.exists() and save_path.exists(): - resume_offset = save_path.stat().st_size - elif control_file.exists(): - control_file.unlink() - elif save_path.exists(): - yield dict(file_downloaded=save_path, written=save_path.stat().st_size) - return + if not part_mode: + if control_file.exists() and save_path.exists(): + resume_offset = save_path.stat().st_size + elif control_file.exists(): + control_file.unlink() + elif save_path.exists(): + yield dict(file_downloaded=save_path, written=save_path.stat().st_size) + return + control_file.write_bytes(b"") - control_file.write_bytes(b"") _time = time.time use_raw = _is_requests_session(session) attempts = 1 completed = False + written = 0 try: while True: - written = 0 + if not part_mode: + written = 0 last_speed_refresh = _time() try: use_rnet = _is_rnet_session(session) request_kwargs = dict(kwargs) - if resume_offset > 0: + if part_mode: + req_headers = dict(request_kwargs.get("headers", {}) or {}) + req_headers["Range"] = f"bytes={part_offset + written}-{part_end}" + request_kwargs["headers"] = req_headers + elif resume_offset > 0: req_headers = dict(request_kwargs.get("headers", {}) or {}) req_headers["Range"] = f"bytes={resume_offset}-" request_kwargs["headers"] = req_headers @@ -116,9 +257,11 @@ def download( stream = session.get(url, stream=True, **request_kwargs) stream.raise_for_status() - resumed = resume_offset > 0 and stream.status_code == 206 - if resume_offset > 0 and not resumed: + resumed = (not part_mode) and resume_offset > 0 and stream.status_code == 206 + if (not part_mode) and resume_offset > 0 and not resumed: resume_offset = 0 + if part_mode and stream.status_code != 206: + raise IOError(f"expected 206 for ranged part, got {stream.status_code}") if use_rnet: content_length = stream.content_length or 0 else: @@ -132,7 +275,7 @@ def download( chunk_size = _adaptive_chunk_size(content_length) total_size = (resume_offset + content_length) if resumed and content_length > 0 else content_length - if not segmented: + if not segmented and not part_mode: if total_size > 0: yield dict(total=total_size) else: @@ -140,9 +283,16 @@ def download( if resumed and resume_offset > 0: yield dict(advance=resume_offset) - file_mode = "ab" if resumed else "wb" - with open(save_path, file_mode, buffering=1_048_576) as f: - if not resumed and content_length > 0: + if part_mode: + file_mode = "r+b" + file_buffering = 0 + else: + file_mode = "ab" if resumed else "wb" + file_buffering = 1_048_576 + with open(save_path, file_mode, buffering=file_buffering) as f: + if part_mode: + f.seek(part_offset + written) + elif not resumed and content_length > 0: f.truncate(content_length) f.seek(0) @@ -151,27 +301,13 @@ def download( if use_rnet: chunks = stream.stream() elif use_raw: - _read = stream.raw.read - - def _chunks() -> Generator[bytes, None, None]: - while True: - chunk = _read(chunk_size) - if not chunk: - break - yield chunk - stream.close() - - chunks = _chunks() + chunks = iter(lambda: stream.raw.read(chunk_size), b"") else: - - def _chunks_iter() -> Generator[bytes, None, None]: - yield from stream.iter_content(chunk_size=chunk_size) - stream.close() - - chunks = _chunks_iter() + chunks = stream.iter_content(chunk_size=chunk_size) _data_accumulated = 0 _bytes_since_yield = 0 + emit_progress = (not segmented) or part_mode for chunk in chunks: if DOWNLOAD_CANCELLED.is_set(): break @@ -179,7 +315,7 @@ def download( download_size = len(chunk) written += download_size - if not segmented: + if emit_progress: _bytes_since_yield += download_size _data_accumulated += download_size now = _time() @@ -187,35 +323,51 @@ def download( if time_since > PROGRESS_WINDOW: yield dict(advance=_bytes_since_yield) _bytes_since_yield = 0 - download_speed = math.ceil(_data_accumulated / (time_since or 1)) - yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") + if not part_mode: + download_speed = math.ceil(_data_accumulated / (time_since or 1)) + yield dict(downloaded=f"{filesize.decimal(download_speed)}/s") last_speed_refresh = now _data_accumulated = 0 - if not segmented and _bytes_since_yield > 0: + if emit_progress and _bytes_since_yield > 0: yield dict(advance=_bytes_since_yield) - if not resumed and content_length > 0 and written != content_length: + try: + stream.close() + except Exception: + pass + + if not part_mode and not resumed and content_length > 0 and written != content_length: f.truncate(written) - if not segmented and content_length and written < content_length: + if part_mode: + expected = part_end - part_offset + 1 + if written < expected: + raise IOError(f"Failed to read part {part_offset}-{part_end}: got {written}/{expected}") + elif not segmented and content_length and written < content_length: raise IOError(f"Failed to read {content_length} bytes from the track URI.") - yield dict(file_downloaded=save_path, written=resume_offset + written) - - if segmented: - yield dict(advance=1) + if not part_mode: + yield dict(file_downloaded=save_path, written=resume_offset + written) + if segmented: + yield dict(advance=1) completed = True break except Exception: + try: + stream.close() + except Exception: + pass if DOWNLOAD_CANCELLED.is_set() or attempts == MAX_ATTEMPTS: + if part_mode and not DOWNLOAD_CANCELLED.is_set(): + raise return - if save_path.exists(): + if not part_mode and save_path.exists(): resume_offset = save_path.stat().st_size time.sleep(RETRY_WAIT) attempts += 1 finally: - if completed: + if completed and not part_mode: control_file.unlink(missing_ok=True) @@ -347,14 +499,38 @@ def requests( segmented_batch = len(urls) > 1 - # Fast path: single URL — no thread pool overhead if len(urls) == 1: + url_item = urls[0] try: - yield from download( - session=session, - segmented=segmented_batch, - **urls[0], - ) + ranged_used = False + if max_workers > 1: + total_size, supports_ranges = _probe_ranged(url_item["url"], session) + if supports_ranges and total_size >= RANGE_PARALLEL_MIN_SIZE: + try: + yield from _dispatch_parts( + session=session, + total_size=total_size, + max_workers=max_workers, + **url_item, + ) + ranged_used = True + except KeyboardInterrupt: + raise + except Exception: + save_path = url_item.get("save_path") + if save_path: + sp = Path(save_path) + for target in (sp, sp.with_name(f"{sp.name}.!dev")): + try: + target.unlink(missing_ok=True) + except OSError: + pass + if not ranged_used: + yield from download( + session=session, + segmented=segmented_batch, + **url_item, + ) except KeyboardInterrupt: DOWNLOAD_CANCELLED.set() yield dict(downloaded="[yellow]CANCELLED")