diff --git a/unshackle/core/tracks/subtitle.py b/unshackle/core/tracks/subtitle.py index 2fb5594..9a6767d 100644 --- a/unshackle/core/tracks/subtitle.py +++ b/unshackle/core/tracks/subtitle.py @@ -91,6 +91,12 @@ class Subtitle(Track): return Subtitle.Codec.TimedTextMarkupLang raise ValueError(f"The Content Profile '{profile}' is not a supported Subtitle Codec") + # WebVTT sanitization patterns (compiled once for performance) + _CUE_ID_PATTERN = re.compile(r"^[A-Za-z]+\d+$") + _TIMING_START_PATTERN = re.compile(r"^\d+:\d+[:\.]") + _TIMING_LINE_PATTERN = re.compile(r"^((?:\d+:)?\d+:\d+[.,]\d+)\s*-->\s*((?:\d+:)?\d+:\d+[.,]\d+)(.*)$") + _LINE_POS_PATTERN = re.compile(r"line:(\d+(?:\.\d+)?%?)") + def __init__( self, *args: Any, @@ -239,6 +245,11 @@ class Subtitle(Track): # Sanitize WebVTT timestamps before parsing text = Subtitle.sanitize_webvtt_timestamps(text) + # Remove cue identifiers that confuse parsers like pysubs2 + text = Subtitle.sanitize_webvtt_cue_identifiers(text) + # Merge overlapping cues with line positioning into single multi-line cues + text = Subtitle.merge_overlapping_webvtt_cues(text) + preserve_formatting = config.subtitle.get("preserve_formatting", True) if preserve_formatting: @@ -277,6 +288,240 @@ class Subtitle(Track): # Replace negative timestamps with 00:00:00.000 return re.sub(r"(-\d+:\d+:\d+\.\d+)", "00:00:00.000", text) + @staticmethod + def has_webvtt_cue_identifiers(text: str) -> bool: + """ + Check if WebVTT content has cue identifiers that need removal. + + Parameters: + text: The WebVTT content as string + + Returns: + True if cue identifiers are detected, False otherwise + """ + lines = text.split("\n") + + for i, line in enumerate(lines): + line = line.strip() + if Subtitle._CUE_ID_PATTERN.match(line): + # Look ahead to see if next non-empty line is a timing line + j = i + 1 + while j < len(lines) and not lines[j].strip(): + j += 1 + if j < len(lines) and ("-->" in lines[j] or Subtitle._TIMING_START_PATTERN.match(lines[j].strip())): + return True + return False + + @staticmethod + def sanitize_webvtt_cue_identifiers(text: str) -> str: + """ + Remove WebVTT cue identifiers that can confuse subtitle parsers. + + Some services use cue identifiers like "Q0", "Q1", etc. + that appear on their own line before the timing line. These can be + incorrectly parsed as part of the previous cue's text content by + some parsers (like pysubs2). + + Parameters: + text: The WebVTT content as string + + Returns: + Sanitized WebVTT content with cue identifiers removed + """ + if not Subtitle.has_webvtt_cue_identifiers(text): + return text + + lines = text.split("\n") + sanitized_lines = [] + + i = 0 + while i < len(lines): + line = lines[i].strip() + + # Check if this line is a cue identifier followed by a timing line + if Subtitle._CUE_ID_PATTERN.match(line): + # Look ahead to see if next non-empty line is a timing line + j = i + 1 + while j < len(lines) and not lines[j].strip(): + j += 1 + if j < len(lines) and ("-->" in lines[j] or Subtitle._TIMING_START_PATTERN.match(lines[j].strip())): + # This is a cue identifier, skip it + i += 1 + continue + + sanitized_lines.append(lines[i]) + i += 1 + + return "\n".join(sanitized_lines) + + @staticmethod + def _parse_vtt_time(t: str) -> int: + """Parse WebVTT timestamp to milliseconds. Returns 0 for malformed input.""" + try: + t = t.replace(",", ".") + parts = t.split(":") + if len(parts) == 2: + m, s = parts + h = "0" + elif len(parts) >= 3: + h, m, s = parts[:3] + else: + return 0 + sec_parts = s.split(".") + secs = int(sec_parts[0]) + # Handle variable millisecond digits (e.g., .5 = 500ms, .50 = 500ms, .500 = 500ms) + ms = int(sec_parts[1].ljust(3, "0")[:3]) if len(sec_parts) > 1 else 0 + return int(h) * 3600000 + int(m) * 60000 + secs * 1000 + ms + except (ValueError, IndexError): + return 0 + + @staticmethod + def has_overlapping_webvtt_cues(text: str) -> bool: + """ + Check if WebVTT content has overlapping cues that need merging. + + Detects cues with start times within 50ms of each other and the same end time, + which indicates multi-line subtitles split into separate cues. + + Parameters: + text: The WebVTT content as string + + Returns: + True if overlapping cues are detected, False otherwise + """ + timings = [] + for line in text.split("\n"): + match = Subtitle._TIMING_LINE_PATTERN.match(line) + if match: + start_str, end_str = match.group(1), match.group(2) + timings.append((Subtitle._parse_vtt_time(start_str), Subtitle._parse_vtt_time(end_str))) + + # Check for overlapping cues (within 50ms start, same end) + for i in range(len(timings) - 1): + curr_start, curr_end = timings[i] + next_start, next_end = timings[i + 1] + if abs(curr_start - next_start) <= 50 and curr_end == next_end: + return True + + return False + + @staticmethod + def merge_overlapping_webvtt_cues(text: str) -> str: + """ + Merge WebVTT cues that have overlapping/near-identical times but different line positions. + + Some services use separate cues for each line of a multi-line subtitle, with + slightly different start times (1ms apart) and different line: positions. + This merges them into single cues with proper line ordering based on the + line: position (lower percentage = higher on screen = first line). + + Parameters: + text: The WebVTT content as string + + Returns: + WebVTT content with overlapping cues merged + """ + if not Subtitle.has_overlapping_webvtt_cues(text): + return text + + lines = text.split("\n") + cues = [] + header_lines = [] + in_header = True + i = 0 + + while i < len(lines): + line = lines[i] + + if in_header: + if "-->" in line: + in_header = False + else: + header_lines.append(line) + i += 1 + continue + + match = Subtitle._TIMING_LINE_PATTERN.match(line) + if match: + start_str, end_str, settings = match.groups() + line_pos = 100.0 # Default to bottom + line_match = Subtitle._LINE_POS_PATTERN.search(settings) + if line_match: + pos_str = line_match.group(1).rstrip("%") + line_pos = float(pos_str) + + content_lines = [] + i += 1 + while i < len(lines) and lines[i].strip() and "-->" not in lines[i]: + content_lines.append(lines[i]) + i += 1 + + cues.append( + { + "start_ms": Subtitle._parse_vtt_time(start_str), + "end_ms": Subtitle._parse_vtt_time(end_str), + "start_str": start_str, + "end_str": end_str, + "line_pos": line_pos, + "content": "\n".join(content_lines), + "settings": settings, + } + ) + else: + i += 1 + + # Merge overlapping cues (within 50ms of each other with same end time) + merged_cues = [] + i = 0 + while i < len(cues): + current = cues[i] + group = [current] + + j = i + 1 + while j < len(cues): + other = cues[j] + if abs(current["start_ms"] - other["start_ms"]) <= 50 and current["end_ms"] == other["end_ms"]: + group.append(other) + j += 1 + else: + break + + if len(group) > 1: + # Sort by line position (lower % = higher on screen = first) + group.sort(key=lambda x: x["line_pos"]) + # Use the earliest start time from the group + earliest = min(group, key=lambda x: x["start_ms"]) + merged_cues.append( + { + "start_str": earliest["start_str"], + "end_str": group[0]["end_str"], + "content": "\n".join(c["content"] for c in group), + "settings": "", + } + ) + else: + merged_cues.append( + { + "start_str": current["start_str"], + "end_str": current["end_str"], + "content": current["content"], + "settings": current["settings"], + } + ) + + i = j if len(group) > 1 else i + 1 + + result_lines = header_lines[:] + if result_lines and result_lines[-1].strip(): + result_lines.append("") + + for cue in merged_cues: + result_lines.append(f"{cue['start_str']} --> {cue['end_str']}{cue['settings']}") + result_lines.append(cue["content"]) + result_lines.append("") + + return "\n".join(result_lines) + @staticmethod def sanitize_webvtt(text: str) -> str: """ @@ -631,7 +876,7 @@ class Subtitle(Track): text = try_ensure_utf8(data).decode("utf8") text = text.replace("tt:", "") # negative size values aren't allowed in TTML/DFXP spec, replace with 0 - text = re.sub(r'-(\d+(?:\.\d+)?)(px|em|%|c|pt)', r'0\2', text) + text = re.sub(r"-(\d+(?:\.\d+)?)(px|em|%|c|pt)", r"0\2", text) caption_set = pycaption.DFXPReader().read(text) elif codec == Subtitle.Codec.fVTT: caption_lists: dict[str, pycaption.CaptionList] = defaultdict(pycaption.CaptionList)