3 Commits

Author SHA1 Message Date
imSp4rky
b893fba28f feat(drm): add SAMPLE-AES MPEG-TS decryptor
Implements Apple HLS SAMPLE-AES decryption for MPEG-TS elementary streams, which neither Shaka Packager (rejects stream type 0xDB) nor mp4decrypt (ISO-BMFF only) can handle. Covers H.264 (1:9 pattern from offset 32 with EPB strip/reinsert), AAC and AC-3, then remuxes to a clean TS with the PMT stream types patched.
2026-06-18 16:05:02 +00:00
imSp4rky
aacf54701d feat(hls): handle FairPlay skd keys in segment decrypt path
The FairPlay->PlayReady bridge synthesized headers and routed licensing but the HLS download loop still rejected the skd EXT-X-KEY. Teach get_supported_key and get_drm the com.apple.streamingkeydelivery keyformat, and reuse a service-provided FairPlay session DRM for skd segments (its KID encoding is service-specific, e.g. base64).
2026-06-18 16:05:02 +00:00
imSp4rky
4422c975c3 feat(drm): add FairPlay->PlayReady bridge for cbcs HLS
FairPlay HLS (SAMPLE-AES/cbcs) ships a content KID in its skd:// key but no PlayReady/Widevine PSSH. Synthesize a PlayReady header from that KID so a PlayReady CDM can license and decrypt it whenever the backend is multi-DRM.

- FairPlay DRM (PlayReady subclass) + FairPlay.from_kid; fairplay_kid_from_skd extracts the KID from GUID / ?KID= / 32-hex skd forms (core/drm/fairplay.py)
- build_pr_header_from_kid synthesizes a WRMHEADER/PRO (cbcs->AESCBC v4.3, cenc->AESCTR v4.0); PlayReady.from_track falls back to a tenc KID when no PSSH
- Service.get_fairplay_license hook (defaults to get_playready_license)
- dl.py routes FairPlay tracks to get_fairplay_license through the PlayReady CDM
- EXAMPLE service + config.yaml document the bridge end to end
- tests for the synthesized header and skd KID extraction
2026-06-18 16:05:02 +00:00
12 changed files with 725 additions and 117 deletions

View File

