From 23755086160f4b36a4d87268fb36fd4fb4742933 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 30 Apr 2026 07:04:59 +0800 Subject: [PATCH] Restore background dispatch without channel context --- app/agent/__init__.py | 4 +++ app/agent/callback/__init__.py | 17 ++++++++-- app/agent/tools/base.py | 13 ++++++++ tests/test_agent_tool_streaming.py | 52 ++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 2 deletions(-) diff --git a/app/agent/__init__.py b/app/agent/__init__.py index d2be4ed0..a35423be 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -474,6 +474,7 @@ class MoviePilotAgent: self._tool_context = { "user_reply_sent": False, "reply_mode": None, + "should_dispatch_reply": self.should_dispatch_reply, } self._streamed_output = "" @@ -579,6 +580,9 @@ class MoviePilotAgent: agent = self._create_agent(streaming=use_streaming) if use_streaming: + self.stream_handler.set_dispatch_policy( + allow_dispatch_without_context=self.should_dispatch_reply + ) # 流式模式:渠道支持消息编辑,启动流式输出实时推送 token await self.stream_handler.start_streaming( channel=self.channel, diff --git a/app/agent/callback/__init__.py b/app/agent/callback/__init__.py index 3bfede67..b65bee66 100644 --- a/app/agent/callback/__init__.py +++ b/app/agent/callback/__init__.py @@ -62,9 +62,19 @@ class StreamingHandler: self._user_id: Optional[str] = None self._username: Optional[str] = None self._title: str = "" + self._allow_dispatch_without_context = False # 非啰嗦模式下的待输出工具统计,等下一段文本到来时再统一补一句摘要 self._pending_tool_stats: dict[str, dict[str, Any]] = {} + def set_dispatch_policy( + self, allow_dispatch_without_context: bool = False + ) -> None: + """ + 设置在缺少渠道上下文时是否仍允许向默认通知渠道分发消息。 + 后台 DISPATCH 任务允许,CAPTURE_ONLY 必须禁止。 + """ + self._allow_dispatch_without_context = allow_dispatch_without_context + def emit(self, token: str) -> str: """ 接收 LLM 流式 token,积累到缓冲区。 @@ -435,8 +445,11 @@ class StreamingHandler: if not current_text or current_text == self._sent_text: # 没有新内容需要刷新 return - if not self._channel or not self._source: - logger.debug("流式输出缺少渠道上下文,仅保留 buffer,不外发消息") + if ( + (not self._channel or not self._source) + and not self._allow_dispatch_without_context + ): + logger.debug("流式输出缺少渠道上下文,当前模式禁止外发消息") return chain = _StreamChain() diff --git a/app/agent/tools/base.py b/app/agent/tools/base.py index a40a69b3..89fb2492 100644 --- a/app/agent/tools/base.py +++ b/app/agent/tools/base.py @@ -113,6 +113,9 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): if tool_message: self._stream_handler.emit(f"\n\n⚙️ => {tool_message}\n\n") else: + allow_dispatch_without_context = self._agent_context.get( + "should_dispatch_reply", False + ) if self._channel and self._source: # 渠道不支持编辑:取出 Agent 文字 + 工具消息合并独立发送 agent_message = await self._stream_handler.take() @@ -124,6 +127,16 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): if messages: merged_message = "\n\n".join(messages) await self.send_tool_message(merged_message) + elif allow_dispatch_without_context: + agent_message = await self._stream_handler.take() + messages = [] + if agent_message: + messages.append(agent_message) + if tool_message: + messages.append(f"⚙️ => {tool_message}") + if messages: + merged_message = "\n\n".join(messages) + await self.send_tool_message(merged_message) else: # 后台 capture 流程没有渠道上下文,不能把工具提示回灌到默认通知渠道。 self._stream_handler.record_tool_call( diff --git a/tests/test_agent_tool_streaming.py b/tests/test_agent_tool_streaming.py index 617ddeb9..de427019 100644 --- a/tests/test_agent_tool_streaming.py +++ b/tests/test_agent_tool_streaming.py @@ -201,6 +201,32 @@ class TestAgentToolStreaming(unittest.TestCase): run_in_threadpool_mock.assert_not_awaited() self.assertFalse(handler.has_sent_message) + def test_flush_without_channel_context_dispatch_allowed_sends_direct_message(self): + handler = StreamingHandler() + handler._user_id = "10001" + handler._username = "tester" + handler._streaming_enabled = True + handler.set_dispatch_policy(allow_dispatch_without_context=True) + handler.emit("hello") + + with patch( + "app.agent.callback.run_in_threadpool", new_callable=AsyncMock + ) as run_in_threadpool_mock: + run_in_threadpool_mock.return_value = MessageResponse( + message_id=1, + chat_id=2, + source="telegram", + success=True, + ) + + asyncio.run(handler._flush()) + + self.assertEqual(run_in_threadpool_mock.await_count, 1) + self.assertEqual( + run_in_threadpool_mock.await_args.args[0].__name__, "send_direct_message" + ) + self.assertTrue(handler.has_sent_message) + def test_verbose_background_tool_call_does_not_post_message(self): async def _run(): tool = DummyTool(session_id="session-1", user_id="10001") @@ -225,6 +251,32 @@ class TestAgentToolStreaming(unittest.TestCase): send_tool_message.assert_not_awaited() self.assertEqual(buffered_message, "(调用了 1 次工具)\n\n") + def test_verbose_background_dispatch_tool_call_can_post_message(self): + async def _run(): + tool = DummyTool(session_id="session-1", user_id="10001") + handler = StreamingHandler() + await handler.start_streaming() + handler.emit("前置内容") + tool.set_stream_handler(handler) + tool.set_message_attr(channel=None, source=None, username="tester") + tool.set_agent_context({"should_dispatch_reply": True}) + + with ( + patch.object(settings, "AI_AGENT_VERBOSE", True), + patch.object( + DummyTool, "send_tool_message", new_callable=AsyncMock + ) as send_tool_message, + ): + result = await tool._arun(explanation="run test tool") + buffered_message = await handler.take() + return result, buffered_message, send_tool_message + + result, buffered_message, send_tool_message = asyncio.run(_run()) + + self.assertEqual(result, "ok") + send_tool_message.assert_awaited_once_with("前置内容\n\n⚙️ => run test tool") + self.assertEqual(buffered_message, "") + if __name__ == "__main__": unittest.main()