9 Commits

Author SHA1 Message Date
d18fbdb542 fix(unshackle): update remote_cdm device configuration
- Replace chrome-2 device with android device in remote_cdm list
- Change device_name to "andorid" and device_type to ANDROID
- Update system_id to 8131 and security_level to 1
- Add type field with value "decrypt_labs"
- Update host and secret fields to match decrypt_labs credentials
2025-08-28 02:21:04 +07:00
d5cbc4e088 fix(cdm): adjust scheme value based on security level
- Change scheme to "widevine" if security level is 3, otherwise use "L1"
- Apply this logic when setting init_data and license response schemes
- Ensure correct CDM scheme usage according to security level context
2025-08-28 02:20:23 +07:00
831fa10ce5 feat(dl): enhance episode folder structure and rename series folders
- Create nested folder structure for episodes: {title}/Season {season:02}/{filename}
- Keep existing folder naming for songs unchanged
- Modify episode folder naming to only show title for main folder
- Adjust episode file naming format to include separators and sanitization
- Add get_season_folder() method returning 'Season XX' for episodes
- Disable series year inclusion in folder and episode naming by default in config
- Comment out additional service and audio language tags in episode naming code
2025-08-26 21:35:22 +07:00
2a414720e7 feat(netflix): implement initial Netflix service with MSL DRM support
- Add MSL core implementation for handling Netflix message security layer
- Create MSL keys and message encryption/signature utilities
- Implement handshake to establish encrypted session keys with Netflix endpoint
- Add entity and user authentication scheme support for MSL
- Provide methods for message creation, sending, decryption, and parsing
- Implement Netflix service class with CLI integration via Click
- Support title metadata retrieval and parse movie or series accordingly
- Implement track extraction with profile and codec handling logic
- Add chapter extraction from Netflix metadata with error handling
- Implement Widevine license request using MSL messaging
- Add utility to split profiles based on video codec types
- Define schemes for key exchange, user and entity authentication with MSL
- Enable caching and loading of MSL keys with expiration checks
- Include gzip compression and base64 key decoding helpers within MSL class
2025-08-26 17:59:47 +07:00
f377bbfb74 chore(config): add comprehensive unshackle.yaml configuration file
- Define tagging options for filenames including group and metadata tags
- Configure terminal background color and file naming conventions
- Set caching parameters for title metadata with expiration controls
- Add muxing and default directories settings
- Provide flexible credentials management with profile and default support
- Configure CDM devices and remote CDMs for Widevine and PlayReady
- Define local and HTTP key vaults with options to disable pushing keys
- Set downloader preferences and specific settings for aria2c, n_m3u8dl_re, and curl_impersonate
- Introduce default parameters for download commands and subtitle conversion methods
- Add service-specific API keys, profiles, and device configurations
- Include proxy provider credentials for external VPN services
2025-08-26 17:59:21 +07:00
fb58e9f52a chore(git): update .gitignore file
- Remove unshackle.yaml from ignore list
- Delete 'services/' directory from ignore list
- Add 'Cache' to ignore list at the end of the file
2025-08-26 17:58:56 +07:00
4d2e84a45a fix(movie): adjust naming format and comment out audio and service tags
- Append " -" suffix to the movie name after replacing "$" with "S"
- Comment out adding service name to the title
- Disable appending "WEB-DL" tag to the title
- Comment out adding "DUAL" tag for two audio languages
- Comment out adding "MULTi" tag for more than two audio languages
- Comment out appending config tag suffix to video codec in name
2025-08-26 17:58:44 +07:00
f85ddce6f2 feat(downloaders): improve aria2c download progress reporting
- Added RPC calls to get detailed global and active download statistics
- Calculated total downloaded size, content size, and active download speed from active downloads
- Included stopped downloads in totals and handle error states with logged messages
- Yielded enhanced progress updates with combined downloaded sizes and speeds
- Added more granular progress dictionary keys for richer status reporting
- Added sleep delay in main aria2c function to reduce CPU usage during monitoring loop
- Updated docstring examples to reflect new progress data format and keys
2025-08-26 17:58:23 +07:00
354ba6c2e3 fix(core): correct filename sanitization regex and cleanup
- Adjust regex to replace semicolon only with spacer
- Remove colon from characters replaced by spacer, handle as removal instead
- Comment out removal of extra neighbouring spacers to prevent unintended collapses
- Refine unsafe characters removal pattern to avoid filename issues
2025-08-26 17:57:53 +07:00
17 changed files with 2194 additions and 49 deletions

3
.gitignore vendored
View File

@@ -1,5 +1,4 @@
# unshackle
unshackle.yaml
unshackle.yml
update_check.json
*.mkv
@@ -25,7 +24,6 @@ unshackle/certs/
unshackle/WVDs/
unshackle/PRDs/
temp/
services/
# Byte-compiled / optimized / DLL files
__pycache__/
@@ -235,3 +233,4 @@ cython_debug/
marimo/_static/
marimo/_lsp/
__marimo__/
Cache

View File

@@ -1159,6 +1159,12 @@ class dl:
final_filename = title.get_filename(media_info, show_service=not no_source)
if not no_folder and isinstance(title, (Episode, Song)):
if isinstance(title, Episode):
# Create nested structure: {title}/Season {season:02}/{filename}
final_dir /= title.get_filename(media_info, show_service=not no_source, folder=True)
final_dir /= title.get_season_folder()
else:
# For Song, use existing logic
final_dir /= title.get_filename(media_info, show_service=not no_source, folder=True)
final_dir.mkdir(parents=True, exist_ok=True)

View File

@@ -71,7 +71,7 @@ class DecryptLabsRemoteCDM(RemoteCdm):
{
"init_data": self.pssh.dumps(),
"service_certificate": self.req_session.signed_device_certificate,
"scheme": "widevine",
"scheme": "widevine" if self.security_level == 3 else "L1", # Using L1 CDM,
"service": self.service_name,
},
)
@@ -103,7 +103,7 @@ class DecryptLabsRemoteCDM(RemoteCdm):
"init_data": self.pssh.dumps(),
"license_request": self.license_request,
"license_response": license_message,
"scheme": "widevine",
"scheme": "widevine" if self.security_level == 3 else "L1",
},
)

View File

