feat(drm): add FairPlay->PlayReady bridge for cbcs HLS

FairPlay HLS (SAMPLE-AES/cbcs) ships a content KID in its skd:// key but no PlayReady/Widevine PSSH. Synthesize a PlayReady header from that KID so a PlayReady CDM can license and decrypt it whenever the backend is multi-DRM.

- FairPlay DRM (PlayReady subclass) + FairPlay.from_kid; fairplay_kid_from_skd extracts the KID from GUID / ?KID= / 32-hex skd forms (core/drm/fairplay.py)
- build_pr_header_from_kid synthesizes a WRMHEADER/PRO (cbcs->AESCBC v4.3, cenc->AESCTR v4.0); PlayReady.from_track falls back to a tenc KID when no PSSH
- Service.get_fairplay_license hook (defaults to get_playready_license)
- dl.py routes FairPlay tracks to get_fairplay_license through the PlayReady CDM
- EXAMPLE service + config.yaml document the bridge end to end
- tests for the synthesized header and skd KID extraction
This commit is contained in:
imSp4rky
2026-06-13 00:47:32 -06:00
parent a9c677c349
commit 4422c975c3
10 changed files with 331 additions and 114 deletions

View File

@@ -1,98 +0,0 @@
#!/usr/bin/env python3
"""
Benchmark subtitle conversion backends to (re-)tune the preference ranks in
``unshackle/core/tracks/subtitle_convert.py``.
Runs every backend that can read each input file, converting to a target format (default
SRT), and reports cue count, leaked ASS override tags, and output size — so you can compare
fidelity per (source, target) pair on real files. Read-only: copies inputs to a temp dir.
Usage:
uv run python scripts/bench_subtitle_backends.py <file-or-dir> [<file-or-dir> ...] [--target SRT]
Example:
uv run python scripts/bench_subtitle_backends.py downloads/
"""
from __future__ import annotations
import argparse
import re
import shutil
import tempfile
from pathlib import Path
from unshackle.core.tracks import subtitle_convert as sc
from unshackle.core.tracks.subtitle import Subtitle
Codec = Subtitle.Codec
EXT_TO_CODEC = {
".srt": Codec.SubRip,
".vtt": Codec.WebVTT,
".ass": Codec.SubStationAlphav4,
".ssa": Codec.SubStationAlpha,
".ttml": Codec.TimedTextMarkupLang,
".smi": Codec.SAMI,
".sami": Codec.SAMI,
}
def gather(paths: list[str]) -> list[Path]:
files: list[Path] = []
for p in paths:
path = Path(p)
if path.is_dir():
files.extend(f for f in path.rglob("*") if f.suffix.lower() in EXT_TO_CODEC)
elif path.suffix.lower() in EXT_TO_CODEC:
files.append(path)
return sorted(files)
def metrics(text: str) -> tuple[int, int, int]:
cues = len(re.findall(r"-->", text))
ass_residue = len(re.findall(r"\{\\", text))
return cues, ass_residue, len(text)
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument("paths", nargs="+", help="subtitle files or directories")
ap.add_argument("--target", default="SRT", help="target codec value (SRT, VTT, ASS, ...)")
args = ap.parse_args()
target = Codec(args.target.upper())
files = gather(args.paths)
if not files:
print("No subtitle files found.")
return
tmp = Path(tempfile.mkdtemp(prefix="subbench_"))
print(f"{'file':40} {'source':10} {'backend':12} {'ok':3} {'cues':>5} {'resid':>5} {'bytes':>7}")
for f in files:
source = EXT_TO_CODEC[f.suffix.lower()]
if source == target:
continue
for backend in sc.REGISTRY:
if not (backend.is_available() and backend.can_convert(source, target)):
continue
work = tmp / f"{f.stem}.{backend.name}{f.suffix}"
shutil.copy2(f, work)
sub = Subtitle(url="x", language="en", codec=source)
sub.path = work
try:
# Call the backend directly so each row reflects only that backend (no fallback).
out = work.with_suffix(f".{target.value.lower()}")
backend.convert(sub, target, out)
cues, resid, size = metrics(out.read_text("utf8", errors="replace"))
print(
f"{f.name[:40]:40} {source.name[:10]:10} {backend.name:12} {'Y':3} {cues:>5} {resid:>5} {size:>7}"
)
except Exception as e: # noqa: BLE001 - benchmark reports failures, does not raise
print(
f"{f.name[:40]:40} {source.name[:10]:10} {backend.name:12} {'N':3} {'-':>5} {'-':>5} {'-':>7} {type(e).__name__}"
)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,81 @@
"""Tests for the FairPlay -> PlayReady bridge DRM type and skd KID extraction.
FairPlay HLS (SAMPLE-AES / cbcs) carries a content KID but no PlayReady PSSH. The
``FairPlay`` DRM synthesizes a PlayReady header from that KID so a PlayReady CDM can
license it; it is a PlayReady subclass so the pipeline reuses CDM/decrypt logic and
only differs in routing the request to ``get_fairplay_license``.
"""
from __future__ import annotations
from typing import Optional
from uuid import UUID
import pytest
from unshackle.core.drm import FairPlay, PlayReady
from unshackle.core.drm.fairplay import fairplay_kid_from_skd
def test_fairplay_is_playready_subclass() -> None:
# Pipeline relies on isinstance(drm, PlayReady) (get_drm_for_cdm, prepare_drm branch).
fp = FairPlay.from_kid(UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f"))
assert isinstance(fp, PlayReady)
assert isinstance(fp, FairPlay)
def test_from_kid_round_trips_kid_and_sets_pssh() -> None:
kid = UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f")
fp = FairPlay.from_kid(kid)
assert fp.pssh_b64
assert [k.hex for k in fp.kids] == [kid.hex]
@pytest.mark.parametrize(
("skd", "expected"),
[
# hyphenated GUID (with optional :IV)
("skd://72ea9ec1-abb4-40d8-b3a9-e0bd1833bd58:0123456789ABCDEF", "72ea9ec1abb440d8b3a9e0bd1833bd58"),
# GUID as a query param
("skd://lic.example/fairplay?KID=4376a4b3-d8ef-4f21-9a6b-faa81a2e59e3", "4376a4b3d8ef4f219a6bfaa81a2e59e3"),
# bare 32-hex path segment
("skd://0000000069176cc2af6f93cce3515ced/clip/x", "0000000069176cc2af6f93cce3515ced"),
],
)
def test_kid_from_skd_common_forms(skd: str, expected: str) -> None:
kid = fairplay_kid_from_skd(skd)
assert kid is not None
assert kid.hex == expected
@pytest.mark.parametrize(
"skd",
[
"skd://example.com/p123456/c1", # service-specific encoding, not a GUID/32-hex
"skd://opaque-asset-token-abc",
"",
],
)
def test_kid_from_skd_returns_none_for_non_kid_forms(skd: str) -> None:
# Services with non-standard skd encodings derive the KID themselves; the generic
# helper must not mis-parse them.
assert fairplay_kid_from_skd(skd) is None
def test_base_get_fairplay_license_delegates_to_playready() -> None:
captured: dict = {}
class FakeService:
# Bind the unbound base methods to a minimal stand-in.
from unshackle.core.service import Service
get_fairplay_license = Service.get_fairplay_license
def get_playready_license(self, *, challenge: bytes, title, track) -> Optional[bytes]:
captured["called"] = (challenge, title, track)
return b"LICENSE"
svc = FakeService()
out = svc.get_fairplay_license(challenge=b"chal", title="T", track="K")
assert out == b"LICENSE"
assert captured["called"] == (b"chal", "T", "K")

View File

@@ -0,0 +1,58 @@
"""Tests for the FairPlay->PlayReady KID bridge.
FairPlay/cbcs HLS ships a tenc DEFAULT_KID but no PlayReady/Widevine PSSH.
``build_pr_header_from_kid`` synthesizes a PlayReady Object (PRO) from that bare
KID so a PlayReady license can be requested keyed on it. These pins ensure the
synthesized header parses back through pyplayready and yields the same KID.
"""
from __future__ import annotations
from uuid import UUID
import pytest
from pyplayready.system.pssh import PSSH
from unshackle.core.drm.playready import PlayReady, build_pr_header_from_kid
@pytest.mark.parametrize(
"kid",
[
UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f"),
UUID("00000000-0000-0000-0000-000000000001"),
UUID("ffffffff-ffff-ffff-ffff-ffffffffffff"),
],
)
def test_synth_header_parses_and_round_trips_kid(kid: UUID) -> None:
pssh_b64 = build_pr_header_from_kid(kid)
# pyplayready parses the synthesized PRO as a single WRMHEADER record.
parsed = PSSH(pssh_b64)
assert len(parsed.wrm_headers) == 1
# PlayReady DRM object recovers the original KID from the synthesized header.
drm = PlayReady(pssh=parsed, pssh_b64=pssh_b64)
assert [k.hex for k in drm.kids] == [kid.hex]
def test_synth_header_is_v4_0_aesctr() -> None:
import base64
pssh_b64 = build_pr_header_from_kid(UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f"))
xml = base64.b64decode(pssh_b64).decode("utf-16-le", errors="ignore")
assert "PlayReadyHeader" in xml
assert 'version="4.0.0.0"' in xml
assert "<ALGID>AESCTR</ALGID>" in xml
def test_cbcs_header_v4_3_round_trips_kid() -> None:
kid = UUID("279926a3-d9b9-4b6f-8b2c-1a2b3c4d5e6f")
pssh_b64 = build_pr_header_from_kid(kid, scheme="cbcs", version="4.3")
drm = PlayReady(pssh=PSSH(pssh_b64), pssh_b64=pssh_b64)
assert [k.hex for k in drm.kids] == [kid.hex]
def test_cbcs_requires_v4_3() -> None:
with pytest.raises(ValueError, match="4.3"):
build_pr_header_from_kid(UUID(int=1), scheme="cbcs", version="4.0")

View File

@@ -46,7 +46,7 @@ from unshackle.core.config import config
from unshackle.core.console import console from unshackle.core.console import console
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack, context_settings from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack, context_settings
from unshackle.core.credential import Credential from unshackle.core.credential import Credential
from unshackle.core.drm import DRM_T, ClearKeyCENC, MonaLisa, PlayReady, Widevine from unshackle.core.drm import DRM_T, ClearKeyCENC, FairPlay, MonaLisa, PlayReady, Widevine
from unshackle.core.events import events from unshackle.core.events import events
from unshackle.core.music import ( from unshackle.core.music import (
MusicAudioIntegrityError, MusicAudioIntegrityError,
@@ -2829,6 +2829,11 @@ class dl:
title=title, title=title,
track=track, track=track,
), ),
fairplay_licence=partial(
service.get_fairplay_license,
title=title,
track=track,
),
clearkey_licence=partial( clearkey_licence=partial(
service.get_clearkey_license, service.get_clearkey_license,
title=title, title=title,
@@ -3530,6 +3535,7 @@ class dl:
title: Title_T, title: Title_T,
certificate: Callable, certificate: Callable,
licence: Callable, licence: Callable,
fairplay_licence: Optional[Callable] = None,
clearkey_licence: Optional[Callable] = None, clearkey_licence: Optional[Callable] = None,
track_kid: Optional[UUID] = None, track_kid: Optional[UUID] = None,
table: Table = None, table: Table = None,
@@ -3927,8 +3933,12 @@ class dl:
if need_license and not vaults_only: if need_license and not vaults_only:
from_vaults = drm.content_keys.copy() from_vaults = drm.content_keys.copy()
# FairPlay tracks are licensed through the PlayReady CDM but route their
# license request to the service's get_fairplay_license.
pr_licence = fairplay_licence if (fairplay_licence and isinstance(drm, FairPlay)) else licence
try: try:
drm.get_content_keys(cdm=self.cdm, licence=licence, certificate=certificate) drm.get_content_keys(cdm=self.cdm, licence=pr_licence, certificate=certificate)
except Exception as e: except Exception as e:
if drm.content_keys: if drm.content_keys:
self.log.debug(f"License call failed but keys already in content_keys: {e}") self.log.debug(f"License call failed but keys already in content_keys: {e}")

View File

@@ -4,11 +4,13 @@ from uuid import UUID
from unshackle.core.drm.clearkey import ClearKey from unshackle.core.drm.clearkey import ClearKey
from unshackle.core.drm.clearkey_cenc import ClearKeyCENC from unshackle.core.drm.clearkey_cenc import ClearKeyCENC
from unshackle.core.drm.fairplay import FairPlay
from unshackle.core.drm.monalisa import MonaLisa from unshackle.core.drm.monalisa import MonaLisa
from unshackle.core.drm.playready import PlayReady from unshackle.core.drm.playready import PlayReady
from unshackle.core.drm.widevine import Widevine from unshackle.core.drm.widevine import Widevine
DRM_T = Union[ClearKey, ClearKeyCENC, Widevine, PlayReady, MonaLisa] # FairPlay is a PlayReady subclass; listed for explicit isinstance/type use.
DRM_T = Union[ClearKey, ClearKeyCENC, Widevine, PlayReady, FairPlay, MonaLisa]
def drm_from_dict(data: dict[str, Any]) -> Union[Widevine, PlayReady, ClearKeyCENC]: def drm_from_dict(data: dict[str, Any]) -> Union[Widevine, PlayReady, ClearKeyCENC]:
@@ -44,4 +46,4 @@ def drm_from_dict(data: dict[str, Any]) -> Union[Widevine, PlayReady, ClearKeyCE
return drm return drm
__all__ = ("ClearKey", "ClearKeyCENC", "Widevine", "PlayReady", "MonaLisa", "DRM_T", "drm_from_dict") __all__ = ("ClearKey", "ClearKeyCENC", "Widevine", "PlayReady", "FairPlay", "MonaLisa", "DRM_T", "drm_from_dict")

View File

@@ -0,0 +1,46 @@
from __future__ import annotations
import re
from typing import Optional
from uuid import UUID
from pyplayready.system.pssh import PSSH
from unshackle.core.drm.playready import PlayReady, build_pr_header_from_kid
def fairplay_kid_from_skd(skd_uri: str) -> Optional[UUID]:
"""Extract the content KID from a FairPlay ``skd://`` key URI.
Handles the common multi-DRM packager forms where the KID appears in the URI as
a hyphenated GUID (optionally followed by ``:<iv>`` or as a ``?KID=<guid>`` query)
or as a bare 32-hex string in a path segment. Services whose skd URI encodes the
KID differently (e.g. an asset number plus content id) should derive it themselves
and use ``FairPlay.from_kid``.
"""
# Prefer a hyphenated GUID; fall back to a bare 32-hex run.
guid = re.search(r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}", skd_uri)
if guid:
return UUID(hex=guid.group(0))
match = re.search(r"(?<![0-9a-fA-F])([0-9a-fA-F]{32})(?![0-9a-fA-F])", skd_uri)
if not match:
return None
return UUID(hex=match.group(1))
class FairPlay(PlayReady):
"""FairPlay track licensed through a PlayReady CDM (FairPlay -> PlayReady bridge).
FairPlay HLS (SAMPLE-AES / cbcs) carries a content KID but no PlayReady PSSH.
We synthesize a PlayReady header from that KID, so a PlayReady CDM can issue a
challenge and the resulting license decrypts the cbcs stream. This subclasses
PlayReady so the CDM challenge and decrypt logic are reused unchanged; the only
difference is that the download pipeline routes its license request to the
service's ``get_fairplay_license`` instead of ``get_playready_license``.
"""
@classmethod
def from_kid(cls, kid: UUID, scheme: str = "cbcs", version: str = "4.3") -> FairPlay:
"""Build a FairPlay DRM from a bare content KID via a synthesized PlayReady header."""
pssh_b64 = build_pr_header_from_kid(kid, scheme=scheme, version=version)
return cls(pssh=PSSH(pssh_b64), kid=kid, pssh_b64=pssh_b64)

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import base64 import base64
import shutil import shutil
import struct
import subprocess import subprocess
import textwrap import textwrap
import time import time
@@ -24,6 +25,42 @@ from unshackle.core.constants import AnyTrack
from unshackle.core.utilities import get_boxes, log_event from unshackle.core.utilities import get_boxes, log_event
from unshackle.core.utils.subprocess import ffprobe from unshackle.core.utils.subprocess import ffprobe
# CENC/CBC scheme -> PlayReady ALGID (AESCTR for ctr schemes, AESCBC for cbc schemes).
PR_SCHEME_ALGID = {"cenc": "AESCTR", "cens": "AESCTR", "cbc1": "AESCBC", "cbcs": "AESCBC"}
def build_pr_header_from_kid(kid: UUID, scheme: str = "cenc", version: str = "4.0") -> str:
"""Synthesize a base64 PlayReady Object from a bare KID.
For FairPlay/cbcs HLS streams that carry a content KID but no PlayReady or
Widevine PSSH, this lets us request a PlayReady license keyed on that KID.
``scheme`` selects the ALGID (cbcs -> AESCBC). ``version`` selects the WRMHEADER
layout: 4.0 uses a bare ``<KID>``, 4.3 uses the ``<KIDS><KID ALGID VALUE>`` form
used for cbcs content.
"""
algid = PR_SCHEME_ALGID.get(scheme.lower())
if not algid:
raise ValueError(f"Unsupported encryption scheme for PlayReady: {scheme}")
if algid == "AESCBC" and version < "4.3":
raise ValueError("AESCBC (cbcs) requires PlayReady WRMHEADER 4.3 or higher")
# PlayReady stores the KID as a little-endian GUID; UUID.bytes_le applies that byte order.
xml_kid = base64.b64encode(kid.bytes_le).decode()
if version == "4.0":
protect_info = f"<KEYLEN>16</KEYLEN><ALGID>{algid}</ALGID></PROTECTINFO><KID>{xml_kid}</KID>"
else:
protect_info = f'<KIDS><KID ALGID="{algid}" VALUE="{xml_kid}"></KID></KIDS></PROTECTINFO>'
header_xml = (
f'<WRMHEADER xmlns="http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader" version="{version}.0.0">'
f"<DATA><PROTECTINFO>{protect_info}</DATA></WRMHEADER>"
)
header_utf16 = header_xml.encode("utf-16-le")
# PlayReady Object: one WRMHEADER record (type 1) behind the PRO length+count prefix.
rm_record = struct.pack("<HH", 1, len(header_utf16)) + header_utf16
pro = struct.pack("<IH", len(rm_record) + 6, 1) + rm_record
return base64.b64encode(pro).decode()
class PlayReady: class PlayReady:
"""PlayReady DRM System.""" """PlayReady DRM System."""
@@ -197,16 +234,22 @@ class PlayReady:
pssh_boxes.extend(list(get_boxes(init_data, b"pssh"))) pssh_boxes.extend(list(get_boxes(init_data, b"pssh")))
tenc_boxes.extend(list(get_boxes(init_data, b"tenc"))) tenc_boxes.extend(list(get_boxes(init_data, b"tenc")))
pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
if not pssh:
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
tenc = next(iter(tenc_boxes), None) tenc = next(iter(tenc_boxes), None)
if not kid and tenc and tenc.key_ID.int != 0: if not kid and tenc and tenc.key_ID.int != 0:
kid = tenc.key_ID kid = tenc.key_ID
pssh_bytes = Box.build(pssh) pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode()) if pssh:
pssh_bytes = Box.build(pssh)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode())
# FairPlay/cbcs: no PSSH of any DRM system, but a tenc KID is present — synthesize a
# PlayReady header from it. Gated on zero pssh boxes so Widevine-bearing tracks fall through.
if not pssh_boxes and kid:
pssh_b64 = build_pr_header_from_kid(kid)
return cls(pssh=PSSH(pssh_b64), kid=kid, pssh_b64=pssh_b64)
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
@classmethod @classmethod
def from_init_data(cls, init_data: bytes) -> PlayReady: def from_init_data(cls, init_data: bytes) -> PlayReady:
@@ -226,16 +269,22 @@ class PlayReady:
if enc_key_id: if enc_key_id:
kid = UUID(bytes=base64.b64decode(enc_key_id)) kid = UUID(bytes=base64.b64decode(enc_key_id))
pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
if not pssh:
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
tenc = next(iter(tenc_boxes), None) tenc = next(iter(tenc_boxes), None)
if not kid and tenc and tenc.key_ID.int != 0: if not kid and tenc and tenc.key_ID.int != 0:
kid = tenc.key_ID kid = tenc.key_ID
pssh_bytes = Box.build(pssh) pssh = next((b for b in pssh_boxes if b.system_ID == PSSH.SYSTEM_ID), None)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode()) if pssh:
pssh_bytes = Box.build(pssh)
return cls(pssh=PSSH(pssh_bytes), kid=kid, pssh_b64=base64.b64encode(pssh_bytes).decode())
# FairPlay/cbcs: no PSSH of any DRM system, but a tenc KID is present — synthesize a
# PlayReady header from it. Gated on zero pssh boxes so Widevine-bearing tracks fall through.
if not pssh_boxes and kid:
pssh_b64 = build_pr_header_from_kid(kid)
return cls(pssh=PSSH(pssh_b64), kid=kid, pssh_b64=pssh_b64)
raise PlayReady.Exceptions.PSSHNotFound("PSSH was not found in track data.")
@property @property
def pssh(self) -> PSSH: def pssh(self) -> PSSH:

View File

@@ -438,6 +438,28 @@ class Service(metaclass=ABCMeta):
# Delegates license handling to the Widevine license method by default if a service-specific PlayReady implementation is not provided. # Delegates license handling to the Widevine license method by default if a service-specific PlayReady implementation is not provided.
return self.get_widevine_license(challenge=challenge, title=title, track=track) return self.get_widevine_license(challenge=challenge, title=title, track=track)
def get_fairplay_license(
self, *, challenge: bytes, title: Title_T, track: AnyTrack
) -> Optional[Union[bytes, str]]:
"""
Get a license for a FairPlay track bridged through a PlayReady CDM.
The `challenge` is a PlayReady License Request generated by the PlayReady CDM
from a header synthesized off the FairPlay content KID (see `FairPlay`). The
response is the PlayReady license, read back by the CDM to recover the keys.
By default this delegates to `get_playready_license`, which is correct for
services whose PlayReady endpoint also serves FairPlay content (e.g. multi-DRM
backends). Override it when the FairPlay license request needs different
shaping (endpoint, headers, body) from the PlayReady one.
:param challenge: The license challenge from the PlayReady CDM.
:param title: The current `Title` from get_titles that is being executed.
:param track: The current `Track` needing decryption.
:return: The License response as Bytes or a Base64 string, returned as is.
"""
return self.get_playready_license(challenge=challenge, title=title, track=track)
def get_clearkey_license( def get_clearkey_license(
self, *, challenge: bytes, title: Title_T, track: AnyTrack self, *, challenge: bytes, title: Title_T, track: AnyTrack
) -> Optional[Union[bytes, str, dict]]: ) -> Optional[Union[bytes, str, dict]]:

View File

@@ -10,11 +10,14 @@ from http.cookiejar import CookieJar
from typing import Any, Optional, Union from typing import Any, Optional, Union
import click import click
import m3u8 # used by get_track_drm() to read the FairPlay skd key from a media playlist
from langcodes import Language from langcodes import Language
from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm from unshackle.core.cdm.detect import is_playready_cdm, is_widevine_cdm
from unshackle.core.constants import AnyTrack from unshackle.core.constants import AnyTrack
from unshackle.core.credential import Credential from unshackle.core.credential import Credential
from unshackle.core.drm import FairPlay # FairPlay->PlayReady bridge, see get_track_drm()
from unshackle.core.drm.fairplay import fairplay_kid_from_skd
from unshackle.core.manifests import DASH # also: HLS, ISM - see get_tracks() alternates from unshackle.core.manifests import DASH # also: HLS, ISM - see get_tracks() alternates
from unshackle.core.search_result import SearchResult from unshackle.core.search_result import SearchResult
from unshackle.core.service import Service from unshackle.core.service import Service
@@ -65,6 +68,8 @@ class EXAMPLE(Service):
get_widevine_* service cert + license (per-segment PSSH via `track`) get_widevine_* service cert + license (per-segment PSSH via `track`)
get_playready_license PlayReady challenge POST get_playready_license PlayReady challenge POST
get_clearkey_license DASH org.w3.clearkey JWK Set POST (Laurl fallback) get_clearkey_license DASH org.w3.clearkey JWK Set POST (Laurl fallback)
get_track_drm FairPlay->PlayReady bridge: skd KID -> FairPlay DRM
get_fairplay_license license a FairPlay-bridged track (PlayReady challenge)
""" """
# ALIASES: extra CLI tags that resolve to this service (e.g. `dl EX ...`). # ALIASES: extra CLI tags that resolve to this service (e.g. `dl EX ...`).
@@ -499,6 +504,47 @@ class EXAMPLE(Service):
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
def get_track_drm(self, track: AnyTrack) -> Optional[list]:
# FairPlay -> PlayReady bridge (HLS only). FairPlay HLS (SAMPLE-AES / cbcs) ships a
# content KID in its `skd://` EXT-X-KEY but no PlayReady/Widevine PSSH. We derive
# that KID, synthesize a PlayReady header from it (FairPlay.from_kid) and license it
# through a PlayReady CDM. This only yields keys when the backend is multi-DRM (the
# same content key is licensable via PlayReady) - a FairPlay-only backend cannot be
# bridged. The framework calls this hook during DRM loading for tracks flagged
# needs_drm_loading; return None to fall back to the normal manifest-PSSH path.
if not is_playready_cdm(self.cdm):
return None # bridge needs a PlayReady CDM to build the challenge
playlist = m3u8.loads(self.session.get(track.url).text, track.url)
skd = next(
(k.uri for k in (playlist.keys or []) if k and k.keyformat == "com.apple.streamingkeydelivery"),
None,
)
if not skd:
return None
# Generic extractor handles GUID / ?KID= / 32-hex skd forms. If your service encodes
# the KID differently, parse the UUID yourself and call FairPlay.from_kid(kid) directly.
kid = fairplay_kid_from_skd(skd)
if not kid:
return None
return [FairPlay.from_kid(kid)]
def get_fairplay_license(
self, *, challenge: bytes, title: Title_T, track: AnyTrack
) -> Optional[Union[bytes, str]]:
# `challenge` is a PlayReady challenge built from the synthesized header. The base
# class defaults to get_playready_license, which is correct when one endpoint serves
# both FairPlay and PlayReady. Override (like this) only when the FairPlay license
# request needs a different endpoint, headers, or body from the PlayReady one.
license_url = self.config["endpoints"].get("fairplay_license") or self.config["endpoints"].get(
"playready_license"
)
if not license_url:
raise ValueError("FairPlay/PlayReady license endpoint not configured")
response = self.session.post(url=license_url, data=challenge)
response.raise_for_status()
return response.content
# For HLS AES-128 ClearKey or unencrypted content there is no license callback; # For HLS AES-128 ClearKey or unencrypted content there is no license callback;
# the key comes from the manifest or a side endpoint and is attached to the # the key comes from the manifest or a side endpoint and is attached to the
# track's DRM directly. Vaults (`self.cache` is separate) cache KID:KEY so repeat # track's DRM directly. Vaults (`self.cache` is separate) cache KID:KEY so repeat

View File

@@ -11,6 +11,7 @@ endpoints:
widevine_license: https://api.domain.com/v1/license/widevine widevine_license: https://api.domain.com/v1/license/widevine
playready_license: https://api.domain.com/v1/license/playready playready_license: https://api.domain.com/v1/license/playready
clearkey_license: https://api.domain.com/v1/license/clearkey # DASH org.w3.clearkey (omit to use manifest Laurl) clearkey_license: https://api.domain.com/v1/license/clearkey # DASH org.w3.clearkey (omit to use manifest Laurl)
fairplay_license: https://api.domain.com/v1/license/fairplay # FairPlay->PlayReady bridge (omit to reuse playready_license)
# Base64 Widevine service certificate (enables privacy-mode license requests). # Base64 Widevine service certificate (enables privacy-mode license requests).
certificate: null certificate: null