From a59afe4cc9ca53ed9fd58e6357a6f20b37fbcba4 Mon Sep 17 00:00:00 2001 From: InfinityPacer Date: Fri, 8 May 2026 18:18:37 +0800 Subject: [PATCH] fix(plugin): avoid clearing runtime modules after dependency install --- app/helper/plugin.py | 43 ++++++++--------- tests/test_plugin_helper.py | 96 +++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 22 deletions(-) diff --git a/app/helper/plugin.py b/app/helper/plugin.py index ffcc11cc..11583afd 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -5,6 +5,7 @@ import json import shutil import site import sys +import threading import traceback import zipfile from pathlib import Path @@ -45,6 +46,8 @@ class PluginHelper(metaclass=WeakSingleton): _install_reg = f"{settings.MP_SERVER_HOST}/plugin/install/{{pid}}" _install_report = f"{settings.MP_SERVER_HOST}/plugin/install" _install_statistic = f"{settings.MP_SERVER_HOST}/plugin/statistic" + # 串行化运行期依赖安装,避免多个 pip 子进程和导入缓存刷新互相踩踏。 + _pip_install_lock = threading.Lock() def __init__(self): self.systemconfig = SystemConfigOper() @@ -873,28 +876,24 @@ class PluginHelper(metaclass=WeakSingleton): strategies.append(("代理", base_cmd + ["--proxy", settings.PROXY_HOST])) strategies.append(("直连", base_cmd)) - # 记录当前已安装的包,以便后续刷新 - before_installation = set(sys.modules.keys()) - - # 遍历策略进行安装 - for strategy_name, pip_command in strategies: - logger.debug(f"[PIP] 尝试使用策略:{strategy_name} 安装依赖,命令:{' '.join(pip_command)}") - success, message = SystemUtils.execute_with_subprocess(pip_command) - if success: - logger.debug(f"[PIP] 策略:{strategy_name} 安装依赖成功,输出:{message}") - # 安装成功后刷新Python的模块系统 - importlib.reload(site) - # 获取新安装的模块 - current_modules = set(sys.modules.keys()) - new_modules = current_modules - before_installation - # 重新加载新安装的模块 - for module in new_modules: - if module in sys.modules: - del sys.modules[module] - logger.debug(f"[PIP] 已刷新导入系统,新加载的模块: {new_modules}") - return True, message - else: - logger.error(f"[PIP] 策略:{strategy_name} 安装依赖失败,错误信息:{message}") + # pip 会修改当前解释器的 site-packages,安装与缓存刷新必须串行,避免运行态模块被并发安装窗口污染。 + with PluginHelper._pip_install_lock: + loaded_modules_before_install = set(sys.modules.keys()) + # 遍历策略进行安装 + for strategy_name, pip_command in strategies: + logger.debug(f"[PIP] 尝试使用策略:{strategy_name} 安装依赖,命令:{' '.join(pip_command)}") + success, message = SystemUtils.execute_with_subprocess(pip_command) + if success: + logger.debug(f"[PIP] 策略:{strategy_name} 安装依赖成功,输出:{message}") + # 刷新导入系统即可发现新安装依赖,同时保持安装窗口内的运行态模块缓存稳定。 + importlib.reload(site) + importlib.invalidate_caches() + loaded_modules_after_install = set(sys.modules.keys()) + loaded_modules_during_install = loaded_modules_after_install - loaded_modules_before_install + logger.debug(f"[PIP] 已刷新导入系统,新加载的模块: {loaded_modules_during_install}") + return True, message + else: + logger.error(f"[PIP] 策略:{strategy_name} 安装依赖失败,错误信息:{message}") return False, "[PIP] 所有策略均安装依赖失败,请检查网络连接或 PIP 配置" diff --git a/tests/test_plugin_helper.py b/tests/test_plugin_helper.py index 6a25be0c..2c538741 100644 --- a/tests/test_plugin_helper.py +++ b/tests/test_plugin_helper.py @@ -1,4 +1,11 @@ +import sys +import tempfile +import threading +import time +from pathlib import Path +from types import ModuleType from unittest import TestCase +from unittest.mock import patch class PluginHelperTest(TestCase): @@ -21,3 +28,92 @@ class PluginHelperTest(TestCase): "local://TestPlugin?version=v2", PluginHelper.sanitize_repo_url_for_statistic(repo_url) ) + + def test_pip_install_keeps_modules_imported_during_install(self): + """ + 验证依赖安装窗口内被其他任务导入的运行态模块不会被误删。 + """ + try: + from app.helper.plugin import PluginHelper + except ModuleNotFoundError as exc: + self.skipTest(f"missing dependency: {exc}") + + module_names = ["app.plugins.dynamicwechat.helper", "Crypto.Cipher._mode_cbc"] + previous_modules = {name: sys.modules.get(name) for name in module_names} + + def fake_execute(_cmd): + for module_name in module_names: + sys.modules[module_name] = ModuleType(module_name) + return True, "ok" + + try: + with tempfile.TemporaryDirectory() as temp_dir: + requirements_file = Path(temp_dir) / "requirements.txt" + requirements_file.write_text("demo-package\n", encoding="utf-8") + with patch("app.helper.plugin.SystemUtils.execute_with_subprocess", side_effect=fake_execute): + success, message = PluginHelper.pip_install_with_fallback(requirements_file) + + self.assertTrue(success) + self.assertEqual("ok", message) + for module_name in module_names: + self.assertIn(module_name, sys.modules) + finally: + for module_name, previous_module in previous_modules.items(): + if previous_module is None: + sys.modules.pop(module_name, None) + else: + sys.modules[module_name] = previous_module + + def test_pip_install_serializes_concurrent_calls(self): + """ + 验证多个依赖安装请求会复用同一把锁串行执行 pip。 + """ + try: + from app.helper.plugin import PluginHelper + except ModuleNotFoundError as exc: + self.skipTest(f"missing dependency: {exc}") + + thread_count = 2 + active_installs = 0 + max_active_installs = 0 + state_lock = threading.Lock() + start_event = threading.Event() + errors = [] + + def fake_execute(_cmd): + nonlocal active_installs, max_active_installs + with state_lock: + active_installs += 1 + max_active_installs = max(max_active_installs, active_installs) + time.sleep(0.05) + with state_lock: + active_installs -= 1 + return True, "ok" + + def worker(requirements_file: Path): + try: + start_event.wait() + PluginHelper.pip_install_with_fallback(requirements_file) + except Exception as err: # pragma: no cover - 仅用于并发测试失败诊断 + errors.append(err) + + with tempfile.TemporaryDirectory() as temp_dir: + requirements_files = [] + for index in range(thread_count): + requirements_file = Path(temp_dir) / f"requirements-{index}.txt" + requirements_file.write_text("demo-package\n", encoding="utf-8") + requirements_files.append(requirements_file) + + threads = [ + threading.Thread(target=worker, args=(requirements_file,)) + for requirements_file in requirements_files + ] + with patch("app.helper.plugin.SystemUtils.execute_with_subprocess", side_effect=fake_execute): + for thread in threads: + thread.start() + start_event.set() + for thread in threads: + thread.join() + + self.assertEqual([], errors) + self.assertEqual(1, max_active_installs)