feat(agent): Telegram与Agent相互时支持流式输出

This commit is contained in:
jxxghp
2026-03-23 19:52:26 +08:00
parent 4bc67dc816
commit c85805b15d
2 changed files with 31 additions and 64 deletions

View File

@@ -21,8 +21,7 @@ class StreamingHandler:
"""
流式Token缓冲管理器
负责从 LLM 流式 token 中积累文本,供 Agent 在工具调用之间穿插发送中间消息
当启用流式输出时,通过定时编辑消息将新产生的 tokens 实时推送给用户。
负责从 LLM 流式 token 中积累文本,并在支持消息编辑的渠道上实时推送给用户
工作流程:
1. Agent开始处理时调用 start_streaming(),检查渠道能力并启动定时刷新
@@ -30,8 +29,9 @@ class StreamingHandler:
3. 定时器周期性调用 _flush()
- 第一次有内容时发送新消息(通过 send_direct_message 获取 message_id
- 后续有新内容时编辑同一条消息(通过 edit_message
4. 工具调用时 take() 被调用:取走缓冲区内容(如果已流式发送则返回空),
重置消息状态以便工具调用后的新内容开启新的流式消息
4. 工具调用时
- 流式渠道:工具消息直接 emit() 追加到 buffer与 Agent 文字合并为同一条流式消息
- 非流式渠道:调用 take() 取出已积累的文字,与工具消息合并独立发送
5. Agent最终完成时调用 stop_streaming():执行最后一次刷新,
返回是否已通过流式发送完所有内容(调用方据此决定是否还需额外发送)
"""
@@ -67,47 +67,18 @@ class StreamingHandler:
"""
获取当前已积累的消息内容,获取后清空缓冲区。
当流式输出启用时:
1. 先暂停 flush loop避免与后续发送产生竞争
2. 执行最终一次 flush确保已有内容完整推送到流式消息
3. 如果内容已全部通过流式编辑发送给用户,返回空字符串(避免重复发送)
4. 重置消息状态,以便工具执行后 LLM 产出的新内容开启新的流式消息
5. 重新启动 flush loop恢复后续流式输出能力
"""
if self._streaming_enabled:
# 暂停 flush loop
await self._cancel_flush_task()
# 执行最终一次 flush确保当前流式消息是完整的
await self._flush()
用于非流式渠道:工具调用前取出 Agent 已产出的文字,
与工具提示合并后独立发送。
注意:流式渠道不调用此方法,工具消息直接 emit 到 buffer 中。
"""
with self._lock:
if not self._buffer:
message = ""
already_sent = False
else:
message = self._buffer
logger.info(f"Agent消息: {message}")
# 如果流式输出已经把内容发给用户了,工具不需要再发
already_sent = (
self._streaming_enabled
and self._message_response is not None
and self._sent_text == self._buffer
)
self._buffer = ""
# 重置流式消息状态,下次有新内容时会开启新消息
self._sent_text = ""
self._message_response = None
# 恢复 flush loop工具执行完成后 LLM 继续产出 token 时需要)
if self._streaming_enabled:
await self._restart_flush_loop()
if already_sent or not message:
return ""
return message
return ""
message = self._buffer
logger.info(f"Agent消息: {message}")
self._buffer = ""
return message
def clear(self):
"""
@@ -225,14 +196,6 @@ class StreamingHandler:
pass
self._flush_task = None
async def _restart_flush_loop(self):
"""
重新启动定时刷新任务(用于 take() 后恢复流式输出)
"""
if not self._streaming_enabled:
return
self._flush_task = asyncio.create_task(self._flush_loop())
async def _flush(self):
"""
将当前缓冲区内容刷新到用户消息

View File

@@ -43,11 +43,6 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
3. 调用具体工具逻辑(子类实现的 execute 方法)
4. 持久化工具结果到会话记忆
"""
# 获取工具调用前 Agent 已积累的流式文本
agent_message = (
await self._stream_handler.take() if self._stream_handler else ""
)
# 获取工具执行提示消息
tool_message = self.get_tool_message(**kwargs)
if not tool_message:
@@ -55,16 +50,25 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
if explanation:
tool_message = explanation
# 合并 Agent 消息和工具执行消息后一起发送
messages = []
if agent_message:
messages.append(agent_message)
if tool_message:
messages.append(f"⚙️ => {tool_message}")
if self._stream_handler and self._stream_handler.is_streaming:
# 流式渠道:工具消息直接追加到 buffer 中,与 Agent 文字合并为同一条流式消息
if tool_message:
self._stream_handler.emit(f"\n\n⚙️ => {tool_message}")
else:
# 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送
agent_message = (
await self._stream_handler.take() if self._stream_handler else ""
)
if messages:
merged_message = "\n\n".join(messages)
await self.send_tool_message(merged_message, title="MoviePilot助手")
messages = []
if agent_message:
messages.append(agent_message)
if tool_message:
messages.append(f"⚙️ => {tool_message}")
if messages:
merged_message = "\n\n".join(messages)
await self.send_tool_message(merged_message, title="MoviePilot助手")
logger.debug(f"Executing tool {self.name} with args: {kwargs}")