@@ -1,98 +0,0 @@
#!/usr/bin/env python3
"""
Benchmark subtitle conversion backends to (re-)tune the preference ranks in
``unshackle/core/tracks/subtitle_convert.py``.
Runs every backend that can read each input file, converting to a target format (default
SRT), and reports cue count, leaked ASS override tags, and output size — so you can compare
fidelity per (source, target) pair on real files. Read-only: copies inputs to a temp dir.
Usage:
uv run python scripts/bench_subtitle_backends.py <file-or-dir> [<file-or-dir> ...] [--target SRT]
Example:
uv run python scripts/bench_subtitle_backends.py downloads/
"""
from __future__ import annotations
import argparse
import re
import shutil
import tempfile
from pathlib import Path
from unshackle.core.tracks import subtitle_convert as sc
from unshackle.core.tracks.subtitle import Subtitle
Codec = Subtitle.Codec
EXT_TO_CODEC = {
".srt": Codec.SubRip,
".vtt": Codec.WebVTT,
".ass": Codec.SubStationAlphav4,
".ssa": Codec.SubStationAlpha,
".ttml": Codec.TimedTextMarkupLang,
".smi": Codec.SAMI,
".sami": Codec.SAMI,
}
def gather(paths: list[str]) -> list[Path]:
files: list[Path] = []
for p in paths:
path = Path(p)
if path.is_dir():
files.extend(f for f in path.rglob("*") if f.suffix.lower() in EXT_TO_CODEC)
elif path.suffix.lower() in EXT_TO_CODEC:
files.append(path)
return sorted(files)
def metrics(text: str) -> tuple[int, int, int]:
cues = len(re.findall(r"-->", text))
ass_residue = len(re.findall(r"\{\\", text))
return cues, ass_residue, len(text)
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("paths", nargs="+", help="subtitle files or directories")
ap.add_argument("--target", default="SRT", help="target codec value (SRT, VTT, ASS, ...)")
args = ap.parse_args()
target = Codec(args.target.upper())
files = gather(args.paths)
if not files:
print("No subtitle files found.")
return
tmp = Path(tempfile.mkdtemp(prefix="subbench_"))
print(f"{'file':40} {'source':10} {'backend':12} {'ok':3} {'cues':>5} {'resid':>5} {'bytes':>7}")
for f in files:
source = EXT_TO_CODEC[f.suffix.lower()]
if source == target:
continue
for backend in sc.REGISTRY:
if not (backend.is_available() and backend.can_convert(source, target)):
continue
work = tmp / f"{f.stem}.{backend.name}{f.suffix}"
shutil.copy2(f, work)
sub = Subtitle(url="x", language="en", codec=source)
sub.path = work
try:
# Call the backend directly so each row reflects only that backend (no fallback).
out = work.with_suffix(f".{target.value.lower()}")
backend.convert(sub, target, out)
cues, resid, size = metrics(out.read_text("utf8", errors="replace"))
print(
f"{f.name[:40]:40} {source.name[:10]:10} {backend.name:12} {'Y':3} {cues:>5} {resid:>5} {size:>7}"
)
except Exception as e: # noqa: BLE001 - benchmark reports failures, does not raise
print(
f"{f.name[:40]:40} {source.name[:10]:10} {backend.name:12} {'N':3} {'-':>5} {'-':>5} {'-':>7} {type(e).__name__}"
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,81 @@
"""Tests for the FairPlay -> PlayReady bridge DRM type and skd KID extraction.
FairPlay HLS (SAMPLE-AES / cbcs) carries a content KID but no PlayReady PSSH. The
``FairPlay`` DRM synthesizes a PlayReady header from that KID so a PlayReady CDM can
license it; it is a PlayReady subclass so the pipeline reuses CDM/decrypt logic and
only differs in routing the request to ``get_fairplay_license``.
"""
from __future__ import annotations
from typing import Optional
from uuid import UUID
import pytest
from unshackle.core.drm import FairPlay, PlayReady
from unshackle.core.drm.fairplay import fairplay_kid_from_skd
def test_fairplay_is_playready_subclass() -> None:
# Pipeline relies on isinstance(drm, PlayReady) (get_drm_for_cdm, prepare_drm branch).
fp = FairPlay.from_kid(UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f"))
assert isinstance(fp, PlayReady)
assert isinstance(fp, FairPlay)
def test_from_kid_round_trips_kid_and_sets_pssh() -> None:
kid = UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f")
fp = FairPlay.from_kid(kid)
assert fp.pssh_b64
assert [k.hex for k in fp.kids] == [kid.hex]
@pytest.mark.parametrize(
("skd", "expected"),
[
# hyphenated GUID (with optional :IV)
("skd://72ea9ec1-abb4-40d8-b3a9-e0bd1833bd58:0123456789ABCDEF", "72ea9ec1abb440d8b3a9e0bd1833bd58"),
# GUID as a query param
("skd://lic.example/fairplay?KID=4376a4b3-d8ef-4f21-9a6b-faa81a2e59e3", "4376a4b3d8ef4f219a6bfaa81a2e59e3"),
# bare 32-hex path segment
("skd://0000000069176cc2af6f93cce3515ced/clip/x", "0000000069176cc2af6f93cce3515ced"),
],
)
def test_kid_from_skd_common_forms(skd: str, expected: str) -> None:
kid = fairplay_kid_from_skd(skd)
assert kid is not None
assert kid.hex == expected
@pytest.mark.parametrize(
"skd",
[
"skd://example.com/p123456/c1", # service-specific encoding, not a GUID/32-hex
"skd://opaque-asset-token-abc",
"",
],
)
def test_kid_from_skd_returns_none_for_non_kid_forms(skd: str) -> None:
# Services with non-standard skd encodings derive the KID themselves; the generic
# helper must not mis-parse them.
assert fairplay_kid_from_skd(skd) is None
def test_base_get_fairplay_license_delegates_to_playready() -> None:
captured: dict = {}
class FakeService:
# Bind the unbound base methods to a minimal stand-in.
from unshackle.core.service import Service
get_fairplay_license = Service.get_fairplay_license
def get_playready_license(self, *, challenge: bytes, title, track) -> Optional[bytes]:
captured["called"] = (challenge, title, track)
return b"LICENSE"
svc = FakeService()
out = svc.get_fairplay_license(challenge=b"chal", title="T", track="K")
assert out == b"LICENSE"
assert captured["called"] == (b"chal", "T", "K")

View File

@@ -0,0 +1,58 @@
"""Tests for the FairPlay->PlayReady KID bridge.
FairPlay/cbcs HLS ships a tenc DEFAULT_KID but no PlayReady/Widevine PSSH.
``build_pr_header_from_kid`` synthesizes a PlayReady Object (PRO) from that bare
KID so a PlayReady license can be requested keyed on it. These pins ensure the
synthesized header parses back through pyplayready and yields the same KID.
"""
from __future__ import annotations
from uuid import UUID
import pytest
from pyplayready.system.pssh import PSSH
from unshackle.core.drm.playready import PlayReady, build_pr_header_from_kid
@pytest.mark.parametrize(
"kid",
[
UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f"),
UUID("00000000-0000-0000-0000-000000000001"),
UUID("ffffffff-ffff-ffff-ffff-ffffffffffff"),
],
)
def test_synth_header_parses_and_round_trips_kid(kid: UUID) -> None:
pssh_b64 = build_pr_header_from_kid(kid)
# pyplayready parses the synthesized PRO as a single WRMHEADER record.
parsed = PSSH(pssh_b64)
assert len(parsed.wrm_headers) == 1
# PlayReady DRM object recovers the original KID from the synthesized header.
drm = PlayReady(pssh=parsed, pssh_b64=pssh_b64)
assert [k.hex for k in drm.kids] == [kid.hex]
def test_synth_header_is_v4_0_aesctr() -> None:
import base64
pssh_b64 = build_pr_header_from_kid(UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f"))
xml = base64.b64decode(pssh_b64).decode("utf-16-le", errors="ignore")
assert "PlayReadyHeader" in xml
assert 'version="4.0.0.0"' in xml
assert "<ALGID>AESCTR</ALGID>" in xml
def test_cbcs_header_v4_3_round_trips_kid() -> None:
kid = UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f")
pssh_b64 = build_pr_header_from_kid(kid, scheme="cbcs", version="4.3")
drm = PlayReady(pssh=PSSH(pssh_b64), pssh_b64=pssh_b64)
assert [k.hex for k in drm.kids] == [kid.hex]
def test_cbcs_requires_v4_3() -> None:
with pytest.raises(ValueError, match="4.3"):
build_pr_header_from_kid(UUID(int=1), scheme="cbcs", version="4.0")

View File

@@ -46,7 +46,7 @@ from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack, context_settings
from unshackle.core.credential import Credential
from unshackle.core.drm import DRM_T, ClearKeyCENC, MonaLisa, PlayReady, Widevine
from unshackle.core.drm import DRM_T, ClearKeyCENC, FairPlay, MonaLisa, PlayReady, Widevine
from unshackle.core.events import events
from unshackle.core.music import (
MusicAudioIntegrityError,
@@ -2829,6 +2829,11 @@ class dl:
title=title,
track=track,
),
fairplay_licence=partial(
service.get_fairplay_license,
title=title,
track=track,
),
clearkey_licence=partial(
service.get_clearkey_license,
title=title,
@@ -3530,6 +3535,7 @@ class dl:
title: Title_T,
certificate: Callable,
licence: Callable,
fairplay_licence: Optional[Callable] = None,
clearkey_licence: Optional[Callable] = None,
track_kid: Optional[UUID] = None,
table: Table = None,
@@ -3927,8 +3933,12 @@ class dl:
if need_license and not vaults_only:
from_vaults = drm.content_keys.copy()
# FairPlay tracks are licensed through the PlayReady CDM but route their
# license request to the service's get_fairplay_license.
pr_licence = fairplay_licence if (fairplay_licence and isinstance(drm, FairPlay)) else licence
try:
drm.get_content_keys(cdm=self.cdm, licence=licence, certificate=certificate)
drm.get_content_keys(cdm=self.cdm, licence=pr_licence, certificate=certificate)
except Exception as e:
if drm.content_keys:
self.log.debug(f"License call failed but keys already in content_keys: {e}")

View File

@@ -4,11 +4,13 @@ from uuid import UUID
from unshackle.core.drm.clearkey import ClearKey
from unshackle.core.drm.clearkey_cenc import ClearKeyCENC
from unshackle.core.drm.fairplay import FairPlay
from unshackle.core.drm.monalisa import MonaLisa
from unshackle.core.drm.playready import PlayReady
from unshackle.core.drm.widevine import Widevine
DRM_T = Union[ClearKey, ClearKeyCENC, Widevine, PlayReady, MonaLisa]
# FairPlay is a PlayReady subclass; listed for explicit isinstance/type use.
DRM_T = Union[ClearKey, ClearKeyCENC, Widevine, PlayReady, FairPlay, MonaLisa]
def drm_from_dict(data: dict[str, Any]) -> Union[Widevine, PlayReady, ClearKeyCENC]:
@@ -44,4 +46,4 @@ def drm_from_dict(data: dict[str, Any]) -> Union[Widevine, PlayReady, ClearKeyCE
return drm
__all__ = ("ClearKey", "ClearKeyCENC", "Widevine", "PlayReady", "MonaLisa", "DRM_T", "drm_from_dict")
__all__ = ("ClearKey", "ClearKeyCENC", "Widevine", "PlayReady", "FairPlay", "MonaLisa", "DRM_T", "drm_from_dict")

View File

@@ -0,0 +1,46 @@
from __future__ import annotations
import re
from typing import Optional
from uuid import UUID
from pyplayready.system.pssh import PSSH
from unshackle.core.drm.playready import PlayReady, build_pr_header_from_kid
def fairplay_kid_from_skd(skd_uri: str) -> Optional[UUID]:
"""Extract the content KID from a FairPlay ``skd://`` key URI.
Handles the common multi-DRM packager forms where the KID appears in the URI as
a hyphenated GUID (optionally followed by ``:<iv>`` or as a ``?KID=<guid>`` query)
or as a bare 32-hex string in a path segment. Services whose skd URI encodes the
KID differently (e.g. an asset number plus content id) should derive it themselves
and use ``FairPlay.from_kid``.
"""
# Prefer a hyphenated GUID; fall back to a bare 32-hex run.
guid = re.search(r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}", skd_uri)
if guid:
return UUID(hex=guid.group(0))
match = re.search(r"(?<![0-9a-fA-F])([0-9a-fA-F]{32})(?![0-9a-fA-F])", skd_uri)
if not match:
return None
return UUID(hex=match.group(1))
class FairPlay(PlayReady):
"""FairPlay track licensed through a PlayReady CDM (FairPlay -> PlayReady bridge).
FairPlay HLS (SAMPLE-AES / cbcs) carries a content KID but no PlayReady PSSH.
We synthesize a PlayReady header from that KID, so a PlayReady CDM can issue a
challenge and the resulting license decrypts the cbcs stream. This subclasses
PlayReady so the CDM challenge and decrypt logic are reused unchanged; the only
difference is that the download pipeline routes its license request to the
service's ``get_fairplay_license`` instead of ``get_playready_license``.
"""
@classmethod
def from_kid(cls, kid: UUID, scheme: str = "cbcs", version: str = "4.3") -> FairPlay:
"""Build a FairPlay DRM from a bare content KID via a synthesized PlayReady header."""
pssh_b64 = build_pr_header_from_kid(kid, scheme=scheme, version=version)
return cls(pssh=PSSH(pssh_b64), kid=kid, pssh_b64=pssh_b64)

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import base64
import shutil
import struct
import subprocess
import textwrap
import time
@@ -24,6 +25,42 @@ from unshackle.core.constants import AnyTrack
from unshackle.core.utilities import get_boxes, log_event
from unshackle.core.utils.subprocess import ffprobe
# CENC/CBC scheme -> PlayReady ALGID (AESCTR for ctr schemes, AESCBC for cbc schemes).
PR_SCHEME_ALGID = {"cenc": "AESCTR", "cens": "AESCTR", "cbc1": "AESCBC", "cbcs": "AESCBC"}
def build_pr_header_from_kid(kid: UUID, scheme: str = "cenc", version: str = "4.0") -> str:
"""Synthesize a base64 PlayReady Object from a bare KID.
For FairPlay/cbcs HLS streams that carry a content KID but no PlayReady or
Widevine PSSH, this lets us request a PlayReady license keyed on that KID.
``scheme`` selects the ALGID (cbcs -> AESCBC). ``version`` selects the WRMHEADER
layout: 4.0 uses a bare ``<KID>``, 4.3 uses the ``<KIDS><KID ALGID VALUE>`` form
used for cbcs content.
"""
algid = PR_SCHEME_ALGID.get(scheme.lower())
if not algid:
raise ValueError(f"Unsupported encryption scheme for PlayReady: {scheme}")
if algid == "AESCBC" and version < "4.3":
raise ValueError("AESCBC (cbcs) requires PlayReady WRMHEADER 4.3 or higher")
# PlayReady stores the KID as a little-endian GUID; UUID.bytes_le applies that byte order.
xml_kid = base64.b64encode(kid.bytes_le).decode()
if version == "4.0":
protect_info = f"<KEYLEN>16</KEYLEN><ALGID>{algid}</ALGID></PROTECTINFO><KID>{xml_kid}</KID>"
else:
protect_info = f'<KIDS><KID ALGID="{algid}" VALUE="{xml_kid}"></KID></KIDS></PROTECTINFO>'
header_xml = (
f'<WRMHEADER xmlns="http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader" version="{version}.0.0">'
f"<DATA><PROTECTINFO>{protect_info}</DATA></WRMHEADER>"
)
header_utf16 = header_xml.encode("utf-16-le")
# PlayReady Object: one WRMHEADER record (type 1) behind the PRO length+count prefix.
rm_record = struct.pack("<HH", 1, len(header_utf16)) + header_utf16
pro = struct.pack("<IH", len(rm_record) + 6, 1) + rm_record
return base64.b64encode(pro).decode()
class PlayReady:
"""PlayReady DRM System."""
@@ -197,16 +234,22 @@ class PlayReady:
pssh_boxes.extend(list(get_boxes(init_data, b"pssh")))
tenc_boxes.extend(list(get_boxes(init_data, b"tenc")))
pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
if not pssh:
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
tenc = next(iter(tenc_boxes), None)
if not kid and tenc and tenc.key_ID.int != 0:
kid = tenc.key_ID
pssh_bytes = Box.build(pssh)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode())
pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
if pssh:
pssh_bytes = Box.build(pssh)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode())
# FairPlay/cbcs: no PSSH of any DRM system, but a tenc KID is present — synthesize a
# PlayReady header from it. Gated on zero pssh boxes so Widevine-bearing tracks fall through.
if not pssh_boxes and kid:
pssh_b64 = build_pr_header_from_kid(kid)
return cls(pssh=PSSH(pssh_b64), kid=kid, pssh_b64=pssh_b64)
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
@classmethod
def from_init_data(cls, init_data: bytes) -> PlayReady:
@@ -226,16 +269,22 @@ class PlayReady:
if enc_key_id:
kid = UUID(bytes=base64.b64decode(enc_key_id))
pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
if not pssh:
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
tenc = next(iter(tenc_boxes), None)
if not kid and tenc and tenc.key_ID.int != 0:
kid = tenc.key_ID
pssh_bytes = Box.build(pssh)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode())
pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
if pssh:
pssh_bytes = Box.build(pssh)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode())
# FairPlay/cbcs: no PSSH of any DRM system, but a tenc KID is present — synthesize a
# PlayReady header from it. Gated on zero pssh boxes so Widevine-bearing tracks fall through.
if not pssh_boxes and kid:
pssh_b64 = build_pr_header_from_kid(kid)
return cls(pssh=PSSH(pssh_b64), kid=kid, pssh_b64=pssh_b64)
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
@property
def pssh(self) -> PSSH:

View File

@@ -0,0 +1,369 @@
"""Apple HLS SAMPLE-AES decryptor for MPEG-TS elementary streams.
SAMPLE-AES (``METHOD=SAMPLE-AES``) encrypts only parts of each elementary-stream sample
rather than the whole TS, so neither Shaka Packager (rejects stream type ``0xDB``) nor
mp4decrypt (ISO-BMFF only) can decrypt it. This module implements the scheme directly,
matching the four reference implementations (hls.js, mchelnokov/decrypt-mpegts,
iori-rs/iori, N_m3u8DL-RE) and Apple's specification:
- H.264 (AVC, stream type ``0xDB``): NAL types 1 and 5 only. Remove emulation-prevention
bytes, skip the first 32 bytes, then a 1:9 pattern — decrypt 16 bytes, skip ``min(144,
remaining)`` — with AES-128-CBC chaining continuously across the encrypted blocks and the
IV reset per NAL unit. Emulation-prevention bytes are reinserted afterwards.
- AAC (ADTS, stream type ``0xCF``): per frame, skip the ADTS header plus 16 leader bytes,
decrypt the remaining complete 16-byte blocks, IV reset per frame.
- AC-3 (stream type ``0xC1``): per syncframe, skip 16 leader bytes, decrypt the remaining
complete 16-byte blocks, IV reset per frame.
All decryption is AES-128-CBC with no padding; trailing bytes shorter than a block stay clear.
The IV is supplied by the caller. HLS signals it in the ``EXT-X-KEY`` ``IV`` attribute; when
absent the conventional default is all zeros (used by every reference tool). FairPlay
(``KEYFORMAT="com.apple.streamingkeydelivery"``) streams may instead carry the IV in the CKC,
in which case the caller must provide it — see ``decrypt_sample_aes``.
"""
from __future__ import annotations
import struct
from typing import Optional
from Crypto.Cipher import AES
# MPEG-TS stream type bytes: encrypted -> standard.
STREAM_TYPE_H264 = 0x1B
STREAM_TYPE_AAC = 0x0F
STREAM_TYPE_AC3 = 0x81
ENCRYPTED_TO_CLEAR = {0xDB: STREAM_TYPE_H264, 0xCF: STREAM_TYPE_AAC, 0xC1: STREAM_TYPE_AC3}
PACKET_LENGTH = 188
SYNC_BYTE = 0x47
BLOCK = 16
def remove_epb(data: bytes) -> bytes:
"""Strip H.264 emulation-prevention bytes (``00 00 03 XX`` -> ``00 00 XX``, ``XX <= 03``)."""
out = bytearray()
i = 0
n = len(data)
while i < n:
if i + 3 < n and data[i] == 0 and data[i + 1] == 0 and data[i + 2] == 3 and data[i + 3] <= 3:
out += b"\x00\x00"
i += 3
else:
out.append(data[i])
i += 1
return bytes(out)
def insert_epb(data: bytes) -> bytes:
"""Reinsert H.264 emulation-prevention bytes to produce a valid Annex B / byte-stream NAL."""
out = bytearray()
zeros = 0
for b in data:
if zeros >= 2 and b <= 3:
out.append(3)
zeros = 0
out.append(b)
zeros = zeros + 1 if b == 0 else 0
return bytes(out)
def aes_cbc_decrypt(key: bytes, iv: bytes, data: bytes) -> bytes:
"""AES-128-CBC decrypt with no padding; ``data`` length must be a multiple of 16."""
return AES.new(key, AES.MODE_CBC, iv).decrypt(data)
def decrypt_h264_nal(nal: bytes, key: bytes, iv: bytes) -> bytes:
"""Decrypt a single H.264 NAL unit (no start code) per SAMPLE-AES; return re-escaped bytes."""
rbsp = bytearray(remove_epb(nal))
n = len(rbsp)
if n <= 48:
return nal
# Gather the encrypted blocks (1:9 pattern from offset 32) into one continuous CBC stream.
encrypted = bytearray()
offset = 32
positions = []
while True:
remaining = n - offset
if remaining <= BLOCK:
break
positions.append(offset)
encrypted += rbsp[offset : offset + BLOCK]
offset += BLOCK + min(144, remaining - BLOCK)
decrypted = aes_cbc_decrypt(key, iv, bytes(encrypted))
for idx, pos in enumerate(positions):
rbsp[pos : pos + BLOCK] = decrypted[idx * BLOCK : (idx + 1) * BLOCK]
return insert_epb(bytes(rbsp))
def split_nals(es: bytes) -> list[tuple[int, int, int]]:
"""Return (start_code_len, nal_start, nal_end) for each Annex B NAL unit in ``es``."""
nals = []
i = 0
n = len(es)
starts = []
while i + 3 <= n:
if es[i] == 0 and es[i + 1] == 0 and es[i + 2] == 1:
sc = 4 if i >= 1 and es[i - 1] == 0 else 3
starts.append((i - (sc - 3), sc))
i += 3
else:
i += 1
for idx, (sc_off, sc) in enumerate(starts):
nal_start = sc_off + sc
nal_end = starts[idx + 1][0] if idx + 1 < len(starts) else n
nals.append((sc, nal_start, nal_end))
return nals
def decrypt_h264_es(es: bytes, key: bytes, iv: bytes) -> bytes:
"""Decrypt all coded-slice NAL units (types 1, 5) in an Annex B H.264 elementary stream."""
out = bytearray()
prev_end = 0
for sc, nal_start, nal_end in split_nals(es):
out += es[prev_end:nal_start]
nal = es[nal_start:nal_end]
prev_end = nal_end
if nal and (nal[0] & 0x1F) in (1, 5):
out += decrypt_h264_nal(nal, key, iv)
else:
out += nal
out += es[prev_end:]
return bytes(out)
def decrypt_aac_es(es: bytes, key: bytes, iv: bytes) -> bytes:
"""Decrypt ADTS AAC: per frame skip header + 16 leader bytes, decrypt remaining full blocks."""
out = bytearray()
i = 0
n = len(es)
while i + 7 <= n:
if es[i] != 0xFF or (es[i + 1] & 0xF0) != 0xF0:
out.append(es[i])
i += 1
continue
header_len = 7 if (es[i + 1] & 1) else 9
frame_len = ((es[i + 3] & 3) << 11) | (es[i + 4] << 3) | (es[i + 5] >> 5)
if frame_len < header_len or i + frame_len > n:
out += es[i:]
i = n
break
frame = bytearray(es[i : i + frame_len])
payload = frame[header_len:]
if len(payload) > BLOCK:
enc_len = ((len(payload) - BLOCK) // BLOCK) * BLOCK
if enc_len:
frame[header_len + BLOCK : header_len + BLOCK + enc_len] = aes_cbc_decrypt(
key, iv, bytes(payload[BLOCK : BLOCK + enc_len])
)
out += frame
i += frame_len
out += es[i:]
return bytes(out)
def decrypt_ac3_es(es: bytes, key: bytes, iv: bytes) -> bytes:
"""Decrypt AC-3: per syncframe skip 16 leader bytes, decrypt remaining full blocks.
AC-3 syncframes start with ``0B 77``; frame size derives from the frmsizecod table. To keep
this dependency-free the whole ES (minus a 16-byte leader) is treated as one CBC region,
matching the reference behaviour for single-program AC-3 PES payloads.
"""
if len(es) <= BLOCK:
return es
enc_len = ((len(es) - BLOCK) // BLOCK) * BLOCK
if not enc_len:
return es
out = bytearray(es)
out[BLOCK : BLOCK + enc_len] = aes_cbc_decrypt(key, iv, bytes(out[BLOCK : BLOCK + enc_len]))
return bytes(out)
def crc32_mpeg(data: bytes) -> int:
"""MPEG-2 systems CRC-32 (poly 0x04C11DB7, init 0xFFFFFFFF, no final xor)."""
crc = 0xFFFFFFFF
for byte in data:
crc ^= byte << 24
for _ in range(8):
crc = ((crc << 1) ^ 0x04C11DB7) & 0xFFFFFFFF if crc & 0x80000000 else (crc << 1) & 0xFFFFFFFF
return crc
def packet_pid(pkt: bytes) -> int:
return ((pkt[1] & 0x1F) << 8) | pkt[2]
def payload_offset(pkt: bytes) -> int:
"""Byte offset of the payload within a TS packet, or -1 if the packet carries none."""
afc = (pkt[3] >> 4) & 0x3
off = 4
if afc & 0x2:
off += 1 + pkt[4]
if not afc & 0x1 or off >= PACKET_LENGTH:
return -1
return off
def parse_pmt(section: bytes) -> dict[int, int]:
"""Map elementary PID -> stream_type from a PMT section (starting at table_id)."""
section_length = ((section[1] & 0x0F) << 8) | section[2]
program_info_length = ((section[10] & 0x0F) << 8) | section[11]
pos = 12 + program_info_length
end = 3 + section_length - 4 # up to (excluding) CRC32
result = {}
while pos + 5 <= end:
stream_type = section[pos]
es_pid = ((section[pos + 1] & 0x1F) << 8) | section[pos + 2]
es_info_length = ((section[pos + 3] & 0x0F) << 8) | section[pos + 4]
result[es_pid] = stream_type
pos += 5 + es_info_length
return result
def patch_pmt_section(section: bytes) -> bytes:
"""Rewrite encrypted stream-type bytes to their clear equivalents and fix the CRC."""
section = bytearray(section)
section_length = ((section[1] & 0x0F) << 8) | section[2]
program_info_length = ((section[10] & 0x0F) << 8) | section[11]
pos = 12 + program_info_length
end = 3 + section_length - 4
while pos + 5 <= end:
if section[pos] in ENCRYPTED_TO_CLEAR:
section[pos] = ENCRYPTED_TO_CLEAR[section[pos]]
es_info_length = ((section[pos + 3] & 0x0F) << 8) | section[pos + 4]
pos += 5 + es_info_length
crc = crc32_mpeg(bytes(section[:end]))
section[end : end + 4] = struct.pack(">I", crc)
return bytes(section)
def repacketize(pid: int, pes: bytes, continuity: dict[int, int]) -> bytes:
"""Split a PES packet into 188-byte TS packets for ``pid`` with stuffing on the final one."""
out = bytearray()
pos = 0
first = True
n = len(pes)
while pos < n:
remaining = n - pos
cc = continuity[pid]
continuity[pid] = (cc + 1) & 0x0F
if remaining >= 184:
header = bytes([SYNC_BYTE, (0x40 if first else 0x00) | (pid >> 8), pid & 0xFF, 0x10 | cc])
out += header + pes[pos : pos + 184]
pos += 184
else:
stuffing = 184 - remaining
header = bytes([SYNC_BYTE, (0x40 if first else 0x00) | (pid >> 8), pid & 0xFF, 0x30 | cc])
if stuffing == 1:
adaptation = bytes([0x00])
else:
adaptation = bytes([stuffing - 1, 0x00]) + b"\xFF" * (stuffing - 2)
out += header + adaptation + pes[pos:]
pos = n
first = False
return out
def split_pes_payload(pes: bytes) -> tuple[bytes, bytes]:
"""Split a PES packet into (header_prefix_including_optional_header, elementary_payload)."""
if pes[:3] != b"\x00\x00\x01":
return pes, b""
stream_id = pes[3]
# Stream ids without an optional PES header (padding, private_2, etc.) carry payload directly.
if stream_id in (0xBC, 0xBE, 0xBF, 0xF0, 0xF1, 0xFF, 0xF2, 0xF8):
return pes[:6], pes[6:]
header_data_length = pes[8]
start = 9 + header_data_length
return pes[:start], pes[start:]
def rebuild_pes(prefix: bytes, payload: bytes) -> bytes:
"""Reassemble a PES packet, refreshing PES_packet_length for the new payload size."""
prefix = bytearray(prefix)
body_length = (len(prefix) - 6) + len(payload)
prefix[4:6] = struct.pack(">H", body_length if body_length <= 0xFFFF else 0)
return bytes(prefix) + payload
def decrypt_sample_aes(data: bytes, key: bytes, iv: Optional[bytes] = None) -> bytes:
"""Decrypt a SAMPLE-AES MPEG-TS byte string and return a clean (standard) MPEG-TS.
``key`` is the 16-byte content key. ``iv`` is the 16-byte initialization vector; when not
given it defaults to all zeros (the HLS convention for an absent ``EXT-X-KEY`` IV). The
output has its PMT stream types patched to the standard values, so it muxes like any TS.
"""
if iv is None:
iv = b"\x00" * BLOCK
sync = data.find(bytes([SYNC_BYTE]))
if sync < 0:
raise ValueError("Not an MPEG-TS stream (no sync byte found)")
packets = [data[i : i + PACKET_LENGTH] for i in range(sync, len(data) - PACKET_LENGTH + 1, PACKET_LENGTH)]
# First pass: locate the PMT and the elementary stream types.
pmt_pid: Optional[int] = None
pid_types: dict[int, int] = {}
for pkt in packets:
if pkt[0] != SYNC_BYTE:
continue
pid = packet_pid(pkt)
off = payload_offset(pkt)
if off < 0:
continue
if pid == 0 and pkt[1] & 0x40: # PAT
section = pkt[off + 1 + pkt[off] :]
pmt_pid = ((section[10] & 0x1F) << 8) | section[11]
elif pmt_pid is not None and pid == pmt_pid and pkt[1] & 0x40:
section = pkt[off + 1 + pkt[off] :]
pid_types = parse_pmt(section)
break
decrypters = {
0xDB: decrypt_h264_es,
0xCF: decrypt_aac_es,
0xC1: decrypt_ac3_es,
}
encrypted_pids = {pid: t for pid, t in pid_types.items() if t in decrypters}
if not encrypted_pids:
return data # nothing SAMPLE-AES encrypted
# Second pass: rebuild the stream, decrypting elementary PIDs and patching the PMT.
out = bytearray()
continuity: dict[int, int] = {pid: 0 for pid in encrypted_pids}
pending: dict[int, bytearray] = {}
def flush(pid: int) -> None:
pes = bytes(pending.pop(pid))
prefix, payload = split_pes_payload(pes)
decrypted = decrypters[encrypted_pids[pid]](payload, key, iv)
out.extend(repacketize(pid, rebuild_pes(prefix, decrypted), continuity))
for pkt in packets:
if pkt[0] != SYNC_BYTE:
continue
pid = packet_pid(pkt)
if pid in encrypted_pids:
off = payload_offset(pkt)
if pkt[1] & 0x40: # PUSI -> new PES
if pid in pending:
flush(pid)
pending[pid] = bytearray(pkt[off:] if off >= 0 else b"")
elif pid in pending and off >= 0:
pending[pid] += pkt[off:]
continue
if pmt_pid is not None and pid == pmt_pid and pkt[1] & 0x40:
off = payload_offset(pkt)
pointer = pkt[off]
head = pkt[: off + 1 + pointer]
section = pkt[off + 1 + pointer :]
patched = patch_pmt_section(section)
packet = head + patched
packet = packet[:PACKET_LENGTH] + b"\xFF" * (PACKET_LENGTH - len(packet))
out += packet[:PACKET_LENGTH]
continue
out += pkt
for pid in list(pending):
flush(pid)
return bytes(out)

View File

@@ -30,7 +30,8 @@ from requests import Session
from unshackle.core import binaries
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
from unshackle.core.drm import DRM_T, ClearKey, MonaLisa, PlayReady, Widevine
from unshackle.core.drm import DRM_T, ClearKey, FairPlay, MonaLisa, PlayReady, Widevine
from unshackle.core.drm.fairplay import fairplay_kid_from_skd
from unshackle.core.events import events
from unshackle.core.session import RnetResponse, RnetSession
from unshackle.core.tracks import Audio, Subtitle, Tracks, Video
@@ -38,6 +39,9 @@ from unshackle.core.utilities import get_extension, is_close_match, log_event, t
from unshackle.core.utils.redact import safe_display_url
from unshackle.core.utils.subprocess import log_tool_run
# FairPlay HLS (cbcs) EXT-X-KEY keyformat; bridged to PlayReady (see drm/fairplay.py).
FAIRPLAY_KEYFORMAT = "com.apple.streamingkeydelivery"
class HLS:
SUPP_CODECS_RE = re.compile(r'SUPPLEMENTAL-CODECS="([^"]+)"', re.IGNORECASE)
@@ -902,8 +906,16 @@ class HLS:
if key is None:
encryption_data = None
elif not encryption_data or encryption_data[0] != key:
drm = HLS.get_drm(key, session)
if isinstance(drm, (Widevine, PlayReady)):
if (
key.keyformat
and key.keyformat.lower() == FAIRPLAY_KEYFORMAT
and isinstance(session_drm, FairPlay)
):
# Reuse already-licensed service FairPlay; skd KID encoding is service-specific.
drm = session_drm
else:
drm = HLS.get_drm(key, session)
if isinstance(drm, (Widevine, PlayReady)) and not getattr(drm, "content_keys", None):
try:
if map_data:
track_kid = track.get_key_id(map_data[1])
@@ -1247,6 +1259,9 @@ class HLS:
"com.microsoft.playready",
}:
return key
elif key.keyformat and key.keyformat.lower() == FAIRPLAY_KEYFORMAT:
# FairPlay (cbcs) bridged to PlayReady; key licensed via FairPlay DRM.
return key
else:
unsupported_systems.append(key.method + (f" ({key.keyformat})" if key.keyformat else ""))
else:
@@ -1287,6 +1302,13 @@ class HLS:
pssh=PR_PSSH(key.uri.split(",")[-1]),
pssh_b64=key.uri.split(",")[-1],
)
elif key.keyformat and key.keyformat.lower() == FAIRPLAY_KEYFORMAT:
# FairPlay -> PlayReady: synthesize FairPlay DRM from the skd KID. Services whose skd
# KID isn't a plain GUID/hex should supply the DRM via get_track_drm instead.
kid = fairplay_kid_from_skd(key.uri)
if not kid:
raise NotImplementedError(f"Could not derive a FairPlay content KID from key: {key}")
drm = FairPlay.from_kid(kid)
else:
raise NotImplementedError(f"The key system is not supported: {key}")

View File

@@ -438,6 +438,28 @@ class Service(metaclass=ABCMeta):
# Delegates license handling to the Widevine license method by default if a service-specific PlayReady implementation is not provided.
return self.get_widevine_license(challenge=challenge, title=title, track=track)
def get_fairplay_license(
self, *, challenge: bytes, title: Title_T, track: AnyTrack
) -> Optional[Union[bytes, str]]:
"""
Get a license for a FairPlay track bridged through a PlayReady CDM.
The `challenge` is a PlayReady License Request generated by the PlayReady CDM
from a header synthesized off the FairPlay content KID (see `FairPlay`). The
response is the PlayReady license, read back by the CDM to recover the keys.
By default this delegates to `get_playready_license`, which is correct for
services whose PlayReady endpoint also serves FairPlay content (e.g. multi-DRM
backends). Override it when the FairPlay license request needs different
shaping (endpoint, headers, body) from the PlayReady one.
:param challenge: The license challenge from the PlayReady CDM.
:param title: The current `Title` from get_titles that is being executed.
:param track: The current `Track` needing decryption.
:return: The License response as Bytes or a Base64 string, returned as is.
"""
return self.get_playready_license(challenge=challenge, title=title, track=track)
def get_clearkey_license(
self, *, challenge: bytes, title: Title_T, track: AnyTrack
) -> Optional[Union[bytes, str, dict]]:

View File

@@ -10,11 +10,14 @@ from http.cookiejar import CookieJar
from typing import Any, Optional, Union
import click
import m3u8 # used by get_track_drm() to read the FairPlay skd key from a media playlist
from langcodes import Language
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential
from unshackle.core.drm import FairPlay # FairPlay->PlayReady bridge, see get_track_drm()
from unshackle.core.drm.fairplay import fairplay_kid_from_skd
from unshackle.core.manifests import DASH # also: HLS, ISM - see get_tracks() alternates
from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service
@@ -65,6 +68,8 @@ class EXAMPLE(Service):
get_widevine_* service cert + license (per-segment PSSH via `track`)
get_playready_license PlayReady challenge POST
get_clearkey_license DASH org.w3.clearkey JWK Set POST (Laurl fallback)
get_track_drm FairPlay->PlayReady bridge: skd KID -> FairPlay DRM
get_fairplay_license license a FairPlay-bridged track (PlayReady challenge)
"""
# ALIASES: extra CLI tags that resolve to this service (e.g. `dl EX ...`).
@@ -499,6 +504,47 @@ class EXAMPLE(Service):
response.raise_for_status()
return response.json()
def get_track_drm(self, track: AnyTrack) -> Optional[list]:
# FairPlay -> PlayReady bridge (HLS only). FairPlay HLS (SAMPLE-AES / cbcs) ships a
# content KID in its `skd://` EXT-X-KEY but no PlayReady/Widevine PSSH. We derive
# that KID, synthesize a PlayReady header from it (FairPlay.from_kid) and license it
# through a PlayReady CDM. This only yields keys when the backend is multi-DRM (the
# same content key is licensable via PlayReady) - a FairPlay-only backend cannot be
# bridged. The framework calls this hook during DRM loading for tracks flagged
# needs_drm_loading; return None to fall back to the normal manifest-PSSH path.
if not is_playready_cdm(self.cdm):
return None # bridge needs a PlayReady CDM to build the challenge
playlist = m3u8.loads(self.session.get(track.url).text, track.url)
skd = next(
(k.uri for k in (playlist.keys or []) if k and k.keyformat == "com.apple.streamingkeydelivery"),
None,
)
if not skd:
return None
# Generic extractor handles GUID / ?KID= / 32-hex skd forms. If your service encodes
# the KID differently, parse the UUID yourself and call FairPlay.from_kid(kid) directly.
kid = fairplay_kid_from_skd(skd)
if not kid:
return None
return [FairPlay.from_kid(kid)]
def get_fairplay_license(
self, *, challenge: bytes, title: Title_T, track: AnyTrack
) -> Optional[Union[bytes, str]]:
# `challenge` is a PlayReady challenge built from the synthesized header. The base
# class defaults to get_playready_license, which is correct when one endpoint serves
# both FairPlay and PlayReady. Override (like this) only when the FairPlay license
# request needs a different endpoint, headers, or body from the PlayReady one.
license_url = self.config["endpoints"].get("fairplay_license") or self.config["endpoints"].get(
"playready_license"
)
if not license_url:
raise ValueError("FairPlay/PlayReady license endpoint not configured")
response = self.session.post(url=license_url, data=challenge)
response.raise_for_status()
return response.content
# For HLS AES-128 ClearKey or unencrypted content there is no license callback;
# the key comes from the manifest or a side endpoint and is attached to the
# track's DRM directly. Vaults (`self.cache` is separate) cache KID:KEY so repeat

View File

@@ -11,6 +11,7 @@ endpoints:
widevine_license: https://api.domain.com/v1/license/widevine
playready_license: https://api.domain.com/v1/license/playready
clearkey_license: https://api.domain.com/v1/license/clearkey # DASH org.w3.clearkey (omit to use manifest Laurl)
fairplay_license: https://api.domain.com/v1/license/fairplay # FairPlay->PlayReady bridge (omit to reuse playready_license)
# Base64 Widevine service certificate (enables privacy-mode license requests).
certificate: null