diff --git a/app/agent/tools/impl/_plugin_tool_utils.py b/app/agent/tools/impl/_plugin_tool_utils.py index e1ebd92e..5ed60b4d 100644 --- a/app/agent/tools/impl/_plugin_tool_utils.py +++ b/app/agent/tools/impl/_plugin_tool_utils.py @@ -1,5 +1,6 @@ """插件 Agent 工具共享辅助方法""" +import asyncio import json import shutil from typing import Any, Optional @@ -248,7 +249,7 @@ async def install_plugin_runtime( SystemConfigKey.UserInstalledPlugins, install_plugins ) - reload_plugin_runtime(plugin_id) + await asyncio.to_thread(reload_plugin_runtime, plugin_id) return True, message or "插件安装成功", refreshed_only diff --git a/app/helper/plugin.py b/app/helper/plugin.py index b5f60b60..234450e6 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -2117,10 +2117,23 @@ class PluginHelper(metaclass=WeakSingleton): async with aiofiles.open(requirements_file_path, "w", encoding="utf-8") as f: await f.write(requirements_txt) - return self.pip_install_with_fallback(Path(requirements_file_path)) + return await self.__async_pip_install_with_fallback(Path(requirements_file_path)) return True, "" # 如果 requirements.txt 为空,视作成功 + async def __async_pip_install_with_fallback( + self, + requirements_file: Path, + find_links_dirs: Optional[List[Path]] = None) -> Tuple[bool, str]: + """ + 在线程池中执行插件依赖安装,避免同步 pip 子进程阻塞事件循环。 + """ + return await asyncio.to_thread( + self.pip_install_with_fallback, + requirements_file, + find_links_dirs + ) + async def __async_backup_plugin(self, pid: str) -> str: """ 异步备份旧插件目录 @@ -2204,7 +2217,7 @@ class PluginHelper(metaclass=WeakSingleton): # 检查是否存在 requirements.txt 文件 if await requirements_file.exists(): logger.info(f"{pid} 存在依赖,开始尝试安装依赖") - success, error_message = self.pip_install_with_fallback(Path(requirements_file)) + success, error_message = await self.__async_pip_install_with_fallback(Path(requirements_file)) if success: return True, True, "" else: @@ -2234,7 +2247,7 @@ class PluginHelper(metaclass=WeakSingleton): try: # 使用自动降级策略安装依赖 wheels_dirs = self.__collect_plugin_wheels_dirs() - return self.pip_install_with_fallback(Path(requirements_temp_file), wheels_dirs) + return await self.__async_pip_install_with_fallback(Path(requirements_temp_file), wheels_dirs) finally: # 删除临时文件 await requirements_temp_file.unlink() diff --git a/tests/test_agent_plugin_tools.py b/tests/test_agent_plugin_tools.py index 1a3c35df..bab0ed14 100644 --- a/tests/test_agent_plugin_tools.py +++ b/tests/test_agent_plugin_tools.py @@ -5,6 +5,7 @@ from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch from app.agent.tools.impl.install_plugin import InstallPluginTool +from app.agent.tools.impl._plugin_tool_utils import install_plugin_runtime from app.agent.tools.impl.query_installed_plugins import QueryInstalledPluginsTool from app.agent.tools.impl.query_market_plugins import QueryMarketPluginsTool from app.agent.tools.impl.query_plugin_config import QueryPluginConfigTool @@ -170,6 +171,54 @@ class TestAgentPluginTools(unittest.TestCase): "DemoPlugin", "https://example.com/market", force=False ) + def test_install_plugin_runtime_reloads_in_threadpool(self): + plugin_manager = MagicMock() + plugin_manager.get_plugin_ids.return_value = ["DemoPlugin"] + plugin_helper = MagicMock() + plugin_helper.async_install_reg = AsyncMock(return_value=True) + config_oper = MagicMock() + config_oper.get.return_value = ["DemoPlugin"] + calls = [] + + async def fake_to_thread(func, *args, **kwargs): + calls.append((func, args, kwargs)) + return None + + with patch( + "app.agent.tools.impl._plugin_tool_utils.SystemConfigOper", + return_value=config_oper, + ), patch( + "app.agent.tools.impl._plugin_tool_utils.PluginManager", + return_value=plugin_manager, + ), patch( + "app.agent.tools.impl._plugin_tool_utils.PluginHelper", + return_value=plugin_helper, + ), patch( + "app.agent.tools.impl._plugin_tool_utils.reload_plugin_runtime", + ) as reload_runtime, patch( + "app.agent.tools.impl._plugin_tool_utils.asyncio.to_thread", + side_effect=fake_to_thread, + ): + success, message, refreshed_only = asyncio.run( + install_plugin_runtime( + "DemoPlugin", + "https://example.com/market", + force=False, + ) + ) + + self.assertTrue(success) + self.assertEqual("插件已存在,已刷新加载", message) + self.assertTrue(refreshed_only) + plugin_helper.async_install_reg.assert_awaited_once_with( + pid="DemoPlugin", + repo_url="https://example.com/market", + ) + self.assertEqual(1, len(calls)) + self.assertEqual(reload_runtime, calls[0][0]) + self.assertEqual(("DemoPlugin",), calls[0][1]) + self.assertEqual({}, calls[0][2]) + def test_uninstall_plugin_uninstalls_installed_candidate(self): tool = UninstallPluginTool(session_id="session-1", user_id="10001") installed_plugin = self._market_plugin( diff --git a/tests/test_plugin_helper.py b/tests/test_plugin_helper.py index 2b7d2a53..2a6d5a9d 100644 --- a/tests/test_plugin_helper.py +++ b/tests/test_plugin_helper.py @@ -1,3 +1,4 @@ +import asyncio import sys import tempfile import threading @@ -360,3 +361,37 @@ class PluginHelperTest(TestCase): self.assertIn("已自动恢复主程序依赖", message) self.assertEqual(1, len(repair_commands)) self.assertIn("runtime-constraints-", repair_commands[0][-1]) + + def test_async_pip_install_runs_in_threadpool(self): + """ + 验证异步安装路径会把同步 pip 安装派发到线程池,避免阻塞事件循环。 + """ + try: + from app.helper.plugin import PluginHelper + except ModuleNotFoundError as exc: + self.skipTest(f"missing dependency: {exc}") + + helper = PluginHelper() + requirements_file = Path("/tmp/demo-requirements.txt") + find_links_dirs = [Path("/tmp/demo-wheels")] + calls = [] + + async def run_install(): + return await helper._PluginHelper__async_pip_install_with_fallback( + requirements_file, + find_links_dirs + ) + + async def fake_to_thread(func, *args, **kwargs): + calls.append((func, args, kwargs)) + return True, "ok" + + with patch("app.helper.plugin.asyncio.to_thread", side_effect=fake_to_thread): + success, message = asyncio.run(run_install()) + + self.assertTrue(success) + self.assertEqual("ok", message) + self.assertEqual(1, len(calls)) + self.assertEqual(helper.pip_install_with_fallback, calls[0][0]) + self.assertEqual((requirements_file, find_links_dirs), calls[0][1]) + self.assertEqual({}, calls[0][2])