Files
archived-MoviePilot-Plugins/plugins.v2/lexiannot/agenttool.py
2025-12-22 18:01:19 +08:00

131 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from typing import Optional, Type
from pydantic import BaseModel
from app.agent.tools.base import MoviePilotTool
from app.core.plugin import PluginManager
from .schemas import VocabularyAnnotatingToolInput, QueryAnnotationTasksToolInput, Task
class VocabularyAnnotatingTool(MoviePilotTool):
"""词汇标注工具"""
# 工具名称
name: str = "vocabulary_annotating_tool"
# 工具描述
description: str = "Add new vocabulary annotation task to plugin LexiAnnot's task queue."
# 输入参数模型
args_schema: Type[BaseModel] = VocabularyAnnotatingToolInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据订阅参数生成友好的提示消息"""
skip_existing = kwargs.get("skip_existing", False)
video_path = kwargs.get("video_path", "")
message = f"正在添加字幕任务: {video_path!r}"
if skip_existing:
message += "(覆写方式:跳过已存在的字幕文件)"
else:
message += "(覆写方式:覆盖已存在的字幕文件)"
return message
async def run(self, video_path: str, skip_existing: bool = True, **kwargs) -> str:
"""
实现工具的核心逻辑(异步方法)
:param video_path: Path to the video file
:param skip_existing: Whether to skip existing subtitle files
:param kwargs: 其他参数,包含 explanation工具使用说明
:return: 工具执行结果,返回字符串格式
"""
try:
# 执行工具逻辑
result = await self._perform_operation(video_path, skip_existing)
# 返回执行结果
if not result:
return f"成功添加词汇标注任务: {video_path!r}"
else:
return f"添加任务出错: {result}"
except Exception as e:
return f"执行失败: {str(e)}"
async def _perform_operation(
self, video_path: str, skip_existing: bool
) -> str | None:
"""内部方法,执行具体操作"""
# 实现具体业务逻辑
plugins = PluginManager().running_plugins
plugin_instance = plugins.get("LexiAnnot")
if not plugin_instance:
return "LexiAnnot 插件未运行"
res = await asyncio.to_thread(
plugin_instance.add_task, video_file=video_path, skip_existing=skip_existing
)
if not res:
return "任务添加失败"
return None
class QueryAnnotationTasksTool(MoviePilotTool):
"""词汇标注任务查询工具"""
# 工具名称
name: str = "query_annotation_tasks_tool"
# 工具描述
description: str = "Query the latest vocabulary annotation tasks from plugin LexiAnnot."
# 输入参数模型
args_schema: Type[BaseModel] = QueryAnnotationTasksToolInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据订阅参数生成友好的提示消息"""
count = kwargs.get("count", 5)
return f"正在查询最近的 {count} 条字幕标注任务"
async def run(self, count: int, **kwargs) -> str:
"""
实现工具的核心逻辑(异步方法)
:param count: The max number of returned annotation tasks
:param kwargs: 其他参数,包含 explanation工具使用说明
:return: 工具执行结果,返回字符串格式
"""
try:
# 执行工具逻辑
plugins = PluginManager().running_plugins
plugin_instance = plugins.get("LexiAnnot")
if not plugin_instance:
return "LexiAnnot 插件未运行"
total: list[Task] = plugin_instance.get_tasks()
# Handle potential None in add_time
total.sort(key=lambda t: t.add_time or "", reverse=True)
tasks = total[:count]
if not tasks:
return "未查询到相关任务"
result_lines = [f"最近 {len(tasks)} 条标注任务:"]
for task in tasks:
status_val = (
task.status.value
if hasattr(task.status, "value")
else str(task.status)
)
info = f"\n🎥 **{task.video_path}**"
info += f"\n ID: {task.task_id}"
info += f"\n Status: {status_val}"
info += f"\n Added: {task.add_time or 'N/A'}"
if task.complete_time:
info += f"\n Completed: {task.complete_time}"
if task.message:
info += f"\n Message: {task.message}"
if task.statistics:
info += f"\n Words: {task.statistics.total_words} | Segments: {task.statistics.total_segments}"
result_lines.append(info)
return "\n".join(result_lines)
except Exception as e:
return f"执行失败: {str(e)}"