diff --git a/tests/core/test_ism_init.py b/tests/core/test_ism_init.py index c2eb7d1..94e9855 100644 --- a/tests/core/test_ism_init.py +++ b/tests/core/test_ism_init.py @@ -13,7 +13,8 @@ import struct import pytest from unshackle.core.manifests.ism_init import (NAL_START_CODE, PIFF_SENC_UUID, box, build_avcc, build_dec3, - build_hvcc, build_init_segment, full_box, parse_hevc_sps_format, + build_hvcc, build_init_segment, full_box, + parse_codec_private_data_colour, parse_hevc_sps_format, read_per_sample_iv_size, read_track_id, remove_emulation_prevention, split_nal_units, synthesize_aac_codec_private_data) @@ -30,6 +31,28 @@ VIDEO_HEVC10_CPD = ( "0000000140010c01ffff02200000030090000003000003003c959809000000000142010102200000030090" "000003000003003ca00a080b9f6d96566924caf0168080000003008000000c8400000000014401c172b4624000" ) +# HEVC VPS+SPS+PPS minted with x265, explicit SPS VUI colour signalling: +# PQ (bt2020/smpte2084/bt2020nc), HLG (arib-std-b67) and BT.709 SDR. +VIDEO_HEVC_PQ_CPD = ( + "0000000140010c01ffff02200000030090000003000003001e9598090000000142010102200000030090000003000003001ea020" + "8104d96566924caf016a12201208000003000800000300c840000000014401c172b42240" +) +VIDEO_HEVC_HLG_CPD = ( + "0000000140010c01ffff02200000030090000003000003001e9598090000000142010102200000030090000003000003001ea020" + "8104d96566924caf016a12241208000003000800000300c840000000014401c172b42240" +) +VIDEO_HEVC_SDR_CPD = ( + "0000000140010c01ffff02200000030090000003000003001e9598090000000142010102200000030090000003000003001ea020" + "8104d96566924caf016a02020208000003000800000300c840000000014401c172b42240" +) +# Real Dolby Vision (dvhe, DoViProfile "stn") CodecPrivateData from a Smooth +# manifest: NALs arrive SPS,PPS,VPS (VPS last) and the VUI colour triple is +# Unspecified (2,2,2) — DV is signalled by FourCC only, never by CICP. +VIDEO_HEVC_DV_CPD = ( + "00000001420101022000000300B00000030000030096A001E020021C4D9457B91CAF016E0404042800001F480002EE0401F4E1" + "15EE7E0001312D00002FAF0C80000000014401C1ACBE0EC90000000140010C01FFFF022000000300B00000030000030096" + "15C0C00000FA40001770200FA680" +) AAC_LC_CPD = "1190" # Real Smooth EC-3 CodecPrivateData: WAVEFORMATEXTENSIBLE extension (samples # per block + channel mask + DD+ GUID) followed by the 5-byte dec3 payload. @@ -400,6 +423,55 @@ def test_read_track_id_truncated_tfhd_returns_none(): assert read_track_id(fragment) is None +def test_parse_colour_hevc_pq(): + # PQ master: bt2020 primaries (9), smpte2084 transfer (16), bt2020nc matrix (9). + assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_PQ_CPD)) == (9, 16, 9) + + +def test_parse_colour_hevc_hlg(): + assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_HLG_CPD)) == (9, 18, 9) + + +def test_parse_colour_hevc_bt709(): + assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_SDR_CPD)) == (1, 1, 1) + # The real-manifest 8-bit sample also signals BT.709 explicitly. + assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC_CPD)) == (1, 1, 1) + + +def test_parse_colour_dv_is_unspecified(): + # DV carries no usable CICP; the DV decision must come from the FourCC. + assert parse_codec_private_data_colour("DVHE", bytes.fromhex(VIDEO_HEVC_DV_CPD)) == (2, 2, 2) + + +def test_dv_cpd_with_vps_last_builds_init(): + cpd = bytes.fromhex(VIDEO_HEVC_DV_CPD) + nals = split_nal_units(cpd) + assert [(n[0] >> 1) & 0x3F for n in nals] == [33, 34, 32] # SPS, PPS, VPS + sps = remove_emulation_prevention(nals[0]) + assert parse_hevc_sps_format(sps) == (1, 2, 2) # 4:2:0, 10-bit + hvcc = build_hvcc(cpd) + for nal in nals: + assert nal in hvcc + init = build_init_segment( + stream_type="video", + fourcc="DVHE", + codec_private_data=VIDEO_HEVC_DV_CPD, + timescale=10000000, + width=3840, + height=2160, + ) + assert b"dvh1" in init and b"hvcC" in init + + +def test_parse_colour_absent_or_unknown_returns_none(): + # Real sample without a VUI colour description. + assert parse_codec_private_data_colour("HVC1", bytes.fromhex(VIDEO_HEVC10_CPD)) is None + # Non-HEVC codecs (AVC has no HDR deployment) and truncated data must not raise. + assert parse_codec_private_data_colour("H264", bytes.fromhex(VIDEO_AVC_CPD)) is None + assert parse_codec_private_data_colour("WVC1", bytes.fromhex(VIDEO_AVC_CPD)) is None + assert parse_codec_private_data_colour("HVC1", b"\x00\x00\x00\x01\x42") is None + + def test_hvcc_profile_tier_level_is_nonzero(): # De-emulated PTL must yield real profile/level, not the off-by-one garbage. hvcc = build_hvcc(bytes.fromhex(VIDEO_HEVC_CPD)) diff --git a/tests/core/test_ism_range.py b/tests/core/test_ism_range.py new file mode 100644 index 0000000..1cb88eb --- /dev/null +++ b/tests/core/test_ism_range.py @@ -0,0 +1,66 @@ +"""Offline characterization: ISM CodecPrivateData SPS VUI -> Video.Range +(PQ -> HDR10, HLG -> HLG, BT.709/absent -> SDR). HDR10+ is per-frame SEI, +undecidable from the manifest; the post-mux bitstream probe names it.""" + +from __future__ import annotations + +from unshackle.core.manifests import ISM +from unshackle.core.tracks import Video + +from tests.core.test_ism_init import (VIDEO_HEVC10_CPD, VIDEO_HEVC_DV_CPD, VIDEO_HEVC_HLG_CPD, # isort: skip + VIDEO_HEVC_PQ_CPD, VIDEO_HEVC_SDR_CPD) + + +def manifest_xml(cpd: str, fourcc: str = "HVC1") -> str: + return ( + '' + '' + f'' + '' + "" + "" + ) + + +def parse_video(cpd: str, fourcc: str = "HVC1") -> Video: + tracks = ISM.from_text(manifest_xml(cpd, fourcc), url="https://x/ism/manifest").to_tracks(language="en") + assert len(tracks.videos) == 1 + return tracks.videos[0] + + +def test_pq_codec_private_data_yields_hdr10() -> None: + assert parse_video(VIDEO_HEVC_PQ_CPD).range == Video.Range.HDR10 + + +def test_hlg_codec_private_data_yields_hlg() -> None: + assert parse_video(VIDEO_HEVC_HLG_CPD).range == Video.Range.HLG + + +def test_bt709_codec_private_data_stays_sdr() -> None: + assert parse_video(VIDEO_HEVC_SDR_CPD).range == Video.Range.SDR + + +def test_colourless_codec_private_data_defaults_sdr() -> None: + # Real 10-bit sample without a VUI colour description: unspecified -> SDR. + assert parse_video(VIDEO_HEVC10_CPD).range == Video.Range.SDR + + +def test_get_video_range_dolby_vision_fourcc() -> None: + assert ISM.get_video_range("DVH1", VIDEO_HEVC_PQ_CPD) == Video.Range.DV + assert ISM.get_video_range("DVHE", "") == Video.Range.DV + + +def test_dv_track_from_real_smooth_cpd() -> None: + # Live manifests ship lowercase "dvhe"; its VUI is Unspecified so the + # FourCC short-circuit is the only thing standing between DV and SDR. + video = parse_video(VIDEO_HEVC_DV_CPD, fourcc="dvhe") + assert video.range == Video.Range.DV + assert ISM.get_video_range("hvc1", VIDEO_HEVC_DV_CPD) == Video.Range.SDR + + +def test_get_video_range_malformed_data_soft_fails_sdr() -> None: + assert ISM.get_video_range("HVC1", "not-hex") == Video.Range.SDR + assert ISM.get_video_range("HVC1", "") == Video.Range.SDR + assert ISM.get_video_range("", VIDEO_HEVC_PQ_CPD) == Video.Range.SDR diff --git a/unshackle/core/manifests/ism.py b/unshackle/core/manifests/ism.py index 7cd2634..98dca0f 100644 --- a/unshackle/core/manifests/ism.py +++ b/unshackle/core/manifests/ism.py @@ -19,7 +19,8 @@ from requests import Session from unshackle.core.constants import DOWNLOAD_CANCELLED, DOWNLOAD_LICENCE_ONLY, AnyTrack from unshackle.core.drm import DRM_T, PlayReady, Widevine from unshackle.core.events import events -from unshackle.core.manifests.ism_init import build_init_segment, read_per_sample_iv_size, read_track_id +from unshackle.core.manifests.ism_init import (build_init_segment, parse_codec_private_data_colour, + read_per_sample_iv_size, read_track_id) from unshackle.core.session import RnetSession from unshackle.core.tracks import Audio, Subtitle, Track, Tracks, Video from unshackle.core.utilities import log_event, try_ensure_utf8 @@ -87,6 +88,22 @@ class ISM: drm.append(PlayReady(pssh=pr_pssh, pssh_b64=data)) return drm + @staticmethod + def get_video_range(fourcc: str, codec_private_data: str) -> Video.Range: + """Derive colour range from the SPS VUI in CodecPrivateData — Smooth + manifests carry no range attributes. Soft-fails to SDR.""" + fourcc = (fourcc or "").upper() + if fourcc in ("DVHE", "DVH1"): + return Video.Range.DV + try: + cpd = bytes.fromhex(codec_private_data or "") + except ValueError: + return Video.Range.SDR + cicp = parse_codec_private_data_colour(fourcc, cpd) + if not cicp: + return Video.Range.SDR + return Video.Range.from_cicp(*cicp) + @staticmethod def _init_segment( track: AnyTrack, session_drm: Optional[DRM_T], first_segment: Optional[bytes] = None @@ -280,6 +297,7 @@ class ISM: id_=track_id, url=self.url, codec=vcodec, + range_=self.get_video_range(codec or "", ql.get("CodecPrivateData") or ""), language=track_lang or language, is_original_lang=bool(language and track_lang and str(track_lang) == str(language)), bitrate=ql.get("Bitrate"), @@ -351,7 +369,7 @@ class ISM: save_path: Path, save_dir: Path, progress: partial, - session: Optional[Session] = None, + session: Optional[Union[Session, RnetSession]] = None, proxy: Optional[str] = None, max_workers: Optional[int] = None, license_widevine: Optional[Callable] = None, @@ -360,8 +378,8 @@ class ISM: ) -> None: if not session: session = Session() - elif not isinstance(session, Session): - raise TypeError(f"Expected session to be a {Session}, not {session!r}") + elif not isinstance(session, (Session, RnetSession)): + raise TypeError(f"Expected session to be a {Session} or {RnetSession}, not {session!r}") if proxy: session.proxies.update({"all": proxy}) diff --git a/unshackle/core/manifests/ism_init.py b/unshackle/core/manifests/ism_init.py index 8ed55f0..a762dcb 100644 --- a/unshackle/core/manifests/ism_init.py +++ b/unshackle/core/manifests/ism_init.py @@ -129,13 +129,15 @@ class BitReader: raise ValueError("Invalid exp-Golomb code") return (1 << zeros) - 1 + (self.read_bits(zeros) if zeros else 0) + def read_se(self) -> int: + # se(v): 0,1,2,3,... -> 0,1,-1,2,... + value = self.read_ue() + return (value + 1) >> 1 if value & 1 else -(value >> 1) -def parse_hevc_sps_format(sps_rbsp: bytes) -> tuple[int, int, int]: - """ - Parse (chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8) - from a de-emulated HEVC SPS RBSP (including its 2-byte NAL header). - """ - r = BitReader(sps_rbsp) + +def read_hevc_sps_to_bit_depth(r: BitReader) -> tuple[int, int, int, int]: + """Advance reader through bit_depth_chroma_minus8; returns + (chroma_format_idc, bit_depth_luma, bit_depth_chroma, max_sub_layers_minus1).""" r.read_bits(16) # NAL unit header r.read_bits(4) # sps_video_parameter_set_id max_sub_layers_minus1 = r.read_bits(3) @@ -164,9 +166,118 @@ def parse_hevc_sps_format(sps_rbsp: bytes) -> tuple[int, int, int]: r.read_ue() bit_depth_luma_minus8 = r.read_ue() bit_depth_chroma_minus8 = r.read_ue() + return chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8, max_sub_layers_minus1 + + +def parse_hevc_sps_format(sps_rbsp: bytes) -> tuple[int, int, int]: + """ + Parse (chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8) + from a de-emulated HEVC SPS RBSP (including its 2-byte NAL header). + """ + r = BitReader(sps_rbsp) + chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8, _ = read_hevc_sps_to_bit_depth(r) return chroma_format_idc, bit_depth_luma_minus8, bit_depth_chroma_minus8 +def skip_hevc_scaling_list_data(r: BitReader) -> None: + """Skip an HEVC scaling_list_data() syntax structure (H.265 7.3.4).""" + for size_id in range(4): + matrix_id = 0 + while matrix_id < 6: + if not r.read_bits(1): # scaling_list_pred_mode_flag + r.read_ue() # scaling_list_pred_matrix_id_delta + else: + coef_num = min(64, 1 << (4 + (size_id << 1))) + if size_id > 1: + r.read_se() # scaling_list_dc_coef_minus8 + for _ in range(coef_num): + r.read_se() # scaling_list_delta_coef + matrix_id += 3 if size_id == 3 else 1 + + +def skip_hevc_st_ref_pic_set(r: BitReader, idx: int, num_delta_pocs: list[int]) -> int: + """Skip one st_ref_pic_set() (H.265 7.3.7); returns its NumDeltaPocs.""" + if idx and r.read_bits(1): # inter_ref_pic_set_prediction_flag + r.read_bits(1) # delta_rps_sign + r.read_ue() # abs_delta_rps_minus1 + count = 0 + for _ in range(num_delta_pocs[idx - 1] + 1): + used_by_curr_pic = r.read_bits(1) + use_delta = 1 if used_by_curr_pic else r.read_bits(1) + if used_by_curr_pic or use_delta: + count += 1 + return count + num_negative = r.read_ue() + num_positive = r.read_ue() + for _ in range(num_negative + num_positive): + r.read_ue() # delta_poc_sX_minus1 + r.read_bits(1) # used_by_curr_pic_sX_flag + return num_negative + num_positive + + +def parse_hevc_sps_colour(sps_rbsp: bytes) -> Optional[tuple[int, int, int]]: + """VUI (colour_primaries, transfer_characteristics, matrix_coeffs) from a + de-emulated HEVC SPS, or None when no colour description is present.""" + r = BitReader(sps_rbsp) + _, _, _, max_sub_layers_minus1 = read_hevc_sps_to_bit_depth(r) + log2_max_poc_lsb_minus4 = r.read_ue() + sub_layer_ordering_info = r.read_bits(1) + for _ in range(max_sub_layers_minus1 + 1 if sub_layer_ordering_info else 1): + r.read_ue() # sps_max_dec_pic_buffering_minus1 + r.read_ue() # sps_max_num_reorder_pics + r.read_ue() # sps_max_latency_increase_plus1 + for _ in range(6): # luma coding/transform block sizes + transform hierarchy depths + r.read_ue() + if r.read_bits(1) and r.read_bits(1): # scaling_list_enabled + sps_scaling_list_data_present + skip_hevc_scaling_list_data(r) + r.read_bits(2) # amp_enabled_flag + sample_adaptive_offset_enabled_flag + if r.read_bits(1): # pcm_enabled_flag + r.read_bits(8) # pcm sample bit depths (4 + 4) + r.read_ue() # log2_min_pcm_luma_coding_block_size_minus3 + r.read_ue() # log2_diff_max_min_pcm_luma_coding_block_size + r.read_bits(1) # pcm_loop_filter_disabled_flag + num_delta_pocs: list[int] = [] + for idx in range(r.read_ue()): # num_short_term_ref_pic_sets + num_delta_pocs.append(skip_hevc_st_ref_pic_set(r, idx, num_delta_pocs)) + if r.read_bits(1): # long_term_ref_pics_present_flag + for _ in range(r.read_ue()): # num_long_term_ref_pics_sps + r.read_bits(log2_max_poc_lsb_minus4 + 4) # lt_ref_pic_poc_lsb_sps + r.read_bits(1) # used_by_curr_pic_lt_sps_flag + r.read_bits(2) # sps_temporal_mvp_enabled + strong_intra_smoothing_enabled + if not r.read_bits(1): # vui_parameters_present_flag + return None + if r.read_bits(1): # aspect_ratio_info_present_flag + if r.read_bits(8) == 255: # aspect_ratio_idc == EXTENDED_SAR + r.read_bits(32) # sar_width + sar_height + if r.read_bits(1): # overscan_info_present_flag + r.read_bits(1) # overscan_appropriate_flag + if not r.read_bits(1): # video_signal_type_present_flag + return None + r.read_bits(3) # video_format + r.read_bits(1) # video_full_range_flag + if not r.read_bits(1): # colour_description_present_flag + return None + return r.read_bits(8), r.read_bits(8), r.read_bits(8) + + +HEVC_FOURCCS = frozenset(("HVC1", "HEV1", "HEVC", "H265", "DVHE", "DVH1")) + + +def parse_codec_private_data_colour(fourcc: str, codec_private_data: bytes) -> Optional[tuple[int, int, int]]: + """SPS VUI colour triple from HEVC CodecPrivateData; None when the codec + is unsupported, no colour description, or malformed data.""" + if (fourcc or "").upper() not in HEVC_FOURCCS: + return None + try: + nals = split_nal_units(codec_private_data) + sps = next((n for n in nals if (n[0] >> 1) & 0x3F == 33), None) + if sps is None: + return None + return parse_hevc_sps_colour(remove_emulation_prevention(sps)) + except (IndexError, ValueError): + return None + + def iter_boxes(data: bytes, start: int, end: int) -> Iterator[tuple[bytes, Optional[bytes], int, int]]: """Yield (type, uuid_usertype, payload_start, box_end) for each child box.""" offset = start