feat(hls): probe TS segments for resolution and codec when master playlist lacks RESOLUTION/CODECS tags

Some HLS services serve master playlists without RESOLUTION or CODECS attributes, leaving video tracks with no width/height/codec info which causes crashes in by_resolutions() quality selection.

After parsing tracks in to_tracks(), any video track missing resolution or codec is now probed by fetching the first 8KB of its first TS segment and parsing the H.264/H.265 SPS NAL unit to extract the actual width, height, and codec. This approach mirrors how cat-catch/hls.js determines resolution from the media data rather than relying on playlist metadata.
This commit is contained in:
imSp4rky
2026-03-29 15:50:07 -06:00
parent e323f6f3b3
commit 3d5e46a2e3

View File

@@ -241,8 +241,199 @@ class HLS:
) )
) )
for video in tracks.videos:
has_resolution = video.width and video.height
has_codec = video.codec is not None
if has_resolution and has_codec:
continue
try:
probe = HLS._probe_ts_info(video.url, self.session)
if probe:
width, height, codec = probe
if not has_resolution:
video.width, video.height = width, height
if not has_codec:
video.codec = codec
except Exception:
pass
return tracks return tracks
@staticmethod
def _probe_ts_info(
variant_url: str, session: Optional[Union[Session, CurlSession]] = None
) -> Optional[tuple[int, int, Video.Codec]]:
"""Probe the first TS segment of a variant playlist to extract resolution and codec."""
if not session:
session = Session()
res = session.get(variant_url)
variant = m3u8.loads(res.text if hasattr(res, "text") else res.text, uri=variant_url)
if not variant.segments:
return None
seg_uri = urljoin(variant_url, variant.segments[0].uri)
# Download only the first 8KB — SPS is always near the start of the first TS packet
res = session.get(seg_uri, headers={"Range": "bytes=0-8191"})
data = res.content
return HLS._parse_ts_video_info(data)
@staticmethod
def _parse_ts_video_info(data: bytes) -> Optional[tuple[int, int, Video.Codec]]:
"""Parse H.264/H.265 NAL units from TS segment data to extract resolution and codec."""
class _BitReader:
def __init__(self, buf: bytes) -> None:
self.data = buf
self.pos = 0
def bits(self, n: int) -> int:
val = 0
for _ in range(n):
val = (val << 1) | ((self.data[self.pos >> 3] >> (7 - (self.pos & 7))) & 1)
self.pos += 1
return val
def ue(self) -> int:
zeros = 0
while self.bits(1) == 0:
zeros += 1
return (1 << zeros) - 1 + self.bits(zeros) if zeros else 0
def se(self) -> int:
val = self.ue()
return (val + 1) // 2 if val & 1 else -(val // 2)
# Find SPS NAL unit via start code
# H.264: NAL type 7 (SPS), identified by byte & 0x1F == 7
# H.265: NAL type 33 (SPS), identified by (byte >> 1) & 0x3F == 33
for i in range(len(data) - 4):
start3 = data[i:i + 3] == b"\x00\x00\x01"
start4 = data[i:i + 4] == b"\x00\x00\x00\x01"
if not start3 and not start4:
continue
offset = i + (4 if start4 else 3)
if offset >= len(data):
continue
nal_byte = data[offset]
h264_type = nal_byte & 0x1F
h265_type = (nal_byte >> 1) & 0x3F
# H.264 SPS (NAL type 7)
if h264_type == 7:
sps = data[offset:offset + 64]
if len(sps) < 5:
continue
try:
r = _BitReader(sps[1:]) # skip NAL header byte
profile = r.bits(8)
r.bits(8) # constraint flags
r.bits(8) # level
r.ue() # sps_id
if profile in (100, 110, 122, 244, 44, 83, 86, 118, 128, 138, 139, 134):
chroma = r.ue()
if chroma == 3:
r.bits(1)
r.ue() # bit_depth_luma
r.ue() # bit_depth_chroma
r.bits(1) # qpprime_y_zero_transform_bypass
if r.bits(1): # scaling_matrix_present
for j in range(6 if chroma != 3 else 12):
if r.bits(1):
last = 8
for _ in range(16 if j < 6 else 64):
if last != 0:
last = (last + r.se()) & 0xFF
r.ue() # log2_max_frame_num
poc_type = r.ue()
if poc_type == 0:
r.ue()
elif poc_type == 1:
r.bits(1)
r.se()
r.se()
for _ in range(r.ue()):
r.se()
r.ue() # max_num_ref_frames
r.bits(1) # gaps_in_frame_num
w_mbs = r.ue() + 1
h_map = r.ue() + 1
frame_mbs_only = r.bits(1)
if not frame_mbs_only:
r.bits(1)
r.bits(1) # direct_8x8_inference
cl = cr = ct = cb = 0
if r.bits(1): # crop
cl, cr, ct, cb = r.ue(), r.ue(), r.ue(), r.ue()
width = w_mbs * 16 - (cl + cr) * 2
height = (2 - frame_mbs_only) * h_map * 16 - (ct + cb) * 2
return (width, height, Video.Codec.AVC)
except (IndexError, ValueError):
continue
# H.265 SPS (NAL type 33)
elif h265_type == 33:
sps = data[offset:offset + 128]
if len(sps) < 10:
continue
try:
r = _BitReader(sps[2:]) # skip 2-byte NAL header
r.bits(4) # sps_video_parameter_set_id
max_sub_layers = r.bits(3)
r.bits(1) # sps_temporal_id_nesting
# profile_tier_level
r.bits(2) # general_profile_space
r.bits(1) # general_tier
r.bits(5) # general_profile_idc
r.bits(32) # general_profile_compatibility_flags
r.bits(48) # general_constraint_indicator_flags
r.bits(8) # general_level_idc
sub_layer_flags = []
for _ in range(max_sub_layers - 1):
sub_layer_flags.append((r.bits(1), r.bits(1)))
if max_sub_layers - 1 > 0:
for _ in range(8 - (max_sub_layers - 1)):
r.bits(2)
for profile_present, level_present in sub_layer_flags:
if profile_present:
r.bits(2 + 1 + 5 + 32 + 48 + 8)
if level_present:
r.bits(8)
r.ue() # sps_seq_parameter_set_id
chroma = r.ue()
if chroma == 3:
r.bits(1)
width = r.ue()
height = r.ue()
if r.bits(1): # conformance_window
cl = r.ue()
cr = r.ue()
ct = r.ue()
cb = r.ue()
sub_w = 2 if chroma in (1, 2) else 1
sub_h = 2 if chroma == 1 else 1
width -= (cl + cr) * sub_w
height -= (ct + cb) * sub_h
return (width, height, Video.Codec.HEVC)
except (IndexError, ValueError):
continue
return None
@staticmethod @staticmethod
def _finalize_n_m3u8dl_re_output(*, track: AnyTrack, save_dir: Path, save_path: Path) -> Path: def _finalize_n_m3u8dl_re_output(*, track: AnyTrack, save_dir: Path, save_path: Path) -> Path:
""" """