N_m3u8DL-RE: Improve track selection, add download arguments and option to load manifest from file (#38)

* feat: Add 'from_file', 'downloader_args' to Track

* feat: Add loading HLS playlist from file

* refactor: Improve track selection, args for n_m3u8dl_re
This commit is contained in:
stabbedbybrick
2025-11-08 21:57:52 +01:00
committed by GitHub
parent 90e4030a88
commit 9ed5133c4c
3 changed files with 319 additions and 225 deletions

View File

@@ -1,12 +1,10 @@
import logging
import os
import re
import subprocess
import warnings
from http.cookiejar import CookieJar
from itertools import chain
from pathlib import Path
from typing import Any, Generator, MutableMapping, Optional, Union
from typing import Any, Generator, MutableMapping
import requests
from requests.cookies import cookiejar_from_dict, get_cookie_header
@@ -16,255 +14,325 @@ from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.constants import DOWNLOAD_CANCELLED
PERCENT_RE = re.compile(r"(\d+\.\d+%)")
SPEED_RE = re.compile(r"(\d+\.\d+(?:MB|KB)ps)")
SIZE_RE = re.compile(r"(\d+\.\d+(?:MB|GB|KB)/\d+\.\d+(?:MB|GB|KB))")
WARN_RE = re.compile(r"(WARN : Response.*|WARN : One or more errors occurred.*)")
ERROR_RE = re.compile(r"(ERROR.*)")
DECRYPTION_ENGINE = {
"shaka": "SHAKA_PACKAGER",
"mp4decrypt": "MP4DECRYPT",
}
# Ignore FutureWarnings
warnings.simplefilter(action="ignore", category=FutureWarning)
AUDIO_CODEC_MAP = {"AAC": "mp4a", "AC3": "ac-3", "EC3": "ec-3"}
VIDEO_CODEC_MAP = {"AVC": "avc", "HEVC": "hvc", "DV": "dvh", "HLG": "hev"}
def get_track_selection_args(track: Any) -> list[str]:
"""
Generates track selection arguments for N_m3u8dl_RE.
Args:
track: A track object with attributes like descriptor, data, and class name.
Returns:
A list of strings for track selection.
Raises:
ValueError: If the manifest type is unsupported or track selection fails.
"""
descriptor = track.descriptor.name
track_type = track.__class__.__name__
def _create_args(flag: str, parts: list[str], type_str: str, extra_args: list[str] | None = None) -> list[str]:
if not parts:
raise ValueError(f"[N_m3u8DL-RE]: Unable to select {type_str} track from {descriptor} manifest")
final_args = [flag, ":".join(parts)]
if extra_args:
final_args.extend(extra_args)
return final_args
match descriptor:
case "HLS":
# HLS playlists are direct inputs; no selection arguments needed.
return []
case "DASH":
representation = track.data.get("dash", {}).get("representation", {})
adaptation_set = track.data.get("dash", {}).get("adaptation_set", {})
parts = []
if track_type == "Audio":
if track_id := representation.get("id") or adaptation_set.get("audioTrackId"):
parts.append(rf'"id=\b{track_id}\b"')
else:
if codecs := representation.get("codecs"):
parts.append(f"codecs={codecs}")
if lang := representation.get("lang") or adaptation_set.get("lang"):
parts.append(f"lang={lang}")
if bw := representation.get("bandwidth"):
bitrate = int(bw) // 1000
parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}")
if roles := representation.findall("Role") + adaptation_set.findall("Role"):
if role := next((r.get("value") for r in roles if r.get("value", "").lower() == "main"), None):
parts.append(f"role={role}")
return _create_args("-sa", parts, "audio")
if track_type == "Video":
if track_id := representation.get("id"):
parts.append(rf'"id=\b{track_id}\b"')
else:
if width := representation.get("width"):
parts.append(f"res={width}*")
if codecs := representation.get("codecs"):
parts.append(f"codecs={codecs}")
if bw := representation.get("bandwidth"):
bitrate = int(bw) // 1000
parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}")
return _create_args("-sv", parts, "video")
if track_type == "Subtitle":
if track_id := representation.get("id"):
parts.append(rf'"id=\b{track_id}\b"')
else:
if lang := representation.get("lang"):
parts.append(f"lang={lang}")
return _create_args("-ss", parts, "subtitle", extra_args=["--auto-subtitle-fix", "false"])
case "ISM":
quality_level = track.data.get("ism", {}).get("quality_level", {})
stream_index = track.data.get("ism", {}).get("stream_index", {})
parts = []
if track_type == "Audio":
if name := stream_index.get("Name") or quality_level.get("Index"):
parts.append(rf'"id=\b{name}\b"')
else:
if codecs := quality_level.get("FourCC"):
parts.append(f"codecs={codecs}")
if lang := stream_index.get("Language"):
parts.append(f"lang={lang}")
if br := quality_level.get("Bitrate"):
bitrate = int(br) // 1000
parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}")
return _create_args("-sa", parts, "audio")
if track_type == "Video":
if name := stream_index.get("Name") or quality_level.get("Index"):
parts.append(rf'"id=\b{name}\b"')
else:
if width := quality_level.get("MaxWidth"):
parts.append(f"res={width}*")
if codecs := quality_level.get("FourCC"):
parts.append(f"codecs={codecs}")
if br := quality_level.get("Bitrate"):
bitrate = int(br) // 1000
parts.append(f"bwMin={bitrate}:bwMax={bitrate + 5}")
return _create_args("-sv", parts, "video")
# I've yet to encounter a subtitle track in ISM manifests, so this is mostly theoretical.
if track_type == "Subtitle":
if name := stream_index.get("Name") or quality_level.get("Index"):
parts.append(rf'"id=\b{name}\b"')
else:
if lang := stream_index.get("Language"):
parts.append(f"lang={lang}")
return _create_args("-ss", parts, "subtitle", extra_args=["--auto-subtitle-fix", "false"])
raise ValueError(f"[N_m3u8DL-RE]: Unsupported manifest type: {descriptor}")
def track_selection(track: object) -> list[str]:
"""Return the N_m3u8DL-RE stream selection arguments for a track."""
def build_download_args(
track_url: str,
filename: str,
output_dir: Path,
thread_count: int,
retry_count: int,
track_from_file: Path | None,
custom_args: dict[str, Any] | None,
headers: dict[str, Any] | None,
cookies: CookieJar | None,
proxy: str | None,
content_keys: dict[str, str] | None,
ad_keyword: str | None,
skip_merge: bool | None = False,
) -> list[str]:
"""Constructs the CLI arguments for N_m3u8DL-RE."""
if "dash" in track.data:
adaptation_set = track.data["dash"]["adaptation_set"]
representation = track.data["dash"]["representation"]
# Default arguments
args = {
"--save-name": filename,
"--save-dir": output_dir,
"--tmp-dir": output_dir,
"--thread-count": thread_count,
"--download-retry-count": retry_count,
"--write-meta-json": False,
"--no-log": True,
}
if proxy:
args["--custom-proxy"] = proxy
if skip_merge:
args["--skip-merge"] = skip_merge
if ad_keyword:
args["--ad-keyword"] = ad_keyword
if content_keys:
args["--key"] = next((f"{kid.hex}:{key.lower()}" for kid, key in content_keys.items()), None)
args["--decryption-engine"] = DECRYPTION_ENGINE.get(config.decryption.lower()) or "SHAKA_PACKAGER"
if custom_args:
args.update(custom_args)
track_type = track.__class__.__name__
codec = track.codec.name
bitrate = track.bitrate // 1000
language = track.language
width = track.width if track_type == "Video" else None
height = track.height if track_type == "Video" else None
range = track.range.name if track_type == "Video" else None
command = [track_from_file or track_url]
for flag, value in args.items():
if value is True:
command.append(flag)
elif value is False:
command.extend([flag, "false"])
elif value is not False and value is not None:
command.extend([flag, str(value)])
elif "ism" in track.data:
stream_index = track.data["ism"]["stream_index"]
quality_level = track.data["ism"]["quality_level"]
if headers:
for key, value in headers.items():
if key.lower() not in ("accept-encoding", "cookie"):
command.extend(["--header", f"{key}: {value}"])
track_type = track.__class__.__name__
codec = track.codec.name
bitrate = track.bitrate // 1000
language = track.language
width = track.width if track_type == "Video" else None
height = track.height if track_type == "Video" else None
range = track.range.name if track_type == "Video" else None
adaptation_set = stream_index
representation = quality_level
if cookies:
req = requests.Request(method="GET", url=track_url)
cookie_header = get_cookie_header(cookies, req)
command.extend(["--header", f"Cookie: {cookie_header}"])
else:
return []
if track_type == "Audio":
codecs = AUDIO_CODEC_MAP.get(codec)
langs = []
if adaptation_set.get("lang"):
langs.append(adaptation_set.get("lang"))
if representation is not None and representation.get("lang"):
langs.append(representation.get("lang"))
track_ids = list(
set(
v
for x in chain(adaptation_set, representation)
for v in (x.get("audioTrackId"), x.get("id"))
if v is not None
)
)
roles = adaptation_set.findall("Role") + representation.findall("Role")
role = ":role=main" if next((i for i in roles if i.get("value").lower() == "main"), None) else ""
bandwidth = f"bwMin={bitrate}:bwMax={bitrate + 5}"
if langs:
track_selection = ["-sa", f"lang={language}:codecs={codecs}:{bandwidth}{role}"]
elif len(track_ids) == 1:
track_selection = ["-sa", f"id={track_ids[0]}"]
else:
track_selection = ["-sa", f"for=best{role}"]
return track_selection
if track_type == "Video":
# adjust codec based on range
codec_adjustments = {("HEVC", "DV"): "DV", ("HEVC", "HLG"): "HLG"}
codec = codec_adjustments.get((codec, range), codec)
codecs = VIDEO_CODEC_MAP.get(codec)
bandwidth = f"bwMin={bitrate}:bwMax={bitrate + 5}"
if width and height:
resolution = f"{width}x{height}"
elif width:
resolution = f"{width}*"
else:
resolution = "for=best"
if resolution.startswith("for="):
track_selection = ["-sv", resolution]
track_selection.append(f"codecs={codecs}:{bandwidth}")
else:
track_selection = ["-sv", f"res={resolution}:codecs={codecs}:{bandwidth}"]
return track_selection
return command
def download(
urls: Union[str, dict[str, Any], list[str], list[dict[str, Any]]],
track: object,
urls: str | dict[str, Any] | list[str | dict[str, Any]],
track: Any,
output_dir: Path,
filename: str,
headers: Optional[MutableMapping[str, Union[str, bytes]]] = None,
cookies: Optional[Union[MutableMapping[str, str], CookieJar]] = None,
proxy: Optional[str] = None,
max_workers: Optional[int] = None,
content_keys: Optional[dict[str, Any]] = None,
headers: MutableMapping[str, str | bytes] | None,
cookies: MutableMapping[str, str] | CookieJar | None,
proxy: str | None,
max_workers: int | None,
content_keys: dict[str, Any] | None,
skip_merge: bool | None = False,
) -> Generator[dict[str, Any], None, None]:
if not urls:
raise ValueError("urls must be provided and not empty")
elif not isinstance(urls, (str, dict, list)):
raise TypeError(f"Expected urls to be {str} or {dict} or a list of one of them, not {type(urls)}")
if not output_dir:
raise ValueError("output_dir must be provided")
elif not isinstance(output_dir, Path):
raise TypeError(f"Expected output_dir to be {Path}, not {type(output_dir)}")
if not filename:
raise ValueError("filename must be provided")
elif not isinstance(filename, str):
raise TypeError(f"Expected filename to be {str}, not {type(filename)}")
if not isinstance(urls, (str, dict, list)):
raise TypeError(f"Expected urls to be str, dict, or list, not {type(urls)}")
if not isinstance(output_dir, Path):
raise TypeError(f"Expected output_dir to be Path, not {type(output_dir)}")
if not isinstance(filename, str) or not filename:
raise ValueError("filename must be a non-empty string")
if not isinstance(headers, (MutableMapping, type(None))):
raise TypeError(f"Expected headers to be {MutableMapping}, not {type(headers)}")
raise TypeError(f"Expected headers to be a mapping or None, not {type(headers)}")
if not isinstance(cookies, (MutableMapping, CookieJar, type(None))):
raise TypeError(f"Expected cookies to be {MutableMapping} or {CookieJar}, not {type(cookies)}")
raise TypeError(f"Expected cookies to be a mapping, CookieJar, or None, not {type(cookies)}")
if not isinstance(proxy, (str, type(None))):
raise TypeError(f"Expected proxy to be {str}, not {type(proxy)}")
if not max_workers:
max_workers = min(32, (os.cpu_count() or 1) + 4)
elif not isinstance(max_workers, int):
raise TypeError(f"Expected max_workers to be {int}, not {type(max_workers)}")
if not isinstance(urls, list):
urls = [urls]
if not binaries.N_m3u8DL_RE:
raise EnvironmentError("N_m3u8DL-RE executable not found...")
raise TypeError(f"Expected proxy to be a str or None, not {type(proxy)}")
if not isinstance(max_workers, (int, type(None))):
raise TypeError(f"Expected max_workers to be an int or None, not {type(max_workers)}")
if not isinstance(content_keys, (dict, type(None))):
raise TypeError(f"Expected content_keys to be a dict or None, not {type(content_keys)}")
if not isinstance(skip_merge, (bool, type(None))):
raise TypeError(f"Expected skip_merge to be a bool or None, not {type(skip_merge)}")
if cookies and not isinstance(cookies, CookieJar):
cookies = cookiejar_from_dict(cookies)
track_type = track.__class__.__name__
thread_count = str(config.n_m3u8dl_re.get("thread_count", max_workers))
retry_count = str(config.n_m3u8dl_re.get("retry_count", max_workers))
if not binaries.N_m3u8DL_RE:
raise EnvironmentError("N_m3u8DL-RE executable not found...")
effective_max_workers = max_workers or min(32, (os.cpu_count() or 1) + 4)
if proxy and not config.n_m3u8dl_re.get("use_proxy", True):
proxy = None
thread_count = config.n_m3u8dl_re.get("thread_count", effective_max_workers)
retry_count = config.n_m3u8dl_re.get("retry_count", 10)
ad_keyword = config.n_m3u8dl_re.get("ad_keyword")
arguments = [
track.url,
"--save-dir",
output_dir,
"--tmp-dir",
output_dir,
"--thread-count",
thread_count,
"--download-retry-count",
retry_count,
"--no-log",
"--write-meta-json",
"false",
]
arguments = build_download_args(
track_url=track.url,
track_from_file=track.from_file,
filename=filename,
output_dir=output_dir,
thread_count=thread_count,
retry_count=retry_count,
custom_args=track.downloader_args,
headers=headers,
cookies=cookies,
proxy=proxy,
content_keys=content_keys,
skip_merge=skip_merge,
ad_keyword=ad_keyword,
)
arguments.extend(get_track_selection_args(track))
for header, value in (headers or {}).items():
if header.lower() in ("accept-encoding", "cookie"):
continue
arguments.extend(["--header", f"{header}: {value}"])
if cookies:
cookie_header = get_cookie_header(cookies, requests.Request(url=track.url))
if cookie_header:
arguments.extend(["--header", f"Cookie: {cookie_header}"])
if proxy:
arguments.extend(["--custom-proxy", proxy])
if content_keys:
for kid, key in content_keys.items():
keys = f"{kid.hex}:{key.lower()}"
arguments.extend(["--key", keys])
arguments.extend(["--use-shaka-packager"])
if ad_keyword:
arguments.extend(["--ad-keyword", ad_keyword])
if track.descriptor.name == "URL":
error = f"[N_m3u8DL-RE]: {track.descriptor} is currently not supported"
raise ValueError(error)
elif track.descriptor.name == "DASH":
arguments.extend(track_selection(track))
# TODO: improve this nonsense
percent_re = re.compile(r"(\d+\.\d+%)")
speed_re = re.compile(r"(?<!/)(\d+\.\d+MB)(?!.*\/)")
warn = re.compile(r"(WARN : Response.*)")
error = re.compile(r"(ERROR.*)")
size_patterns = [
re.compile(r"(\d+\.\d+MB/\d+\.\d+GB)"),
re.compile(r"(\d+\.\d+GB/\d+\.\d+GB)"),
re.compile(r"(\d+\.\d+MB/\d+\.\d+MB)"),
]
yield dict(total=100)
yield {"total": 100}
yield {"downloaded": "Parsing streams..."}
try:
with subprocess.Popen(
[binaries.N_m3u8DL_RE, *arguments], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
) as p:
for line in p.stdout:
[binaries.N_m3u8DL_RE, *arguments],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
) as process:
last_line = ""
track_type = track.__class__.__name__
for line in process.stdout:
output = line.strip()
if output:
percent = percent_re.search(output)
speed = speed_re.search(output)
size = next(
(pattern.search(output).group(1) for pattern in size_patterns if pattern.search(output)), ""
)
if not output:
continue
last_line = output
if speed:
yield dict(downloaded=f"{speed.group(1)}ps {size}")
if percent:
progress = int(percent.group(1).split(".")[0])
yield dict(completed=progress) if progress < 100 else dict(downloaded="Merging")
if warn_match := WARN_RE.search(output):
console.log(f"{track_type} {warn_match.group(1)}")
continue
if warn.search(output):
console.log(f"{track_type} " + warn.search(output).group(1))
if speed_match := SPEED_RE.search(output):
size = size_match.group(1) if (size_match := SIZE_RE.search(output)) else ""
yield {"downloaded": f"{speed_match.group(1)} {size}"}
p.wait()
if percent_match := PERCENT_RE.search(output):
progress = int(percent_match.group(1).split(".", 1)[0])
yield {"completed": progress} if progress < 100 else {"downloaded": "Merging"}
if p.returncode != 0:
if error.search(output):
raise ValueError(f"[N_m3u8DL-RE]: {error.search(output).group(1)}")
raise subprocess.CalledProcessError(p.returncode, arguments)
process.wait()
if process.returncode != 0:
if error_match := ERROR_RE.search(last_line):
raise ValueError(f"[N_m3u8DL-RE]: {error_match.group(1)}")
raise subprocess.CalledProcessError(process.returncode, arguments)
except ConnectionResetError:
# interrupted while passing URI to download
raise KeyboardInterrupt()
except KeyboardInterrupt:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
yield dict(downloaded="[yellow]CANCELLED")
yield {"downloaded": "[yellow]CANCELLED"}
raise
except Exception:
DOWNLOAD_CANCELLED.set() # skip pending track downloads
yield dict(downloaded="[red]FAILED")
yield {"downloaded": "[red]FAILED"}
raise
def n_m3u8dl_re(
urls: Union[str, list[str], dict[str, Any], list[dict[str, Any]]],
track: object,
urls: str | list[str] | dict[str, Any] | list[dict[str, Any]],
track: Any,
output_dir: Path,
filename: str,
headers: Optional[MutableMapping[str, Union[str, bytes]]] = None,
cookies: Optional[Union[MutableMapping[str, str], CookieJar]] = None,
proxy: Optional[str] = None,
max_workers: Optional[int] = None,
content_keys: Optional[dict[str, Any]] = None,
headers: MutableMapping[str, str | bytes] | None = None,
cookies: MutableMapping[str, str] | CookieJar | None = None,
proxy: str | None = None,
max_workers: int | None = None,
content_keys: dict[str, Any] | None = None,
skip_merge: bool | None = False,
) -> Generator[dict[str, Any], None, None]:
"""
Download files using N_m3u8DL-RE.
@@ -279,28 +347,33 @@ def n_m3u8dl_re(
The data is in the same format accepted by rich's progress.update() function.
Parameters:
urls: Web URL(s) to file(s) to download. You can use a dictionary with the key
"url" for the URI, and other keys for extra arguments to use per-URL.
urls: Web URL(s) to file(s) to download. NOTE: This parameter is ignored for now.
track: The track to download. Used to get track attributes for the selection
process. Note that Track.Descriptor.URL is not supported by N_m3u8DL-RE.
output_dir: The folder to save the file into. If the save path's directory does
not exist then it will be made automatically.
filename: The filename or filename template to use for each file. The variables
you can use are `i` for the URL index and `ext` for the URL extension.
headers: A mapping of HTTP Header Key/Values to use for the download.
cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for the download.
filename: The filename or filename template to use for each file.
headers: A mapping of HTTP Header Key/Values to use for all downloads.
cookies: A mapping of Cookie Key/Values or a Cookie Jar to use for all downloads.
proxy: A proxy to use for all downloads.
max_workers: The maximum amount of threads to use for downloads. Defaults to
min(32,(cpu_count+4)). Can be set in config with --thread-count option.
content_keys: The content keys to use for decryption.
skip_merge: Whether to skip merging the downloaded chunks.
"""
track_type = track.__class__.__name__
log = logging.getLogger("N_m3u8DL-RE")
if proxy and not config.n_m3u8dl_re.get("use_proxy", True):
log.warning(f"{track_type}: Ignoring proxy as N_m3u8DL-RE is set to use_proxy=False")
proxy = None
yield from download(urls, track, output_dir, filename, headers, cookies, proxy, max_workers, content_keys)
yield from download(
urls=urls,
track=track,
output_dir=output_dir,
filename=filename,
headers=headers,
cookies=cookies,
proxy=proxy,
max_workers=max_workers,
content_keys=content_keys,
skip_merge=skip_merge,
)
__all__ = ("n_m3u8dl_re",)

