"""SSMD parser - Parse SSMD text into structured Sentence/Segment objects.
This module provides functions to parse SSMD markdown into structured data
that can be used for TTS processing or conversion to SSML.
"""
import re
from typing import TYPE_CHECKING, Any
from ssmd.paragraph import Paragraph
from ssmd.segment import Segment
from ssmd.sentence import Sentence
from ssmd.spans import AnnotationSpan, LintIssue, ParseSpansResult
from ssmd.ssml_conversions import (
PROSODY_PITCH_MAP,
PROSODY_RATE_MAP,
PROSODY_VOLUME_MAP,
SSMD_BREAK_MARKER_TO_STRENGTH,
)
from ssmd.types import (
DEFAULT_HEADING_LEVELS,
AudioAttrs,
BreakAttrs,
DirectiveAttrs,
PhonemeAttrs,
ProsodyAttrs,
SayAsAttrs,
VoiceAttrs,
)
from ssmd.utils import unescape_ssmd_syntax
if TYPE_CHECKING:
from ssmd.capabilities import TTSCapabilities
# ═══════════════════════════════════════════════════════════════════════════════
# REGEX PATTERNS
# ═══════════════════════════════════════════════════════════════════════════════
# Directive blocks: <div key="value"> ... </div>
DIV_DIRECTIVE_START = re.compile(r"^\s*<div\s+([^>]+)>\s*$", re.IGNORECASE)
DIV_DIRECTIVE_END = re.compile(r"^\s*</div>\s*$", re.IGNORECASE)
# Emphasis patterns
STRONG_EMPHASIS_PATTERN = re.compile(r"\*\*([^\*]+)\*\*")
MODERATE_EMPHASIS_PATTERN = re.compile(r"\*([^\*]+)\*")
REDUCED_EMPHASIS_PATTERN = re.compile(r"(?<!_)_(?!_)([^_]+?)(?<!_)_(?!_)")
# Annotation pattern: [text]{key="value"}
ANNOTATION_PATTERN = re.compile(r"\[([^\]]*)\]\{((?:\\.|[^}])*)\}")
# Break pattern: ...500ms, ...2s, ...n, ...w, ...c, ...s, ...p
BREAK_PATTERN = re.compile(r"\.\.\.(\d+(?:s|ms)|[nwcsp])(?=\s|$|[.!?,;:])")
# Mark pattern: @name
MARK_PATTERN = re.compile(r"(?<!\S)@(\w+)(?=\s|$)")
# Heading pattern: # ## ###
HEADING_PATTERN = re.compile(r"^\s*(#{1,6})\s*(.+)$", re.MULTILINE)
# Paragraph break: two or more newlines
PARAGRAPH_PATTERN = re.compile(r"\n\n+")
# Space before punctuation (to normalize)
SPACE_BEFORE_PUNCT = re.compile(r"\s+([.!?,:;])")
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN PARSING FUNCTIONS
# ═══════════════════════════════════════════════════════════════════════════════
def _normalize_text(text: str) -> str:
"""Normalize text by removing extra whitespace and fixing spacing.
- Removes space before punctuation
- Collapses multiple spaces
"""
text = SPACE_BEFORE_PUNCT.sub(r"\1", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
[docs]
def parse_paragraphs(
text: str,
*,
capabilities: "TTSCapabilities | str | None" = None,
heading_levels: dict | None = None,
extensions: dict | None = None,
sentence_detection: bool = True,
language: str = "en",
use_spacy: bool | None = None,
model_size: str | None = None,
parse_yaml_header: bool = False,
strict_parse: bool = False,
) -> list[Paragraph]:
"""Parse SSMD text into a list of Paragraphs.
This is the main parsing function. It handles:
- Directive blocks (<div ...> ... </div>)
- Paragraph and sentence splitting
- All SSMD markup (emphasis, annotations, breaks, etc.)
Args:
text: SSMD markdown text
capabilities: TTS capabilities for filtering (optional)
heading_levels: Custom heading configurations
extensions: Custom extension handlers
sentence_detection: If True, split text into sentences
language: Default language for sentence detection
use_spacy: If True, use spaCy for sentence detection
model_size: spaCy model size ("sm", "md", "lg")
parse_yaml_header: If True, parse YAML front matter and apply
heading/extensions config while stripping it from the body. If False,
YAML front matter is preserved as plain text.
strict_parse: If True, strip unsupported features based on capabilities.
Returns:
List of Paragraph objects
"""
if not text or not text.strip():
return []
from ssmd.utils import (
build_config_from_header,
)
from ssmd.utils import (
parse_yaml_header as parse_yaml_front_matter,
)
if parse_yaml_header:
header, text = parse_yaml_front_matter(text)
if header:
header_config = build_config_from_header(header)
heading_levels = header_config.get("heading_levels", heading_levels)
extensions = header_config.get("extensions", extensions)
# Resolve capabilities
caps = _resolve_capabilities(capabilities)
# Split text into directive blocks
directive_blocks = _split_directive_blocks(text)
paragraphs: list[Paragraph] = []
paragraph_index = 0
sentence_index = 0
for block_index, (directive, block_text) in enumerate(directive_blocks):
is_last_block = block_index == len(directive_blocks) - 1
# Split block into paragraphs
block_paragraphs = PARAGRAPH_PATTERN.split(block_text)
for para_idx, paragraph in enumerate(block_paragraphs):
paragraph = paragraph.strip()
if not paragraph:
continue
is_last_paragraph = para_idx == len(block_paragraphs) - 1
paragraph_boundary = not is_last_paragraph or not is_last_block
# Split paragraph into sentences if enabled
if sentence_detection:
sent_texts = _split_sentences(
paragraph,
language=language,
use_spacy=use_spacy,
model_size=model_size,
)
else:
sent_texts = [paragraph]
paragraph_sentences: list[Sentence] = []
for sent_idx, sent_text in enumerate(sent_texts):
sent_text = sent_text.strip()
if not sent_text:
continue
is_last_sent_in_para = sent_idx == len(sent_texts) - 1
# Parse the sentence content into segments
segments = _parse_segments(
sent_text,
capabilities=caps,
heading_levels=heading_levels,
extensions=extensions,
)
if segments:
sentence = Sentence(
segments=segments,
voice=directive.voice,
language=directive.language,
prosody=directive.prosody,
is_paragraph_end=is_last_sent_in_para and paragraph_boundary,
paragraph_index=paragraph_index,
sentence_index=sentence_index,
)
paragraph_sentences.append(sentence)
sentence_index += 1
if paragraph_sentences:
paragraphs.append(Paragraph(sentences=paragraph_sentences))
paragraph_index += 1
if strict_parse and caps:
all_sentences = [
sentence for paragraph in paragraphs for sentence in paragraph.sentences
]
_filter_sentences(all_sentences, caps)
return paragraphs
[docs]
def parse_ssmd(
text: str,
*,
capabilities: "TTSCapabilities | str | None" = None,
heading_levels: dict | None = None,
extensions: dict | None = None,
sentence_detection: bool = True,
language: str = "en",
use_spacy: bool | None = None,
model_size: str | None = None,
parse_yaml_header: bool = False,
strict_parse: bool = False,
) -> list[Paragraph]:
"""Parse SSMD text into paragraphs (backward compatible name).
This is an alias for parse_paragraphs().
"""
return parse_paragraphs(
text,
capabilities=capabilities,
heading_levels=heading_levels,
extensions=extensions,
sentence_detection=sentence_detection,
language=language,
use_spacy=use_spacy,
model_size=model_size,
parse_yaml_header=parse_yaml_header,
strict_parse=strict_parse,
)
def _resolve_capabilities(
capabilities: "TTSCapabilities | str | None",
) -> "TTSCapabilities | None":
"""Resolve capabilities from string or object."""
if capabilities is None:
return None
if isinstance(capabilities, str):
from ssmd.capabilities import get_preset
return get_preset(capabilities)
return capabilities
def _split_directive_blocks(text: str) -> list[tuple[DirectiveAttrs, str]]:
"""Split text into directive blocks defined by <div ...> tags."""
blocks: list[tuple[DirectiveAttrs, str]] = []
stack: list[DirectiveAttrs] = [DirectiveAttrs()]
current_lines: list[str] = []
def flush_block() -> None:
if not current_lines:
return
block_text = "\n".join(current_lines)
if block_text.strip():
blocks.append((stack[-1], block_text))
current_lines.clear()
for line in text.split("\n"):
start_match = DIV_DIRECTIVE_START.match(line)
if start_match:
flush_block()
attrs = _parse_div_attrs(start_match.group(1))
stack.append(_merge_directives(stack[-1], attrs))
continue
if DIV_DIRECTIVE_END.match(line):
if len(stack) > 1:
flush_block()
stack.pop()
continue
current_lines.append(line)
continue
current_lines.append(line)
flush_block()
if not blocks and text.strip():
blocks.append((DirectiveAttrs(), text.strip()))
return blocks
def _split_directive_blocks_with_warnings(
text: str,
) -> tuple[list[tuple[DirectiveAttrs, str]], list[str]]:
"""Split directive blocks and collect parse warnings."""
blocks: list[tuple[DirectiveAttrs, str]] = []
warnings: list[str] = []
stack: list[DirectiveAttrs] = [DirectiveAttrs()]
current_lines: list[str] = []
def flush_block() -> None:
if not current_lines:
return
block_text = "\n".join(current_lines)
if block_text.strip():
blocks.append((stack[-1], block_text))
current_lines.clear()
for line in text.split("\n"):
start_match = DIV_DIRECTIVE_START.match(line)
if start_match:
flush_block()
attrs = _parse_div_attrs(start_match.group(1))
stack.append(_merge_directives(stack[-1], attrs))
continue
if DIV_DIRECTIVE_END.match(line):
if len(stack) > 1:
flush_block()
stack.pop()
continue
warnings.append("Unexpected </div> without matching <div>.")
current_lines.append(line)
continue
current_lines.append(line)
flush_block()
if len(stack) > 1:
warnings.append("Unclosed <div> directive block.")
if not blocks and text.strip():
blocks.append((DirectiveAttrs(), text.strip()))
return blocks, warnings
def _parse_div_attrs(params: str) -> DirectiveAttrs:
"""Parse <div ...> attribute params into directive attrs."""
params_map = _parse_annotation_params(params)
directive = DirectiveAttrs()
language = params_map.get("lang") or params_map.get("language")
if language:
directive.language = language
voice = _parse_voice_annotation_params(params_map)
if voice:
directive.voice = voice
if "voice" in params_map and directive.voice:
directive.voice.name = params_map["voice"]
prosody = _parse_prosody_params(params_map)
if prosody:
directive.prosody = prosody
return directive
def _merge_directives(base: DirectiveAttrs, update: DirectiveAttrs) -> DirectiveAttrs:
"""Merge directive attributes for nested <div> blocks."""
merged_voice = _merge_voice(base.voice, update.voice)
merged_prosody = _merge_prosody(base.prosody, update.prosody)
language = update.language or base.language
return DirectiveAttrs(
voice=merged_voice,
language=language,
prosody=merged_prosody,
)
def _merge_voice(
base: VoiceAttrs | None, update: VoiceAttrs | None
) -> VoiceAttrs | None:
if base is None and update is None:
return None
merged = VoiceAttrs()
for field_name in ("name", "language", "gender", "variant"):
update_value = getattr(update, field_name) if update else None
if update_value in (None, ""):
update_value = None
base_value = getattr(base, field_name) if base else None
setattr(
merged, field_name, update_value if update_value is not None else base_value
)
if not any(
[merged.name, merged.language, merged.gender, merged.variant is not None]
):
return None
return merged
def _merge_prosody(
base: ProsodyAttrs | None,
update: ProsodyAttrs | None,
) -> ProsodyAttrs | None:
if base is None and update is None:
return None
merged = ProsodyAttrs()
for field_name in ("volume", "rate", "pitch"):
update_value = getattr(update, field_name) if update else None
if update_value in (None, ""):
update_value = None
base_value = getattr(base, field_name) if base else None
setattr(
merged, field_name, update_value if update_value is not None else base_value
)
if not any([merged.volume, merged.rate, merged.pitch]):
return None
return merged
def _split_sentences(
text: str,
language: str = "en",
use_spacy: bool | None = None,
model_size: str | None = None,
*,
escape_annotations: bool = True,
) -> list[str]:
"""Split text into sentences using phrasplit."""
try:
from phrasplit import split_text
# Build model name
size = model_size or "sm"
lang_code = language.split("-")[0] if "-" in language else language
# Language-specific model patterns
web_langs = {
"en",
"zh",
}
if lang_code in web_langs:
model = f"{lang_code}_core_web_{size}"
else:
model = f"{lang_code}_core_news_{size}"
should_escape = escape_annotations
escaped_text = text
placeholder_values: list[str] = []
placeholder_tokens: list[str] = []
if should_escape:
placeholder_base = 0xF100
def _replace_placeholder(match: re.Match[str]) -> str:
placeholder_values.append(match.group(0))
placeholder = chr(placeholder_base + len(placeholder_values) - 1)
placeholder_tokens.append(placeholder)
return placeholder
escaped_text = re.sub(
r"\[[^\]]*\]\{(?:\\.|[^}])*\}", _replace_placeholder, escaped_text
)
escaped_text = re.sub(
r"\.\.\.(?:\d+(?:s|ms)|[nwcsp])(?=\s|$|[.!?,;:])",
_replace_placeholder,
escaped_text,
)
segments = split_text(
escaped_text,
mode="sentence",
language_model=model,
apply_corrections=True,
split_on_colon=True,
use_spacy=use_spacy,
)
# Group segments by sentence
sentences = []
current = ""
last_sent_id = None
for seg in segments:
if last_sent_id is not None and seg.sentence != last_sent_id:
if current.strip():
sentences.append(current)
current = ""
current += seg.text
last_sent_id = seg.sentence
if current.strip():
sentences.append(current)
if not should_escape:
return sentences if sentences else [text]
if not sentences:
return [text]
restored_sentences: list[str] = []
for sentence in sentences:
restored = sentence
for placeholder_index, original_value in enumerate(placeholder_values):
restored = restored.replace(
placeholder_tokens[placeholder_index], original_value
)
restored_sentences.append(restored)
merged_sentences: list[str] = []
break_only_pattern = re.compile(r"^(?:\.\.\.(?:\d+(?:s|ms)|[nwcsp])\s*)+$")
for sentence in restored_sentences:
stripped = sentence.strip()
if stripped and break_only_pattern.match(stripped) and merged_sentences:
merged_sentences[-1] = merged_sentences[-1].rstrip() + " " + stripped
else:
merged_sentences.append(sentence)
if should_escape:
for idx, sentence in enumerate(merged_sentences[:-1]):
merged_sentences[idx] = sentence.rstrip() + "\n"
return merged_sentences
except ImportError:
# Fallback: simple sentence splitting
return _simple_sentence_split(text)
def _simple_sentence_split(text: str) -> list[str]:
"""Simple regex-based sentence splitting."""
# Split on sentence-ending punctuation followed by space or newline
parts = re.split(r"(?<=[.!?])\s+", text)
return [p.strip() for p in parts if p.strip()]
def _parse_segments( # noqa: C901
text: str,
capabilities: "TTSCapabilities | None" = None,
heading_levels: dict | None = None,
extensions: dict | None = None,
) -> list[Segment]:
"""Parse text into segments with SSMD features."""
# Check for heading
heading_match = HEADING_PATTERN.match(text)
if heading_match:
return _parse_heading(heading_match, heading_levels or DEFAULT_HEADING_LEVELS)
segments: list[Segment] = []
position = 0
# Build combined pattern for all markup
# Order matters: longer patterns first
combined = re.compile(
r"("
r"\*\*[^\*]+\*\*" # **strong**
r"|\*[^\*]+\*" # *moderate*
r"|(?<![_a-zA-Z0-9])_(?!_)[^_]+?(?<!_)_(?![_a-zA-Z0-9])" # _reduced_
r"|\[[^\]]*\]\{(?:\\.|[^}])+\}" # [text]{annotation}
r"|\.\.\.(?:\d+(?:s|ms)|[nwcsp])(?=\s|$|[.!?,;:])" # breaks
r"|(?<!\S)@(?!voice[:(])\w+(?=\s|$)" # marks
r")"
)
pending_breaks: list[BreakAttrs] = []
pending_marks: list[str] = []
for match in combined.finditer(text):
if match.start() > position:
plain = _normalize_text(text[position : match.start()])
if plain:
seg = Segment(text=plain)
if pending_breaks:
seg.breaks_before = pending_breaks
pending_breaks = []
if pending_marks:
seg.marks_before = pending_marks
pending_marks = []
segments.append(seg)
markup = match.group(0)
pending_breaks, pending_marks, markup_seg = _handle_markup(
markup,
segments,
pending_breaks,
pending_marks,
extensions,
)
if markup_seg:
segments.append(markup_seg)
position = match.end()
# Add remaining text
if position < len(text):
plain = _normalize_text(text[position:])
if plain:
seg = Segment(text=plain)
_apply_pending(seg, pending_breaks, pending_marks)
segments.append(seg)
# If no segments created but we have text, create a plain segment
if not segments and text.strip():
seg = Segment(text=text.strip())
_apply_pending(seg, pending_breaks, pending_marks)
segments.append(seg)
return segments
def _handle_markup(
markup: str,
segments: list[Segment],
pending_breaks: list[BreakAttrs],
pending_marks: list[str],
extensions: dict | None,
) -> tuple[list[BreakAttrs], list[str], Segment | None]:
"""Handle a single markup token and return any segment."""
if markup.startswith("..."):
brk = _parse_break(markup[3:])
if segments:
segments[-1].breaks_after.append(brk)
else:
pending_breaks.append(brk)
return pending_breaks, pending_marks, None
if markup.startswith("@"):
mark_name = markup[1:]
if segments:
segments[-1].marks_after.append(mark_name)
else:
pending_marks.append(mark_name)
return pending_breaks, pending_marks, None
seg = _segment_from_markup(markup, extensions)
if seg:
_apply_pending(seg, pending_breaks, pending_marks)
return [], [], seg
return pending_breaks, pending_marks, None
def _segment_from_markup(markup: str, extensions: dict | None) -> Segment | None:
"""Build a segment from emphasis, annotation, or prosody markup."""
if markup.startswith("**"):
inner = STRONG_EMPHASIS_PATTERN.match(markup)
if inner:
return Segment(text=inner.group(1), emphasis="strong")
return None
if markup.startswith("*"):
inner = MODERATE_EMPHASIS_PATTERN.match(markup)
if inner:
return Segment(text=inner.group(1), emphasis=True)
return None
if markup.startswith("_") and not markup.startswith("__"):
inner = REDUCED_EMPHASIS_PATTERN.match(markup)
if inner:
return Segment(text=inner.group(1), emphasis="reduced")
return None
if markup.startswith("["):
return _parse_annotation(markup, extensions)
return None
def _apply_pending(
seg: Segment,
pending_breaks: list[BreakAttrs],
pending_marks: list[str],
) -> None:
"""Apply pending breaks and marks to a segment."""
if pending_breaks:
seg.breaks_before = pending_breaks.copy()
if pending_marks:
seg.marks_before = pending_marks.copy()
def _parse_heading(
match: re.Match,
heading_levels: dict,
) -> list[Segment]:
"""Parse heading into segments."""
level = len(match.group(1))
text = match.group(2).strip()
if level not in heading_levels:
return [Segment(text=text)]
# Build segment with heading effects
seg = Segment(text=text)
for effect_type, value in heading_levels[level]:
if effect_type == "emphasis":
seg.emphasis = value
elif effect_type == "pause":
seg.breaks_after.append(BreakAttrs(time=value))
elif effect_type == "pause_before":
seg.breaks_before.append(BreakAttrs(time=value))
elif effect_type == "prosody" and isinstance(value, dict):
seg.prosody = ProsodyAttrs(
volume=value.get("volume"),
rate=value.get("rate"),
pitch=value.get("pitch"),
)
return [seg]
def _parse_block_to_spans(
clean_text: str,
block_text: str,
annotations: list[AnnotationSpan],
warnings: list[str],
preserve_whitespace: bool,
) -> str:
if preserve_whitespace:
segments, seg_warnings = _parse_segments_for_spans(
block_text,
normalize_text=False,
)
warnings.extend(seg_warnings)
for segment, attrs_override in segments:
clean_text = _append_segment_spans(
clean_text,
segment,
annotations,
"inline",
attrs_override=attrs_override,
)
return clean_text
paragraphs = PARAGRAPH_PATTERN.split(block_text)
for para_index, paragraph in enumerate(paragraphs):
if not paragraph.strip():
continue
if clean_text and (para_index > 0 or clean_text.endswith("\n")):
clean_text += "\n\n"
clean_text = _parse_paragraph_normalized(
clean_text,
paragraph,
annotations,
warnings,
)
return clean_text
def _parse_paragraph_normalized(
clean_text: str,
paragraph: str,
annotations: list[AnnotationSpan],
warnings: list[str],
) -> str:
segments, seg_warnings = _parse_segments_for_spans(paragraph)
warnings.extend(seg_warnings)
for segment, attrs_override in segments:
clean_text = _append_segment_spans_normalized(
clean_text,
segment,
annotations,
"inline",
attrs_override=attrs_override,
)
return clean_text
def _append_segment_spans(
clean_text: str,
segment: Segment,
annotations: list[AnnotationSpan],
kind: str,
attrs_override: dict[str, str] | None = None,
) -> str:
text = segment.to_text()
if not text:
return clean_text
char_start = len(clean_text)
clean_text += text
char_end = len(clean_text)
attrs = (
attrs_override if attrs_override is not None else _segment_attrs_to_map(segment)
)
if attrs:
annotations.append(
AnnotationSpan(
char_start=char_start,
char_end=char_end,
attrs=attrs,
kind=kind,
)
)
return clean_text
def _append_segment_spans_normalized(
clean_text: str,
segment: Segment,
annotations: list[AnnotationSpan],
kind: str,
attrs_override: dict[str, str] | None = None,
) -> str:
text = segment.to_text()
if not text:
return clean_text
prefix = ""
if clean_text and not clean_text.endswith("\n"):
if text and not text.startswith(tuple(".!?,:;")):
prefix = " "
char_start = len(clean_text) + len(prefix)
clean_text = f"{clean_text}{prefix}{text}"
char_end = len(clean_text)
attrs = (
attrs_override if attrs_override is not None else _segment_attrs_to_map(segment)
)
if attrs:
annotations.append(
AnnotationSpan(
char_start=char_start,
char_end=char_end,
attrs=attrs,
kind=kind,
)
)
return clean_text
def _annotated_attrs_to_tagged(attrs: dict[str, str]) -> dict[str, str]:
tag: str | None = None
if "ext" in attrs:
tag = "extension"
elif "src" in attrs:
tag = "audio"
elif "sub" in attrs:
tag = "sub"
elif "ph" in attrs or "ipa" in attrs or "sampa" in attrs:
tag = "phoneme"
elif "as" in attrs:
tag = "say-as"
elif "voice" in attrs or "voice-lang" in attrs or "gender" in attrs:
tag = "voice"
elif "lang" in attrs:
tag = "lang"
elif any(k in attrs for k in ("volume", "rate", "pitch", "v", "r", "p")):
tag = "prosody"
elif "emphasis" in attrs:
tag = "emphasis"
if tag:
return {**attrs, "tag": tag}
return attrs
def _segment_attrs_to_map(segment: Segment) -> dict[str, str]: # noqa: C901
attrs: dict[str, str] = {}
if segment.language:
attrs["lang"] = segment.language
if segment.voice:
if segment.voice.name:
attrs["voice"] = segment.voice.name
if segment.voice.language:
attrs["voice-lang"] = segment.voice.language
if segment.voice.gender:
attrs["gender"] = segment.voice.gender
if segment.voice.variant is not None:
attrs["variant"] = str(segment.voice.variant)
if segment.say_as:
attrs["as"] = segment.say_as.interpret_as
if segment.say_as.format:
attrs["format"] = segment.say_as.format
if segment.say_as.detail:
attrs["detail"] = str(segment.say_as.detail)
if segment.substitution:
attrs["sub"] = segment.substitution
if segment.phoneme:
attrs["ph"] = segment.phoneme.ph
attrs["alphabet"] = segment.phoneme.alphabet
if segment.extension:
attrs["ext"] = segment.extension
if segment.prosody:
if segment.prosody.volume:
attrs["volume"] = segment.prosody.volume
if segment.prosody.rate:
attrs["rate"] = segment.prosody.rate
if segment.prosody.pitch:
attrs["pitch"] = segment.prosody.pitch
if segment.emphasis:
if segment.emphasis is True or segment.emphasis == "moderate":
attrs["emphasis"] = "moderate"
else:
attrs["emphasis"] = str(segment.emphasis)
if segment.audio:
attrs["src"] = segment.audio.src
if segment.audio.clip_begin and segment.audio.clip_end:
attrs["clip"] = f"{segment.audio.clip_begin}-{segment.audio.clip_end}"
if segment.audio.speed:
attrs["speed"] = segment.audio.speed
if segment.audio.repeat_count is not None:
attrs["repeat"] = str(segment.audio.repeat_count)
if segment.audio.repeat_dur:
attrs["repeatDur"] = segment.audio.repeat_dur
if segment.audio.sound_level:
attrs["level"] = segment.audio.sound_level
if segment.audio.alt_text:
attrs["alt"] = segment.audio.alt_text
return _annotated_attrs_to_tagged(attrs)
def _parse_segments_with_warnings(
text: str,
*,
normalize_text: bool = True,
) -> tuple[list[Segment], list[str]]:
segments, warnings = _parse_segments_for_spans(text, normalize_text=normalize_text)
return [segment for segment, _ in segments], warnings
def _parse_segments_for_spans(
text: str,
*,
normalize_text: bool = True,
) -> tuple[list[tuple[Segment, dict[str, str] | None]], list[str]]:
segments: list[tuple[Segment, dict[str, str] | None]] = []
warnings: list[str] = []
position = 0
heading_match = HEADING_PATTERN.match(text)
if heading_match:
parsed = _parse_heading(heading_match, DEFAULT_HEADING_LEVELS)
segments.extend((segment, _segment_attrs_to_map(segment)) for segment in parsed)
return segments, warnings
combined = re.compile(
r"("
r"\*\*[^\*]+\*\*"
r"|\*[^\*]+\*"
r"|(?<![_a-zA-Z0-9])_(?!_)[^_]+?(?<!_)_(?![_a-zA-Z0-9])"
r"|\[[^\]]*\]\{(?:\\.|[^}])+\}"
r"|\.\.\.(?:\d+(?:s|ms)|[nwcsp])(?=\s|$|[.!?,;:])"
r"|(?<!\S)@(?!voice[:(])\w+(?=\s|$)"
r")"
)
pending_breaks: list[BreakAttrs] = []
pending_marks: list[str] = []
for match in combined.finditer(text):
if match.start() > position:
plain_text = text[position : match.start()]
plain = _normalize_text(plain_text) if normalize_text else plain_text
if plain:
seg = Segment(text=plain)
if pending_breaks:
seg.breaks_before = pending_breaks
pending_breaks = []
if pending_marks:
seg.marks_before = pending_marks
pending_marks = []
segments.append((seg, _segment_attrs_to_map(seg)))
markup = match.group(0)
attrs_override: dict[str, str] | None = None
if markup.startswith("["):
annotation_match = ANNOTATION_PATTERN.match(markup)
if annotation_match:
attrs_override, attr_warnings = _parse_annotation_params_with_warnings(
annotation_match.group(2).strip()
)
warnings.extend(attr_warnings)
attrs_override = {k: v for k, v in attrs_override.items() if v != ""}
attrs_override = _annotated_attrs_to_tagged(attrs_override)
current_segments = [segment for segment, _ in segments]
pending_breaks, pending_marks, markup_seg = _handle_markup(
markup,
current_segments,
pending_breaks,
pending_marks,
extensions=None,
)
if markup_seg:
if attrs_override is None or not attrs_override:
attrs_override = _segment_attrs_to_map(markup_seg)
segments.append((markup_seg, attrs_override))
position = match.end()
if position < len(text):
plain_text = text[position:]
plain = _normalize_text(plain_text) if normalize_text else plain_text
if plain:
seg = Segment(text=plain)
_apply_pending(seg, pending_breaks, pending_marks)
segments.append((seg, _segment_attrs_to_map(seg)))
if not segments and text.strip():
content = _normalize_text(text) if normalize_text else text
if content:
seg = Segment(text=content)
_apply_pending(seg, pending_breaks, pending_marks)
segments.append((seg, _segment_attrs_to_map(seg)))
if text.count("[") != text.count("]"):
warnings.append("Unbalanced annotation brackets in input.")
if text.count("{") != text.count("}"):
warnings.append("Unbalanced annotation braces in input.")
return segments, warnings
def _directive_attrs_to_map(directive: DirectiveAttrs) -> dict[str, str]:
attrs: dict[str, str] = {}
if directive.language:
attrs["lang"] = directive.language
if directive.voice:
if directive.voice.name:
attrs["voice"] = directive.voice.name
if directive.voice.language:
attrs["voice-lang"] = directive.voice.language
if directive.voice.gender:
attrs["gender"] = directive.voice.gender
if directive.voice.variant is not None:
attrs["variant"] = str(directive.voice.variant)
if directive.prosody:
if directive.prosody.volume:
attrs["volume"] = directive.prosody.volume
if directive.prosody.rate:
attrs["rate"] = directive.prosody.rate
if directive.prosody.pitch:
attrs["pitch"] = directive.prosody.pitch
return attrs
def _parse_break(modifier: str) -> BreakAttrs:
"""Parse break modifier into BreakAttrs."""
if modifier in SSMD_BREAK_MARKER_TO_STRENGTH:
return BreakAttrs(strength=SSMD_BREAK_MARKER_TO_STRENGTH[modifier])
elif modifier.endswith("s") or modifier.endswith("ms"):
return BreakAttrs(time=modifier)
else:
return BreakAttrs(time=f"{modifier}ms")
def _parse_annotation(markup: str, extensions: dict | None = None) -> Segment | None:
"""Parse [text]{key="value"} markup."""
match = ANNOTATION_PATTERN.match(markup)
if not match:
return None
text = match.group(1)
params = match.group(2).strip()
seg = Segment(text=text)
params_map = _parse_annotation_params(params)
if not params_map and params:
return seg
if not params_map:
return seg
if "src" in params_map:
seg.audio = _parse_audio_annotation_params(params_map)
return seg
if "lang" in params_map:
seg.language = params_map["lang"]
elif "language" in params_map:
seg.language = params_map["language"]
voice = _parse_voice_annotation_params(params_map)
if voice:
seg.voice = voice
say_as = _parse_say_as_params(params_map)
if say_as:
seg.say_as = say_as
phoneme = _parse_phoneme_params(params_map)
if phoneme:
seg.phoneme = phoneme
if "sub" in params_map:
seg.substitution = params_map["sub"]
if "emphasis" in params_map:
level = params_map["emphasis"].lower()
if level in ("none", "reduced", "moderate", "strong"):
seg.emphasis = level if level != "moderate" else True
if "ext" in params_map:
seg.extension = params_map["ext"]
prosody = _parse_prosody_params(params_map)
if prosody:
seg.prosody = prosody
return seg
def _parse_annotation_params(params: str) -> dict[str, str]:
"""Parse key="value" pairs from annotation params."""
values, _ = _parse_annotation_params_with_warnings(params)
return values
def _parse_annotation_params_with_warnings( # noqa: C901
params: str,
) -> tuple[dict[str, str], list[str]]:
values: dict[str, str] = {}
warnings: list[str] = []
if not params:
return values, warnings
key = ""
value = ""
state = "key"
quote: str | None = None
escape = False
def _commit() -> None:
nonlocal key, value
if key:
values[key.lower()] = value
key = ""
value = ""
for ch in params:
if state == "key":
if ch.isspace():
continue
if ch == "=":
if key:
state = "value"
continue
if ch.isalnum() or ch in "_-:":
key += ch
continue
warnings.append(f"Unexpected character '{ch}' in attribute key.")
continue
if state == "value":
if quote:
# Handle escaping within quoted strings
if escape:
value += ch
escape = False
continue
if ch == "\\":
escape = True
continue
if ch == quote:
_commit()
state = "key"
quote = None
else:
value += ch
continue
if ch in ('"', "'"):
quote = ch
continue
if ch.isspace() and value != "":
_commit()
state = "key"
continue
elif ch.isspace() and value == "":
continue
value += ch
if quote is not None:
warnings.append("Unterminated quote in annotation attributes.")
if key:
values[key.lower()] = value
return values, warnings
if key:
if state == "value" and quote is None:
_commit()
elif state == "key":
values[key.lower()] = ""
return values, warnings
def _parse_audio_annotation_params(params_map: dict[str, str]) -> AudioAttrs:
"""Parse audio parameters from annotation map."""
audio = AudioAttrs(src=params_map["src"])
clip = params_map.get("clip")
if clip and "-" in clip:
clip_begin, clip_end = clip.split("-", 1)
audio.clip_begin = clip_begin.strip()
audio.clip_end = clip_end.strip()
if params_map.get("speed"):
audio.speed = params_map["speed"]
repeat = params_map.get("repeat")
if repeat:
try:
audio.repeat_count = int(repeat)
except ValueError:
pass
if params_map.get("repeatdur"):
audio.repeat_dur = params_map["repeatdur"]
if params_map.get("level"):
audio.sound_level = params_map["level"]
if params_map.get("alt"):
audio.alt_text = params_map["alt"]
return audio
def _parse_voice_annotation_params(params_map: dict[str, str]) -> VoiceAttrs | None:
"""Parse voice params from annotation map."""
if not any(
key in params_map
for key in ("voice", "voice-lang", "voice_lang", "gender", "variant")
):
return None
voice = VoiceAttrs()
voice_name = params_map.get("voice")
voice_lang = params_map.get("voice-lang") or params_map.get("voice_lang")
if voice_name:
voice.name = voice_name
if voice_lang:
voice.language = voice_lang
if "gender" in params_map:
voice.gender = params_map["gender"].lower() # type: ignore[assignment]
if "variant" in params_map:
try:
voice.variant = int(params_map["variant"])
except ValueError:
pass
return voice
def _parse_say_as_params(params_map: dict[str, str]) -> SayAsAttrs | None:
"""Parse say-as params from annotation map."""
interpret_as = params_map.get("as") or params_map.get("say-as")
if not interpret_as:
return None
return SayAsAttrs(
interpret_as=interpret_as,
format=params_map.get("format"),
detail=params_map.get("detail"),
)
def _parse_phoneme_params(params_map: dict[str, str]) -> PhonemeAttrs | None:
"""Parse phoneme params from annotation map."""
if "ipa" in params_map:
return PhonemeAttrs(ph=params_map["ipa"], alphabet="ipa")
if "sampa" in params_map:
return PhonemeAttrs(ph=params_map["sampa"], alphabet="x-sampa")
if "ph" in params_map:
alphabet = params_map.get("alphabet", "ipa").lower()
if alphabet == "sampa":
alphabet = "x-sampa"
return PhonemeAttrs(ph=params_map["ph"], alphabet=alphabet)
return None
def _parse_prosody_params(params_map: dict[str, str]) -> ProsodyAttrs | None:
"""Parse prosody params from annotation map."""
volume = params_map.get("volume") or params_map.get("v")
rate = params_map.get("rate") or params_map.get("r")
pitch = params_map.get("pitch") or params_map.get("p")
if not any([volume, rate, pitch]):
return None
prosody = ProsodyAttrs()
if volume:
prosody.volume = _normalize_prosody_value(volume, PROSODY_VOLUME_MAP)
if rate:
prosody.rate = _normalize_prosody_value(rate, PROSODY_RATE_MAP)
if pitch:
prosody.pitch = _normalize_prosody_value(pitch, PROSODY_PITCH_MAP)
return prosody
def _normalize_prosody_value(value: str, mapping: dict[str, str]) -> str:
"""Normalize prosody values to named levels where possible."""
stripped = value.strip()
if stripped.isdigit() and stripped in mapping:
return mapping[stripped]
lowered = stripped.lower()
if lowered in mapping.values():
return lowered
return stripped
def _is_language_code(value: str) -> bool:
return bool(re.match(r"^[a-z]{2}(-[A-Z]{2})?$", value))
def _parse_voice_annotation(params: str) -> VoiceAttrs:
"""Parse voice annotation parameters."""
voice = VoiceAttrs()
# Check for complex params (with gender/variant)
if "," in params:
parts = [p.strip() for p in params.split(",")]
first = parts[0]
# First part is name or language
if re.match(r"^[a-z]{2}(-[A-Z]{2})?$", first):
voice.language = first
else:
voice.name = first
# Parse remaining parts
for part in parts[1:]:
if part.startswith("gender:"):
voice.gender = part[7:].strip().lower() # type: ignore[assignment]
elif part.startswith("variant:"):
voice.variant = int(part[8:].strip())
else:
# Simple name or language
if re.match(r"^[a-z]{2}(-[A-Z]{2})?$", params):
voice.language = params
else:
voice.name = params
return voice
# ═══════════════════════════════════════════════════════════════════════════════
# BACKWARD COMPATIBILITY
# ═══════════════════════════════════════════════════════════════════════════════
# Re-export old names for compatibility
SSMDSegment = Segment
SSMDSentence = Sentence
SSMDParagraph = Paragraph
[docs]
def parse_sentences(
ssmd_text: str,
*,
capabilities: "TTSCapabilities | str | None" = None,
include_default_voice: bool = True,
sentence_detection: bool = True,
language: str = "en",
model_size: str | None = None,
spacy_model: str | None = None,
use_spacy: bool | None = None,
heading_levels: dict | None = None,
extensions: dict | None = None,
parse_yaml_header: bool = False,
strict_parse: bool = False,
) -> list[Sentence]:
"""Parse SSMD text into sentences (backward compatible API).
This is an alias for parse_paragraphs() with the old parameter names.
Returned sentences include paragraph_index and sentence_index metadata.
Args:
ssmd_text: SSMD formatted text to parse
capabilities: TTS capabilities or preset name
include_default_voice: If False, exclude sentences without voice context
sentence_detection: Enable/disable sentence splitting
language: Language code for sentence detection
model_size: Size of spacy model (sm/md/lg)
spacy_model: Full spacy model name (deprecated, use model_size)
use_spacy: Force use of spacy for sentence detection
heading_levels: Custom heading configurations
extensions: Custom extension handlers
parse_yaml_header: If True, parse YAML front matter and apply
heading/extensions config while stripping it from the body. If False,
YAML front matter is preserved as plain text.
strict_parse: If True, strip unsupported features based on capabilities.
Returns:
List of Sentence objects
"""
model_size_value = model_size or (
spacy_model.split("_")[-1] if spacy_model else None
)
paragraphs = parse_paragraphs(
ssmd_text,
capabilities=capabilities,
sentence_detection=sentence_detection,
language=language,
model_size=model_size_value,
use_spacy=use_spacy,
heading_levels=heading_levels,
extensions=extensions,
parse_yaml_header=parse_yaml_header,
strict_parse=strict_parse,
)
sentences = [
sentence for paragraph in paragraphs for sentence in paragraph.sentences
]
# Filter out sentences without voice if requested
if not include_default_voice:
sentences = [s for s in sentences if s.voice is not None]
return sentences
[docs]
def parse_segments(
ssmd_text: str,
*,
capabilities: "TTSCapabilities | str | None" = None,
voice_context: VoiceAttrs | None = None,
) -> list[Segment]:
"""Parse SSMD text into segments (backward compatible API)."""
if voice_context is not None:
_ = voice_context
caps = _resolve_capabilities(capabilities)
return _parse_segments(ssmd_text, capabilities=caps)
[docs]
def parse_voice_blocks(ssmd_text: str) -> list[tuple[DirectiveAttrs, str]]:
"""Parse SSMD text into directive blocks (backward compatible API).
Returns list of (DirectiveAttrs, text) tuples.
"""
return _split_directive_blocks(ssmd_text)
[docs]
def parse_spans(
text: str,
*,
normalize: bool = True,
default_lang: str | None = None,
preserve_whitespace: bool | None = None,
) -> ParseSpansResult:
"""Parse SSMD text into clean text and annotation spans.
Args:
text: SSMD markdown text
normalize: If True (default), normalize whitespace between segments
default_lang: Optional language to apply to the entire output
preserve_whitespace: Deprecated. Use normalize=False instead.
Returns:
ParseSpansResult with clean text, annotations, and warnings. Offsets in
annotations are relative to the returned clean_text.
Note:
Offsets are 0-based, half-open [start, end) intervals referring to clean_text.
"""
if not text:
return ParseSpansResult(clean_text="", annotations=[], warnings=[])
# Handle deprecated preserve_whitespace parameter
if preserve_whitespace is not None:
normalize = not preserve_whitespace
warnings: list[str] = []
annotations: list[AnnotationSpan] = []
blocks, directive_warnings = _split_directive_blocks_with_warnings(text)
warnings.extend(directive_warnings)
clean_text = ""
for directive, block_text in blocks:
block_start = len(clean_text)
clean_text = _parse_block_to_spans(
clean_text,
block_text,
annotations,
warnings,
preserve_whitespace=not normalize,
)
block_end = len(clean_text)
directive_attrs = _directive_attrs_to_map(directive)
if directive_attrs and block_end > block_start:
# Add "tag" attribute for consistency with inline annotations
directive_attrs["tag"] = "div"
annotations.append(
AnnotationSpan(
char_start=block_start,
char_end=block_end,
attrs=directive_attrs,
kind="div",
)
)
clean_text = unescape_ssmd_syntax(clean_text)
if default_lang and clean_text:
annotations.insert(
0,
AnnotationSpan(
char_start=0,
char_end=len(clean_text),
attrs={"lang": default_lang},
kind="language",
),
)
return ParseSpansResult(
clean_text=clean_text, annotations=annotations, warnings=warnings
)
[docs]
def iter_sentences_spans(
text_or_doc: str | Any,
*,
preserve_whitespace: bool = False,
language: str = "en",
use_spacy: bool | None = None,
model_size: str | None = None,
) -> list[tuple[str, int, int]]:
"""Iterate over sentence spans in clean text coordinates."""
if not text_or_doc:
return []
text = text_or_doc
if not isinstance(text_or_doc, str):
text = text_or_doc.ssmd
clean_text = parse_spans(text, preserve_whitespace=preserve_whitespace).clean_text
if not clean_text:
return []
sent_texts = _split_sentences(
clean_text,
language=language,
use_spacy=use_spacy,
model_size=model_size,
escape_annotations=False,
)
spans: list[tuple[str, int, int]] = []
cursor = 0
for sent_text in sent_texts:
if not sent_text:
continue
if preserve_whitespace:
sentence = sent_text
start = cursor
end = start + len(sentence)
spans.append((sentence, start, end))
cursor = end
continue
sentence = sent_text.strip()
if not sentence:
continue
start = cursor
while start < len(clean_text) and clean_text[start].isspace():
start += 1
end = start + len(sentence)
spans.append((sentence, start, end))
cursor = end
return spans
[docs]
def lint(text: str, profile: str = "ssmd-core") -> list[LintIssue]:
"""Lint SSMD text against a capability profile.
Offsets in lint issues refer to the clean text coordinate system.
"""
from ssmd.capabilities import get_profile
issues: list[LintIssue] = []
spans = parse_spans(text)
profile_data = get_profile(profile)
for warning in spans.warnings:
issues.append(LintIssue(severity="warn", message=warning))
for annotation in spans.annotations:
attrs = annotation.attrs
tag = attrs.get("tag") or annotation.kind
if (
tag
and tag not in profile_data.inline_tags
and tag not in profile_data.block_tags
):
issues.append(
LintIssue(
severity="error",
message=f"Tag '{tag}' is not supported by profile '{profile}'.",
char_start=annotation.char_start,
char_end=annotation.char_end,
)
)
continue
if tag:
allowed_attrs = profile_data.attributes.get(tag, set())
if allowed_attrs:
for key in attrs:
if key in {"tag", "name"}:
continue
if key not in allowed_attrs:
issues.append(
LintIssue(
severity="warn",
message=(
f"Attribute '{key}' is not supported for '{tag}' "
f"in profile '{profile}'."
),
char_start=annotation.char_start,
char_end=annotation.char_end,
)
)
return issues
def _filter_sentences(sentences: list[Sentence], caps: "TTSCapabilities") -> None: # noqa: C901
for sentence in sentences:
if sentence.language and not caps.language_scopes.get("sentence", True):
sentence.language = None
if sentence.prosody:
if not caps.prosody:
sentence.prosody = None
else:
if not caps.volume:
sentence.prosody.volume = None
if not caps.rate:
sentence.prosody.rate = None
if not caps.pitch:
sentence.prosody.pitch = None
if not any(
[
sentence.prosody.volume,
sentence.prosody.rate,
sentence.prosody.pitch,
]
):
sentence.prosody = None
for segment in sentence.segments:
if segment.audio and not caps.audio:
segment.audio = None
if segment.say_as and not caps.say_as:
segment.say_as = None
if segment.emphasis and not caps.emphasis:
segment.emphasis = False
if segment.language and not caps.language_scopes.get("sentence", True):
segment.language = None
if segment.phoneme and not caps.phoneme:
segment.phoneme = None
if segment.substitution and not caps.substitution:
segment.substitution = None
if segment.extension and not caps.supports_extension(segment.extension):
segment.extension = None
if segment.prosody:
if not caps.prosody:
segment.prosody = None
else:
if not caps.volume:
segment.prosody.volume = None
if not caps.rate:
segment.prosody.rate = None
if not caps.pitch:
segment.prosody.pitch = None
if not any(
[
segment.prosody.volume,
segment.prosody.rate,
segment.prosody.pitch,
]
):
segment.prosody = None
if not caps.break_tags:
segment.breaks_before = []
segment.breaks_after = []
if not caps.mark:
segment.marks_before = []
segment.marks_after = []