fix(plugin): avoid clearing runtime modules after dependency install

This commit is contained in:
InfinityPacer
2026-05-08 18:18:37 +08:00
committed by jxxghp
parent 7b6047accf
commit a59afe4cc9
2 changed files with 117 additions and 22 deletions

View File

@@ -5,6 +5,7 @@ import json
import shutil
import site
import sys
import threading
import traceback
import zipfile
from pathlib import Path
@@ -45,6 +46,8 @@ class PluginHelper(metaclass=WeakSingleton):
_install_reg = f"{settings.MP_SERVER_HOST}/plugin/install/{{pid}}"
_install_report = f"{settings.MP_SERVER_HOST}/plugin/install"
_install_statistic = f"{settings.MP_SERVER_HOST}/plugin/statistic"
# 串行化运行期依赖安装,避免多个 pip 子进程和导入缓存刷新互相踩踏。
_pip_install_lock = threading.Lock()
def __init__(self):
self.systemconfig = SystemConfigOper()
@@ -873,28 +876,24 @@ class PluginHelper(metaclass=WeakSingleton):
strategies.append(("代理", base_cmd + ["--proxy", settings.PROXY_HOST]))
strategies.append(("直连", base_cmd))
# 记录当前已安装的包,以便后续刷新
before_installation = set(sys.modules.keys())
# 遍历策略进行安装
for strategy_name, pip_command in strategies:
logger.debug(f"[PIP] 尝试使用策略:{strategy_name} 安装依赖,命令:{' '.join(pip_command)}")
success, message = SystemUtils.execute_with_subprocess(pip_command)
if success:
logger.debug(f"[PIP] 策略:{strategy_name} 安装依赖成功,输出:{message}")
# 安装成功后刷新Python的模块系统
importlib.reload(site)
# 获取新安装的模块
current_modules = set(sys.modules.keys())
new_modules = current_modules - before_installation
# 重新加载新安装的模块
for module in new_modules:
if module in sys.modules:
del sys.modules[module]
logger.debug(f"[PIP] 已刷新导入系统,新加载的模块: {new_modules}")
return True, message
else:
logger.error(f"[PIP] 策略:{strategy_name} 安装依赖失败,错误信息:{message}")
# pip 会修改当前解释器的 site-packages安装与缓存刷新必须串行避免运行态模块被并发安装窗口污染。
with PluginHelper._pip_install_lock:
loaded_modules_before_install = set(sys.modules.keys())
# 遍历策略进行安装
for strategy_name, pip_command in strategies:
logger.debug(f"[PIP] 尝试使用策略:{strategy_name} 安装依赖,命令:{' '.join(pip_command)}")
success, message = SystemUtils.execute_with_subprocess(pip_command)
if success:
logger.debug(f"[PIP] 策略:{strategy_name} 安装依赖成功,输出:{message}")
# 刷新导入系统即可发现新安装依赖,同时保持安装窗口内的运行态模块缓存稳定。
importlib.reload(site)
importlib.invalidate_caches()
loaded_modules_after_install = set(sys.modules.keys())
loaded_modules_during_install = loaded_modules_after_install - loaded_modules_before_install
logger.debug(f"[PIP] 已刷新导入系统,新加载的模块: {loaded_modules_during_install}")
return True, message
else:
logger.error(f"[PIP] 策略:{strategy_name} 安装依赖失败,错误信息:{message}")
return False, "[PIP] 所有策略均安装依赖失败,请检查网络连接或 PIP 配置"

View File

@@ -1,4 +1,11 @@
import sys
import tempfile
import threading
import time
from pathlib import Path
from types import ModuleType
from unittest import TestCase
from unittest.mock import patch
class PluginHelperTest(TestCase):
@@ -21,3 +28,92 @@ class PluginHelperTest(TestCase):
"local://TestPlugin?version=v2",
PluginHelper.sanitize_repo_url_for_statistic(repo_url)
)
def test_pip_install_keeps_modules_imported_during_install(self):
"""
验证依赖安装窗口内被其他任务导入的运行态模块不会被误删。
"""
try:
from app.helper.plugin import PluginHelper
except ModuleNotFoundError as exc:
self.skipTest(f"missing dependency: {exc}")
module_names = ["app.plugins.dynamicwechat.helper", "Crypto.Cipher._mode_cbc"]
previous_modules = {name: sys.modules.get(name) for name in module_names}
def fake_execute(_cmd):
for module_name in module_names:
sys.modules[module_name] = ModuleType(module_name)
return True, "ok"
try:
with tempfile.TemporaryDirectory() as temp_dir:
requirements_file = Path(temp_dir) / "requirements.txt"
requirements_file.write_text("demo-package\n", encoding="utf-8")
with patch("app.helper.plugin.SystemUtils.execute_with_subprocess", side_effect=fake_execute):
success, message = PluginHelper.pip_install_with_fallback(requirements_file)
self.assertTrue(success)
self.assertEqual("ok", message)
for module_name in module_names:
self.assertIn(module_name, sys.modules)
finally:
for module_name, previous_module in previous_modules.items():
if previous_module is None:
sys.modules.pop(module_name, None)
else:
sys.modules[module_name] = previous_module
def test_pip_install_serializes_concurrent_calls(self):
"""
验证多个依赖安装请求会复用同一把锁串行执行 pip。
"""
try:
from app.helper.plugin import PluginHelper
except ModuleNotFoundError as exc:
self.skipTest(f"missing dependency: {exc}")
thread_count = 2
active_installs = 0
max_active_installs = 0
state_lock = threading.Lock()
start_event = threading.Event()
errors = []
def fake_execute(_cmd):
nonlocal active_installs, max_active_installs
with state_lock:
active_installs += 1
max_active_installs = max(max_active_installs, active_installs)
time.sleep(0.05)
with state_lock:
active_installs -= 1
return True, "ok"
def worker(requirements_file: Path):
try:
start_event.wait()
PluginHelper.pip_install_with_fallback(requirements_file)
except Exception as err: # pragma: no cover - 仅用于并发测试失败诊断
errors.append(err)
with tempfile.TemporaryDirectory() as temp_dir:
requirements_files = []
for index in range(thread_count):
requirements_file = Path(temp_dir) / f"requirements-{index}.txt"
requirements_file.write_text("demo-package\n", encoding="utf-8")
requirements_files.append(requirements_file)
threads = [
threading.Thread(target=worker, args=(requirements_file,))
for requirements_file in requirements_files
]
with patch("app.helper.plugin.SystemUtils.execute_with_subprocess", side_effect=fake_execute):
for thread in threads:
thread.start()
start_event.set()
for thread in threads:
thread.join()
self.assertEqual([], errors)
self.assertEqual(1, max_active_installs)