fix(subs): handle WebVTT cue identifiers and overlapping multi-line cues

Some services use WebVTT files with:
- Cue identifiers (Q0, Q1, etc.) before timing lines that pysubs2/pycaption incorrectly parses as subtitle text
- Multi-line subtitles split into separate cues with 1ms offset times and different line: positions (e.g., line:77% for top, line:84% for bottom)

Added detection and sanitization functions:
- has_webvtt_cue_identifiers(): detects cue identifiers before timing
- sanitize_webvtt_cue_identifiers(): removes problematic cue identifiers
- has_overlapping_webvtt_cues(): detects overlapping cues needing merge
- merge_overlapping_webvtt_cues(): merges cues sorted by line position
This commit is contained in:
Andy
2026-01-18 04:44:08 +00:00
parent 4e11f69a58
commit e99cfddaec

View File

@@ -91,6 +91,12 @@ class Subtitle(Track):
return Subtitle.Codec.TimedTextMarkupLang return Subtitle.Codec.TimedTextMarkupLang
raise ValueError(f"The Content Profile '{profile}' is not a supported Subtitle Codec") raise ValueError(f"The Content Profile '{profile}' is not a supported Subtitle Codec")
# WebVTT sanitization patterns (compiled once for performance)
_CUE_ID_PATTERN = re.compile(r"^[A-Za-z]+\d+$")
_TIMING_START_PATTERN = re.compile(r"^\d+:\d+[:\.]")
_TIMING_LINE_PATTERN = re.compile(r"^((?:\d+:)?\d+:\d+[.,]\d+)\s*-->\s*((?:\d+:)?\d+:\d+[.,]\d+)(.*)$")
_LINE_POS_PATTERN = re.compile(r"line:(\d+(?:\.\d+)?%?)")
def __init__( def __init__(
self, self,
*args: Any, *args: Any,
@@ -239,6 +245,11 @@ class Subtitle(Track):
# Sanitize WebVTT timestamps before parsing # Sanitize WebVTT timestamps before parsing
text = Subtitle.sanitize_webvtt_timestamps(text) text = Subtitle.sanitize_webvtt_timestamps(text)
# Remove cue identifiers that confuse parsers like pysubs2
text = Subtitle.sanitize_webvtt_cue_identifiers(text)
# Merge overlapping cues with line positioning into single multi-line cues
text = Subtitle.merge_overlapping_webvtt_cues(text)
preserve_formatting = config.subtitle.get("preserve_formatting", True) preserve_formatting = config.subtitle.get("preserve_formatting", True)
if preserve_formatting: if preserve_formatting:
@@ -277,6 +288,240 @@ class Subtitle(Track):
# Replace negative timestamps with 00:00:00.000 # Replace negative timestamps with 00:00:00.000
return re.sub(r"(-\d+:\d+:\d+\.\d+)", "00:00:00.000", text) return re.sub(r"(-\d+:\d+:\d+\.\d+)", "00:00:00.000", text)
@staticmethod
def has_webvtt_cue_identifiers(text: str) -> bool:
"""
Check if WebVTT content has cue identifiers that need removal.
Parameters:
text: The WebVTT content as string
Returns:
True if cue identifiers are detected, False otherwise
"""
lines = text.split("\n")
for i, line in enumerate(lines):
line = line.strip()
if Subtitle._CUE_ID_PATTERN.match(line):
# Look ahead to see if next non-empty line is a timing line
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
if j < len(lines) and ("-->" in lines[j] or Subtitle._TIMING_START_PATTERN.match(lines[j].strip())):
return True
return False
@staticmethod
def sanitize_webvtt_cue_identifiers(text: str) -> str:
"""
Remove WebVTT cue identifiers that can confuse subtitle parsers.
Some services use cue identifiers like "Q0", "Q1", etc.
that appear on their own line before the timing line. These can be
incorrectly parsed as part of the previous cue's text content by
some parsers (like pysubs2).
Parameters:
text: The WebVTT content as string
Returns:
Sanitized WebVTT content with cue identifiers removed
"""
if not Subtitle.has_webvtt_cue_identifiers(text):
return text
lines = text.split("\n")
sanitized_lines = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Check if this line is a cue identifier followed by a timing line
if Subtitle._CUE_ID_PATTERN.match(line):
# Look ahead to see if next non-empty line is a timing line
j = i + 1
while j < len(lines) and not lines[j].strip():
j += 1
if j < len(lines) and ("-->" in lines[j] or Subtitle._TIMING_START_PATTERN.match(lines[j].strip())):
# This is a cue identifier, skip it
i += 1
continue
sanitized_lines.append(lines[i])
i += 1
return "\n".join(sanitized_lines)
@staticmethod
def _parse_vtt_time(t: str) -> int:
"""Parse WebVTT timestamp to milliseconds. Returns 0 for malformed input."""
try:
t = t.replace(",", ".")
parts = t.split(":")
if len(parts) == 2:
m, s = parts
h = "0"
elif len(parts) >= 3:
h, m, s = parts[:3]
else:
return 0
sec_parts = s.split(".")
secs = int(sec_parts[0])
# Handle variable millisecond digits (e.g., .5 = 500ms, .50 = 500ms, .500 = 500ms)
ms = int(sec_parts[1].ljust(3, "0")[:3]) if len(sec_parts) > 1 else 0
return int(h) * 3600000 + int(m) * 60000 + secs * 1000 + ms
except (ValueError, IndexError):
return 0
@staticmethod
def has_overlapping_webvtt_cues(text: str) -> bool:
"""
Check if WebVTT content has overlapping cues that need merging.
Detects cues with start times within 50ms of each other and the same end time,
which indicates multi-line subtitles split into separate cues.
Parameters:
text: The WebVTT content as string
Returns:
True if overlapping cues are detected, False otherwise
"""
timings = []
for line in text.split("\n"):
match = Subtitle._TIMING_LINE_PATTERN.match(line)
if match:
start_str, end_str = match.group(1), match.group(2)
timings.append((Subtitle._parse_vtt_time(start_str), Subtitle._parse_vtt_time(end_str)))
# Check for overlapping cues (within 50ms start, same end)
for i in range(len(timings) - 1):
curr_start, curr_end = timings[i]
next_start, next_end = timings[i + 1]
if abs(curr_start - next_start) <= 50 and curr_end == next_end:
return True
return False
@staticmethod
def merge_overlapping_webvtt_cues(text: str) -> str:
"""
Merge WebVTT cues that have overlapping/near-identical times but different line positions.
Some services use separate cues for each line of a multi-line subtitle, with
slightly different start times (1ms apart) and different line: positions.
This merges them into single cues with proper line ordering based on the
line: position (lower percentage = higher on screen = first line).
Parameters:
text: The WebVTT content as string
Returns:
WebVTT content with overlapping cues merged
"""
if not Subtitle.has_overlapping_webvtt_cues(text):
return text
lines = text.split("\n")
cues = []
header_lines = []
in_header = True
i = 0
while i < len(lines):
line = lines[i]
if in_header:
if "-->" in line:
in_header = False
else:
header_lines.append(line)
i += 1
continue
match = Subtitle._TIMING_LINE_PATTERN.match(line)
if match:
start_str, end_str, settings = match.groups()
line_pos = 100.0 # Default to bottom
line_match = Subtitle._LINE_POS_PATTERN.search(settings)
if line_match:
pos_str = line_match.group(1).rstrip("%")
line_pos = float(pos_str)
content_lines = []
i += 1
while i < len(lines) and lines[i].strip() and "-->" not in lines[i]:
content_lines.append(lines[i])
i += 1
cues.append(
{
"start_ms": Subtitle._parse_vtt_time(start_str),
"end_ms": Subtitle._parse_vtt_time(end_str),
"start_str": start_str,
"end_str": end_str,
"line_pos": line_pos,
"content": "\n".join(content_lines),
"settings": settings,
}
)
else:
i += 1
# Merge overlapping cues (within 50ms of each other with same end time)
merged_cues = []
i = 0
while i < len(cues):
current = cues[i]
group = [current]
j = i + 1
while j < len(cues):
other = cues[j]
if abs(current["start_ms"] - other["start_ms"]) <= 50 and current["end_ms"] == other["end_ms"]:
group.append(other)
j += 1
else:
break
if len(group) > 1:
# Sort by line position (lower % = higher on screen = first)
group.sort(key=lambda x: x["line_pos"])
# Use the earliest start time from the group
earliest = min(group, key=lambda x: x["start_ms"])
merged_cues.append(
{
"start_str": earliest["start_str"],
"end_str": group[0]["end_str"],
"content": "\n".join(c["content"] for c in group),
"settings": "",
}
)
else:
merged_cues.append(
{
"start_str": current["start_str"],
"end_str": current["end_str"],
"content": current["content"],
"settings": current["settings"],
}
)
i = j if len(group) > 1 else i + 1
result_lines = header_lines[:]
if result_lines and result_lines[-1].strip():
result_lines.append("")
for cue in merged_cues:
result_lines.append(f"{cue['start_str']} --> {cue['end_str']}{cue['settings']}")
result_lines.append(cue["content"])
result_lines.append("")
return "\n".join(result_lines)
@staticmethod @staticmethod
def sanitize_webvtt(text: str) -> str: def sanitize_webvtt(text: str) -> str:
""" """
@@ -631,7 +876,7 @@ class Subtitle(Track):
text = try_ensure_utf8(data).decode("utf8") text = try_ensure_utf8(data).decode("utf8")
text = text.replace("tt:", "") text = text.replace("tt:", "")
# negative size values aren't allowed in TTML/DFXP spec, replace with 0 # negative size values aren't allowed in TTML/DFXP spec, replace with 0
text = re.sub(r'-(\d+(?:\.\d+)?)(px|em|%|c|pt)', r'0\2', text) text = re.sub(r"-(\d+(?:\.\d+)?)(px|em|%|c|pt)", r"0\2", text)
caption_set = pycaption.DFXPReader().read(text) caption_set = pycaption.DFXPReader().read(text)
elif codec == Subtitle.Codec.fVTT: elif codec == Subtitle.Codec.fVTT:
caption_lists: dict[str, pycaption.CaptionList] = defaultdict(pycaption.CaptionList) caption_lists: dict[str, pycaption.CaptionList] = defaultdict(pycaption.CaptionList)