@@ -137,7 +137,6 @@ def download(
if len(urls) > 1:
split = 1
file_allocation = "none"
arguments = [
# [Basic Options]
"--input-file",
@@ -189,19 +188,36 @@ def download(
p.stdin.close()
while p.poll() is None:
# Get global statistics via RPC
global_stats: dict[str, Any] = (
rpc(caller=partial(rpc_session.post, url=rpc_uri), secret=rpc_secret, method="aria2.getGlobalStat")
or {}
)
number_stopped = int(global_stats.get("numStoppedTotal", 0))
download_speed = int(global_stats.get("downloadSpeed", -1))
global_download_speed = int(global_stats.get("downloadSpeed", 0))
if number_stopped:
yield dict(completed=number_stopped)
if download_speed != -1:
yield dict(downloaded=f"{filesize.decimal(download_speed)}/s")
# Get active downloads via RPC for detailed progress tracking
active_downloads: list[dict[str, Any]] = (
rpc(
caller=partial(rpc_session.post, url=rpc_uri),
secret=rpc_secret,
method="aria2.tellActive",
)
or []
)
# Calculate totals from active downloads
total_downloaded_size = 0
total_content_size = 0
active_download_speed = 0
for download in active_downloads:
total_downloaded_size += int(download.get("completedLength", 0))
total_content_size += int(download.get("totalLength", 0))
active_download_speed += int(download.get("downloadSpeed", 0))
# Get stopped downloads via RPC to check for errors and completion
stopped_downloads: list[dict[str, Any]] = (
rpc(
caller=partial(rpc_session.post, url=rpc_uri),
@@ -212,22 +228,46 @@ def download(
or []
)
for dl in stopped_downloads:
if dl["status"] == "error":
# Add completed downloads to totals and handle errors
for download in stopped_downloads:
if download["status"] == "complete":
completed_length = int(download.get("completedLength", 0))
total_downloaded_size += completed_length
total_content_size += completed_length
elif download["status"] == "error":
used_uri = next(
uri["uri"]
for file in dl["files"]
for file in download["files"]
if file["selected"] == "true"
for uri in file["uris"]
if uri["status"] == "used"
)
error = f"Download Error (#{dl['gid']}): {dl['errorMessage']} ({dl['errorCode']}), {used_uri}"
error = f"Download Error (#{download['gid']}): {download['errorMessage']} ({download['errorCode']}), {used_uri}"
error_pretty = "\n ".join(
textwrap.wrap(error, width=console.width - 20, initial_indent="")
)
console.log(Text.from_ansi("\n[Aria2c]: " + error_pretty))
raise ValueError(error)
# Yield progress information
if total_content_size > 0:
downloaded = f"{filesize.decimal(total_downloaded_size)}/{filesize.decimal(total_content_size)} {filesize.decimal(active_download_speed)}/s"
yield dict(
downloaded=downloaded,
total=total_content_size,
completed=total_downloaded_size
)
elif global_download_speed > 0:
yield dict(
downloaded=f"{filesize.decimal(global_download_speed)}/s",
speed_bytes_per_sec=global_download_speed
)
# Yield completion count
if number_stopped:
yield dict(completed=number_stopped)
# Exit when all downloads are complete
if number_stopped == len(urls):
rpc(caller=partial(rpc_session.post, url=rpc_uri), secret=rpc_secret, method="aria2.shutdown")
break
@@ -274,11 +314,14 @@ def aria2c(
Yields the following download status updates while chunks are downloading:
- {total: 100} (100% download total)
- {completed: 1} (1% download progress out of 100%)
- {downloaded: "10.1 MB/s"} (currently downloading at a rate of 10.1 MB/s)
- {total: 100} (total number of URLs to download)
- {completed: 1} (number of completed downloads)
- {downloaded: "50.2 MB/128.5 MB 10.1 MB/s", total: 134742016, completed: 52428800} (progress data)
- {downloaded: "10.1 MB/s", speed_bytes_per_sec: 10485760} (speed fallback data)
The data is in the same format accepted by rich's progress.update() function.
However, The `downloaded` and `speed_bytes_per_sec` keys are custom and not natively
accepted by all rich progress bars.
Parameters:
urls: Web URL(s) to file(s) to download. You can use a dictionary with the key
@@ -318,6 +361,7 @@ def aria2c(
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
time.sleep(1)
try:
yield from download(urls, output_dir, filename, headers, cookies, local_proxy, max_workers)

View File

@@ -92,14 +92,14 @@ class Episode(Title):
primary_audio_track = next(iter(media_info.audio_tracks), None)
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Title [Year] SXXEXX Name (or Title [Year] SXX if folder)
# Title [Year] SXXEXX Name (or just Title for main folder)
if folder:
name = f"{self.title}"
if self.year and config.series_year:
name += f" {self.year}"
name += f" S{self.season:02}"
name += f" ({self.year})"
return name
else:
name = "{title}{year} S{season:02}E{number:02} {name}".format(
name = "{title}{year} S{season:02}E{number:02} - {name} -".format(
title=self.title.replace("$", "S"), # e.g., Arli$$
year=f" {self.year}" if self.year and config.series_year else "",
season=self.season,
@@ -128,19 +128,19 @@ class Episode(Title):
name += f" {resolution}p"
# Service
if show_service:
name += f" {self.service.__name__}"
# if show_service:
# name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# # 'WEB-DL'
# name += " WEB-DL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# # DUAL
# if unique_audio_languages == 2:
# name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# # MULTi
# if unique_audio_languages > 2:
# name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
@@ -189,6 +189,12 @@ class Episode(Title):
# Simple naming style without technical details - use spaces instead of dots
return sanitize_filename(name, " ")
def get_season_folder(self) -> str:
"""
Get the season folder name in the format 'Season XX'.
"""
return f"Season {self.season:02d}"
class Series(SortedKeyList, ABC):
def __init__(self, iterable: Optional[Iterable] = None):

View File

@@ -56,7 +56,7 @@ class Movie(Title):
unique_audio_languages = len({x.language.split("-")[0] for x in media_info.audio_tracks if x.language})
# Name (Year)
name = str(self).replace("$", "S") # e.g., Arli$$
name = str(self).replace("$", "S") + " -" # e.g., Arli$$
if config.scene_naming:
# Resolution
@@ -78,20 +78,20 @@ class Movie(Title):
resolution = int(primary_video_track.width * (9 / 16))
name += f" {resolution}p"
# Service
if show_service:
name += f" {self.service.__name__}"
# # Service
# if show_service:
# name += f" {self.service.__name__}"
# 'WEB-DL'
name += " WEB-DL"
# # 'WEB-DL'
# name += " WEB-DL"
# DUAL
if unique_audio_languages == 2:
name += " DUAL"
# # DUAL
# if unique_audio_languages == 2:
# name += " DUAL"
# MULTi
if unique_audio_languages > 2:
name += " MULTi"
# # MULTi
# if unique_audio_languages > 2:
# name += " MULTi"
# Audio Codec + Channels (+ feature)
if primary_audio_track:
@@ -132,8 +132,8 @@ class Movie(Title):
name += " HFR"
name += f" {VIDEO_CODEC_MAP.get(codec, codec)}"
if config.tag:
name += f"-{config.tag}"
# if config.tag:
# name += f"-{config.tag}"
return sanitize_filename(name)
else:

View File

@@ -99,9 +99,9 @@ def sanitize_filename(filename: str, spacer: str = ".") -> str:
# remove or replace further characters as needed
filename = "".join(c for c in filename if unicodedata.category(c) != "Mn") # hidden characters
filename = filename.replace("/", " & ").replace(";", " & ") # e.g. multi-episode filenames
filename = re.sub(r"[:; ]", spacer, filename) # structural chars to (spacer)
filename = re.sub(r"[\\*!?¿,'\"" "()<>|$#~]", "", filename) # not filename safe chars
filename = re.sub(rf"[{spacer}]{{2,}}", spacer, filename) # remove extra neighbouring (spacer)s
filename = re.sub(r"[;]", spacer, filename) # structural chars to (spacer)
filename = re.sub(r"[\\:*!?¿,'\"""<>|$#~]", "", filename) # not filename safe chars
# filename = re.sub(rf"[{spacer}]{{2,}}", spacer, filename) # remove extra neighbouring (spacer)s
return filename

View File

@@ -0,0 +1,10 @@
from .MSLObject import MSLObject
class MSLKeys(MSLObject):
def __init__(self, encryption=None, sign=None, rsa=None, mastertoken=None, cdm_session=None):
self.encryption = encryption
self.sign = sign
self.rsa = rsa
self.mastertoken = mastertoken
self.cdm_session = cdm_session

View File

@@ -0,0 +1,6 @@
import jsonpickle
class MSLObject:
def __repr__(self):
return "<{} {}>".format(self.__class__.__name__, jsonpickle.encode(self, unpicklable=False))

View File

@@ -0,0 +1,408 @@
import base64
import gzip
import json
import logging
import os
import random
import re
import sys
import time
import zlib
from datetime import datetime
from io import BytesIO
import jsonpickle
import requests
from Cryptodome.Cipher import AES, PKCS1_OAEP
from Cryptodome.Hash import HMAC, SHA256
from Cryptodome.PublicKey import RSA
from Cryptodome.Random import get_random_bytes
from Cryptodome.Util import Padding
from unshackle.core.cacher import Cacher
from .MSLKeys import MSLKeys
from .schemes import EntityAuthenticationSchemes # noqa: F401
from .schemes import KeyExchangeSchemes
from .schemes.EntityAuthentication import EntityAuthentication
from .schemes.KeyExchangeRequest import KeyExchangeRequest
# from vinetrimmer.utils.widevine.device import RemoteDevice
class MSL:
log = logging.getLogger("MSL")
def __init__(self, session, endpoint, sender, keys, message_id, user_auth=None):
self.session = session
self.endpoint = endpoint
self.sender = sender
self.keys = keys
self.user_auth = user_auth
self.message_id = message_id
@classmethod
def handshake(cls, scheme: KeyExchangeSchemes, session: requests.Session, endpoint: str, sender: str, cache: Cacher):
cache = cache.get(sender)
message_id = random.randint(0, pow(2, 52))
msl_keys = MSL.load_cache_data(cache)
if msl_keys is not None:
cls.log.info("Using cached MSL data")
else:
msl_keys = MSLKeys()
if scheme != KeyExchangeSchemes.Widevine:
msl_keys.rsa = RSA.generate(2048)
# if not cdm:
# raise cls.log.exit("- No cached data and no CDM specified")
# if not msl_keys_path:
# raise cls.log.exit("- No cached data and no MSL key path specified")
# Key Exchange Scheme Widevine currently not implemented
# if scheme == KeyExchangeSchemes.Widevine:
# msl_keys.cdm_session = cdm.open(
# pssh=b"\x0A\x7A\x00\x6C\x38\x2B",
# raw=True,
# offline=True
# )
# keyrequestdata = KeyExchangeRequest.Widevine(
# keyrequest=cdm.get_license_challenge(msl_keys.cdm_session)
# )
# else:
keyrequestdata = KeyExchangeRequest.AsymmetricWrapped(
keypairid="superKeyPair",
mechanism="JWK_RSA",
publickey=msl_keys.rsa.publickey().exportKey(format="DER")
)
data = jsonpickle.encode({
"entityauthdata": EntityAuthentication.Unauthenticated(sender),
"headerdata": base64.b64encode(MSL.generate_msg_header(
message_id=message_id,
sender=sender,
is_handshake=True,
keyrequestdata=keyrequestdata
).encode("utf-8")).decode("utf-8"),
"signature": ""
}, unpicklable=False)
data += json.dumps({
"payload": base64.b64encode(json.dumps({
"messageid": message_id,
"data": "",
"sequencenumber": 1,
"endofmsg": True
}).encode("utf-8")).decode("utf-8"),
"signature": ""
})
try:
r = session.post(
url=endpoint,
data=data
)
except requests.HTTPError as e:
raise cls.log.exit(f"- Key exchange failed, response data is unexpected: {e.response.text}")
key_exchange = r.json() # expecting no payloads, so this is fine
if "errordata" in key_exchange:
raise cls.log.exit("- Key exchange failed: " + json.loads(base64.b64decode(
key_exchange["errordata"]
).decode())["errormsg"])
# parse the crypto keys
key_response_data = json.JSONDecoder().decode(base64.b64decode(
key_exchange["headerdata"]
).decode("utf-8"))["keyresponsedata"]
if key_response_data["scheme"] != str(scheme):
raise cls.log.exit("- Key exchange scheme mismatch occurred")
key_data = key_response_data["keydata"]
# if scheme == KeyExchangeSchemes.Widevine:
# if isinstance(cdm.device, RemoteDevice):
# msl_keys.encryption, msl_keys.sign = cdm.device.exchange(
# cdm.sessions[msl_keys.cdm_session],
# license_res=key_data["cdmkeyresponse"],
# enc_key_id=base64.b64decode(key_data["encryptionkeyid"]),
# hmac_key_id=base64.b64decode(key_data["hmackeyid"])
# )
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
# else:
# cdm.parse_license(msl_keys.cdm_session, key_data["cdmkeyresponse"])
# keys = cdm.get_keys(msl_keys.cdm_session)
# msl_keys.encryption = MSL.get_widevine_key(
# kid=base64.b64decode(key_data["encryptionkeyid"]),
# keys=keys,
# permissions=["AllowEncrypt", "AllowDecrypt"]
# )
# msl_keys.sign = MSL.get_widevine_key(
# kid=base64.b64decode(key_data["hmackeyid"]),
# keys=keys,
# permissions=["AllowSign", "AllowSignatureVerify"]
# )
# else:
cipher_rsa = PKCS1_OAEP.new(msl_keys.rsa)
msl_keys.encryption = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["encryptionkey"])
).decode("utf-8"))["k"]
)
msl_keys.sign = MSL.base64key_decode(
json.JSONDecoder().decode(cipher_rsa.decrypt(
base64.b64decode(key_data["hmackey"])
).decode("utf-8"))["k"]
)
msl_keys.mastertoken = key_response_data["mastertoken"]
MSL.cache_keys(msl_keys, cache)
cls.log.info("MSL handshake successful")
return cls(
session=session,
endpoint=endpoint,
sender=sender,
keys=msl_keys,
message_id=message_id
)
@staticmethod
def load_cache_data(cacher: Cacher):
if not cacher or cacher == {}:
return None
# with open(msl_keys_path, encoding="utf-8") as fd:
# msl_keys = jsonpickle.decode(fd.read())
msl_keys = jsonpickle.decode(cacher.data)
if msl_keys.rsa:
# noinspection PyTypeChecker
# expects RsaKey, but is a string, this is because jsonpickle can't pickle RsaKey object
# so as a workaround it exports to PEM, and then when reading, it imports that PEM back
# to an RsaKey :)
msl_keys.rsa = RSA.importKey(msl_keys.rsa)
# If it's expired or close to, return None as it's unusable
if msl_keys.mastertoken and ((datetime.utcfromtimestamp(int(json.JSONDecoder().decode(
base64.b64decode(msl_keys.mastertoken["tokendata"]).decode("utf-8")
)["expiration"])) - datetime.now()).total_seconds() / 60 / 60) < 10:
return None
return msl_keys
@staticmethod
def cache_keys(msl_keys, cache: Cacher):
# os.makedirs(os.path.dirname(cache), exist_ok=True)
if msl_keys.rsa:
# jsonpickle can't pickle RsaKey objects :(
msl_keys.rsa = msl_keys.rsa.export_key()
# with open(cache, "w", encoding="utf-8") as fd:
# fd.write()
cache.set(jsonpickle.encode(msl_keys))
if msl_keys.rsa:
# re-import now
msl_keys.rsa = RSA.importKey(msl_keys.rsa)
@staticmethod
def generate_msg_header(message_id, sender, is_handshake, userauthdata=None, keyrequestdata=None,
compression="GZIP"):
"""
The MSL header carries all MSL data used for entity and user authentication, message encryption
and verification, and service tokens. Portions of the MSL header are encrypted.
https://github.com/Netflix/msl/wiki/Messages#header-data
:param message_id: number against which payload chunks are bound to protect against replay.
:param sender: ESN
:param is_handshake: This flag is set true if the message is a handshake message and will not include any
payload chunks. It will include keyrequestdata.
:param userauthdata: UserAuthData
:param keyrequestdata: KeyRequestData
:param compression: Supported compression algorithms.
:return: The base64 encoded JSON String of the header
"""
header_data = {
"messageid": message_id,
"renewable": True, # MUST be True if is_handshake
"handshake": is_handshake,
"capabilities": {
"compressionalgos": [compression] if compression else [],
"languages": ["en-US"], # bcp-47
"encoderformats": ["JSON"]
},
"timestamp": int(time.time()),
# undocumented or unused:
"sender": sender,
"nonreplayable": False,
"recipient": "Netflix",
}
if userauthdata:
header_data["userauthdata"] = userauthdata
if keyrequestdata:
header_data["keyrequestdata"] = [keyrequestdata]
return jsonpickle.encode(header_data, unpicklable=False)
@classmethod
def get_widevine_key(cls, kid, keys, permissions):
for key in keys:
if key.kid != kid:
continue
if key.type != "OPERATOR_SESSION":
cls.log.warning(f"Widevine Key Exchange: Wrong key type (not operator session) key {key}")
continue
if not set(permissions) <= set(key.permissions):
cls.log.warning(f"Widevine Key Exchange: Incorrect permissions, key {key}, needed perms {permissions}")
continue
return key.key
return None
def send_message(self, endpoint, params, application_data, userauthdata=None):
message = self.create_message(application_data, userauthdata)
res = self.session.post(url=endpoint, data=message, params=params)
header, payload_data = self.parse_message(res.text)
if "errordata" in header:
raise self.log.exit(
"- MSL response message contains an error: {}".format(
json.loads(base64.b64decode(header["errordata"].encode("utf-8")).decode("utf-8"))
)
)
return header, payload_data
def create_message(self, application_data, userauthdata=None):
self.message_id += 1 # new message must ue a new message id
headerdata = self.encrypt(self.generate_msg_header(
message_id=self.message_id,
sender=self.sender,
is_handshake=False,
userauthdata=userauthdata
))
header = json.dumps({
"headerdata": base64.b64encode(headerdata.encode("utf-8")).decode("utf-8"),
"signature": self.sign(headerdata).decode("utf-8"),
"mastertoken": self.keys.mastertoken
})
payload_chunks = [self.encrypt(json.dumps({
"messageid": self.message_id,
"data": self.gzip_compress(json.dumps(application_data).encode("utf-8")).decode("utf-8"),
"compressionalgo": "GZIP",
"sequencenumber": 1, # todo ; use sequence_number from master token instead?
"endofmsg": True
}))]
message = header
for payload_chunk in payload_chunks:
message += json.dumps({
"payload": base64.b64encode(payload_chunk.encode("utf-8")).decode("utf-8"),
"signature": self.sign(payload_chunk).decode("utf-8")
})
return message
def decrypt_payload_chunks(self, payload_chunks):
"""
Decrypt and extract data from payload chunks
:param payload_chunks: List of payload chunks
:return: json object
"""
raw_data = ""
for payload_chunk in payload_chunks:
# todo ; verify signature of payload_chunk["signature"] against payload_chunk["payload"]
# expecting base64-encoded json string
payload_chunk = json.loads(base64.b64decode(payload_chunk["payload"]).decode("utf-8"))
# decrypt the payload
payload_decrypted = AES.new(
key=self.keys.encryption,
mode=AES.MODE_CBC,
iv=base64.b64decode(payload_chunk["iv"])
).decrypt(base64.b64decode(payload_chunk["ciphertext"]))
payload_decrypted = Padding.unpad(payload_decrypted, 16)
payload_decrypted = json.loads(payload_decrypted.decode("utf-8"))
# decode and uncompress data if compressed
payload_data = base64.b64decode(payload_decrypted["data"])
if payload_decrypted.get("compressionalgo") == "GZIP":
payload_data = zlib.decompress(payload_data, 16 + zlib.MAX_WBITS)
raw_data += payload_data.decode("utf-8")
data = json.loads(raw_data)
if "error" in data:
error = data["error"]
error_display = error.get("display")
error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", ""))
if error_display:
self.log.critical(f"- {error_display}")
if error_detail:
self.log.critical(f"- {error_detail}")
if not (error_display or error_detail):
self.log.critical(f"- {error}")
# sys.exit(1)
return data["result"]
def parse_message(self, message):
"""
Parse an MSL message into a header and list of payload chunks
:param message: MSL message
:returns: a 2-item tuple containing message and list of payload chunks if available
"""
parsed_message = json.loads("[{}]".format(message.replace("}{", "},{")))
header = parsed_message[0]
encrypted_payload_chunks = parsed_message[1:] if len(parsed_message) > 1 else []
if encrypted_payload_chunks:
payload_chunks = self.decrypt_payload_chunks(encrypted_payload_chunks)
else:
payload_chunks = {}
return header, payload_chunks
@staticmethod
def gzip_compress(data):
out = BytesIO()
with gzip.GzipFile(fileobj=out, mode="w") as fd:
fd.write(data)
return base64.b64encode(out.getvalue())
@staticmethod
def base64key_decode(payload):
length = len(payload) % 4
if length == 2:
payload += "=="
elif length == 3:
payload += "="
elif length != 0:
raise ValueError("Invalid base64 string")
return base64.urlsafe_b64decode(payload.encode("utf-8"))
def encrypt(self, plaintext):
"""
Encrypt the given Plaintext with the encryption key
:param plaintext:
:return: Serialized JSON String of the encryption Envelope
"""
iv = get_random_bytes(16)
return json.dumps({
"ciphertext": base64.b64encode(
AES.new(
self.keys.encryption,
AES.MODE_CBC,
iv
).encrypt(
Padding.pad(plaintext.encode("utf-8"), 16)
)
).decode("utf-8"),
"keyid": "{}_{}".format(self.sender, json.loads(
base64.b64decode(self.keys.mastertoken["tokendata"]).decode("utf-8")
)["sequencenumber"]),
"sha256": "AA==",
"iv": base64.b64encode(iv).decode("utf-8")
})
def sign(self, text):
"""
Calculates the HMAC signature for the given text with the current sign key and SHA256
:param text:
:return: Base64 encoded signature
"""
return base64.b64encode(HMAC.new(self.keys.sign, text.encode("utf-8"), SHA256).digest())

View File

@@ -0,0 +1,59 @@
from .. import EntityAuthenticationSchemes
from ..MSLObject import MSLObject
# noinspection PyPep8Naming
class EntityAuthentication(MSLObject):
def __init__(self, scheme, authdata):
"""
Data used to identify and authenticate the entity associated with a message.
https://github.com/Netflix/msl/wiki/Entity-Authentication-%28Configuration%29
:param scheme: Entity Authentication Scheme identifier
:param authdata: Entity Authentication data
"""
self.scheme = str(scheme)
self.authdata = authdata
@classmethod
def Unauthenticated(cls, identity):
"""
The unauthenticated entity authentication scheme does not provide encryption or authentication and only
identifies the entity. Therefore entity identities can be harvested and spoofed. The benefit of this
authentication scheme is that the entity has control over its identity. This may be useful if the identity is
derived from or related to other data, or if retaining the identity is desired across state resets or in the
event of MSL errors requiring entity re-authentication.
"""
return cls(
scheme=EntityAuthenticationSchemes.Unauthenticated,
authdata={"identity": identity}
)
@classmethod
def Widevine(cls, devtype, keyrequest):
"""
The Widevine entity authentication scheme is used by devices with the Widevine CDM. It does not provide
encryption or authentication and only identifies the entity. Therefore entity identities can be harvested
and spoofed. The entity identity is composed from the provided device type and Widevine key request data. The
Widevine CDM properties can be extracted from the key request data.
When coupled with the Widevine key exchange scheme, the entity identity can be cryptographically validated by
comparing the entity authentication key request data against the key exchange key request data.
Note that the local entity will not know its entity identity when using this scheme.
> Devtype
An arbitrary value identifying the device type the local entity wishes to assume. The data inside the Widevine
key request may be optionally used to validate the claimed device type.
:param devtype: Local entity device type
:param keyrequest: Widevine key request
"""
return cls(
scheme=EntityAuthenticationSchemes.Widevine,
authdata={
"devtype": devtype,
"keyrequest": keyrequest
}
)

View File

@@ -0,0 +1,80 @@
import base64
from .. import KeyExchangeSchemes
from ..MSLObject import MSLObject
# noinspection PyPep8Naming
class KeyExchangeRequest(MSLObject):
def __init__(self, scheme, keydata):
"""
Session key exchange data from a requesting entity.
https://github.com/Netflix/msl/wiki/Key-Exchange-%28Configuration%29
:param scheme: Key Exchange Scheme identifier
:param keydata: Key Request data
"""
self.scheme = str(scheme)
self.keydata = keydata
@classmethod
def AsymmetricWrapped(cls, keypairid, mechanism, publickey):
"""
Asymmetric wrapped key exchange uses a generated ephemeral asymmetric key pair for key exchange. It will
typically be used when there is no other data or keys from which to base secure key exchange.
This mechanism provides perfect forward secrecy but does not guarantee that session keys will only be available
to the requesting entity if the requesting MSL stack has been modified to perform the operation on behalf of a
third party.
> Key Pair ID
The key pair ID is included as a sanity check.
> Mechanism & Public Key
The following mechanisms are associated public key formats are currently supported.
Field Public Key Format Description
RSA SPKI RSA-OAEP encrypt/decrypt
ECC SPKI ECIES encrypt/decrypt
JWEJS_RSA SPKI RSA-OAEP JSON Web Encryption JSON Serialization
JWE_RSA SPKI RSA-OAEP JSON Web Encryption Compact Serialization
JWK_RSA SPKI RSA-OAEP JSON Web Key
JWK_RSAES SPKI RSA PKCS#1 JSON Web Key
:param keypairid: key pair ID
:param mechanism: asymmetric key type
:param publickey: public key
"""
return cls(
scheme=KeyExchangeSchemes.AsymmetricWrapped,
keydata={
"keypairid": keypairid,
"mechanism": mechanism,
"publickey": base64.b64encode(publickey).decode("utf-8")
}
)
@classmethod
def Widevine(cls, keyrequest):
"""
Google Widevine provides a secure key exchange mechanism. When requested the Widevine component will issue a
one-time use key request. The Widevine server library can be used to authenticate the request and return
randomly generated symmetric keys in a protected key response bound to the request and Widevine client library.
The key response also specifies the key identities, types and their permitted usage.
The Widevine key request also contains a model identifier and a unique device identifier with an expectation of
long-term persistence. These values are available from the Widevine client library and can be retrieved from
the key request by the Widevine server library.
The Widevine client library will protect the returned keys from inspection or misuse.
:param keyrequest: Base64-encoded Widevine CDM license challenge (PSSH: b'\x0A\x7A\x00\x6C\x38\x2B')
"""
if not isinstance(keyrequest, str):
keyrequest = base64.b64encode(keyrequest).decode()
return cls(
scheme=KeyExchangeSchemes.Widevine,
keydata={"keyrequest": keyrequest}
)

View File

@@ -0,0 +1,59 @@
from ..MSLObject import MSLObject
from . import UserAuthenticationSchemes
# noinspection PyPep8Naming
class UserAuthentication(MSLObject):
def __init__(self, scheme, authdata):
"""
Data used to identify and authenticate the user associated with a message.
https://github.com/Netflix/msl/wiki/User-Authentication-%28Configuration%29
:param scheme: User Authentication Scheme identifier
:param authdata: User Authentication data
"""
self.scheme = str(scheme)
self.authdata = authdata
@classmethod
def EmailPassword(cls, email, password):
"""
Email and password is a standard user authentication scheme in wide use.
:param email: user email address
:param password: user password
"""
return cls(
scheme=UserAuthenticationSchemes.EmailPassword,
authdata={
"email": email,
"password": password
}
)
@classmethod
def NetflixIDCookies(cls, netflixid, securenetflixid):
"""
Netflix ID HTTP cookies are used when the user has previously logged in to a web site. Possession of the
cookies serves as proof of user identity, in the same manner as they do when communicating with the web site.
The Netflix ID cookie and Secure Netflix ID cookie are HTTP cookies issued by the Netflix web site after
subscriber login. The Netflix ID cookie is encrypted and identifies the subscriber and analogous to a
subscribers username. The Secure Netflix ID cookie is tied to a Netflix ID cookie and only sent over HTTPS
and analogous to a subscribers password.
In some cases the Netflix ID and Secure Netflix ID cookies will be unavailable to the MSL stack or application.
If either or both of the Netflix ID or Secure Netflix ID cookies are absent in the above data structure the
HTTP cookie headers will be queried for it; this is only acceptable when HTTPS is used as the underlying
transport protocol.
:param netflixid: Netflix ID cookie
:param securenetflixid: Secure Netflix ID cookie
"""
return cls(
scheme=UserAuthenticationSchemes.NetflixIDCookies,
authdata={
"netflixid": netflixid,
"securenetflixid": securenetflixid
}
)

View File

@@ -0,0 +1,24 @@
from enum import Enum
class Scheme(Enum):
def __str__(self):
return str(self.value)
class EntityAuthenticationSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/Entity-Authentication-%28Configuration%29"""
Unauthenticated = "NONE"
Widevine = "WIDEVINE"
class UserAuthenticationSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/User-Authentication-%28Configuration%29"""
EmailPassword = "EMAIL_PASSWORD"
NetflixIDCookies = "NETFLIXID"
class KeyExchangeSchemes(Scheme):
"""https://github.com/Netflix/msl/wiki/Key-Exchange-%28Configuration%29"""
AsymmetricWrapped = "ASYMMETRIC_WRAPPED"
Widevine = "WIDEVINE"

View File

@@ -0,0 +1,978 @@
import base64
from datetime import datetime
import json
from math import e
import random
import sys
import time
import typing
from uuid import UUID
import click
import re
from typing import List, Literal, Optional, Set, Union, Tuple
from http.cookiejar import CookieJar
from itertools import zip_longest
from Crypto.Random import get_random_bytes
import jsonpickle
from pymp4.parser import Box
from pywidevine import PSSH, Cdm
import requests
from langcodes import Language
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.drm.widevine import Widevine
from unshackle.core.service import Service
from unshackle.core.titles import Titles_T, Title_T
from unshackle.core.titles.episode import Episode, Series
from unshackle.core.titles.movie import Movie, Movies
from unshackle.core.titles.title import Title
from unshackle.core.tracks import Tracks, Chapters
from unshackle.core.tracks.audio import Audio
from unshackle.core.tracks.chapter import Chapter
from unshackle.core.tracks.subtitle import Subtitle
from unshackle.core.tracks.track import Track
from unshackle.core.tracks.video import Video
from unshackle.core.utils.collections import flatten, as_list
from unshackle.core.tracks.attachment import Attachment
from unshackle.core.drm.playready import PlayReady
from unshackle.core.titles.song import Song
from unshackle.utils.base62 import decode
from .MSL import MSL, KeyExchangeSchemes
from .MSL.schemes.UserAuthentication import UserAuthentication
class Netflix(Service):
"""
Service for https://netflix.com
Version: 1.0.0
Authorization: Cookies
Security: UHD@SL3000/L1 FHD@SL3000/L1
"""
TITLE_RE = [
r"^(?:https?://(?:www\.)?netflix\.com(?:/[a-z0-9]{2})?/(?:title/|watch/|.+jbv=))?(?P<id>\d+)",
r"^https?://(?:www\.)?unogs\.com/title/(?P<id>\d+)",
]
ALIASES= ("NF", "Netflix")
NF_LANG_MAP = {
"es": "es-419",
"pt": "pt-PT",
}
@staticmethod
@click.command(name="Netflix", short_help="https://netflix.com")
@click.argument("title", type=str)
@click.option("-drm", "--drm-system", type=click.Choice(["widevine", "playready"], case_sensitive=False),
default="widevine",
help="which drm system to use")
@click.option("-p", "--profile", type=click.Choice(["MPL", "HPL", "QC", "MPL+HPL", "MPL+HPL+QC", "MPL+QC"], case_sensitive=False),
default=None,
help="H.264 profile to use. Default is best available.")
@click.option("--meta-lang", type=str, help="Language to use for metadata")
@click.option("-ht","--hydrate-track", is_flag=True, default=False, help="Hydrate missing audio and subtitle.")
@click.option("-hb", "--high-bitrate", is_flag=True, default=False, help="Get more video bitrate")
@click.pass_context
def cli(ctx, **kwargs):
return Netflix(ctx, **kwargs)
def __init__(self, ctx: click.Context, title: str, drm_system: Literal["widevine", "playready"], profile: str, meta_lang: str, hydrate_track: bool, high_bitrate: bool):
super().__init__(ctx)
# General
self.title = title
self.profile = profile
self.meta_lang = meta_lang
self.hydrate_track = hydrate_track
self.drm_system = drm_system
self.profiles: List[str] = []
self.requested_profiles: List[str] = []
self.high_bitrate = high_bitrate
# MSL
self.esn = self.cache.get("ESN")
self.msl: Optional[MSL] = None
self.userauthdata = None
# Download options
self.range = ctx.parent.params.get("range_") or [Video.Range.SDR]
self.vcodec = ctx.parent.params.get("vcodec") or Video.Codec.AVC # Defaults to H264
self.acodec : Audio.Codec = ctx.parent.params.get("acodec") or Audio.Codec.EC3
self.quality: List[int] = ctx.parent.params.get("quality")
self.audio_only = ctx.parent.params.get("audio_only")
self.subs_only = ctx.parent.params.get("subs_only")
self.chapters_only = ctx.parent.params.get("chapters_only")
def authenticate(self, cookies: Optional[CookieJar] = None, credential: Optional[Credential] = None) -> None:
# Configure first before download
self.log.debug("Authenticating Netflix service")
auth = super().authenticate(cookies, credential)
if not cookies:
raise EnvironmentError("Service requires Cookies for Authentication.")
self.configure()
return auth
def get_titles(self) -> Titles_T:
metadata = self.get_metadata(self.title)
if "video" not in metadata:
self.log.error(f"Failed to get metadata: {metadata}")
sys.exit(1)
titles: Titles_T | None = None
if metadata["video"]["type"] == "movie":
movie = Movie(
id_=self.title,
name=metadata["video"]["title"],
year=metadata["video"]["year"],
# language=self.get_original_language(self.get_manifest()),
service=self.__class__,
data=metadata["video"],
description=metadata["video"]["synopsis"]
)
movie.language = self.get_original_language(self.get_manifest(movie, self.profiles))
titles = Movies([
movie
])
else:
# self.log.warning(f"Metadata: {jsonpickle.encode(metadata, indent=2)}")
# print(metadata)
episode_list: List[Episode] = []
for season in metadata["video"]["seasons"]:
for episodes in season["episodes"]:
episode = Episode(
id_=self.title,
title=metadata["video"]["title"],
year=season["year"],
service=self.__class__,
season=season["seq"],
number=episodes["seq"],
name=episodes["title"],
data=episodes,
description=episodes["synopsis"],
)
try:
episode.language = self.get_original_language(self.get_manifest(episode, self.profiles))
self.log.debug(f"Episode S{episode.season:02d}E{episode.number:02d}: {episode.language}")
except Exception as e:
self.log.warning(f"Failed to get original language for episode S{season['seq']:02d}E{episodes['seq']:02d}: {e}")
# Fallback: try to get the original language from the first episode that worked
# or default to English if none worked
if episode_list and hasattr(episode_list[0], 'language') and episode_list[0].language:
episode.language = episode_list[0].language
else:
episode.language = Language.get("en")
self.log.info(f"Using fallback language for episode: {episode.language}")
episode_list.append(
episode
)
titles = Series(episode_list)
return titles
def get_tracks(self, title: Title_T) -> Tracks:
tracks = Tracks()
# If Video Codec is H.264 is selected but `self.profile is none` profile QC has to be requested seperately
if self.vcodec == Video.Codec.AVC:
# self.log.info(f"Profile: {self.profile}")
try:
manifest = self.get_manifest(title, self.profiles)
movie_track = self.manifest_as_tracks(manifest, title, self.hydrate_track)
tracks.add(movie_track)
if self.profile is not None:
self.log.info(f"Requested profiles: {self.profile}")
else:
qc_720_profile = [x for x in self.config["profiles"]["video"][self.vcodec.extension.upper()]["QC"] if "l40" not in x and 720 in self.quality]
qc_manifest = self.get_manifest(title, qc_720_profile if 720 in self.quality else self.config["profiles"]["video"][self.vcodec.extension.upper()]["QC"])
qc_tracks = self.manifest_as_tracks(qc_manifest, title, False)
tracks.add(qc_tracks.videos)
mpl_manifest = self.get_manifest(title, [x for x in self.config["profiles"]["video"][self.vcodec.extension.upper()]["MPL"] if "l40" not in x])
mpl_tracks = self.manifest_as_tracks(mpl_manifest, title, False)
tracks.add(mpl_tracks.videos)
except Exception as e:
self.log.error(e)
else:
if self.high_bitrate:
splitted_profiles = self.split_profiles(self.profiles)
for index, profile_list in enumerate(splitted_profiles):
try:
self.log.debug(f"Index: {index}. Getting profiles: {profile_list}")
manifest = self.get_manifest(title, profile_list)
manifest_tracks = self.manifest_as_tracks(manifest, title, self.hydrate_track if index == 0 else False)
tracks.add(manifest_tracks if index == 0 else manifest_tracks.videos)
except Exception:
self.log.error(f"Error getting profile: {profile_list}. Skipping")
continue
else:
try:
manifest = self.get_manifest(title, self.profiles)
manifest_tracks = self.manifest_as_tracks(manifest, title, self.hydrate_track)
tracks.add(manifest_tracks)
except Exception as e:
self.log.error(e)
# Add Attachments for profile picture
if isinstance(title, Movie):
if title.data and "boxart" in title.data and title.data["boxart"]:
tracks.add(
Attachment.from_url(
url=title.data["boxart"][0]["url"]
)
)
else:
if title.data and "stills" in title.data and title.data["stills"]:
tracks.add(
Attachment.from_url(title.data["stills"][0]["url"])
)
return tracks
def split_profiles(self, profiles: List[str]) -> List[List[str]]:
"""
Split profiles with names containing specific patterns based on video codec
For H264: uses patterns "l30", "l31", "l40" (lowercase)
For non-H264: uses patterns "L30", "L31", "L40", "L41", "L50", "L51" (uppercase)
Returns List[List[str]] type with profiles grouped by pattern
"""
# Define the profile patterns to match based on video codec
if self.vcodec == Video.Codec.AVC: # H264
patterns = ["l30", "l31", "l40"]
else:
patterns = ["L30", "L31", "L40", "L41", "L50", "L51"]
# Group profiles by pattern
result: List[List[str]] = []
for pattern in patterns:
pattern_group = []
for profile in profiles:
if pattern in profile:
pattern_group.append(profile)
if pattern_group: # Only add non-empty groups
result.append(pattern_group)
return result
def get_chapters(self, title: Title_T) -> Chapters:
chapters: Chapters = Chapters()
if not title.data:
return chapters
try:
# self.log.info(f"Title data: {title.data}")
if "skipMarkers" in title.data and "credit" in title.data["skipMarkers"]:
credits = title.data["skipMarkers"]["credit"]
if credits.get("start", 0) > 0 and credits.get("end", 0) > 0:
chapters.add(Chapter(
timestamp=credits["start"], # Milliseconds
name="Intro"
))
chapters.add(
Chapter(
timestamp=credits["end"], # Milliseconds
)
)
if "creditsOffset" in title.data and title.data["creditsOffset"] is not None:
chapters.add(Chapter(
timestamp=float(title.data["creditsOffset"]), # this is seconds, needed to assign to float
name="Credits"
))
except Exception as e:
self.log.warning(f"Failed to process chapters: {e}")
return chapters
def get_widevine_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None:
if not self.msl:
self.log.error(f"MSL Client is not intialized!")
sys.exit(1)
application_data = {
"version": 2,
"url": track.data["license_url"],
"id": int(time.time() * 10000),
"esn": self.esn.data,
"languages": ["en-US"],
# "uiVersion": "shakti-v9dddfde5",
"clientVersion": "6.0026.291.011",
"params": [{
"sessionId": base64.b64encode(get_random_bytes(16)).decode("utf-8"),
"clientTime": int(time.time()),
"challengeBase64": base64.b64encode(challenge).decode("utf-8"),
"xid": str(int((int(time.time()) + 0.1612) * 1000)),
}],
"echo": "sessionId"
}
header, payload_data = self.msl.send_message(
endpoint=self.config["endpoints"]["license"],
params={
"reqAttempt": 1,
"reqName": "license",
},
application_data=application_data,
userauthdata=self.userauthdata
)
if not payload_data:
self.log.error(f" - Failed to get license: {header['message']} [{header['code']}]")
sys.exit(1)
if "error" in payload_data[0]:
error = payload_data[0]["error"]
error_display = error.get("display")
error_detail = re.sub(r" \(E3-[^)]+\)", "", error.get("detail", ""))
if error_display:
self.log.critical(f" - {error_display}")
if error_detail:
self.log.critical(f" - {error_detail}")
if not (error_display or error_detail):
self.log.critical(f" - {error}")
sys.exit(1)
return payload_data[0]["licenseResponseBase64"]
def get_playready_license(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str | None:
return None
# return super().get_widevine_license(challenge=challenge, title=title, track=track)
def configure(self):
# self.log.info(ctx)
# if profile is none from argument let's use them all profile in video codec scope
# self.log.info(f"Requested profiles: {self.profile}")
if self.profile is None:
self.profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
if self.profile is not None:
self.requested_profiles = self.profile.split('+')
self.log.info(f"Requested profile: {self.requested_profiles}")
else:
# self.log.info(f"Video Range: {self.range}")
self.requested_profiles = self.config["profiles"]["video"][self.vcodec.extension.upper()]
# Make sure video codec is supported by Netflix
if self.vcodec.extension.upper() not in self.config["profiles"]["video"]:
raise ValueError(f"Video Codec {self.vcodec} is not supported by Netflix")
if self.range[0].name not in list(self.config["profiles"]["video"][self.vcodec.extension.upper()].keys()) and self.vcodec != Video.Codec.AVC and self.vcodec != Video.Codec.VP9:
self.log.error(f"Video range {self.range[0].name} is not supported by Video Codec: {self.vcodec}")
sys.exit(1)
if len(self.range) > 1:
self.log.error(f"Multiple video range is not supported right now.")
sys.exit(1)
if self.vcodec == Video.Codec.AVC and self.range[0] != Video.Range.SDR:
self.log.error(f"H.264 Video Codec only supports SDR")
sys.exit(1)
self.profiles = self.get_profiles()
self.log.info("Intializing a MSL client")
self.get_esn()
scheme = KeyExchangeSchemes.AsymmetricWrapped
self.log.info(f"Scheme: {scheme}")
self.msl = MSL.handshake(
scheme=scheme,
session=self.session,
endpoint=self.config["endpoints"]["manifest"],
sender=self.esn.data,
cache=self.cache.get("MSL")
)
cookie = self.session.cookies.get_dict()
self.userauthdata = UserAuthentication.NetflixIDCookies(
netflixid=cookie["NetflixId"],
securenetflixid=cookie["SecureNetflixId"]
)
def get_profiles(self):
result_profiles = []
if self.vcodec == Video.Codec.AVC:
if self.requested_profiles is not None:
for requested_profiles in self.requested_profiles:
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()][requested_profiles])))
return result_profiles
result_profiles.extend(flatten(list(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())))
return result_profiles
# Handle case for codec VP9
if self.vcodec == Video.Codec.VP9 and self.range[0] != Video.Range.HDR10:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()].values())
return result_profiles
for profiles in self.config["profiles"]["video"][self.vcodec.extension.upper()]:
for range in self.range:
if range in profiles:
result_profiles.extend(self.config["profiles"]["video"][self.vcodec.extension.upper()][range.name])
# sys.exit(1)
self.log.debug(f"Result_profiles: {result_profiles}")
return result_profiles
def get_esn(self):
ESN_GEN = "".join(random.choice("0123456789ABCDEF") for _ in range(30))
esn_value = f"NFCDIE-03-{ESN_GEN}"
# Check if ESN is expired or doesn't exist
if self.esn.data is None or self.esn.data == {} or (hasattr(self.esn, 'expired') and self.esn.expired):
# Set new ESN with 6-hour expiration
self.esn.set(esn_value, 1 * 60 * 60) # 6 hours in seconds
self.log.info(f"Generated new ESN with 1-hour expiration")
else:
self.log.info(f"Using cached ESN.")
self.log.info(f"ESN: {self.esn.data}")
def get_metadata(self, title_id: str):
"""
Obtain Metadata information about a title by it's ID.
:param title_id: Title's ID.
:returns: Title Metadata.
"""
try:
metadata = self.session.get(
self.config["endpoints"]["metadata"].format(build_id="release"),
params={
"movieid": title_id,
"drmSystem": self.config["configuration"]["drm_system"],
"isWatchlistEnabled": False,
"isShortformEnabled": False,
"languages": self.meta_lang
}
).json()
except requests.HTTPError as e:
if e.response.status_code == 500:
self.log.warning(
" - Recieved a HTTP 500 error while getting metadata, deleting cached reactContext data"
)
# self.cache.
# os.unlink(self.get_cache("web_data.json"))
# return self.get_metadata(self, title_id)
raise Exception(f"Error getting metadata: {e}")
except json.JSONDecodeError:
self.log.error(" - Failed to get metadata, title might not be available in your region.")
sys.exit(1)
else:
if "status" in metadata and metadata["status"] == "error":
self.log.error(
f" - Failed to get metadata, cookies might be expired. ({metadata['message']})"
)
sys.exit(1)
return metadata
def _get_empty_manifest(self):
"""Return an empty manifest structure to prevent crashes when manifest retrieval fails"""
return {
"video_tracks": [{
"streams": [],
"drmHeader": {"bytes": b""}
}],
"audio_tracks": [],
"timedtexttracks": [],
"links": {
"license": {"href": ""}
}
}
def get_manifest(self, title: Title_T, video_profiles: List[str], required_text_track_id: Optional[str] = None, required_audio_track_id: Optional[str] = None):
try:
# Log context information for debugging
title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'
self.log.debug(f"Getting manifest for title_id: {title_id}, video_profiles_count: {len(video_profiles)}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}")
audio_profiles = self.config["profiles"]["audio"].values()
video_profiles = sorted(set(flatten(as_list(
video_profiles,
audio_profiles,
self.config["profiles"]["video"]["H264"]["BPL"] if self.vcodec == Video.Codec.AVC else [],
self.config["profiles"]["subtitles"],
))))
self.log.debug("Profiles:\n\t" + "\n\t".join(video_profiles))
if not self.msl:
self.log.error(f"MSL Client is not initialized for title_id: {title_id}")
return self._get_empty_manifest()
params = {
"reqAttempt": 1,
"reqPriority": 10,
"reqName": "manifest",
}
_, payload_chunks = self.msl.send_message(
endpoint=self.config["endpoints"]["manifest"],
params=params,
application_data={
"version": 2,
"url": "manifest",
"id": int(time.time()),
"esn": self.esn.data,
"languages": ["en-US"],
"clientVersion": "6.0026.291.011",
"params": {
"clientVersion": "6.0051.090.911",
"challenge": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"],
"challanges": {
"default": self.config["payload_challenge_pr"] if self.drm_system == 'playready' else self.config["payload_challenge"]
},
"contentPlaygraph": ["v2"],
"deviceSecurityLevel": "3000",
"drmVersion": 25,
"desiredVmaf": "plus_lts",
"desiredSegmentVmaf": "plus_lts",
"flavor": "STANDARD", # ? PRE_FETCH, SUPPLEMENTAL
"drmType": self.drm_system,
"imageSubtitleHeight": 1080,
"isBranching": False,
"isNonMember": False,
"isUIAutoPlay": False,
"licenseType": "standard",
"liveAdsCapability": "replace",
"liveMetadataFormat": "INDEXED_SEGMENT_TEMPLATE",
"manifestVersion": "v2",
"osName": "windows",
"osVersion": "10.0",
"platform": "138.0.0.0",
"profilesGroups": [{
"name": "default",
"profiles": video_profiles
}],
"profiles": video_profiles,
"preferAssistiveAudio": False,
"requestSegmentVmaf": False,
"requiredAudioTrackId": required_audio_track_id, # This is for getting missing audio tracks (value get from `new_track_id``)
"requiredTextTrackId": required_text_track_id, # This is for getting missing subtitle. (value get from `new_track_id``)
"supportsAdBreakHydration": False,
"supportsNetflixMediaEvents": True,
"supportsPartialHydration": True, # This is important if you want get available all tracks. but you must fetch each missing url tracks with "requiredAudioTracksId" or "requiredTextTrackId"
"supportsPreReleasePin": True,
"supportsUnequalizedDownloadables": True,
"supportsWatermark": True,
"titleSpecificData": {
(title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"): {"unletterboxed": False}
},
"type": "standard", # ? PREPARE
"uiPlatform": "SHAKTI",
"uiVersion": "shakti-v49577320",
"useBetterTextUrls": True,
"useHttpsStreams": True,
"usePsshBox": True,
"videoOutputInfo": [{
# todo ; make this return valid, but "secure" values, maybe it helps
"type": "DigitalVideoOutputDescriptor",
"outputType": "unknown",
"supportedHdcpVersions": self.config["configuration"]["supported_hdcp_versions"],
"isHdcpEngaged": self.config["configuration"]["is_hdcp_engaged"]
}],
"viewableId": (title.data.get("episodeId") if title.data else None) or (title.data.get("id") if title.data else "unknown"),
"xid": str(int((int(time.time()) + 0.1612) * 1000)),
"showAllSubDubTracks": True,
}
},
userauthdata=self.userauthdata
)
if "errorDetails" in payload_chunks:
self.log.error(f"Manifest call failed for title_id: {title_id}, required_audio_track_id: {required_audio_track_id}, required_text_track_id: {required_text_track_id}, error: {payload_chunks['errorDetails']}")
return self._get_empty_manifest()
# with open(f"./manifest_{"+".join(video_profiles)}.json", mode='w') as r:
# r.write(jsonpickle.encode(payload_chunks, indent=4))
return payload_chunks
except Exception as e:
title_id = title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'
profile_count = len(video_profiles) if 'video_profiles' in locals() else 0
self.log.error(f"Exception in get_manifest: {e}")
self.log.error(f"Context - title_id: {title_id}, video_profiles_count: {profile_count}, required_audio_track_id: {required_audio_track_id or 'None'}, required_text_track_id: {required_text_track_id or 'None'}")
if 'video_profiles' in locals() and video_profiles:
self.log.error(f"Video profiles being processed: {', '.join(video_profiles[:5])}{'...' if len(video_profiles) > 5 else ''}")
return self._get_empty_manifest()
@staticmethod
def get_original_language(manifest) -> Language:
try:
# First, try to find the original language from audio tracks
if "audio_tracks" in manifest and manifest["audio_tracks"]:
for language in manifest["audio_tracks"]:
if "languageDescription" in language and language["languageDescription"].endswith(" [Original]"):
return Language.get(language["language"])
# Fallback 1: Try to parse from defaultTrackOrderList
if "defaultTrackOrderList" in manifest and manifest["defaultTrackOrderList"]:
try:
media_id = manifest["defaultTrackOrderList"][0]["mediaId"]
lang_code = media_id.split(";")[2]
if lang_code:
return Language.get(lang_code)
except (IndexError, KeyError, AttributeError):
pass
# Fallback 2: Try to get the first available audio track language
if "audio_tracks" in manifest and manifest["audio_tracks"]:
for audio_track in manifest["audio_tracks"]:
if "language" in audio_track and audio_track["language"]:
return Language.get(audio_track["language"])
# Fallback 3: Default to English if all else fails
return Language.get("en")
except Exception as e:
# If anything goes wrong, default to English
return Language.get("en")
def get_widevine_service_certificate(self, *, challenge: bytes, title: Movie | Episode | Song, track: AnyTrack) -> bytes | str:
return self.config["certificate"]
def manifest_as_tracks(self, manifest, title: Title_T, hydrate_tracks = False) -> Tracks:
tracks = Tracks()
try:
# Handle empty or invalid manifest
if not manifest or not isinstance(manifest, dict):
self.log.warning("Empty or invalid manifest received, returning empty tracks")
return tracks
# Check if manifest has required structure
if "video_tracks" not in manifest or not manifest["video_tracks"]:
self.log.warning("No video tracks in manifest, returning empty tracks")
return tracks
if "links" not in manifest or "license" not in manifest["links"]:
self.log.warning("No license URL in manifest, cannot process tracks")
return tracks
original_language = self.get_original_language(manifest)
self.log.debug(f"Original language: {original_language}")
license_url = manifest["links"]["license"]["href"]
# Process video tracks
if "streams" in manifest["video_tracks"][0] and manifest["video_tracks"][0]["streams"]:
# self.log.info(f"Video: {jsonpickle.encode(manifest["video_tracks"], indent=2)}")
# self.log.info()
for video_index, video in enumerate(reversed(manifest["video_tracks"][0]["streams"])):
try:
# self.log.info(video)
id = video["downloadable_id"]
# self.log.info(f"Adding video {video["res_w"]}x{video["res_h"]}, bitrate: {(float(video["framerate_value"]) / video["framerate_scale"]) if "framerate_value" in video else None} with profile {video["content_profile"]}. kid: {video["drmHeaderId"]}")
tracks.add(
Video(
id_=video["downloadable_id"],
url=video["urls"][0]["url"],
codec=Video.Codec.from_netflix_profile(video["content_profile"]),
bitrate=video["bitrate"] * 1000,
width=video["res_w"],
height=video["res_h"],
fps=(float(video["framerate_value"]) / video["framerate_scale"]) if "framerate_value" in video else None,
language=Language.get(original_language),
edition=video["content_profile"],
range_=self.parse_video_range_from_profile(video["content_profile"]),
is_original_lang=True,
drm=[Widevine(
pssh=PSSH(
manifest["video_tracks"][0]["drmHeader"]["bytes"]
),
kid=video["drmHeaderId"]
)] if manifest["video_tracks"][0].get("drmHeader", {}).get("bytes") else [],
data={
'license_url': license_url
}
)
)
except Exception as e:
video_id = video.get("downloadable_id", "unknown") if isinstance(video, dict) else "unknown"
self.log.warning(f"Failed to process video track at index {video_index}, video_id: {video_id}, error: {e}")
continue
# Process audio tracks
unavailable_audio_tracks: List[Tuple[str, str]] = []
if "audio_tracks" in manifest:
for audio_index, audio in enumerate(manifest["audio_tracks"]):
try:
audio_id = audio.get("id", "unknown")
audio_lang = audio.get("language", "unknown")
if len(audio.get("streams", [])) < 1:
# This
# self.log.debug(f"Audio lang {audio["languageDescription"]} is available but no stream available.")
if "new_track_id" in audio and "id" in audio:
unavailable_audio_tracks.append((audio["new_track_id"], audio["id"])) # Assign to `unavailable_subtitle` for request missing audio tracks later
self.log.debug(f"Audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang} has no streams available")
continue
# self.log.debug(f"Adding audio lang: {audio["language"]} with profile: {audio["content_profile"]}")
is_original_lang = audio.get("language") == original_language.language
# self.log.info(f"is audio {audio["languageDescription"]} original language: {is_original_lang}")
for stream_index, stream in enumerate(audio["streams"]):
try:
stream_id = stream.get("downloadable_id", "unknown")
tracks.add(
Audio(
id_=stream["downloadable_id"],
url=stream["urls"][0]["url"],
codec=Audio.Codec.from_netflix_profile(stream["content_profile"]),
language=Language.get(self.NF_LANG_MAP.get(audio["language"]) or audio["language"]),
is_original_lang=is_original_lang,
bitrate=stream["bitrate"] * 1000,
channels=stream["channels"],
descriptive=audio.get("rawTrackType", "").lower() == "assistive",
name="[Original]" if Language.get(audio["language"]).language == original_language.language else None,
joc=6 if "atmos" in stream["content_profile"] else None
)
)
except Exception as e:
stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown"
self.log.warning(f"Failed to process audio stream at audio_index {audio_index}, stream_index {stream_index}, audio_id: {audio_id}, stream_id: {stream_id}, language: {audio_lang}, error: {e}")
continue
except Exception as e:
audio_id = audio.get("id", "unknown") if isinstance(audio, dict) else "unknown"
audio_lang = audio.get("language", "unknown") if isinstance(audio, dict) else "unknown"
self.log.warning(f"Failed to process audio track at index {audio_index}, audio_id: {audio_id}, language: {audio_lang}, error: {e}")
continue
# Process subtitle tracks
unavailable_subtitle: List[Tuple[str, str]] = []
if "timedtexttracks" in manifest:
for subtitle_index, subtitle in enumerate(manifest["timedtexttracks"]):
try:
subtitle_id = subtitle.get("id", "unknown")
subtitle_lang = subtitle.get("language", "unknown")
if "isNoneTrack" in subtitle and subtitle["isNoneTrack"] == True:
continue
if subtitle.get("hydrated") == False:
# This subtitles is there but has to request stream first
if "new_track_id" in subtitle and "id" in subtitle:
unavailable_subtitle.append((subtitle["new_track_id"], subtitle["id"])) # Assign to `unavailable_subtitle` for request missing subtitles later
# self.log.debug(f"Audio language: {subtitle["languageDescription"]} id: {subtitle["new_track_id"]} is not hydrated.")
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} is not hydrated")
continue
if subtitle.get("languageDescription") == 'Off':
# I don't why this subtitles is requested, i consider for skip these subtitles for now
continue
# pass
if "downloadableIds" not in subtitle or not subtitle["downloadableIds"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no downloadableIds")
continue
id = list(subtitle["downloadableIds"].values())
if not id:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has empty downloadableIds")
continue
language = Language.get(subtitle["language"])
if "ttDownloadables" not in subtitle or not subtitle["ttDownloadables"]:
self.log.debug(f"Subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang} has no ttDownloadables")
continue
profile = next(iter(subtitle["ttDownloadables"].keys()))
tt_downloadables = next(iter(subtitle["ttDownloadables"].values()))
is_original_lang = subtitle.get("language") == original_language.language
# self.log.info(f"is subtitle {subtitle["languageDescription"]} original language {is_original_lang}")
# self.log.info(f"ddd")
tracks.add(
Subtitle(
id_=id[0],
url=tt_downloadables["urls"][0]["url"],
codec=Subtitle.Codec.from_netflix_profile(profile),
language=language,
forced=subtitle.get("isForcedNarrative", False),
cc=subtitle.get("rawTrackType") == "closedcaptions",
sdh=subtitle.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitle else False,
is_original_lang=is_original_lang,
name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitle and subtitle["trackVariant"] == "DUBTITLE" else None),
)
)
except Exception as e:
subtitle_id = subtitle.get("id", "unknown") if isinstance(subtitle, dict) else "unknown"
subtitle_lang = subtitle.get("language", "unknown") if isinstance(subtitle, dict) else "unknown"
self.log.warning(f"Failed to process subtitle track at index {subtitle_index}, subtitle_id: {subtitle_id}, language: {subtitle_lang}, error: {e}")
continue
if hydrate_tracks == False:
return tracks
# Hydrate missing tracks
self.log.info(f"Getting all missing audio and subtitle tracks")
# Handle mismatched lengths - use last successful subtitle track when needed
last_successful_subtitle = ("N/A", "N/A") if not unavailable_subtitle else unavailable_subtitle[-1]
# Process audio tracks first, then handle subtitles separately if needed
max_length = max(len(unavailable_audio_tracks), len(unavailable_subtitle))
for hydration_index in range(max_length):
# Get audio track info for this index
audio_hydration = unavailable_audio_tracks[hydration_index] if hydration_index < len(unavailable_audio_tracks) else ("N/A", "N/A")
# Get subtitle track info for this index, or use last successful one if available
if hydration_index < len(unavailable_subtitle):
subtitle_hydration = unavailable_subtitle[hydration_index]
is_real_subtitle_request = True # This is a real subtitle to be added to tracks
elif unavailable_subtitle: # Use last successful subtitle track for context only
subtitle_hydration = last_successful_subtitle
is_real_subtitle_request = False # This is just for context, don't add to tracks
else:
subtitle_hydration = ("N/A", "N/A")
is_real_subtitle_request = False
try:
# Log what we're trying to hydrate
self.log.debug(f"Hydrating tracks at index {hydration_index}, audio_track_id: {audio_hydration[1] if audio_hydration[1] != 'N/A' else 'N/A'}, subtitle_track_id: {subtitle_hydration[1] if subtitle_hydration[1] != 'N/A' else 'N/A'}, is_real_subtitle: {is_real_subtitle_request}")
# Only call get_manifest if we have audio to hydrate
should_hydrate_audio = audio_hydration[0] != 'N/A' and audio_hydration[1] != 'N/A'
if not should_hydrate_audio:
self.log.debug(f"Skipping hydration at index {hydration_index} - no audio tracks to hydrate")
continue
# Always use a valid subtitle track ID for the manifest request to avoid API errors
# Use the subtitle track (real or reused) if available, otherwise use N/A
subtitle_track_for_request = subtitle_hydration[0] if subtitle_hydration[0] != 'N/A' else None
# If we still don't have a subtitle track ID, skip this hydration to avoid API error
if subtitle_track_for_request is None:
self.log.warning(f"Skipping hydration at index {hydration_index} - no subtitle track available for API request context")
continue
hydrated_manifest = self.get_manifest(title, self.profiles, subtitle_track_for_request, audio_hydration[0])
# Handle hydrated audio tracks
if should_hydrate_audio and "audio_tracks" in hydrated_manifest:
try:
audios = next((item for item in hydrated_manifest["audio_tracks"] if 'id' in item and item["id"] == audio_hydration[1]), None)
if audios and "streams" in audios:
audio_lang = audios.get("language", "unknown")
self.log.debug(f"Processing hydrated audio track_id: {audio_hydration[1]}, language: {audio_lang}, streams_count: {len(audios['streams'])}")
for stream_index, stream in enumerate(audios["streams"]):
try:
stream_id = stream.get("downloadable_id", "unknown")
tracks.add(
Audio(
id_=stream["downloadable_id"],
url=stream["urls"][0]["url"],
codec=Audio.Codec.from_netflix_profile(stream["content_profile"]),
language=Language.get(self.NF_LANG_MAP.get(audios["language"]) or audios["language"]),
is_original_lang=audios["language"] == original_language.language,
bitrate=stream["bitrate"] * 1000,
channels=stream["channels"],
descriptive=audios.get("rawTrackType", "").lower() == "assistive",
name="[Original]" if Language.get(audios["language"]).language == original_language.language else None,
joc=16 if "atmos" in stream["content_profile"] else None
)
)
except Exception as e:
stream_id = stream.get("downloadable_id", "unknown") if isinstance(stream, dict) else "unknown"
self.log.warning(f"Failed to process hydrated audio stream at hydration_index {hydration_index}, stream_index {stream_index}, audio_track_id: {audio_hydration[1]}, stream_id: {stream_id}, error: {e}")
continue
else:
self.log.warning(f"No audio streams found for hydrated audio_track_id: {audio_hydration[1]} at hydration_index {hydration_index}")
except Exception as e:
self.log.warning(f"Failed to find hydrated audio track at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1]}, error: {e}")
# Handle hydrated subtitle tracks (only if it's a real subtitle request, not reused)
if is_real_subtitle_request and subtitle_hydration[0] != 'N/A' and subtitle_hydration[1] != 'N/A' and "timedtexttracks" in hydrated_manifest:
try:
subtitles = next((item for item in hydrated_manifest["timedtexttracks"] if 'id' in item and item["id"] == subtitle_hydration[1]), None)
if subtitles and "downloadableIds" in subtitles and "ttDownloadables" in subtitles:
subtitle_lang = subtitles.get("language", "unknown")
self.log.debug(f"Processing hydrated subtitle track_id: {subtitle_hydration[1]}, language: {subtitle_lang}")
# self.log.info(jsonpickle.encode(subtitles, indent=2))
# sel
id = list(subtitles["downloadableIds"].values())
if id:
language = Language.get(subtitles["language"])
profile = next(iter(subtitles["ttDownloadables"].keys()))
tt_downloadables = next(iter(subtitles["ttDownloadables"].values()))
tracks.add(
Subtitle(
id_=id[0],
url=tt_downloadables["urls"][0]["url"],
codec=Subtitle.Codec.from_netflix_profile(profile),
language=language,
forced=subtitles.get("isForcedNarrative", False),
cc=subtitles.get("rawTrackType") == "closedcaptions",
sdh=subtitles.get("trackVariant") == 'STRIPPED_SDH' if "trackVariant" in subtitles else False,
is_original_lang=subtitles.get("language") == original_language.language,
name=("[Original]" if language.language == original_language.language else None or "[Dubbing]" if "trackVariant" in subtitles and subtitles["trackVariant"] == "DUBTITLE" else None),
)
)
else:
self.log.warning(f"No downloadable IDs found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}")
else:
self.log.warning(f"No subtitle data found for hydrated subtitle_track_id: {subtitle_hydration[1]} at hydration_index {hydration_index}")
except Exception as e:
self.log.warning(f"Failed to process hydrated subtitle track at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]}, error: {e}")
elif not is_real_subtitle_request and subtitle_hydration[1] != 'N/A':
self.log.debug(f"Used subtitle track context for API request at hydration_index {hydration_index}, subtitle_track_id: {subtitle_hydration[1]} (not adding to tracks)")
except Exception as e:
self.log.warning(f"Failed to hydrate tracks at hydration_index {hydration_index}, audio_track_id: {audio_hydration[1] if audio_hydration[1] != 'N/A' else 'N/A'}, subtitle_track_id: {subtitle_hydration[1] if subtitle_hydration[1] != 'N/A' else 'N/A'}, error: {e}")
continue
except Exception as e:
self.log.error(f"Exception in manifest_as_tracks: {e}")
self.log.debug(f"Failed to process manifest for title: {title.data.get('episodeId', title.data.get('id', 'unknown')) if title.data else 'unknown'}")
# Return empty tracks on any critical error
return tracks
def parse_video_range_from_profile(self, profile: str) -> Video.Range:
"""
Parse the video range from a Netflix profile string.
Args:
profile (str): The Netflix profile string (e.g., "hevc-main10-L30-dash-cenc")
Returns:
Video.Range: The corresponding Video.Range enum value
Examples:
>>> parse_video_range_from_profile("hevc-main10-L30-dash-cenc")
<Video.Range.SDR: 'SDR'>
>>> parse_video_range_from_profile("hevc-dv5-main10-L30-dash-cenc")
<Video.Range.DV: 'DV'>
"""
# Get video profiles from config
video_profiles = self.config.get("profiles", {}).get("video", {})
# Search through all codecs and ranges to find the profile
for codec, ranges in video_profiles.items():
# if codec == 'H264':
# return Video.Range.SDR # for H264 video always return SDR
for range_name, profiles in ranges.items():
# self.log.info(f"Checking range {range_name}")
if profile in profiles:
# Return the corresponding Video.Range enum value
try:
# self.log.info(f"Found {range_name}")
return Video.Range(range_name)
except ValueError:
# If range_name is not a valid Video.Range, return SDR as default
self.log.debug(f"Video range is not valid {range_name}")
return Video.Range.SDR
# If profile not found, return SDR as default
return Video.Range.SDR