View File

@@ -249,17 +249,20 @@ class HLS:
log = logging.getLogger("HLS")
# Get the playlist text and handle both session types
response = session.get(track.url)
if isinstance(response, requests.Response):
if not response.ok:
log.error(f"Failed to request the invariant M3U8 playlist: {response.status_code}")
sys.exit(1)
playlist_text = response.text
if track.from_file:
master = m3u8.load(str(track.from_file))
else:
raise TypeError(f"Expected response to be a requests.Response or curl_cffi.Response, not {type(response)}")
# Get the playlist text and handle both session types
response = session.get(track.url)
if isinstance(response, requests.Response):
if not response.ok:
log.error(f"Failed to request the invariant M3U8 playlist: {response.status_code}")
sys.exit(1)
playlist_text = response.text
else:
raise TypeError(f"Expected response to be a requests.Response or curl_cffi.Response, not {type(response)}")
master = m3u8.loads(playlist_text, uri=track.url)
master = m3u8.loads(playlist_text, uri=track.url)
if not master.segments:
log.error("Track's HLS playlist has no segments, expecting an invariant M3U8 playlist.")

View File

@@ -25,7 +25,7 @@ from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY
from unshackle.core.downloaders import aria2c, curl_impersonate, n_m3u8dl_re, requests
from unshackle.core.drm import DRM_T, PlayReady, Widevine
from unshackle.core.events import events
from unshackle.core.utilities import get_boxes, try_ensure_utf8
from unshackle.core.utilities import get_boxes, try_ensure_utf8, get_extension
from unshackle.core.utils.subprocess import ffprobe
@@ -47,6 +47,8 @@ class Track:
drm: Optional[Iterable[DRM_T]] = None,
edition: Optional[str] = None,
downloader: Optional[Callable] = None,
downloader_args: Optional[dict] = None,
from_file: Optional[Path] = None,
data: Optional[Union[dict, defaultdict]] = None,
id_: Optional[str] = None,
extra: Optional[Any] = None,
@@ -69,6 +71,10 @@ class Track:
raise TypeError(f"Expected edition to be a {str}, not {type(edition)}")
if not isinstance(downloader, (Callable, type(None))):
raise TypeError(f"Expected downloader to be a {Callable}, not {type(downloader)}")
if not isinstance(downloader_args, (dict, type(None))):
raise TypeError(f"Expected downloader_args to be a {dict}, not {type(downloader_args)}")
if not isinstance(from_file, (Path, type(None))):
raise TypeError(f"Expected from_file to be a {Path}, not {type(from_file)}")
if not isinstance(data, (dict, defaultdict, type(None))):
raise TypeError(f"Expected data to be a {dict} or {defaultdict}, not {type(data)}")
@@ -100,6 +106,8 @@ class Track:
self.drm = drm
self.edition: str = edition
self.downloader = downloader
self.downloader_args = downloader_args
self.from_file = from_file
self._data: defaultdict[Any, Any] = defaultdict(dict)
self.data = data or {}
self.extra: Any = extra or {} # allow anything for extra, but default to a dict
@@ -203,7 +211,17 @@ class Track:
save_path = config.directories.temp / f"{track_type}_{self.id}.mp4"
if track_type == "Subtitle":
save_path = save_path.with_suffix(f".{self.codec.extension}")
if self.downloader.__name__ == "n_m3u8dl_re":
# n_m3u8dl_re doesn't support directly downloading subtitles
if self.downloader.__name__ == "n_m3u8dl_re" and get_extension(self.url) in {
".srt",
".vtt",
".ttml",
".ssa",
".ass",
".stpp",
".wvtt",
".xml",
}:
self.downloader = requests
if self.descriptor != self.Descriptor.URL: