fix(ism): derive video colour range from CodecPrivateData SPS VUI

Smooth Streaming manifests carry no range attributes, so every ISM video track was labelled SDR even when the stream is HDR10/HLG/DV, breaking range-based selection (-r HDR10 / -r DV) for ISM services.

  - ism_init: walk the full HEVC SPS (incl. scaling-list and st_ref_pic_set skippers) to read the VUI colour triple (colour_primaries, transfer_characteristics, matrix_coeffs); expose parse_codec_private_data_colour() keyed by FourCC. No unshackle imports added.
  - ism: new ISM.get_video_range() maps the CICP triple via Video.Range.from_cicp (PQ -> HDR10, HLG -> HLG, BT.709/absent -> SDR); DVHE/DVH1 FourCCs map straight to DV since DV bitstreams signal Unspecified (2,2,2) in the VUI. to_tracks() now sets range_ on every video track. Soft-fails to SDR on malformed data.
  - ism: accept RnetSession in download_track() so TLS-impersonated sessions pass the type check.
  - tests: real PQ/HLG/BT.709 (x265-minted) and Dolby Vision (live-manifest, DoViProfile=stn, out-of-order SPS,PPS,VPS NALs) CodecPrivateData samples; byte-level VUI assertions in test_ism_init and manifest->Range characterization in new test_ism_range.
This commit is contained in:
imSp4rky
2026-06-11 18:28:35 -06:00
parent 39034f2bb5
commit e207116d30
4 changed files with 278 additions and 11 deletions

View File

@@ -13,7 +13,8 @@ import struct
import pytest import pytest
from unshackle.core.manifests.ism_init import (NAL_START_CODE, PIFF_SENC_UUID, box, build_avcc, build_dec3, from unshackle.core.manifests.ism_init import (NAL_START_CODE, PIFF_SENC_UUID, box, build_avcc, build_dec3,
build_hvcc, build_init_segment, full_box, parse_hevc_sps_format, build_hvcc, build_init_segment, full_box,
parse_codec_private_data_colour, parse_hevc_sps_format,
read_per_sample_iv_size, read_track_id, remove_emulation_prevention, read_per_sample_iv_size, read_track_id, remove_emulation_prevention,
split_nal_units, synthesize_aac_codec_private_data) split_nal_units, synthesize_aac_codec_private_data)
@@ -30,6 +31,28 @@ VIDEO_HEVC10_CPD = (
"0000000140010c01ffff02200000030090000003000003003c959809000000000142010102200000030090" "0000000140010c01ffff02200000030090000003000003003c959809000000000142010102200000030090"
"000003000003003ca00a080b9f6d96566924caf0168080000003008000000c8400000000014401c172b4624000" "000003000003003ca00a080b9f6d96566924caf0168080000003008000000c8400000000014401c172b4624000"
) )
# HEVC VPS+SPS+PPS minted with x265, explicit SPS VUI colour signalling:
# PQ (bt2020/smpte2084/bt2020nc), HLG (arib-std-b67) and BT.709 SDR.
VIDEO_HEVC_PQ_CPD = (
"0000000140010c01ffff02200000030090000003000003001e9598090000000142010102200000030090000003000003001ea020"
"8104d96566924caf016a12201208000003000800000300c840000000014401c172b42240"
)
VIDEO_HEVC_HLG_CPD = (
"0000000140010c01ffff02200000030090000003000003001e9598090000000142010102200000030090000003000003001ea020"
"8104d96566924caf016a12241208000003000800000300c840000000014401c172b42240"
)
VIDEO_HEVC_SDR_CPD = (
"0000000140010c01ffff02200000030090000003000003001e9598090000000142010102200000030090000003000003001ea020"
"8104d96566924caf016a02020208000003000800000300c840000000014401c172b42240"
)
# Real Dolby Vision (dvhe, DoViProfile "stn") CodecPrivateData from a Smooth
# manifest: NALs arrive SPS,PPS,VPS (VPS last) and the VUI colour triple is
# Unspecified (2,2,2) — DV is signalled by FourCC only, never by CICP.
VIDEO_HEVC_DV_CPD = (
"00000001420101022000000300B00000030000030096A001E020021C4D9457B91CAF016E0404042800001F480002EE0401F4E1"
"15EE7E0001312D00002FAF0C80000000014401C1ACBE0EC90000000140010C01FFFF022000000300B00000030000030096"
"15C0C00000FA40001770200FA680"
)
AAC_LC_CPD = "1190" AAC_LC_CPD = "1190"
# Real Smooth EC-3 CodecPrivateData: WAVEFORMATEXTENSIBLE extension (samples # Real Smooth EC-3 CodecPrivateData: WAVEFORMATEXTENSIBLE extension (samples
# per block + channel mask + DD+ GUID) followed by the 5-byte dec3 payload. # per block + channel mask + DD+ GUID) followed by the 5-byte dec3 payload.
@@ -400,6 +423,55 @@ def test_read_track_id_truncated_tfhd_returns_none():
assert read_track_id(fragment) is None assert read_track_id(fragment) is None
def test_parse_colour_hevc_pq():
# PQ master: bt2020 primaries (9), smpte2084 transfer (16), bt2020nc matrix (9).
assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_PQ_CPD)) == (9, 16, 9)
def test_parse_colour_hevc_hlg():
assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_HLG_CPD)) == (9, 18, 9)
def test_parse_colour_hevc_bt709():
assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_SDR_CPD)) == (1, 1, 1)
# The real-manifest 8-bit sample also signals BT.709 explicitly.
assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_CPD)) == (1, 1, 1)
def test_parse_colour_dv_is_unspecified():
# DV carries no usable CICP; the DV decision must come from the FourCC.
assert parse_codec_private_data_colour("DVHE", bytes.fromhex(VIDEO_HEVC_DV_CPD)) == (2, 2, 2)
def test_dv_cpd_with_vps_last_builds_init():
cpd = bytes.fromhex(VIDEO_HEVC_DV_CPD)
nals = split_nal_units(cpd)
assert [(n[0] >> 1) & 0x3F for n in nals] == [33, 34, 32] # SPS, PPS, VPS
sps = remove_emulation_prevention(nals[0])
assert parse_hevc_sps_format(sps) == (1, 2, 2) # 4:2:0, 10-bit
hvcc = build_hvcc(cpd)
for nal in nals:
assert nal in hvcc
init = build_init_segment(
stream_type="video",
fourcc="DVHE",
codec_private_data=VIDEO_HEVC_DV_CPD,
timescale=10000000,
width=3840,
height=2160,
)
assert b"dvh1" in init and b"hvcC" in init
def test_parse_colour_absent_or_unknown_returns_none():
# Real sample without a VUI colour description.
assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC10_CPD)) is None
# Non-HEVC codecs (AVC has no HDR deployment) and truncated data must not raise.
assert parse_codec_private_data_colour("H264", bytes.fromhex(VIDEO_AVC_CPD)) is None
assert parse_codec_private_data_colour("WVC1", bytes.fromhex(VIDEO_AVC_CPD)) is None
assert parse_codec_private_data_colour("HVC1", b"\x00\x00\x00\x01\x42") is None
def test_hvcc_profile_tier_level_is_nonzero(): def test_hvcc_profile_tier_level_is_nonzero():
# De-emulated PTL must yield real profile/level, not the off-by-one garbage. # De-emulated PTL must yield real profile/level, not the off-by-one garbage.
hvcc = build_hvcc(bytes.fromhex(VIDEO_HEVC_CPD)) hvcc = build_hvcc(bytes.fromhex(VIDEO_HEVC_CPD))

View File

@@ -0,0 +1,66 @@
"""Offline characterization: ISM CodecPrivateData SPS VUI -> Video.Range
(PQ -> HDR10, HLG -> HLG, BT.709/absent -> SDR). HDR10+ is per-frame SEI,
undecidable from the manifest; the post-mux bitstream probe names it."""
from __future__ import annotations
from unshackle.core.manifests import ISM
from unshackle.core.tracks import Video
from tests.core.test_ism_init import (VIDEO_HEVC10_CPD, VIDEO_HEVC_DV_CPD, VIDEO_HEVC_HLG_CPD, # isort: skip
VIDEO_HEVC_PQ_CPD, VIDEO_HEVC_SDR_CPD)
def manifest_xml(cpd: str, fourcc: str = "HVC1") -> str:
return (
'<SmoothStreamingMedia MajorVersion="2" MinorVersion="0" TimeScale="10000000" Duration="100000000">'
'<StreamIndex Type="video" Name="video" Chunks="1" QualityLevels="1" MaxWidth="3840" MaxHeight="2160" '
'Url="QualityLevels({bitrate})/Fragments(video={start time})">'
f'<QualityLevel Index="0" Bitrate="15000000" FourCC="{fourcc}" MaxWidth="3840" MaxHeight="2160" '
f'CodecPrivateData="{cpd}"/>'
'<c t="0" d="100000000"/>'
"</StreamIndex>"
"</SmoothStreamingMedia>"
)
def parse_video(cpd: str, fourcc: str = "HVC1") -> Video:
tracks = ISM.from_text(manifest_xml(cpd, fourcc), url="https://x/ism/manifest").to_tracks(language="en")
assert len(tracks.videos) == 1
return tracks.videos[0]
def test_pq_codec_private_data_yields_hdr10() -> None:
assert parse_video(VIDEO_HEVC_PQ_CPD).range == Video.Range.HDR10
def test_hlg_codec_private_data_yields_hlg() -> None:
assert parse_video(VIDEO_HEVC_HLG_CPD).range == Video.Range.HLG
def test_bt709_codec_private_data_stays_sdr() -> None:
assert parse_video(VIDEO_HEVC_SDR_CPD).range == Video.Range.SDR
def test_colourless_codec_private_data_defaults_sdr() -> None:
# Real 10-bit sample without a VUI colour description: unspecified -> SDR.
assert parse_video(VIDEO_HEVC10_CPD).range == Video.Range.SDR
def test_get_video_range_dolby_vision_fourcc() -> None:
assert ISM.get_video_range("DVH1", VIDEO_HEVC_PQ_CPD) == Video.Range.DV
assert ISM.get_video_range("DVHE", "") == Video.Range.DV
def test_dv_track_from_real_smooth_cpd() -> None:
# Live manifests ship lowercase "dvhe"; its VUI is Unspecified so the
# FourCC short-circuit is the only thing standing between DV and SDR.
video = parse_video(VIDEO_HEVC_DV_CPD, fourcc="dvhe")
assert video.range == Video.Range.DV
assert ISM.get_video_range("hvc1", VIDEO_HEVC_DV_CPD) == Video.Range.SDR
def test_get_video_range_malformed_data_soft_fails_sdr() -> None:
assert ISM.get_video_range("HVC1", "not-hex") == Video.Range.SDR
assert ISM.get_video_range("HVC1", "") == Video.Range.SDR
assert ISM.get_video_range("", VIDEO_HEVC_PQ_CPD) == Video.Range.SDR

View File

@@ -19,7 +19,8 @@ from requests import Session
from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack
from unshackle.core.drm import DRM_T, PlayReady, Widevine from unshackle.core.drm import DRM_T, PlayReady, Widevine
from unshackle.core.events import events from unshackle.core.events import events
from unshackle.core.manifests.ism_init import build_init_segment, read_per_sample_iv_size, read_track_id from unshackle.core.manifests.ism_init import (build_init_segment, parse_codec_private_data_colour,
read_per_sample_iv_size, read_track_id)
from unshackle.core.session import RnetSession from unshackle.core.session import RnetSession
from unshackle.core.tracks import Audio, Subtitle, Track, Tracks, Video from unshackle.core.tracks import Audio, Subtitle, Track, Tracks, Video
from unshackle.core.utilities import log_event, try_ensure_utf8 from unshackle.core.utilities import log_event, try_ensure_utf8
@@ -87,6 +88,22 @@ class ISM:
drm.append(PlayReady(pssh=pr_pssh, pssh_b64=data)) drm.append(PlayReady(pssh=pr_pssh, pssh_b64=data))
return drm return drm
@staticmethod
def get_video_range(fourcc: str, codec_private_data: str) -> Video.Range:
"""Derive colour range from the SPS VUI in CodecPrivateData — Smooth
manifests carry no range attributes. Soft-fails to SDR."""
fourcc = (fourcc or "").upper()
if fourcc in ("DVHE", "DVH1"):
return Video.Range.DV
try:
cpd = bytes.fromhex(codec_private_data or "")
except ValueError:
return Video.Range.SDR
cicp = parse_codec_private_data_colour(fourcc, cpd)
if not cicp:
return Video.Range.SDR
return Video.Range.from_cicp(*cicp)
@staticmethod @staticmethod
def _init_segment( def _init_segment(
track: AnyTrack, session_drm: Optional[DRM_T], first_segment: Optional[bytes] = None track: AnyTrack, session_drm: Optional[DRM_T], first_segment: Optional[bytes] = None
@@ -280,6 +297,7 @@ class ISM:
id_=track_id, id_=track_id,
url=self.url, url=self.url,
codec=vcodec, codec=vcodec,
range_=self.get_video_range(codec or "", ql.get("CodecPrivateData") or ""),
language=track_lang or language, language=track_lang or language,
is_original_lang=bool(language and track_lang and str(track_lang) == str(language)), is_original_lang=bool(language and track_lang and str(track_lang) == str(language)),
bitrate=ql.get("Bitrate"), bitrate=ql.get("Bitrate"),
@@ -351,7 +369,7 @@ class ISM:
save_path: Path, save_path: Path,
save_dir: Path, save_dir: Path,
progress: partial, progress: partial,
session: Optional[Session] = None, session: Optional[Union[Session, RnetSession]] = None,
proxy: Optional[str] = None, proxy: Optional[str] = None,
max_workers: Optional[int] = None, max_workers: Optional[int] = None,
license_widevine: Optional[Callable] = None, license_widevine: Optional[Callable] = None,
@@ -360,8 +378,8 @@ class ISM:
) -> None: ) -> None:
if not session: if not session:
session = Session() session = Session()
elif not isinstance(session, Session): elif not isinstance(session, (Session, RnetSession)):
raise TypeError(f"Expected session to be a {Session}, not {session!r}") raise TypeError(f"Expected session to be a {Session} or {RnetSession}, not {session!r}")
if proxy: if proxy:
session.proxies.update({"all": proxy}) session.proxies.update({"all": proxy})

View File

@@ -129,13 +129,15 @@ class BitReader:
raise ValueError("Invalid exp-Golomb code") raise ValueError("Invalid exp-Golomb code")
return (1 << zeros) - 1 + (self.read_bits(zeros) if zeros else 0) return (1 << zeros) - 1 + (self.read_bits(zeros) if zeros else 0)
def read_se(self) -> int:
# se(v): 0,1,2,3,... -> 0,1,-1,2,...
value = self.read_ue()
return (value + 1) >> 1 if value & 1 else -(value >> 1)
def parse_hevc_sps_format(sps_rbsp: bytes) -> tuple[int, int, int]:
""" def read_hevc_sps_to_bit_depth(r: BitReader) -> tuple[int, int, int, int]:
Parse (chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8) """Advance reader through bit_depth_chroma_minus8; returns
from a de-emulated HEVC SPS RBSP (including its 2-byte NAL header). (chroma_format_idc, bit_depth_luma, bit_depth_chroma, max_sub_layers_minus1)."""
"""
r = BitReader(sps_rbsp)
r.read_bits(16) # NAL unit header r.read_bits(16) # NAL unit header
r.read_bits(4) # sps_video_parameter_set_id r.read_bits(4) # sps_video_parameter_set_id
max_sub_layers_minus1 = r.read_bits(3) max_sub_layers_minus1 = r.read_bits(3)
@@ -164,9 +166,118 @@ def parse_hevc_sps_format(sps_rbsp: bytes) -> tuple[int, int, int]:
r.read_ue() r.read_ue()
bit_depth_luma_minus8 = r.read_ue() bit_depth_luma_minus8 = r.read_ue()
bit_depth_chroma_minus8 = r.read_ue() bit_depth_chroma_minus8 = r.read_ue()
return chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8, max_sub_layers_minus1
def parse_hevc_sps_format(sps_rbsp: bytes) -> tuple[int, int, int]:
"""
Parse (chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8)
from a de-emulated HEVC SPS RBSP (including its 2-byte NAL header).
"""
r = BitReader(sps_rbsp)
chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8, _ = read_hevc_sps_to_bit_depth(r)
return chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8 return chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8
def skip_hevc_scaling_list_data(r: BitReader) -> None:
"""Skip an HEVC scaling_list_data() syntax structure (H.265 7.3.4)."""
for size_id in range(4):
matrix_id = 0
while matrix_id < 6:
if not r.read_bits(1): # scaling_list_pred_mode_flag
r.read_ue() # scaling_list_pred_matrix_id_delta
else:
coef_num = min(64, 1 << (4 + (size_id << 1)))
if size_id > 1:
r.read_se() # scaling_list_dc_coef_minus8
for _ in range(coef_num):
r.read_se() # scaling_list_delta_coef
matrix_id += 3 if size_id == 3 else 1
def skip_hevc_st_ref_pic_set(r: BitReader, idx: int, num_delta_pocs: list[int]) -> int:
"""Skip one st_ref_pic_set() (H.265 7.3.7); returns its NumDeltaPocs."""
if idx and r.read_bits(1): # inter_ref_pic_set_prediction_flag
r.read_bits(1) # delta_rps_sign
r.read_ue() # abs_delta_rps_minus1
count = 0
for _ in range(num_delta_pocs[idx - 1] + 1):
used_by_curr_pic = r.read_bits(1)
use_delta = 1 if used_by_curr_pic else r.read_bits(1)
if used_by_curr_pic or use_delta:
count += 1
return count
num_negative = r.read_ue()
num_positive = r.read_ue()
for _ in range(num_negative + num_positive):
r.read_ue() # delta_poc_sX_minus1
r.read_bits(1) # used_by_curr_pic_sX_flag
return num_negative + num_positive
def parse_hevc_sps_colour(sps_rbsp: bytes) -> Optional[tuple[int, int, int]]:
"""VUI (colour_primaries, transfer_characteristics, matrix_coeffs) from a
de-emulated HEVC SPS, or None when no colour description is present."""
r = BitReader(sps_rbsp)
_, _, _, max_sub_layers_minus1 = read_hevc_sps_to_bit_depth(r)
log2_max_poc_lsb_minus4 = r.read_ue()
sub_layer_ordering_info = r.read_bits(1)
for _ in range(max_sub_layers_minus1 + 1 if sub_layer_ordering_info else 1):
r.read_ue() # sps_max_dec_pic_buffering_minus1
r.read_ue() # sps_max_num_reorder_pics
r.read_ue() # sps_max_latency_increase_plus1
for _ in range(6): # luma coding/transform block sizes + transform hierarchy depths
r.read_ue()
if r.read_bits(1) and r.read_bits(1): # scaling_list_enabled + sps_scaling_list_data_present
skip_hevc_scaling_list_data(r)
r.read_bits(2) # amp_enabled_flag + sample_adaptive_offset_enabled_flag
if r.read_bits(1): # pcm_enabled_flag
r.read_bits(8) # pcm sample bit depths (4 + 4)
r.read_ue() # log2_min_pcm_luma_coding_block_size_minus3
r.read_ue() # log2_diff_max_min_pcm_luma_coding_block_size
r.read_bits(1) # pcm_loop_filter_disabled_flag
num_delta_pocs: list[int] = []
for idx in range(r.read_ue()): # num_short_term_ref_pic_sets
num_delta_pocs.append(skip_hevc_st_ref_pic_set(r, idx, num_delta_pocs))
if r.read_bits(1): # long_term_ref_pics_present_flag
for _ in range(r.read_ue()): # num_long_term_ref_pics_sps
r.read_bits(log2_max_poc_lsb_minus4 + 4) # lt_ref_pic_poc_lsb_sps
r.read_bits(1) # used_by_curr_pic_lt_sps_flag
r.read_bits(2) # sps_temporal_mvp_enabled + strong_intra_smoothing_enabled
if not r.read_bits(1): # vui_parameters_present_flag
return None
if r.read_bits(1): # aspect_ratio_info_present_flag
if r.read_bits(8) == 255: # aspect_ratio_idc == EXTENDED_SAR
r.read_bits(32) # sar_width + sar_height
if r.read_bits(1): # overscan_info_present_flag
r.read_bits(1) # overscan_appropriate_flag
if not r.read_bits(1): # video_signal_type_present_flag
return None
r.read_bits(3) # video_format
r.read_bits(1) # video_full_range_flag
if not r.read_bits(1): # colour_description_present_flag
return None
return r.read_bits(8), r.read_bits(8), r.read_bits(8)
HEVC_FOURCCS = frozenset(("HVC1", "HEV1", "HEVC", "H265", "DVHE", "DVH1"))
def parse_codec_private_data_colour(fourcc: str, codec_private_data: bytes) -> Optional[tuple[int, int, int]]:
"""SPS VUI colour triple from HEVC CodecPrivateData; None when the codec
is unsupported, no colour description, or malformed data."""
if (fourcc or "").upper() not in HEVC_FOURCCS:
return None
try:
nals = split_nal_units(codec_private_data)
sps = next((n for n in nals if (n[0] >> 1) & 0x3F == 33), None)
if sps is None:
return None
return parse_hevc_sps_colour(remove_emulation_prevention(sps))
except (IndexError, ValueError):
return None
def iter_boxes(data: bytes, start: int, end: int) -> Iterator[tuple[bytes, Optional[bytes], int, int]]: def iter_boxes(data: bytes, start: int, end: int) -> Iterator[tuple[bytes, Optional[bytes], int, int]]:
"""Yield (type, uuid_usertype, payload_start, box_end) for each child box.""" """Yield (type, uuid_usertype, payload_start, box_end) for each child box."""
offset = start offset = start