File diff suppressed because one or more lines are too long

250
unshackle/unshackle.yaml Normal file
View File

@@ -0,0 +1,250 @@
# Group or Username to postfix to the end of all download filenames following a dash
tag: Kenzuya
# Enable/disable tagging with group name (default: true)
tag_group_name: true
# Enable/disable tagging with IMDB/TMDB/TVDB details (default: true)
tag_imdb_tmdb: true
# Set terminal background color (custom option not in CONFIG.md)
set_terminal_bg: false
# Set file naming convention
# true for style - Prime.Suspect.S07E01.The.Final.Act.Part.One.1080p.ITV.WEB-DL.AAC2.0.H.264
# false for style - Prime Suspect S07E01 The Final Act - Part One
scene_naming: true
# Whether to include the year in series names for episodes and folders (default: true)
# true for style - Show Name (2023) S01E01 Episode Name
# false for style - Show Name S01E01 Episode Name
series_year: false
# Check for updates from GitHub repository on startup (default: true)
update_checks: true
# How often to check for updates, in hours (default: 24)
update_check_interval: 24
# Title caching configuration
# Cache title metadata to reduce redundant API calls
title_cache_enabled: true # Enable/disable title caching globally (default: true)
title_cache_time: 1800 # Cache duration in seconds (default: 1800 = 30 minutes)
title_cache_max_retention: 86400 # Maximum cache retention for fallback when API fails (default: 86400 = 24 hours)
# Muxing configuration
muxing:
set_title: true
# Login credentials for each Service
credentials:
# Direct credentials (no profile support)
EXAMPLE: email@example.com:password
# Per-profile credentials with default fallback
SERVICE_NAME:
default: default@email.com:password # Used when no -p/--profile is specified
profile1: user1@email.com:password1
profile2: user2@email.com:password2
# Per-profile credentials without default (requires -p/--profile)
SERVICE_NAME2:
john: john@example.com:johnspassword
jane: jane@example.com:janespassword
# You can also use list format for passwords with special characters
SERVICE_NAME3:
default: ["user@email.com", ":PasswordWith:Colons"]
# Override default directories used across unshackle
directories:
cache: Cache
# cookies: Cookies
dcsl: DCSL # Device Certificate Status List
downloads: Downloads
logs: Logs
temp: Temp
wvds: WVDs
prds: PRDs
# Additional directories that can be configured:
# commands: Commands
# services:
# - /path/to/services
# - /other/path/to/services
# vaults: Vaults
# fonts: Fonts
# Pre-define which Widevine or PlayReady device to use for each Service
cdm:
# Global default CDM device (fallback for all services/profiles)
default: chromecdm
# Direct service-specific CDM
DIFFERENT_EXAMPLE: PRD_1
# Per-profile CDM configuration
EXAMPLE:
john_sd: chromecdm_903_l3 # Profile 'john_sd' uses Chrome CDM L3
jane_uhd: nexus_5_l1 # Profile 'jane_uhd' uses Nexus 5 L1
default: generic_android_l3 # Default CDM for this service
# Use pywidevine Serve-compliant Remote CDMs
remote_cdm:
- name: "chromecdm"
device_name: widevine
device_type: CHROME
system_id: 36586
security_level: 3
type: "decrypt_labs"
host: https://keyxtractor.decryptlabs.com
secret: 7547150416_41da0a32d6237d83_KeyXtractor_api_ext
- name: "android"
device_name: andorid
device_type: ANDROID
system_id: 8131
security_level: 1
type: "decrypt_labs"
host: https://keyxtractor.decryptlabs.com
secret: decrypt_labs_special_ultimate
# Key Vaults store your obtained Content Encryption Keys (CEKs)
# Use 'no_push: true' to prevent a vault from receiving pushed keys
# while still allowing it to provide keys when requested
key_vaults:
- type: SQLite
name: Local
path: key_store.db
- type: HTTP
name: "DRMLab Vault"
host: "https://api.drmlab.io/vault/"
username: "unshackle"
password: "gEX75q7I5YVkvgF5SUkcNd41IbGrDtTT"
api_mode: "json"
# Additional vault types:
# - type: API
# name: "Remote Vault"
# uri: "https://key-vault.example.com"
# token: "secret_token"
# no_push: true # This vault will only provide keys, not receive them
# - type: MySQL
# name: "MySQL Vault"
# host: "127.0.0.1"
# port: 3306
# database: vault
# username: user
# password: pass
# no_push: false # Default behavior - vault both provides and receives keys
# Choose what software to use to download data
downloader: aria2c
# Options: requests | aria2c | curl_impersonate | n_m3u8dl_re
# Can also be a mapping:
# downloader:
# NF: requests
# AMZN: n_m3u8dl_re
# DSNP: n_m3u8dl_re
# default: requests
# aria2c downloader configuration
aria2c:
max_concurrent_downloads: 4
max_connection_per_server: 3
split: 5
file_allocation: falloc # none | prealloc | falloc | trunc
# N_m3u8DL-RE downloader configuration
n_m3u8dl_re:
thread_count: 16
ad_keyword: "advertisement"
use_proxy: true
# curl_impersonate downloader configuration
curl_impersonate:
browser: chrome120
# Pre-define default options and switches of the dl command
dl:
sub_format: srt
downloads: 4
workers: 16
lang:
- orig
- id
EXAMPLE:
bitrate: CBR
# Chapter Name to use when exporting a Chapter without a Name
chapter_fallback_name: "Chapter {j:02}"
# Case-Insensitive dictionary of headers for all Services
headers:
Accept-Language: "en-US,en;q=0.8"
User-Agent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36"
# Override default filenames used across unshackle
filenames:
log: "unshackle_{name}_{time}.log"
config: "config.yaml"
root_config: "unshackle.yaml"
chapters: "Chapters_{title}_{random}.txt"
subtitle: "Subtitle_{id}_{language}.srt"
# API key for The Movie Database (TMDB)
tmdb_api_key: "8f5c14ef648a0abdd262cf809e11fcd4"
# conversion_method:
# - auto (default): Smart routing - subby for WebVTT/SAMI, standard for others
# - subby: Always use subby with advanced processing
# - pycaption: Use only pycaption library (no SubtitleEdit, no subby)
# - subtitleedit: Prefer SubtitleEdit when available, fall back to pycaption
subtitle:
conversion_method: auto
sdh_method: auto
# Configuration for pywidevine's serve functionality
serve:
users:
secret_key_for_user:
devices:
- generic_nexus_4464_l3
username: user
# devices:
# - '/path/to/device.wvd'
# Configuration data for each Service
services:
# Service-specific configuration goes here
# Profile-specific configurations can be nested under service names
# Example: with profile-specific device configs
EXAMPLE:
# Global service config
api_key: "service_api_key"
# Profile-specific device configurations
profiles:
john_sd:
device:
app_name: "AIV"
device_model: "SHIELD Android TV"
jane_uhd:
device:
app_name: "AIV"
device_model: "Fire TV Stick 4K"
# Example: Service with different regions per profile
SERVICE_NAME:
profiles:
us_account:
region: "US"
api_endpoint: "https://api.us.service.com"
uk_account:
region: "GB"
api_endpoint: "https://api.uk.service.com"
# External proxy provider services
proxy_providers:
surfsharkvpn:
username: wqKD6vru8vXxUtkXJbvx4HAL # Service credentials from https://my.surfshark.com/vpn/manual-setup/main/openvpn
password: zyFnwsYFMNxqbzpf3asu36m6 # Service credentials (not your login password)