diff --git a/app/agent/middleware/subagents.py b/app/agent/middleware/subagents.py index 9dd76d6d..529097b4 100644 --- a/app/agent/middleware/subagents.py +++ b/app/agent/middleware/subagents.py @@ -459,6 +459,9 @@ class _SubAgentAgentProvider: return profile.name, cached_agent subagent_tools = _select_tools(self._tools, profile) + logger.info( + f"创建子代理图: subagent_type={profile.name}, tools={len(subagent_tools)}" + ) agent = create_agent( model=self._model, tools=subagent_tools, @@ -478,20 +481,35 @@ class _SubAgentAgentProvider: """调用指定子代理并只返回供主代理读取的结果。""" agent_name, agent = self.get_agent(subagent_type) thread_suffix = task_id or uuid.uuid4().hex - result = await agent.ainvoke( - {"messages": [HumanMessage(content=description)]}, - config={ - "configurable": { - "thread_id": f"subagent-{agent_name}-{thread_suffix}", - SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE, - }, - "metadata": { - "lc_agent_name": agent_name, - SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE, - }, - }, + log_task_id = task_id or "-" + logger.info( + f"开始调用子代理: subagent_type={agent_name}, task_id={log_task_id}" ) + try: + result = await agent.ainvoke( + {"messages": [HumanMessage(content=description)]}, + config={ + "configurable": { + "thread_id": f"subagent-{agent_name}-{thread_suffix}", + SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE, + }, + "metadata": { + "lc_agent_name": agent_name, + SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE, + }, + }, + ) + except Exception as err: + logger.error( + f"子代理调用失败: subagent_type={agent_name}, " + f"task_id={log_task_id}, error={err}" + ) + raise final_text = _extract_final_text(result) + logger.info( + f"子代理调用完成: subagent_type={agent_name}, " + f"task_id={log_task_id}, result_chars={len(final_text)}" + ) return final_text or "The subagent did not return a usable result." @@ -713,13 +731,26 @@ class SubAgentTaskControlMiddleware(AgentMiddleware): """执行受调度器管理的子代理任务。""" async with self._semaphore: record.started_at = datetime.now() + logger.info( + f"异步子代理任务开始执行: task_id={record.task_id}, " + f"subagent_type={record.subagent_type}" + ) try: - return await self._provider.run_task( + result = await self._provider.run_task( description=record.description, subagent_type=record.subagent_type, task_id=record.task_id, ) + logger.info( + f"异步子代理任务执行完成: task_id={record.task_id}, " + f"subagent_type={record.subagent_type}, result_chars={len(result)}" + ) + return result except asyncio.CancelledError: + logger.info( + f"异步子代理任务已取消: task_id={record.task_id}, " + f"subagent_type={record.subagent_type}" + ) raise except Exception as err: logger.error(f"子代理任务执行失败: task_id={record.task_id}, error={err}") @@ -762,6 +793,10 @@ class SubAgentTaskControlMiddleware(AgentMiddleware): ) self._tasks[task_id] = record records.append(record) + logger.info( + f"已启动子代理任务: task_id={task_id}, " + f"subagent_type={record.subagent_type}" + ) return records async def _wait_records( @@ -774,28 +809,48 @@ class SubAgentTaskControlMiddleware(AgentMiddleware): """按等待模式等待一组任务完成。""" pending_tasks = [record.task for record in records if not record.task.done()] if not pending_tasks: + if records: + logger.info(f"子代理任务无需等待: tasks={len(records)}") return - timeout = self._normalize_timeout_ms(timeout_ms) / 1000 + normalized_timeout_ms = self._normalize_timeout_ms(timeout_ms) + timeout = normalized_timeout_ms / 1000 if timeout <= 0: + logger.info( + f"子代理任务等待超时时间为 0,跳过等待: tasks={len(pending_tasks)}" + ) return - return_when = asyncio.FIRST_COMPLETED if wait_mode == "any" else asyncio.ALL_COMPLETED + return_when = ( + asyncio.FIRST_COMPLETED if wait_mode == "any" else asyncio.ALL_COMPLETED + ) + logger.info( + f"开始等待子代理任务: tasks={len(pending_tasks)}, " + f"wait_mode={wait_mode}, timeout_ms={normalized_timeout_ms}" + ) await asyncio.wait( pending_tasks, timeout=timeout, return_when=return_when, ) + finished_count = sum(1 for task in pending_tasks if task.done()) + logger.info( + f"子代理任务等待结束: finished={finished_count}, " + f"pending={len(pending_tasks) - finished_count}" + ) async def _cancel_records(self, records: list[_SubAgentRuntimeTask]) -> None: """取消一组尚未完成的任务。""" cancellable_tasks = [ record.task for record in records if not record.task.done() ] + if cancellable_tasks: + logger.info(f"开始取消子代理任务: tasks={len(cancellable_tasks)}") for task in cancellable_tasks: task.cancel() if cancellable_tasks: await asyncio.gather(*cancellable_tasks, return_exceptions=True) + logger.info(f"子代理任务取消完成: tasks={len(cancellable_tasks)}") async def _control_task( self, @@ -809,6 +864,7 @@ class SubAgentTaskControlMiddleware(AgentMiddleware): timeout_ms: Optional[int] = SUBAGENT_DEFAULT_WAIT_TIMEOUT_MS, ) -> str: """管理异步子代理任务。""" + logger.info(f"收到子代理管控操作: action={action}") if action in {"start", "run"}: specs, error = self._normalize_specs( description=description, @@ -816,8 +872,10 @@ class SubAgentTaskControlMiddleware(AgentMiddleware): tasks=tasks, ) if error: + logger.info(f"子代理管控操作未启动任务: action={action}, error={error}") return self._json_response({"success": False, "error": error}) + logger.info(f"准备启动子代理任务: action={action}, tasks={len(specs)}") records = self._start_tasks(specs) if action == "run": await self._wait_records( @@ -842,13 +900,23 @@ class SubAgentTaskControlMiddleware(AgentMiddleware): ) if action == "wait": + logger.info( + f"准备等待子代理任务: selected={len(records)}, missing={len(missing_ids)}" + ) await self._wait_records( records=records, wait_mode=wait_mode, timeout_ms=timeout_ms, ) elif action == "cancel": + logger.info( + f"准备取消子代理任务: selected={len(records)}, missing={len(missing_ids)}" + ) await self._cancel_records(records) + elif action == "status": + logger.info( + f"查询子代理任务状态: selected={len(records)}, missing={len(missing_ids)}" + ) return self._json_response( { @@ -865,6 +933,8 @@ class SubAgentTaskControlMiddleware(AgentMiddleware): unfinished_records = [ record for record in self._tasks.values() if not record.task.done() ] + if unfinished_records: + logger.info(f"Agent 结束,取消未完成子代理任务: tasks={len(unfinished_records)}") await self._cancel_records(unfinished_records) @@ -882,20 +952,39 @@ class SubAgentCallSummaryMiddleware(AgentMiddleware): ) -> Any: """在子代理任务工具执行时记录聚合摘要。""" tool = request.tool - if ( - tool - and getattr(tool, "name", None) - in {SUBAGENT_TASK_TOOL_NAME, SUBAGENT_CONTROL_TOOL_NAME} - and self.stream_handler - and getattr(self.stream_handler, "is_streaming", False) - ): + tool_name = getattr(tool, "name", None) + is_subagent_tool = tool_name in { + SUBAGENT_TASK_TOOL_NAME, + SUBAGENT_CONTROL_TOOL_NAME, + } + if is_subagent_tool: tool_call = request.tool_call or {} - self.stream_handler.record_tool_call( - tool_name=getattr(tool, "name", SUBAGENT_TASK_TOOL_NAME), - tool_message="Subagent invoked", - tool_kwargs=tool_call.get("args") or {}, + tool_args = tool_call.get("args") or {} + if not isinstance(tool_args, dict): + tool_args = {} + logger.info( + f"开始执行子代理工具: tool_name={tool_name}, " + f"action={tool_args.get('action') or '-'}, " + f"subagent_type={tool_args.get('subagent_type') or '-'}" ) - return await handler(request) + if ( + self.stream_handler + and getattr(self.stream_handler, "is_streaming", False) + ): + self.stream_handler.record_tool_call( + tool_name=tool_name or SUBAGENT_TASK_TOOL_NAME, + tool_message="Subagent invoked", + tool_kwargs=tool_args, + ) + try: + result = await handler(request) + except Exception as err: + if is_subagent_tool: + logger.error(f"子代理工具执行失败: tool_name={tool_name}, error={err}") + raise + if is_subagent_tool: + logger.info(f"子代理工具执行完成: tool_name={tool_name}") + return result def _deepagents_spec( diff --git a/tests/test_agent_subagents.py b/tests/test_agent_subagents.py index da4eac9e..757a0363 100644 --- a/tests/test_agent_subagents.py +++ b/tests/test_agent_subagents.py @@ -12,6 +12,7 @@ from app.agent.middleware.subagents import ( MoviePilotSubAgentMiddleware, SUBAGENT_CONTROL_TOOL_NAME, SUBAGENT_TASK_TOOL_NAME, + SubAgentCallSummaryMiddleware, SubAgentTaskControlMiddleware, create_subagent_middlewares, ) @@ -93,6 +94,30 @@ class TestAgentSubagents(unittest.TestCase): class TestSubAgentTaskControlMiddleware(unittest.IsolatedAsyncioTestCase): + async def test_call_summary_middleware_logs_subagent_tool_operations(self): + """子代理工具包装层应记录工具执行开始和完成日志。""" + middleware = SubAgentCallSummaryMiddleware() + request = SimpleNamespace( + tool=SimpleNamespace(name=SUBAGENT_CONTROL_TOOL_NAME), + tool_call={ + "args": { + "action": "status", + "subagent_type": "general-purpose", + } + }, + ) + + async def _fake_handler(_request): + return "ok" + + with patch.object(subagent_module.logger, "info") as log_info: + result = await middleware.awrap_tool_call(request, _fake_handler) + + messages = [call.args[0] for call in log_info.call_args_list] + self.assertEqual("ok", result) + self.assertTrue(any("开始执行子代理工具" in message for message in messages)) + self.assertTrue(any("子代理工具执行完成" in message for message in messages)) + async def test_control_tool_starts_tasks_concurrently_and_waits(self): """异步子代理管控工具应批量启动任务,并在 wait 时收集结果。""" model = FakeListChatModel(responses=["ok"])