mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-13 07:26:45 +00:00
Restore background dispatch without channel context
This commit is contained in:
@@ -474,6 +474,7 @@ class MoviePilotAgent:
|
||||
self._tool_context = {
|
||||
"user_reply_sent": False,
|
||||
"reply_mode": None,
|
||||
"should_dispatch_reply": self.should_dispatch_reply,
|
||||
}
|
||||
self._streamed_output = ""
|
||||
|
||||
@@ -579,6 +580,9 @@ class MoviePilotAgent:
|
||||
agent = self._create_agent(streaming=use_streaming)
|
||||
|
||||
if use_streaming:
|
||||
self.stream_handler.set_dispatch_policy(
|
||||
allow_dispatch_without_context=self.should_dispatch_reply
|
||||
)
|
||||
# 流式模式:渠道支持消息编辑,启动流式输出实时推送 token
|
||||
await self.stream_handler.start_streaming(
|
||||
channel=self.channel,
|
||||
|
||||
@@ -62,9 +62,19 @@ class StreamingHandler:
|
||||
self._user_id: Optional[str] = None
|
||||
self._username: Optional[str] = None
|
||||
self._title: str = ""
|
||||
self._allow_dispatch_without_context = False
|
||||
# 非啰嗦模式下的待输出工具统计,等下一段文本到来时再统一补一句摘要
|
||||
self._pending_tool_stats: dict[str, dict[str, Any]] = {}
|
||||
|
||||
def set_dispatch_policy(
|
||||
self, allow_dispatch_without_context: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
设置在缺少渠道上下文时是否仍允许向默认通知渠道分发消息。
|
||||
后台 DISPATCH 任务允许,CAPTURE_ONLY 必须禁止。
|
||||
"""
|
||||
self._allow_dispatch_without_context = allow_dispatch_without_context
|
||||
|
||||
def emit(self, token: str) -> str:
|
||||
"""
|
||||
接收 LLM 流式 token,积累到缓冲区。
|
||||
@@ -435,8 +445,11 @@ class StreamingHandler:
|
||||
if not current_text or current_text == self._sent_text:
|
||||
# 没有新内容需要刷新
|
||||
return
|
||||
if not self._channel or not self._source:
|
||||
logger.debug("流式输出缺少渠道上下文,仅保留 buffer,不外发消息")
|
||||
if (
|
||||
(not self._channel or not self._source)
|
||||
and not self._allow_dispatch_without_context
|
||||
):
|
||||
logger.debug("流式输出缺少渠道上下文,当前模式禁止外发消息")
|
||||
return
|
||||
|
||||
chain = _StreamChain()
|
||||
|
||||
@@ -113,6 +113,9 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
|
||||
if tool_message:
|
||||
self._stream_handler.emit(f"\n\n⚙️ => {tool_message}\n\n")
|
||||
else:
|
||||
allow_dispatch_without_context = self._agent_context.get(
|
||||
"should_dispatch_reply", False
|
||||
)
|
||||
if self._channel and self._source:
|
||||
# 渠道不支持编辑:取出 Agent 文字 + 工具消息合并独立发送
|
||||
agent_message = await self._stream_handler.take()
|
||||
@@ -124,6 +127,16 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
|
||||
if messages:
|
||||
merged_message = "\n\n".join(messages)
|
||||
await self.send_tool_message(merged_message)
|
||||
elif allow_dispatch_without_context:
|
||||
agent_message = await self._stream_handler.take()
|
||||
messages = []
|
||||
if agent_message:
|
||||
messages.append(agent_message)
|
||||
if tool_message:
|
||||
messages.append(f"⚙️ => {tool_message}")
|
||||
if messages:
|
||||
merged_message = "\n\n".join(messages)
|
||||
await self.send_tool_message(merged_message)
|
||||
else:
|
||||
# 后台 capture 流程没有渠道上下文,不能把工具提示回灌到默认通知渠道。
|
||||
self._stream_handler.record_tool_call(
|
||||
|
||||
@@ -201,6 +201,32 @@ class TestAgentToolStreaming(unittest.TestCase):
|
||||
run_in_threadpool_mock.assert_not_awaited()
|
||||
self.assertFalse(handler.has_sent_message)
|
||||
|
||||
def test_flush_without_channel_context_dispatch_allowed_sends_direct_message(self):
|
||||
handler = StreamingHandler()
|
||||
handler._user_id = "10001"
|
||||
handler._username = "tester"
|
||||
handler._streaming_enabled = True
|
||||
handler.set_dispatch_policy(allow_dispatch_without_context=True)
|
||||
handler.emit("hello")
|
||||
|
||||
with patch(
|
||||
"app.agent.callback.run_in_threadpool", new_callable=AsyncMock
|
||||
) as run_in_threadpool_mock:
|
||||
run_in_threadpool_mock.return_value = MessageResponse(
|
||||
message_id=1,
|
||||
chat_id=2,
|
||||
source="telegram",
|
||||
success=True,
|
||||
)
|
||||
|
||||
asyncio.run(handler._flush())
|
||||
|
||||
self.assertEqual(run_in_threadpool_mock.await_count, 1)
|
||||
self.assertEqual(
|
||||
run_in_threadpool_mock.await_args.args[0].__name__, "send_direct_message"
|
||||
)
|
||||
self.assertTrue(handler.has_sent_message)
|
||||
|
||||
def test_verbose_background_tool_call_does_not_post_message(self):
|
||||
async def _run():
|
||||
tool = DummyTool(session_id="session-1", user_id="10001")
|
||||
@@ -225,6 +251,32 @@ class TestAgentToolStreaming(unittest.TestCase):
|
||||
send_tool_message.assert_not_awaited()
|
||||
self.assertEqual(buffered_message, "(调用了 1 次工具)\n\n")
|
||||
|
||||
def test_verbose_background_dispatch_tool_call_can_post_message(self):
|
||||
async def _run():
|
||||
tool = DummyTool(session_id="session-1", user_id="10001")
|
||||
handler = StreamingHandler()
|
||||
await handler.start_streaming()
|
||||
handler.emit("前置内容")
|
||||
tool.set_stream_handler(handler)
|
||||
tool.set_message_attr(channel=None, source=None, username="tester")
|
||||
tool.set_agent_context({"should_dispatch_reply": True})
|
||||
|
||||
with (
|
||||
patch.object(settings, "AI_AGENT_VERBOSE", True),
|
||||
patch.object(
|
||||
DummyTool, "send_tool_message", new_callable=AsyncMock
|
||||
) as send_tool_message,
|
||||
):
|
||||
result = await tool._arun(explanation="run test tool")
|
||||
buffered_message = await handler.take()
|
||||
return result, buffered_message, send_tool_message
|
||||
|
||||
result, buffered_message, send_tool_message = asyncio.run(_run())
|
||||
|
||||
self.assertEqual(result, "ok")
|
||||
send_tool_message.assert_awaited_once_with("前置内容\n\n⚙️ => run test tool")
|
||||
self.assertEqual(buffered_message, "")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user