diff --git a/unshackle/core/manifests/hls.py b/unshackle/core/manifests/hls.py index 5353e49..1963e82 100644 --- a/unshackle/core/manifests/hls.py +++ b/unshackle/core/manifests/hls.py @@ -241,8 +241,199 @@ class HLS: ) ) + for video in tracks.videos: + has_resolution = video.width and video.height + has_codec = video.codec is not None + if has_resolution and has_codec: + continue + try: + probe = HLS._probe_ts_info(video.url, self.session) + if probe: + width, height, codec = probe + if not has_resolution: + video.width, video.height = width, height + if not has_codec: + video.codec = codec + except Exception: + pass + return tracks + @staticmethod + def _probe_ts_info( + variant_url: str, session: Optional[Union[Session, CurlSession]] = None + ) -> Optional[tuple[int, int, Video.Codec]]: + """Probe the first TS segment of a variant playlist to extract resolution and codec.""" + if not session: + session = Session() + + res = session.get(variant_url) + variant = m3u8.loads(res.text if hasattr(res, "text") else res.text, uri=variant_url) + if not variant.segments: + return None + + seg_uri = urljoin(variant_url, variant.segments[0].uri) + + # Download only the first 8KB — SPS is always near the start of the first TS packet + res = session.get(seg_uri, headers={"Range": "bytes=0-8191"}) + data = res.content + + return HLS._parse_ts_video_info(data) + + @staticmethod + def _parse_ts_video_info(data: bytes) -> Optional[tuple[int, int, Video.Codec]]: + """Parse H.264/H.265 NAL units from TS segment data to extract resolution and codec.""" + + class _BitReader: + def __init__(self, buf: bytes) -> None: + self.data = buf + self.pos = 0 + + def bits(self, n: int) -> int: + val = 0 + for _ in range(n): + val = (val << 1) | ((self.data[self.pos >> 3] >> (7 - (self.pos & 7))) & 1) + self.pos += 1 + return val + + def ue(self) -> int: + zeros = 0 + while self.bits(1) == 0: + zeros += 1 + return (1 << zeros) - 1 + self.bits(zeros) if zeros else 0 + + def se(self) -> int: + val = self.ue() + return (val + 1) // 2 if val & 1 else -(val // 2) + + # Find SPS NAL unit via start code + # H.264: NAL type 7 (SPS), identified by byte & 0x1F == 7 + # H.265: NAL type 33 (SPS), identified by (byte >> 1) & 0x3F == 33 + for i in range(len(data) - 4): + start3 = data[i:i + 3] == b"\x00\x00\x01" + start4 = data[i:i + 4] == b"\x00\x00\x00\x01" + if not start3 and not start4: + continue + offset = i + (4 if start4 else 3) + if offset >= len(data): + continue + + nal_byte = data[offset] + h264_type = nal_byte & 0x1F + h265_type = (nal_byte >> 1) & 0x3F + + # H.264 SPS (NAL type 7) + if h264_type == 7: + sps = data[offset:offset + 64] + if len(sps) < 5: + continue + try: + r = _BitReader(sps[1:]) # skip NAL header byte + profile = r.bits(8) + r.bits(8) # constraint flags + r.bits(8) # level + r.ue() # sps_id + + if profile in (100, 110, 122, 244, 44, 83, 86, 118, 128, 138, 139, 134): + chroma = r.ue() + if chroma == 3: + r.bits(1) + r.ue() # bit_depth_luma + r.ue() # bit_depth_chroma + r.bits(1) # qpprime_y_zero_transform_bypass + if r.bits(1): # scaling_matrix_present + for j in range(6 if chroma != 3 else 12): + if r.bits(1): + last = 8 + for _ in range(16 if j < 6 else 64): + if last != 0: + last = (last + r.se()) & 0xFF + + r.ue() # log2_max_frame_num + poc_type = r.ue() + if poc_type == 0: + r.ue() + elif poc_type == 1: + r.bits(1) + r.se() + r.se() + for _ in range(r.ue()): + r.se() + + r.ue() # max_num_ref_frames + r.bits(1) # gaps_in_frame_num + + w_mbs = r.ue() + 1 + h_map = r.ue() + 1 + frame_mbs_only = r.bits(1) + if not frame_mbs_only: + r.bits(1) + r.bits(1) # direct_8x8_inference + + cl = cr = ct = cb = 0 + if r.bits(1): # crop + cl, cr, ct, cb = r.ue(), r.ue(), r.ue(), r.ue() + + width = w_mbs * 16 - (cl + cr) * 2 + height = (2 - frame_mbs_only) * h_map * 16 - (ct + cb) * 2 + return (width, height, Video.Codec.AVC) + except (IndexError, ValueError): + continue + + # H.265 SPS (NAL type 33) + elif h265_type == 33: + sps = data[offset:offset + 128] + if len(sps) < 10: + continue + try: + r = _BitReader(sps[2:]) # skip 2-byte NAL header + r.bits(4) # sps_video_parameter_set_id + max_sub_layers = r.bits(3) + r.bits(1) # sps_temporal_id_nesting + + # profile_tier_level + r.bits(2) # general_profile_space + r.bits(1) # general_tier + r.bits(5) # general_profile_idc + r.bits(32) # general_profile_compatibility_flags + r.bits(48) # general_constraint_indicator_flags + r.bits(8) # general_level_idc + sub_layer_flags = [] + for _ in range(max_sub_layers - 1): + sub_layer_flags.append((r.bits(1), r.bits(1))) + if max_sub_layers - 1 > 0: + for _ in range(8 - (max_sub_layers - 1)): + r.bits(2) + for profile_present, level_present in sub_layer_flags: + if profile_present: + r.bits(2 + 1 + 5 + 32 + 48 + 8) + if level_present: + r.bits(8) + + r.ue() # sps_seq_parameter_set_id + chroma = r.ue() + if chroma == 3: + r.bits(1) + + width = r.ue() + height = r.ue() + + if r.bits(1): # conformance_window + cl = r.ue() + cr = r.ue() + ct = r.ue() + cb = r.ue() + sub_w = 2 if chroma in (1, 2) else 1 + sub_h = 2 if chroma == 1 else 1 + width -= (cl + cr) * sub_w + height -= (ct + cb) * sub_h + + return (width, height, Video.Codec.HEVC) + except (IndexError, ValueError): + continue + + return None + @staticmethod def _finalize_n_m3u8dl_re_output(*, track: AnyTrack, save_dir: Path, save_path: Path) -> Path: """