feat(lexiannot): Integrate LLM for advanced vocabulary processing

This commit is contained in:
wumode
2025-12-10 21:22:29 +08:00
parent b961a52440
commit 00c65a0983
11 changed files with 2854 additions and 1508 deletions

View File

@@ -540,11 +540,12 @@
"name": "美剧生词标注",
"description": "根据CEFR等级为英语影视剧标注高级词汇。",
"labels": "英语",
"version": "1.1.4",
"version": "1.2.0",
"icon": "LexiAnnot.png",
"author": "wumode",
"level": 1,
"history": {
"v1.2.0": "引入大模型候选词决策和词义丰富处理链; 支持读取系统智能体配置; 添加智能体工具; 优化通知样式; 改进 UI",
"v1.1.4": "优化字幕选择决策",
"v1.1.3": "适配 Pydantic V2 (主程序版本需高于 2.8.1-1)",
"v1.1.2": "使用子进程避免 spaCy 模型常驻内存",

View File

@@ -1,26 +1,32 @@
# 美剧生词标注
根据CEFR等级为英语影视剧标注高级词汇。
___
在影视剧入库后LexiAnnot 会读取媒体文件的MediaInfo和文件列表如果视频的原始语言为英语并且包含英文文本字幕LexiAnnot将为其生成包含词汇注释的`.en.ass`字幕文件。
在影视剧入库后LexiAnnot会读取媒体文件的MediaInfo和文件列表如果视频的原始语言为英语并且包含英文文本字幕LexiAnnot将为其生成包含词汇注释的.ass字幕文件。
## 主要功能
![](https://images2.imgbox.com/d6/b6/kZu6EH2a_o.png)
![](https://images2.imgbox.com/c8/3a/rEJBWu5v_o.png)
![](https://images2.imgbox.com/97/b7/d6RXFtwD_o.png)
![](https://images2.imgbox.com/56/c0/FBhJMvRD_o.jpg)
![](https://images2.imgbox.com/8a/d4/AtgOe265_o.jpg)
# Gemini
- 识别视频的原始语言和字幕语言
- 自动适应原字幕样式
- 俚语 / 自造词 / 熟词生义标注和解释
- **[获取APIKEY](https://aistudio.google.com/app/apikey)**
- **[速率限制](https://ai.google.dev/gemini-api/docs/rate-limits)**
## 使用配置
**确保可以正常访问下面的域名**
- spaCy 模型
- spaCy 用于词形还原、POS 标注和命名实体识别,`en_core_web_sm``en_core_web_md` 已足够满足需求。
- LLM 设置
- 一集影视剧的字幕通常包含数千个单词,建议使用支持长文本输入的模型,选择一个适当的上下文窗口大小。
- 处理 60 min 的影视剧字幕大约会消耗 `60K`~`80K` token具体取决于字幕内容。
- 配置请参考 MoviePilot 智能助手的设置部分。
- Agent 工具
- 在聊天中使用 `/ai` 命令告诉智能助手你要标注的影视剧。
- googleapis.com
- google.dev
- aistudio.google.com
# CEFR
## CEFR
CEFR全称是Common European Framework of Reference for Languages。
@@ -36,20 +42,18 @@ CEFR全称是Common European Framework of Reference for Languages。
- **C1** (高级/Advanced):能够理解各种较长、要求较高的文本,并能识别隐含意义,表达流利、自然,能灵活有效地使用语言来应对各种目的。
- **C2** (精通/Proficient):能够轻松理解几乎所有听到的或读到的内容,能够非常流利、准确、精细地表达自己,即使在复杂的情况下也能区分细微的含义。
# 计划
## 计划
- 双语字幕支持
- ~~考试词汇标注~~
# FAQ
## FAQ
- **为什么需要用到Gemini**
- LexiAnnot使用的词典仅包含约18000个单词无法覆盖影视剧中的海量的俚语、习语、流行语等更广泛的表达形式
- **只能处理已有字幕的视频吗?**
- 是的,视频需要包含**英文文本字幕**
- **为什么无法处理一些包含字幕视频**
- 目前无法识别基于图片的字幕(通常是特效字幕)
# 感谢
## 感谢
- [coca-vocabulary-20000](https://github.com/llt22/coca-vocabulary-20000)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,67 @@
import asyncio
from typing import Optional, Type
from pydantic import BaseModel
from app.agent.tools.base import MoviePilotTool
from app.core.plugin import PluginManager
from .schemas import VocabularyAnnotatingToolInput
class VocabularyAnnotatingTool(MoviePilotTool):
"""自定义工具示例"""
# 工具名称
name: str = "vocabulary_annotating_tool"
# 工具描述
description: str = (
"Add new vocabulary annotation task to plugin LexiAnnot's task queue."
)
# 输入参数模型
args_schema: Type[BaseModel] = VocabularyAnnotatingToolInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据订阅参数生成友好的提示消息"""
skip_existing = kwargs.get("skip_existing", False)
video_path = kwargs.get("video_path", "")
message = f"正在添加字幕任务: {video_path!r}"
if skip_existing:
message += "(覆写方式:跳过已存在的字幕文件)"
else:
message += "(覆写方式:覆盖已存在的字幕文件)"
return message
async def run(self, video_path: str, skip_existing: bool = True, **kwargs) -> str:
"""
实现工具的核心逻辑(异步方法)
:param video_path: Path to the video file
:param skip_existing: Whether to skip existing subtitle files
:param kwargs: 其他参数,包含 explanation工具使用说明
:return: 工具执行结果,返回字符串格式
"""
try:
# 执行工具逻辑
result = await self._perform_operation(video_path, skip_existing)
# 返回执行结果
if not result:
return f"成功添加词汇标注任务: {video_path!r}"
else:
return f"添加任务出错: {result}"
except Exception as e:
return f"执行失败: {str(e)}"
async def _perform_operation(
self, video_path: str, skip_existing: bool
) -> str | None:
"""内部方法,执行具体操作"""
# 实现具体业务逻辑
plugins = PluginManager().running_plugins
plugin_instance = plugins.get("LexiAnnot")
if not plugin_instance:
return "LexiAnnot 插件未运行"
await asyncio.to_thread(
plugin_instance.add_task, video_file=video_path, skip_existing=skip_existing
)
return None

View File

@@ -0,0 +1,116 @@
from typing import Literal
from pydantic import BaseModel, Field, RootModel
from .schemas import PosDef, Cefr
class CefrEntry(BaseModel):
pos: Literal[
"noun",
"adverb",
"interjection",
"preposition",
"determiner",
"have-verb",
"modal auxiliary",
"adjective",
"number",
"be-verb",
"verb",
"conjunction",
"do-verb",
"infinitive-to",
"vern",
"pos",
"pronoun",
] = Field(..., description="Part of speech")
cefr: Cefr = Field(..., description="CEFR level")
notes: str | None = Field(default=None, description="Notes")
class CefrDictionary(RootModel):
root: dict[str, list[CefrEntry]]
def get(self, word: str) -> list[CefrEntry] | None:
return self.root.get(word)
class Coca20KEntry(BaseModel):
index: int = Field(..., description="Index of the entry")
phonetics_1: str = Field(..., description="Phonetics style 1")
phonetics_2: str = Field(..., description="Phonetics style 2")
pos_defs: list[PosDef] = Field(
..., description="List of part of speech definitions"
)
class Coca20KDictionary(RootModel):
root: dict[str, Coca20KEntry]
def get(self, word: str) -> Coca20KEntry | None:
return self.root.get(word)
class ShanBayDef(BaseModel):
# 'n.', 'v.', 'adv.', 'adj.', 'phrase.', 'int.', 'pron.', 'prep.', '.', 'conj.', 'num.', 'phrase v.', 'linkv.',
# 'det.', 'ordnumber.', 'prefix.', 'un.', 'vt.', 'mod. v.', 'abbr.', 'auxv.', 'modalv.', 'vi.', 'aux. v.',
# 'interj.', 'article.', 'infinitive.', 'suff.', 'ord.', 'art.', 'exclam.', 'n.[C]'
pos: str = Field(..., description="Part of speech")
definition_cn: str = Field(..., description="Definition in Chinese")
class ShanbayEntry(BaseModel):
ipa_uk: str = Field(..., description="UK IPA pronunciation")
ipa_us: str = Field(..., description="US IPA pronunciation")
defs: list[ShanBayDef] = Field(..., description="List of definitions")
class ShanbayDictionary(BaseModel):
"""Dictionary entries for various examinations."""
cet4: dict[str, ShanbayEntry] = Field(
..., alias="CET-4", description="CET-4 dictionary entries"
)
cet6: dict[str, ShanbayEntry] = Field(
..., alias="CET-6", description="CET-6 dictionary entries"
)
npee: dict[str, ShanbayEntry] = Field(
..., alias="NPEE", description="NPEE dictionary entries"
)
ielts: dict[str, ShanbayEntry] = Field(
..., alias="IELTS", description="IELTS dictionary entries"
)
toefl: dict[str, ShanbayEntry] = Field(
..., alias="TOEFL", description="TOEFL dictionary entries"
)
gre: dict[str, ShanbayEntry] = Field(
..., alias="GRE", description="GRE dictionary entries"
)
tem4: dict[str, ShanbayEntry] = Field(
..., alias="TEM-4", description="TEM-4 dictionary entries"
)
tem8: dict[str, ShanbayEntry] = Field(
..., alias="TEM-8", description="TEM-8 dictionary entries"
)
pet: dict[str, ShanbayEntry] = Field(
..., alias="PET", description="PET dictionary entries"
)
def query(self, word: str) -> dict[str, ShanbayEntry]:
result = {}
for field_name, field_info in ShanbayDictionary.model_fields.items():
value = getattr(self, field_name)
if word in value:
result[field_info.alias] = value[word]
return result
class Lexicon(BaseModel):
cefr: CefrDictionary = Field(..., description="CEFR dictionary")
coca20k: Coca20KDictionary = Field(..., description="COCA 20K dictionary")
examinations: ShanbayDictionary = Field(
..., description="Shanbay examinations dictionary"
)
swear_words: list[str] = Field(..., description="List of swear words")
version: str = Field(..., description="Version of the lexicon")

View File

@@ -0,0 +1,736 @@
import re
import threading
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.prompts import ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from pydantic import SecretStr
from app.core.config import settings
from app.schemas import Context
from app.schemas.types import MediaType
from app.log import logger
from .lexicon import CefrDictionary, Lexicon, Coca20KDictionary
from .schemas import (
SubtitleSegment,
PosDef,
Word,
Cefr,
WordMetadata,
SegmentList,
LlmFeedback,
UniversalPos,
LlmEnrichmentResult,
LlmTranslationResult,
)
from .spacyworker import SpacyWorker
_patterns = [
r"\d+th|\d?1st|\d?2nd|\d?3rd",
r"\w+'s$",
r"\w+'d$",
r"\w+'t$",
"[Ii]'m$",
r"\w+'re$",
r"\w+'ve$",
r"\w+'ll$",
]
filter_patterns: list[re.Pattern] = [re.compile(p) for p in _patterns]
pos_interests = {"NOUN", "VERB", "ADJ", "ADV", "ADP", "CCONJ", "SCONJ"}
UNIVERSAL_POS_MAP: dict[UniversalPos, str] = {
UniversalPos.ADJ: "adj.",
UniversalPos.ADV: "adv.",
UniversalPos.INTJ: "int.",
UniversalPos.NOUN: "n.",
UniversalPos.PROPN: "n.",
UniversalPos.VERB: "v.",
UniversalPos.AUX: "aux.",
UniversalPos.ADP: "prep.",
UniversalPos.CCONJ: "conj.",
UniversalPos.SCONJ: "conj.",
UniversalPos.DET: "det.",
UniversalPos.NUM: "num.",
UniversalPos.PART: "part.",
UniversalPos.PRON: "pron.",
UniversalPos.PUNCT: None,
UniversalPos.SYM: None,
UniversalPos.X: None,
}
def initialize_llm(
provider: str,
api_key: str,
model_name: str,
base_url: str | None,
temperature: float = 0.1,
max_retries: int = 3,
proxy: bool = False,
) -> BaseChatModel:
"""初始化LLM模型"""
if provider == "google":
if proxy:
from langchain_openai import ChatOpenAI
return ChatOpenAI(
model=settings.LLM_MODEL,
api_key=SecretStr(api_key),
max_retries=3,
base_url="https://generativelanguage.googleapis.com/v1beta/openai",
temperature=settings.LLM_TEMPERATURE,
openai_proxy=settings.PROXY_HOST,
)
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(
model=model_name,
google_api_key=api_key, # noqa
max_retries=max_retries,
temperature=temperature,
)
elif provider == "deepseek":
from langchain_deepseek import ChatDeepSeek
return ChatDeepSeek(
model=model_name,
api_key=SecretStr(api_key),
max_retries=max_retries,
temperature=temperature,
)
else:
from langchain_openai import ChatOpenAI
return ChatOpenAI(
model=model_name,
api_key=SecretStr(api_key),
max_retries=max_retries,
base_url=base_url,
temperature=temperature,
openai_proxy=settings.PROXY_HOST if proxy else None,
)
def convert_pos_to_spacy(pos: str):
"""
将给定的词性列表转换为 spaCy 库中使用的词性标签
:param pos: 字符串形式词性
:returns: 一个包含对应spaCy词性标签的列表。对于无法直接映射的词性将返回None
"""
spacy_pos_map = {
"noun": "NOUN",
"adjective": "ADJ",
"adverb": "ADV",
"verb": "VERB",
"preposition": "ADP",
"conjunction": "CCONJ",
"determiner": "DET",
"pronoun": "PRON",
"interjection": "INTJ",
"number": "NUM",
}
pos_lower = pos.lower()
if pos_lower in spacy_pos_map:
spacy_pos = spacy_pos_map[pos_lower]
elif pos_lower == "be-verb":
spacy_pos = "AUX" # Auxiliary verb (e.g., be, do, have)
elif pos_lower == "vern":
spacy_pos = "VERB" # Assuming 'vern' is a typo for 'verb'
elif pos_lower == "modal auxiliary":
spacy_pos = "AUX" # Modal verbs are also auxiliaries
elif pos_lower == "do-verb":
spacy_pos = "AUX"
elif pos_lower == "have-verb":
spacy_pos = "AUX"
elif pos_lower == "infinitive-to":
spacy_pos = "PART" # Particle (e.g., to in "to go")
elif not pos_lower: # Handle empty strings
spacy_pos = None
else:
spacy_pos = None # For unmapped POS tags
return spacy_pos
def convert_spacy_to_universal(spacy_pos: str) -> UniversalPos:
"""
将 spaCy POS 标签转换为 UniversalPos 枚举
"""
# 创建映射字典
pos_mapping = {
"ADJ": UniversalPos.ADJ,
"ADV": UniversalPos.ADV,
"INTJ": UniversalPos.INTJ,
"NOUN": UniversalPos.NOUN,
"PROPN": UniversalPos.PROPN,
"VERB": UniversalPos.VERB,
"AUX": UniversalPos.AUX,
# 介词/后置词
"ADP": UniversalPos.ADP,
# 连词
"CCONJ": UniversalPos.CCONJ,
"SCONJ": UniversalPos.SCONJ,
# 限定词
"DET": UniversalPos.DET,
# 数词
"NUM": UniversalPos.NUM,
# 代词
"PRON": UniversalPos.PRON,
# 小品词
"PART": UniversalPos.PART,
# 标点
"PUNCT": UniversalPos.PUNCT,
# 符号
"SYM": UniversalPos.SYM,
# 其他
"X": UniversalPos.X,
# 特殊处理spaCy 可能返回的其他标签
"SPACE": UniversalPos.PUNCT, # 空格当作标点处理
"CONJ": UniversalPos.CCONJ, # 旧版 spaCy 的连词标签
}
# 转换为大写,确保一致
spacy_pos = spacy_pos.upper()
# 如果直接匹配,返回对应枚举
if spacy_pos in pos_mapping:
return pos_mapping[spacy_pos]
# 处理特殊情况:以特定前缀开头的标签
if spacy_pos.startswith("ADJ"):
return UniversalPos.ADJ
elif spacy_pos.startswith("ADV"):
return UniversalPos.ADV
elif spacy_pos.startswith("NOUN"):
return UniversalPos.NOUN
elif spacy_pos.startswith("VERB"):
return UniversalPos.VERB
elif spacy_pos.startswith("PROPN"):
return UniversalPos.PROPN
elif spacy_pos.startswith("PRON"):
return UniversalPos.PRON
# 默认返回 X未知
return UniversalPos.X
def get_cefr_by_spacy(
lemma_: str, pos_: str, cefr_lexicon: CefrDictionary
) -> Cefr | None:
word = lemma_.lower().strip("-*'")
result = cefr_lexicon.get(word)
if result:
all_cefr: list[Cefr] = []
if len(result) > 0:
for entry in result:
if pos_ == convert_pos_to_spacy(entry.pos):
return entry.cefr
all_cefr.append(entry.cefr)
return min(all_cefr)
return None
def query_coca20k(word: str, coca20k: Coca20KDictionary):
word = word.lower().strip("-*'")
return coca20k.get(word)
def _update_word_via_lexicon(word: Word, lexi: Lexicon) -> Word:
"""
使用词典信息更新单词对象
:param word: 需要更新的单词对象
:param lexi: 词典对象
:returns: 更新后的单词对象
"""
# query dictionary
cefr = get_cefr_by_spacy(word.lemma, word.pos.value, lexi.cefr)
res_of_coca = query_coca20k(word.lemma, lexi.coca20k)
if res_of_coca and not cefr:
cefr = None
res_of_exams = lexi.examinations.query(word.lemma)
exam_tags = [exam_id for exam_id in res_of_exams if exam_id in res_of_exams]
pos_defs = []
phonetics = ""
if res_of_exams:
for exam, value in res_of_exams.items():
phonetics = value.ipa_uk
defs = {}
for pos_def in value.defs:
pos = pos_def.pos
definition_cn = pos_def.definition_cn
defs.setdefault(pos, []).append(definition_cn)
for pos, meanings in defs.items():
pos_defs.append(PosDef(pos=pos, meanings=meanings))
break
elif res_of_coca:
phonetics = res_of_coca.phonetics_1
pos_defs = res_of_coca.pos_defs
word.exams = exam_tags
word.cefr = cefr
word.pos_defs = pos_defs
word.phonetics = phonetics
return word
def extract_advanced_words(
segment: SubtitleSegment,
lexi: Lexicon,
spacy_worker: SpacyWorker,
simple_level: set[Cefr],
exams: list[str],
) -> list[Word]:
text = segment.clean_text
doc = spacy_worker.submit(text)
last_end_pos = 0
lemma_to_query = []
words = []
for token in doc.tokens:
# filter tokens
if (
len(token.text) == 1
or token.is_stop
or token.is_punct
or token.ent_iob_ != "O"
):
continue
if token.pos_ not in pos_interests:
continue
if token.lemma_ in lexi.swear_words:
continue
striped = token.lemma_.strip("-[")
if any(p.match(striped) for p in filter_patterns):
continue
if striped in lemma_to_query:
continue
else:
lemma_to_query.append(striped)
striped_text = token.text.strip("-*[")
start_pos = text.find(striped_text, last_end_pos)
end_pos = start_pos + len(striped_text)
last_end_pos = end_pos
word = Word(
text=striped_text,
lemma=striped,
pos=convert_spacy_to_universal(token.pos_),
meta=WordMetadata(
start_pos=start_pos, end_pos=end_pos, context_id=segment.index
),
)
word = _update_word_via_lexicon(word, lexi)
if word.cefr and word.cefr in simple_level:
continue
words.append(word)
return words
def _find_segment_by_word_id(
segments: list[SubtitleSegment], word_id: int
) -> SubtitleSegment | None:
for segment in segments:
for word in segment.candidate_words:
if word.meta.word_id == word_id:
return segment
return None
def _update_word_metadata(
new_text: str, meta: WordMetadata, segment: SubtitleSegment
) -> WordMetadata | None:
"""
更新单词的元数据
:param new_text: 新的单词文本
:param meta: 单词的元数据对象
:param segment: 字幕片段对象
"""
text = segment.clean_text
p_end = meta.end_pos
new_len = len(new_text)
i = meta.start_pos - new_len + 1
i = max(0, i)
j = p_end + min(0, (len(text) - (p_end + new_len)))
for x in range(i, j + 1):
text_view = text[x : (x + new_len)]
if text_view == new_text:
return WordMetadata(
start_pos=x,
end_pos=x + new_len,
context_id=segment.index,
word_id=meta.word_id,
)
return None
def format_time_extended(milliseconds: int):
"""
将秒数转换为时间格式
:param milliseconds: 整数,表示毫秒数
:return: 字符串,格式为 HH:MM:SS 或 HH:MM:SS.mmm
"""
if milliseconds < 0:
sign = "-"
milliseconds = abs(milliseconds)
else:
sign = ""
hours = int(milliseconds // 3600000)
minutes = int((milliseconds % 3600000) // 60000)
seconds = (milliseconds % 60000) // 1000
milliseconds_remainder = milliseconds % 1000
return f"{sign}{hours:02d}:{minutes:02d}:{seconds:02d}.{milliseconds_remainder:03d}"
def _context_process_chain(
lexi: Lexicon,
llm: BaseChatModel,
segments: list[SubtitleSegment],
start: int,
end: int,
leaner_level: str = "C1",
media_name: str | None = None,
translate_sentences: bool = False
):
feedback_parser = PydanticOutputParser(pydantic_object=LlmFeedback)
def format_input(segment_list: list[SubtitleSegment]):
media_name_prefix = (
f"The following subtitles are from '{media_name}'.\n" if media_name else ""
)
return {
"media_name_prefix": media_name_prefix,
"context_text": " ".join([seg.clean_text for seg in segment_list]),
"candidate_words": "\n".join(
[
f"- {word.text} (WORD_ID: {word.meta.word_id}, LEMMA: {word.lemma}, CEFR: {word.cefr}, POS: {word.pos})"
for seg in segment_list
for word in seg.candidate_words
]
),
"leaner_level": leaner_level,
"format_instructions": feedback_parser.get_format_instructions(),
}
def refactor_by_feedback(feedback: LlmFeedback):
# Process LLM feedback to update segments
for word in feedback.candidate_words_feedback:
seg = _find_segment_by_word_id(segments, word.word_id)
if not seg or seg.index < start or seg.index > end:
continue
# Update word info based on feedback
if not word.should_keep:
seg.candidate_words = [
w for w in seg.candidate_words if w.meta.word_id != word.word_id
]
continue
for w in seg.candidate_words:
if w.meta.word_id == word.word_id:
word_text = word.text
if word_text is not None and word.text != w.text:
# Update metadata if text changed
if word.text not in seg.clean_text:
# If the word text is not found in the segment, skip updating metadata
continue
new_meta = _update_word_metadata(word_text, w.meta, seg)
if not new_meta:
continue
w.meta = new_meta
w.text = word_text
if word.pos:
w.pos = word.pos
if word.lemma:
w.lemma = word.lemma
# Add new words identified by LLM
for new_word in feedback.llm_identified_words:
for seg in segments:
if seg.index < start or seg.index > end:
continue
start_pos = seg.clean_text.find(new_word.text)
if start_pos == -1:
continue
if any(w.text == new_word.text for w in seg.candidate_words):
continue
new_meta = WordMetadata(
start_pos=start_pos,
end_pos=start_pos + len(new_word.text),
context_id=seg.index
)
built_word = Word(
text=new_word.text,
lemma=new_word.lemma,
pos=new_word.pos,
meta=new_meta
)
built_word = _update_word_via_lexicon(built_word, lexi)
if built_word.cefr and built_word.cefr < leaner_level:
continue
seg.candidate_words.append(built_word)
prompt_template = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are an expert in linguistics and language learning. Your task is to analyze subtitle segments.
Please perform the following tasks for an English learner at {leaner_level} CEFR level.
**CRITICAL INSTRUCTION**: The learner is advanced. They already know common daily vocabulary.
Your goal is to identify **only** content that helps them reach native-level proficiency.
1. **Review and Evaluate Candidate Words:**
* **Goal**: Filter out simple words and correct any errors in lemma/POS/text.
* **Action**: Return feedback items **ONLY** for words that:
1. Should be **discarded** (too simple, trivial filler, profanity without cultural value). Set `should_keep` to `False`.
2. Need **correction** (wrong lemma, POS, or text boundary). Set `should_keep` to `True` and provide correct values.
* **Implicit Rule**: If a word is appropriate for the learner and has correct info, **DO NOT** include it in the output list.
* **Keep criteria**: Keep simple words **ONLY IF** used in a non-literal, metaphorical, or idiomatic sense.
* **Discard criteria**: Discard trivial conversational fillers ('gonna', 'wanna'), simple interjections, common profanity, and words below {leaner_level} level.
2. **Identify Missed Words:**
* Identify any additional single words or phrases (typically 1-3 words) from the `context_text` that may be important for {leaner_level} learners. This specifically includes:
* **Slang or informal expressions.**
* **Internet terms or modern colloquialisms.**
* **Words or phrases that require specific cultural background knowledge to understand.**
* **Any other words or phrases that are challenging.**
* Avoid repeating words already listed in `candidate_words`.
* Must exist in the exact form in `context_text`.
* Provide lemma and POS.
* **Do NOT include** simple high-frequency words, common fillers ('gonna', 'gotta'), or basic swear words unless necessary for context.
-------------------------
You MUST return output strictly matching the provided Pydantic schema.
Return ONLY valid JSON.
**Here are the output format instructions you MUST follow strictly:**
{format_instructions}
""",
),
(
"human",
"""{media_name_prefix}Here is the context from the subtitles:
---
{context_text}
---
Here are the candidate words identified by a basic algorithm:
{candidate_words}
""",
),
]
)
feedback_chain = (
format_input | prompt_template | llm.with_structured_output(LlmFeedback).with_retry(stop_after_attempt=3)
)
result: LlmFeedback = feedback_chain.invoke(segments) # type: ignore
refactor_by_feedback(result)
# 丰富词义
if any(segment.candidate_words for segment in segments):
enrichment_prompt_template = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are a linguistics and English-learning expert. Your goal is to enhance vocabulary learning for Chinese users.\n
For each word (identified by `WORD_ID`), provide:
1. **Translation:** A concise Chinese translation.
2. **Usage or Cultural Context (optional, in Chinese)**:
* ONLY include if:
- The word has a specific meaning in this context that differs from its common definition;
- It is slang, idiom, phrasal, metaphorical, or culturally loaded;
* ONLY provide this context when learners would likely struggle to understand the word's usage without it.
**For each word, provide the `word_id` to ensure proper mapping.**
**Your judgment should be based strictly on the provided subtitle context. DO NOT fabricate context or forced explanation.**
-------------------------
You MUST return output strictly matching the provided Pydantic schema.
Return ONLY valid JSON.
**Here are the output format instructions you MUST follow strictly:**
{format_instructions}
""",
),
(
"human",
"""{media_name_prefix}Here is the context from the subtitles:
---
{context_text}
---
Here are the words you need to enrich:
{words_to_enrich}
""",
),
]
)
enrichment_parser = PydanticOutputParser(pydantic_object=LlmEnrichmentResult)
def format_enrichment_input(segment_list: list[SubtitleSegment]):
media_name_prefix = (
f"The following subtitles are from '{media_name}'.\n"
if media_name
else ""
)
words_to_enrich = []
for seg in segment_list:
if start <= seg.index <= end:
for w in seg.candidate_words:
words_to_enrich.append(
f"- {w.text} (WORD_ID: {w.meta.word_id}, LEMMA: {w.lemma}, POS: {w.pos}, DEFINITIONS: {w.pos_defs_plaintext})"
)
return {
"media_name_prefix": media_name_prefix,
"context_text": " ".join([seg.clean_text for seg in segment_list]),
"words_to_enrich": "\n".join(words_to_enrich),
"format_instructions": enrichment_parser.get_format_instructions(),
}
enrichment_chain = (
format_enrichment_input
| enrichment_prompt_template
| llm.with_structured_output(LlmEnrichmentResult).with_retry(stop_after_attempt=3)
)
enrichment_result: LlmEnrichmentResult = enrichment_chain.invoke(segments) # type: ignore
for enriched_word_data in enrichment_result.enriched_words:
for segment in segments:
if segment.index < start or segment.index > end:
continue
for candidate_word in segment.candidate_words:
if candidate_word.meta.word_id == enriched_word_data.word_id:
candidate_word.llm_translation = enriched_word_data.translation
candidate_word.llm_usage_context = enriched_word_data.usage_context
break
# 整句翻译
if translate_sentences:
translation_parser = PydanticOutputParser(pydantic_object=LlmTranslationResult)
translation_prompt_template = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are a professional subtitle translator. Your task is to translate English subtitle segments into natural, idiomatic Chinese.
**Guidelines:**
1. **Tone & Style:** Maintain the original tone (e.g., casual, formal, humorous, dramatic).
2. **Context:** Use the surrounding segments to ensure continuity and correct meaning.
3. **Conciseness:** Subtitles have space constraints. Keep translations concise but accurate.
4. **Formatting:** Return the result strictly matching the provided JSON schema.
-------------------------
You MUST return output strictly matching the provided Pydantic schema.
Return ONLY valid JSON.
**Here are the output format instructions you MUST follow strictly:**
{format_instructions}
""",
),
(
"human",
"""{media_name_prefix}Here are the segments to translate:
---
{segments_text}
---
""",
),
]
)
def format_translation_input(segment_list: list[SubtitleSegment]):
media_name_prefix = (
f"The following subtitles are from '{media_name}'.\n"
if media_name
else ""
)
# Only translate segments within the current batch range (start to end)
segments_text_lines = []
for seg in segment_list:
if start <= seg.index <= end:
segments_text_lines.append(f"ID {seg.index}: {seg.clean_text}")
return {
"media_name_prefix": media_name_prefix,
"segments_text": "\n".join(segments_text_lines),
"format_instructions": translation_parser.get_format_instructions(),
}
translation_chain = (
format_translation_input
| translation_prompt_template
| llm.with_structured_output(LlmTranslationResult).with_retry(stop_after_attempt=3)
)
try:
translation_result: LlmTranslationResult = translation_chain.invoke(segments) # type: ignore
# Map translations back to segments
trans_map = {
t.index: t.translation for t in translation_result.translations
}
for segment in segments:
if segment.index in trans_map:
segment.Chinese = trans_map[segment.index]
except Exception as e:
logger.error(f"Error during sentence translation: {e}")
return [segment for segment in segments if start <= segment.index <= end]
def llm_process_chain(
lexi: Lexicon,
llm: BaseChatModel,
segments: SegmentList,
shutdown_event: threading.Event,
context_window: int = 30,
leaner_level: str = "C1",
media_context: Context | None = None,
translate_sentences: bool = False,
) -> SegmentList:
"""
根据 LLM 的反馈更新字幕片段中的单词信息
:param lexi: 词典对象
:param llm: 大语言模型对象
:param segments: 字幕片段
:param shutdown_event: 关闭事件
:param context_window: 上下文窗口大小
:param leaner_level: 学习者的 CEFR 水平
:param media_context: 媒体信息
:param translate_sentences: 是否翻译句子
:returns: 更新后的字幕片段列表
"""
media_name = None
if media_context and media_context.media_info and media_context.meta_info:
media_info = media_context.media_info
if media_info.type == MediaType.TV:
media_name = (
f"{media_info.title_year} {media_context.meta_info.season_episode}"
)
else:
media_name = f"{media_info.title_year}"
segments_list = []
for context, (start, end) in segments.context_generator(
context_window=context_window, extra_len=2
):
if shutdown_event.is_set():
break
logger.info(
f"Processing segments {format_time_extended(context[0].start_time)} ({context[0].index}) ->"
f" {format_time_extended(context[-1].end_time)} ({context[-1].index}) via LLM..."
)
segments_list.extend(
_context_process_chain(
lexi, llm, context, start, end, leaner_level, media_name, translate_sentences
)
)
return SegmentList(root=segments_list)

View File

@@ -1,111 +0,0 @@
import time
from typing import Generic, List, TypeVar
from google import genai
from google.genai import types
from pydantic import BaseModel
class Context(BaseModel):
original_text: str
class Vocabulary(BaseModel):
lemma: str
Chinese: str
class TaskBase(BaseModel):
id: str
class VocabularyTranslationTask(TaskBase):
vocabulary: List[Vocabulary]
context: Context
index: int
class DialogueTranslationTask(TaskBase):
original_text: str
Chinese: str
index: int
T = TypeVar("T", bound=TaskBase)
class TranslationTasks(BaseModel, Generic[T]):
tasks: List[T]
class GeminiResponse(BaseModel, Generic[T]):
tasks: List[T]
total_token_count: int
success: bool
message: str = ""
def translate(
api_key: str,
translation_tasks: TranslationTasks[T],
system_instruction: str,
gemini_model: str = "gemini-2.0-flash",
temperature: float = 0.3,
max_retries: int = 3,
retry_delay: int = 10,
) -> GeminiResponse[T]:
"""
Query the Gemini API for translation tasks with retry logic.
:param api_key: Gemini API key
:param translation_tasks: Translation tasks
:param system_instruction: System instruction
:param gemini_model: Model name to use
:param temperature: Generation temperature
:param max_retries: Number of retry attempts
:param retry_delay: Delay between retries in seconds
returns: GeminiResponse containing the results
"""
messages = []
response_schema = type(translation_tasks)
for attempt in range(1, max_retries + 1):
try:
client = genai.Client(api_key=api_key)
response = client.models.generate_content(
model=gemini_model,
contents=translation_tasks.model_dump_json(),
config=types.GenerateContentConfig(
system_instruction=system_instruction,
response_mime_type="application/json",
response_schema=response_schema,
temperature=temperature,
),
)
if not response.parsed:
raise ValueError("Empty response from Gemini API")
translation_res = response.parsed
total_token_count = response.usage_metadata.total_token_count
return GeminiResponse(
tasks=translation_res.tasks,
total_token_count=total_token_count or 0,
success=True,
)
except Exception as e:
messages.append(f"Attempt {attempt} failed: {str(e)}")
if attempt < max_retries:
time.sleep(attempt*retry_delay)
return GeminiResponse(
tasks=[],
total_token_count=0,
success=False,
message="All retry attempts failed. " + "\n".join(messages),
)

View File

@@ -1,5 +1,4 @@
pysubs2~=1.8.0
langdetect~=1.0.9
pymediainfo~=7.0.1
spacy~=3.8.7
google-genai~=1.48.0
spacy~=3.8.11

View File

@@ -0,0 +1,394 @@
import re
import uuid
from collections import Counter
from enum import Enum
from typing import Literal, Generator, Iterator
from pydantic import BaseModel, Field, RootModel, model_validator
from app.utils.singleton import Singleton
Cefr = Literal["C2", "C1", "B2", "B1", "A2", "A1"]
class UniversalPos(str, Enum):
"""Universal Part-of-Speech tags"""
ADJ = "ADJ" # Adjective
ADV = "ADV" # Adverb
INTJ = "INTJ" # Interjection
NOUN = "NOUN" # Noun
PROPN = "PROPN" # Proper noun
VERB = "VERB" # Verb
ADP = "ADP" # Adposition (preposition/postposition)
AUX = "AUX" # Auxiliary verb
CCONJ = "CCONJ" # Coordinating conjunction
DET = "DET" # Determiner
NUM = "NUM" # Numeral
PART = "PART" # Particle
PRON = "PRON" # Pronoun
SCONJ = "SCONJ" # Subordinating conjunction
PUNCT = "PUNCT" # Punctuation
SYM = "SYM" # Symbol
X = "X" # Other/unknown
class IDGenerator(metaclass=Singleton):
"""Singleton class for generating unique IDs."""
_counter = 0
def next_id(self):
self._counter += 1
return self._counter
def reset(self):
self._counter = 0
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
IGNORED = "ignored"
class TaskParams(BaseModel):
skip_existing: bool = Field(
default=True, description="Whether to skip existing subtitle files"
)
class TasksApiParams(BaseModel):
operation: Literal["DELETE", "RETRY", "IGNORE"] = Field(
..., description="Operation to perform on the tasks"
)
task_id: str | None = Field(
default=None, description="Unique identifier for the task"
)
class SegmentStatistics(BaseModel):
total_segments: int = Field(default=0, description="Total number of subtitle segments")
total_words: int = Field(default=0, description="Total number of candidate words")
cefr_distribution: dict[str, int] = Field(
default_factory=dict, description="Distribution of words by CEFR level"
)
pos_distribution: dict[str, int] = Field(
default_factory=dict, description="Distribution of words by Part of Speech"
)
exam_distribution: dict[str, int] = Field(
default_factory=dict, description="Distribution of words by Examination"
)
def to_string(self) -> str:
cefr_str = ", ".join(
[f"{level}({count})" for level, count in self.cefr_distribution.items()]
)
pos_str = ", ".join(
[f"{pos}({count})" for pos, count in self.pos_distribution.items()]
)
exam_str = ", ".join([f"{exam}({count})" for exam, count in self.exam_distribution.items()])
return (
f"Total Segments: {self.total_segments}\n"
f"Total Words: {self.total_words}\n"
f"CEFR Distribution: {cefr_str if cefr_str else 'N/A'}\n"
f"POS Distribution: {pos_str if pos_str else 'N/A'}\n"
f"Exam Distribution: {exam_str if exam_str else 'N/A'}"
)
class ProcessResult(BaseModel):
"""Result of processing a task."""
message: str | None = Field(
default=None, description="Additional message or error information"
)
status: TaskStatus = Field(
default=TaskStatus.PENDING, description="Current status of the task"
)
statistics: SegmentStatistics | None = Field(default=None, description="Statistics of the task")
class Task(BaseModel):
video_path: str = Field(..., description="Path to the video file")
task_id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique identifier for the task",
)
status: TaskStatus = Field(
default=TaskStatus.PENDING, description="Current status of the task"
)
add_time: str | None = Field(
default=None, description="Add time of the task, format %Y-%m-%d %H:%M:%S"
)
complete_time: str | None = Field(
default=None, description="Complete time of the task"
)
tokens_used: int = Field(default=0, description="Number of used tokens")
message: str | None = Field(
default=None, description="Additional message or error information"
)
params: TaskParams = Field(
default_factory=TaskParams, description="Parameters for the task"
)
statistics: SegmentStatistics | None = Field(default=None, description="Statistics of the task")
class WordMetadata(BaseModel):
start_pos: int = Field(
..., description="Start position of the word in the context sentence"
)
end_pos: int = Field(
..., description="End position of the word in the context sentence"
)
context_id: int = Field(..., description="Identifier of the context sentence")
word_id: int = Field(
default_factory=lambda: IDGenerator().next_id(),
description="Identifier of the word in the context",
)
class PosDef(BaseModel):
# 'art.', 'v.', 'aux.', 'conj.', 'prep.', 'adv.', 'adj.', 'n.', 'vt.', 'pron.', 'det.', 'vi.', 'int.'
# 'num.', 'abbr.', 'na.', 'quant.', 'phr.'
pos: str = Field(..., description="Part of speech")
meanings: list[str] = Field(..., description="List of definitions")
@property
def plaintext(self):
return f"{self.pos} {'; '.join(self.meanings)}"
class WordBase(BaseModel):
text: str = Field(..., description="The word or phrase")
lemma: str = Field(..., description="Lemma form of the word")
pos: UniversalPos = Field(
default=UniversalPos.X, description="Universal POS tag of the word"
)
class Word(WordBase):
phonetics: str | None = Field(
default=None, description="Phonetic transcription of the word"
)
meta: WordMetadata = Field(
default_factory=WordMetadata, description="Additional metadata"
)
cefr: Cefr | None = Field(default=None, description="CEFR level")
exams: list[str] = Field(
default_factory=list,
description="Exams whose vocabulary syllabus include this word",
)
pos_defs: list[PosDef] = Field(
default_factory=list, description="Part of speech definitions"
)
llm_translation: str | None = Field(
default=None, description="LLM generated Chinese translation"
)
llm_usage_context: str | None = Field(
default=None, description="LLM generated cultural context"
)
llm_example_sentences: list[str] = Field(
default_factory=list, description="LLM generated example sentences"
)
@property
def pos_defs_plaintext(self) -> str:
return " ".join(
[
f"{index}. {pos_def.plaintext}"
for index, pos_def in enumerate(self.pos_defs)
]
)
class SubtitleSegment(BaseModel):
index: int = Field(..., description="Index of the subtitle segment")
start_time: int = Field(
..., description="Start time of the subtitle segment in milliseconds"
)
end_time: int = Field(
..., description="End time of the subtitle segment in milliseconds"
)
plaintext: str = Field(..., description="Text content of the subtitle segment")
Chinese: str | None = Field(
default=None, description="Chinese translation of the subtitle segment"
)
candidate_words: list[Word] = Field(
default_factory=list, description="List of words worth learning in the segment"
)
def words_append(self, word: Word):
"""
向字幕片段中添加一个单词到 words_worth_larning 列表中。
:param word: 要添加的单词对象。
"""
self.candidate_words.append(word)
@staticmethod
def _replace_with_spaces(_text):
"""
使用等长的空格替换文本中的 [xxx] 模式。
例如:"[Hi]" 会被替换成 " " (4个空格)
"""
pattern = r"(\[.*?\])"
return re.sub(pattern, lambda match: " " * len(match.group(1)), _text)
@property
def clean_text(self) -> str:
"""
获取清理后的文本内容,去除换行符并将 [xxx] 模式替换为空格。
"""
return SubtitleSegment._replace_with_spaces(self.plaintext.replace("\n", " "))
def __lt__(self, other: object):
if not isinstance(other, SubtitleSegment):
return NotImplemented
return self.index < other.index
class SegmentList(RootModel):
root: list[SubtitleSegment] = Field(
default_factory=list, description="List of subtitle segments"
)
@property
def statistics(self) -> SegmentStatistics:
all_words = [word for seg in self.root for word in seg.candidate_words]
cefr_counts = Counter(word.cefr if word.cefr else "Other" for word in all_words)
pos_counts = Counter(word.pos.value if word.pos else "Other" for word in all_words)
exam_counts = Counter(exam for word in all_words for exam in word.exams)
return SegmentStatistics(
total_segments=len(self.root),
total_words=len(all_words),
cefr_distribution=dict(cefr_counts),
pos_distribution=dict(pos_counts),
)
def context_generator(
self, context_window: int, extra_len: int = 1
) -> Generator[tuple[list[SubtitleSegment], tuple[int, int]], None, None]:
"""
生成包含上下文窗口的字幕片段列表
:param context_window: 上下文窗口大小
:param extra_len: 额外长度,用于调整窗口大小
:yield: 包含上下文的字幕片段列表。
"""
total_segments = len(self.root)
for i in range(total_segments // context_window + 1):
real_start = i * context_window
real_end = min(total_segments, (i + 1) * context_window) - 1
start_index = max(0, i * context_window - extra_len)
end_index = min(total_segments, (i + 1) * context_window + extra_len)
yield (
self.root[start_index:end_index],
(self.root[real_start].index, self.root[real_end].index),
)
def sort(self):
self.root.sort()
@model_validator(mode="after")
def sort_root(self):
self.root.sort()
return self
def __iter__(self) -> Iterator[SubtitleSegment]:
return iter(self.root)
class SpacyToken(BaseModel):
lemma_: str = Field(..., description="Lemma form of the word (string)")
pos_: str = Field(..., description="POS tag of the word")
text: str = Field(..., description="Text of the word")
is_stop: bool = Field(
default=False, description="Indicates if the word is a stop word"
)
is_punct: bool = Field(
default=False, description="Indicates if the word is punctuation"
)
ent_iob_: str = Field(..., description="Entity IOB")
class SpacyNamedEntity(BaseModel):
text: str = Field(..., description="Text of the entity")
label_: str = Field(..., description="Label of the entity")
class NlpResult(BaseModel):
tokens: list[SpacyToken] = Field(default_factory=list, description="List of tokens")
entities: list[SpacyNamedEntity] = Field(
default_factory=list, description="List of named entities"
)
class LlmFeedbackAboutCandidateWord(BaseModel):
should_keep: bool = Field(
..., description="Indicates whether to keep the candidate word"
)
# reason: str | None = Field(default=None, description="Concise reason for the decision")
word_id: int = Field(..., description="Identifier of the word in the context")
text: str | None = Field(default=None, description="The vocabulary word or phrase")
lemma: str | None = Field(default=None, description="Lemma form of the word")
pos: UniversalPos | None = Field(
default=None,
description="Universal POS tag of the word. Options: ADJ, ADV, INTJ, NOUN, PROPN, "
"VERB, ADP, AUX, CCONJ, DET, NUM, PART, PRON, SCONJ, PUNCT, SYM, X",
)
class LlmFeedback(BaseModel):
candidate_words_feedback: list[LlmFeedbackAboutCandidateWord] = Field(
default_factory=list, description="Feedback about candidate words."
)
llm_identified_words: list[WordBase] = Field(
default_factory=list, description="List of words identified by the LLM."
)
class LlmWordEnrichment(BaseModel):
word_id: int = Field(..., description="Identifier of the word in the context")
translation: str | None = Field(
default=None, description="Chinese translation of the word"
)
usage_context: str | None = Field(
default=None, description="Usage or Cultural Context"
)
class LlmEnrichmentResult(BaseModel):
enriched_words: list[LlmWordEnrichment] = Field(
default_factory=list, description="List of enriched word data."
)
class LlmSegmentTranslation(BaseModel):
index: int = Field(..., description="Index of the subtitle segment")
translation: str = Field(
..., description="Natural Chinese translation of the segment"
)
class LlmTranslationResult(BaseModel):
translations: list[LlmSegmentTranslation] = Field(
default_factory=list, description="List of segment translations"
)
class VocabularyAnnotatingToolInput(BaseModel):
explanation: str = Field(
...,
description="This is a tool for adding a new vocabulary-annotating task to AnnotLexi.",
)
video_path: str = Field(..., description="Path to the video file")
skip_existing: bool = Field(
default=True, description="Whether to skip existing subtitle files"
)

View File

@@ -1,29 +1,28 @@
from multiprocessing import Process, Queue
from typing import Dict, List
import spacy
from spacy.tokenizer import Tokenizer
from app.core.cache import cached
from app.log import logger
from .schemas import SpacyNamedEntity, SpacyToken, NlpResult
class SpacyWorker:
def __init__(self, model='en_core_web_sm'):
def __init__(self, model="en_core_web_sm"):
self.task_q = Queue()
self.result_q = Queue()
self.status_q = Queue()
self.model = model
# 启动子进程
logger.info(f"正在启动 SpacyWorker 子进程...")
logger.info("正在启动 SpacyWorker 子进程...")
self.proc = Process(target=self.run, args=(self.model,))
self.proc.start()
# 等待子进程返回模型加载状态
status, info = self.status_q.get()
if status == 'error':
if status == "error":
self.proc.join()
raise RuntimeError(f"spaCy 模型加载失败: {info}")
else:
@@ -39,35 +38,50 @@ class SpacyWorker:
try:
nlp = SpacyWorker.load_nlp(model)
infixes = list(nlp.Defaults.infixes)
infixes = [i for i in infixes if '-' not in i]
infixes = [i for i in infixes if "-" not in i]
infix_re = spacy.util.compile_infix_regex(infixes)
nlp.tokenizer = Tokenizer(
nlp.vocab,
prefix_search=nlp.tokenizer.prefix_search,
suffix_search=nlp.tokenizer.suffix_search,
infix_finditer=infix_re.finditer,
token_match=nlp.tokenizer.token_match
token_match=nlp.tokenizer.token_match,
)
except Exception as e:
self.status_q.put(('error', str(e)))
self.status_q.put(("error", str(e)))
return
# 告诉主进程加载成功
self.status_q.put(('ok', None))
self.status_q.put(("ok", None))
while True:
text = self.task_q.get()
if text is None:
break
doc = nlp(text)
self.result_q.put([{'text': token.text, 'pos_': token.pos_, 'lemma_': token.lemma_} for token in doc])
tokens = []
entities = []
for token in doc:
tokens.append(
SpacyToken(
lemma_=token.lemma_,
pos_=token.pos_,
text=token.text,
is_stop=token.is_stop,
is_punct=token.is_punct,
ent_iob_=token.ent_iob_,
)
)
for ent in doc.ents:
entities.append(SpacyNamedEntity(text=ent.text, label_=ent.label_))
self.result_q.put(NlpResult(tokens=tokens, entities=entities))
@staticmethod
@cached(maxsize=1, ttl=3600 * 6)
def load_nlp(model: str) -> spacy.Language:
return spacy.load(model)
def submit(self, text: str) -> List[Dict[str, str]]:
def submit(self, text: str) -> NlpResult:
"""
提交任务并等待结果
"""

View File

@@ -0,0 +1,44 @@
from typing import Generator, Any, overload
from pysubs2 import SSAEvent
from .schemas import SubtitleSegment
class SubtitleProcessor:
def __init__(self):
self._events: list[SSAEvent] = []
def append(self, event: SSAEvent):
self._events.append(event)
def segment_generator(self) -> Generator[SubtitleSegment, None, None]:
for index, event in enumerate(self._events):
yield SubtitleSegment(
index=index,
start_time=event.start,
end_time=event.end,
plaintext=event.plaintext,
)
@overload
def __getitem__(self, item: int) -> SSAEvent:
pass
@overload
def __getitem__(self, s: slice) -> list[SSAEvent]:
pass
def __getitem__(self, item: Any) -> Any:
return self._events[item]
def style_text(style: str, text: str) -> str:
"""
使用指定的样式包装文本。
:param style: 样式名称
:param text: 要包装的文本
:return: 包含样式的文本
"""
return f"{{\\r{style}}}{text}{{\\r}}"