diff --git a/app/agent/__init__.py b/app/agent/__init__.py index e3315a28..80af5d8e 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -219,6 +219,8 @@ class ReplyMode(str, Enum): HEARTBEAT_SESSION_PREFIX = "__agent_heartbeat_" UNSUPPORTED_IMAGE_INPUT_MESSAGE = "当前模型不支持图片输入,请更换支持图片输入的模型,或在系统设置中关闭图片输入支持后重试。" +AGENT_EXECUTION_ERROR_PREFIX = "智能助手执行失败" +AGENT_EXECUTION_ERROR_MESSAGE = "智能助手执行失败,请稍后重试。" class MoviePilotAgent: @@ -495,6 +497,77 @@ class MoviePilotAgent: ) ) + @staticmethod + def _payload_error_message(payload: Any) -> str: + """ + 从 SDK 返回的结构化错误体里提取 message 字段。 + 许多 OpenAI-compatible 服务会把真正原因放在 body.error.message 中。 + """ + if isinstance(payload, dict): + error = payload.get("error") + if isinstance(error, dict) and error.get("message"): + return str(error["message"]) + for key in ("message", "detail", "error_description"): + if payload.get(key): + return str(payload[key]) + return "" + + @staticmethod + def _sanitize_execution_error_message(message: str) -> str: + """ + 清理执行错误中的密钥和尾部长说明,避免把敏感字段或 SDK 调参文档直接发给用户。 + """ + sanitized = re.sub(r"\s+", " ", str(message or "")).strip() + if settings.LLM_API_KEY: + sanitized = sanitized.replace(settings.LLM_API_KEY, "***") + sanitized = re.sub( + r"(?i)(api[_-]?key\s*[:=]\s*)([^\s,;]+)", + r"\1***", + sanitized, + ) + sanitized = re.sub( + r"(?i)authorization\s*:\s*bearer\s+[^\s,;]+", + "Authorization: ***", + sanitized, + ) + for marker in ( + " Tune or disable via ", + " See also ", + " Traceback ", + " - Traceback ", + ): + if marker in sanitized: + sanitized = sanitized.split(marker, 1)[0].strip() + return sanitized + + @classmethod + def _primary_exception_message(cls, error: Exception) -> str: + """ + 从异常对象中抽取最主要的错误消息。 + 优先使用结构化 message,其次回退到异常字符串,保持用户回复直接反映真实失败原因。 + """ + candidates = [ + getattr(error, "message", None), + cls._payload_error_message(getattr(error, "body", None)), + str(error), + ] + for candidate in candidates: + message = cls._sanitize_execution_error_message(candidate) + if message: + return message + return "" + + @classmethod + def _friendly_execution_error_message(cls, error: Exception) -> str: + """ + 将 Agent 执行异常转换为用户可读消息。 + 回复只携带主错误信息,完整 traceback 保留在日志中排查。 + """ + message = cls._primary_exception_message(error) + if not message: + return AGENT_EXECUTION_ERROR_MESSAGE + return f"{AGENT_EXECUTION_ERROR_PREFIX}: {message}" + async def _dispatch_execution_notice(self, message: str) -> None: """ 将执行层可预期的失败转成用户可读提示。 @@ -677,7 +750,10 @@ class MoviePilotAgent: messages.append(HumanMessage(content=content)) # 执行推理 - await self._execute_agent(messages) + result = await self._execute_agent(messages) + if isinstance(result, tuple) and result: + return result[0] + return result except Exception as e: error_message = f"处理消息时发生错误: {str(e)}" @@ -884,7 +960,9 @@ class MoviePilotAgent: await self._dispatch_execution_notice(UNSUPPORTED_IMAGE_INPUT_MESSAGE) return UNSUPPORTED_IMAGE_INPUT_MESSAGE, {} logger.error(f"Agent执行失败: {e} - {traceback.format_exc()}") - return str(e), {} + friendly_message = self._friendly_execution_error_message(e) + await self._dispatch_execution_notice(friendly_message) + return friendly_message, {} finally: # 确保停止流式输出 await self.stream_handler.stop_streaming() diff --git a/tests/test_agent_background_output.py b/tests/test_agent_background_output.py index 7bcb4730..6bff97e0 100644 --- a/tests/test_agent_background_output.py +++ b/tests/test_agent_background_output.py @@ -50,6 +50,10 @@ class _FakeStreamingFailingAgent(_FakeFailingAgent): yield None +class StreamChunkTimeoutError(RuntimeError): + """模拟 langchain_openai 的流式分块超时异常。""" + + class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase): async def test_background_non_streaming_does_not_send_by_default(self): agent = MoviePilotAgent(session_id="bg-test", user_id="system") @@ -153,6 +157,45 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase): ) agent._save_agent_message_to_db.assert_not_awaited() + async def test_streaming_model_chunk_timeout_sends_friendly_notice(self): + """流式模型分块超时时应只把主错误信息发给用户。""" + agent = MoviePilotAgent(session_id="timeout-test", user_id="user-1") + agent.channel = "Telegram" + agent.source = "telegram-test" + agent._tool_context = {"user_reply_sent": False} + agent._streamed_output = "" + agent.stream_handler = SimpleNamespace( + set_dispatch_policy=lambda allow_dispatch_without_context=False: None, + start_streaming=AsyncMock(), + flush_pending_tool_summary=lambda: "", + stop_streaming=AsyncMock(return_value=(False, "")), + ) + agent._should_stream = lambda: True + raw_error = StreamChunkTimeoutError( + "No streaming chunk received for 120.0s " + "(model=mimo-v2.5-pro, chunks_received=1). " + "Tune or disable via the `stream_chunk_timeout` constructor kwarg." + ) + agent._create_agent = AsyncMock( + return_value=_FakeStreamingFailingAgent(raw_error) + ) + agent.send_agent_message = AsyncMock() + agent._save_agent_message_to_db = AsyncMock() + + result, _ = await agent._execute_agent([HumanMessage(content="测试超时")]) + + expected = ( + "智能助手执行失败: No streaming chunk received for 120.0s " + "(model=mimo-v2.5-pro, chunks_received=1)." + ) + self.assertEqual(expected, result) + agent.send_agent_message.assert_awaited_once_with(expected, title="") + sent_message = agent.send_agent_message.await_args.args[0] + self.assertIn("No streaming chunk received for 120.0s", sent_message) + self.assertNotIn("Tune or disable", sent_message) + self.assertEqual(expected, agent._streamed_output) + agent._save_agent_message_to_db.assert_not_awaited() + async def test_background_non_streaming_sends_when_reply_mode_dispatch(self): agent = MoviePilotAgent(session_id="bg-test", user_id="system") agent.channel = None