增加子代理操作日志

This commit is contained in:
jxxghp
2026-06-01 11:31:18 +08:00
parent b7c78da214
commit e4242058e2
2 changed files with 141 additions and 27 deletions

View File

@@ -459,6 +459,9 @@ class _SubAgentAgentProvider:
return profile.name, cached_agent
subagent_tools = _select_tools(self._tools, profile)
logger.info(
f"创建子代理图: subagent_type={profile.name}, tools={len(subagent_tools)}"
)
agent = create_agent(
model=self._model,
tools=subagent_tools,
@@ -478,20 +481,35 @@ class _SubAgentAgentProvider:
"""调用指定子代理并只返回供主代理读取的结果。"""
agent_name, agent = self.get_agent(subagent_type)
thread_suffix = task_id or uuid.uuid4().hex
result = await agent.ainvoke(
{"messages": [HumanMessage(content=description)]},
config={
"configurable": {
"thread_id": f"subagent-{agent_name}-{thread_suffix}",
SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE,
},
"metadata": {
"lc_agent_name": agent_name,
SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE,
},
},
log_task_id = task_id or "-"
logger.info(
f"开始调用子代理: subagent_type={agent_name}, task_id={log_task_id}"
)
try:
result = await agent.ainvoke(
{"messages": [HumanMessage(content=description)]},
config={
"configurable": {
"thread_id": f"subagent-{agent_name}-{thread_suffix}",
SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE,
},
"metadata": {
"lc_agent_name": agent_name,
SUBAGENT_STREAM_MARKER_KEY: SUBAGENT_STREAM_MARKER_VALUE,
},
},
)
except Exception as err:
logger.error(
f"子代理调用失败: subagent_type={agent_name}, "
f"task_id={log_task_id}, error={err}"
)
raise
final_text = _extract_final_text(result)
logger.info(
f"子代理调用完成: subagent_type={agent_name}, "
f"task_id={log_task_id}, result_chars={len(final_text)}"
)
return final_text or "The subagent did not return a usable result."
@@ -713,13 +731,26 @@ class SubAgentTaskControlMiddleware(AgentMiddleware):
"""执行受调度器管理的子代理任务。"""
async with self._semaphore:
record.started_at = datetime.now()
logger.info(
f"异步子代理任务开始执行: task_id={record.task_id}, "
f"subagent_type={record.subagent_type}"
)
try:
return await self._provider.run_task(
result = await self._provider.run_task(
description=record.description,
subagent_type=record.subagent_type,
task_id=record.task_id,
)
logger.info(
f"异步子代理任务执行完成: task_id={record.task_id}, "
f"subagent_type={record.subagent_type}, result_chars={len(result)}"
)
return result
except asyncio.CancelledError:
logger.info(
f"异步子代理任务已取消: task_id={record.task_id}, "
f"subagent_type={record.subagent_type}"
)
raise
except Exception as err:
logger.error(f"子代理任务执行失败: task_id={record.task_id}, error={err}")
@@ -762,6 +793,10 @@ class SubAgentTaskControlMiddleware(AgentMiddleware):
)
self._tasks[task_id] = record
records.append(record)
logger.info(
f"已启动子代理任务: task_id={task_id}, "
f"subagent_type={record.subagent_type}"
)
return records
async def _wait_records(
@@ -774,28 +809,48 @@ class SubAgentTaskControlMiddleware(AgentMiddleware):
"""按等待模式等待一组任务完成。"""
pending_tasks = [record.task for record in records if not record.task.done()]
if not pending_tasks:
if records:
logger.info(f"子代理任务无需等待: tasks={len(records)}")
return
timeout = self._normalize_timeout_ms(timeout_ms) / 1000
normalized_timeout_ms = self._normalize_timeout_ms(timeout_ms)
timeout = normalized_timeout_ms / 1000
if timeout <= 0:
logger.info(
f"子代理任务等待超时时间为 0跳过等待: tasks={len(pending_tasks)}"
)
return
return_when = asyncio.FIRST_COMPLETED if wait_mode == "any" else asyncio.ALL_COMPLETED
return_when = (
asyncio.FIRST_COMPLETED if wait_mode == "any" else asyncio.ALL_COMPLETED
)
logger.info(
f"开始等待子代理任务: tasks={len(pending_tasks)}, "
f"wait_mode={wait_mode}, timeout_ms={normalized_timeout_ms}"
)
await asyncio.wait(
pending_tasks,
timeout=timeout,
return_when=return_when,
)
finished_count = sum(1 for task in pending_tasks if task.done())
logger.info(
f"子代理任务等待结束: finished={finished_count}, "
f"pending={len(pending_tasks) - finished_count}"
)
async def _cancel_records(self, records: list[_SubAgentRuntimeTask]) -> None:
"""取消一组尚未完成的任务。"""
cancellable_tasks = [
record.task for record in records if not record.task.done()
]
if cancellable_tasks:
logger.info(f"开始取消子代理任务: tasks={len(cancellable_tasks)}")
for task in cancellable_tasks:
task.cancel()
if cancellable_tasks:
await asyncio.gather(*cancellable_tasks, return_exceptions=True)
logger.info(f"子代理任务取消完成: tasks={len(cancellable_tasks)}")
async def _control_task(
self,
@@ -809,6 +864,7 @@ class SubAgentTaskControlMiddleware(AgentMiddleware):
timeout_ms: Optional[int] = SUBAGENT_DEFAULT_WAIT_TIMEOUT_MS,
) -> str:
"""管理异步子代理任务。"""
logger.info(f"收到子代理管控操作: action={action}")
if action in {"start", "run"}:
specs, error = self._normalize_specs(
description=description,
@@ -816,8 +872,10 @@ class SubAgentTaskControlMiddleware(AgentMiddleware):
tasks=tasks,
)
if error:
logger.info(f"子代理管控操作未启动任务: action={action}, error={error}")
return self._json_response({"success": False, "error": error})
logger.info(f"准备启动子代理任务: action={action}, tasks={len(specs)}")
records = self._start_tasks(specs)
if action == "run":
await self._wait_records(
@@ -842,13 +900,23 @@ class SubAgentTaskControlMiddleware(AgentMiddleware):
)
if action == "wait":
logger.info(
f"准备等待子代理任务: selected={len(records)}, missing={len(missing_ids)}"
)
await self._wait_records(
records=records,
wait_mode=wait_mode,
timeout_ms=timeout_ms,
)
elif action == "cancel":
logger.info(
f"准备取消子代理任务: selected={len(records)}, missing={len(missing_ids)}"
)
await self._cancel_records(records)
elif action == "status":
logger.info(
f"查询子代理任务状态: selected={len(records)}, missing={len(missing_ids)}"
)
return self._json_response(
{
@@ -865,6 +933,8 @@ class SubAgentTaskControlMiddleware(AgentMiddleware):
unfinished_records = [
record for record in self._tasks.values() if not record.task.done()
]
if unfinished_records:
logger.info(f"Agent 结束,取消未完成子代理任务: tasks={len(unfinished_records)}")
await self._cancel_records(unfinished_records)
@@ -882,20 +952,39 @@ class SubAgentCallSummaryMiddleware(AgentMiddleware):
) -> Any:
"""在子代理任务工具执行时记录聚合摘要。"""
tool = request.tool
if (
tool
and getattr(tool, "name", None)
in {SUBAGENT_TASK_TOOL_NAME, SUBAGENT_CONTROL_TOOL_NAME}
and self.stream_handler
and getattr(self.stream_handler, "is_streaming", False)
):
tool_name = getattr(tool, "name", None)
is_subagent_tool = tool_name in {
SUBAGENT_TASK_TOOL_NAME,
SUBAGENT_CONTROL_TOOL_NAME,
}
if is_subagent_tool:
tool_call = request.tool_call or {}
self.stream_handler.record_tool_call(
tool_name=getattr(tool, "name", SUBAGENT_TASK_TOOL_NAME),
tool_message="Subagent invoked",
tool_kwargs=tool_call.get("args") or {},
tool_args = tool_call.get("args") or {}
if not isinstance(tool_args, dict):
tool_args = {}
logger.info(
f"开始执行子代理工具: tool_name={tool_name}, "
f"action={tool_args.get('action') or '-'}, "
f"subagent_type={tool_args.get('subagent_type') or '-'}"
)
return await handler(request)
if (
self.stream_handler
and getattr(self.stream_handler, "is_streaming", False)
):
self.stream_handler.record_tool_call(
tool_name=tool_name or SUBAGENT_TASK_TOOL_NAME,
tool_message="Subagent invoked",
tool_kwargs=tool_args,
)
try:
result = await handler(request)
except Exception as err:
if is_subagent_tool:
logger.error(f"子代理工具执行失败: tool_name={tool_name}, error={err}")
raise
if is_subagent_tool:
logger.info(f"子代理工具执行完成: tool_name={tool_name}")
return result
def _deepagents_spec(

View File

@@ -12,6 +12,7 @@ from app.agent.middleware.subagents import (
MoviePilotSubAgentMiddleware,
SUBAGENT_CONTROL_TOOL_NAME,
SUBAGENT_TASK_TOOL_NAME,
SubAgentCallSummaryMiddleware,
SubAgentTaskControlMiddleware,
create_subagent_middlewares,
)
@@ -93,6 +94,30 @@ class TestAgentSubagents(unittest.TestCase):
class TestSubAgentTaskControlMiddleware(unittest.IsolatedAsyncioTestCase):
async def test_call_summary_middleware_logs_subagent_tool_operations(self):
"""子代理工具包装层应记录工具执行开始和完成日志。"""
middleware = SubAgentCallSummaryMiddleware()
request = SimpleNamespace(
tool=SimpleNamespace(name=SUBAGENT_CONTROL_TOOL_NAME),
tool_call={
"args": {
"action": "status",
"subagent_type": "general-purpose",
}
},
)
async def _fake_handler(_request):
return "ok"
with patch.object(subagent_module.logger, "info") as log_info:
result = await middleware.awrap_tool_call(request, _fake_handler)
messages = [call.args[0] for call in log_info.call_args_list]
self.assertEqual("ok", result)
self.assertTrue(any("开始执行子代理工具" in message for message in messages))
self.assertTrue(any("子代理工具执行完成" in message for message in messages))
async def test_control_tool_starts_tasks_concurrently_and_waits(self):
"""异步子代理管控工具应批量启动任务,并在 wait 时收集结果。"""
model = FakeListChatModel(responses=["ok"])