diff --git a/app/agent/callback/__init__.py b/app/agent/callback/__init__.py index e4b6095a..548c4bc8 100644 --- a/app/agent/callback/__init__.py +++ b/app/agent/callback/__init__.py @@ -21,8 +21,7 @@ class StreamingHandler: """ 流式Token缓冲管理器 - 负责从 LLM 流式 token 中积累文本,供 Agent 在工具调用之间穿插发送中间消息。 - 当启用流式输出时,通过定时编辑消息将新产生的 tokens 实时推送给用户。 + 负责从 LLM 流式 token 中积累文本,并在支持消息编辑的渠道上实时推送给用户。 工作流程: 1. Agent开始处理时调用 start_streaming(),检查渠道能力并启动定时刷新 @@ -30,8 +29,9 @@ class StreamingHandler: 3. 定时器周期性调用 _flush(): - 第一次有内容时发送新消息(通过 send_direct_message 获取 message_id) - 后续有新内容时编辑同一条消息(通过 edit_message) - 4. 工具调用时 take() 被调用:取走缓冲区内容(如果已流式发送则返回空), - 重置消息状态以便工具调用后的新内容开启新的流式消息 + 4. 工具调用时: + - 流式渠道:工具消息直接 emit() 追加到 buffer,与 Agent 文字合并为同一条流式消息 + - 非流式渠道:调用 take() 取出已积累的文字,与工具消息合并独立发送 5. Agent最终完成时调用 stop_streaming():执行最后一次刷新, 返回是否已通过流式发送完所有内容(调用方据此决定是否还需额外发送) """ @@ -67,47 +67,18 @@ class StreamingHandler: """ 获取当前已积累的消息内容,获取后清空缓冲区。 - 当流式输出启用时: - 1. 先暂停 flush loop(避免与后续发送产生竞争) - 2. 执行最终一次 flush(确保已有内容完整推送到流式消息) - 3. 如果内容已全部通过流式编辑发送给用户,返回空字符串(避免重复发送) - 4. 重置消息状态,以便工具执行后 LLM 产出的新内容开启新的流式消息 - 5. 重新启动 flush loop(恢复后续流式输出能力) - """ - if self._streaming_enabled: - # 暂停 flush loop - await self._cancel_flush_task() - # 执行最终一次 flush,确保当前流式消息是完整的 - await self._flush() + 用于非流式渠道:工具调用前取出 Agent 已产出的文字, + 与工具提示合并后独立发送。 + 注意:流式渠道不调用此方法,工具消息直接 emit 到 buffer 中。 + """ with self._lock: if not self._buffer: - message = "" - already_sent = False - else: - message = self._buffer - logger.info(f"Agent消息: {message}") - - # 如果流式输出已经把内容发给用户了,工具不需要再发 - already_sent = ( - self._streaming_enabled - and self._message_response is not None - and self._sent_text == self._buffer - ) - - self._buffer = "" - - # 重置流式消息状态,下次有新内容时会开启新消息 - self._sent_text = "" - self._message_response = None - - # 恢复 flush loop(工具执行完成后 LLM 继续产出 token 时需要) - if self._streaming_enabled: - await self._restart_flush_loop() - - if already_sent or not message: - return "" - return message + return "" + message = self._buffer + logger.info(f"Agent消息: {message}") + self._buffer = "" + return message def clear(self): """ @@ -225,14 +196,6 @@ class StreamingHandler: pass self._flush_task = None - async def _restart_flush_loop(self): - """ - 重新启动定时刷新任务(用于 take() 后恢复流式输出) - """ - if not self._streaming_enabled: - return - self._flush_task = asyncio.create_task(self._flush_loop()) - async def _flush(self): """ 将当前缓冲区内容刷新到用户消息 diff --git a/app/agent/tools/base.py b/app/agent/tools/base.py index 6f0fe712..07ec5423 100644 --- a/app/agent/tools/base.py +++ b/app/agent/tools/base.py @@ -43,11 +43,6 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): 3. 调用具体工具逻辑(子类实现的 execute 方法) 4. 持久化工具结果到会话记忆 """ - # 获取工具调用前 Agent 已积累的流式文本 - agent_message = ( - await self._stream_handler.take() if self._stream_handler else "" - ) - # 获取工具执行提示消息 tool_message = self.get_tool_message(**kwargs) if not tool_message: @@ -55,16 +50,25 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): if explanation: tool_message = explanation - # 合并 Agent 消息和工具执行消息后一起发送 - messages = [] - if agent_message: - messages.append(agent_message) - if tool_message: - messages.append(f"⚙️ => {tool_message}") + if self._stream_handler and self._stream_handler.is_streaming: + # 流式渠道:工具消息直接追加到 buffer 中,与 Agent 文字合并为同一条流式消息 + if tool_message: + self._stream_handler.emit(f"\n\n⚙️ => {tool_message}") + else: + # 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送 + agent_message = ( + await self._stream_handler.take() if self._stream_handler else "" + ) - if messages: - merged_message = "\n\n".join(messages) - await self.send_tool_message(merged_message, title="MoviePilot助手") + messages = [] + if agent_message: + messages.append(agent_message) + if tool_message: + messages.append(f"⚙️ => {tool_message}") + + if messages: + merged_message = "\n\n".join(messages) + await self.send_tool_message(merged_message, title="MoviePilot助手") logger.debug(f"Executing tool {self.name} with args: {kwargs}")