From 1b2433f7c2d53cc85c53e6c8cd1fbe6e7526ba24 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 11 May 2026 08:54:34 +0800 Subject: [PATCH] feat: implement runtime dependency checks and recovery for plugin installations --- app/helper/plugin.py | 276 +++++++++++++++++++++++--- app/modules/wechatclawbot/__init__.py | 6 +- app/utils/http.py | 1 + docker/entrypoint.sh | 36 ++++ tests/test_plugin_helper.py | 106 ++++++++++ 5 files changed, 393 insertions(+), 32 deletions(-) diff --git a/app/helper/plugin.py b/app/helper/plugin.py index 11583afd..49690f35 100644 --- a/app/helper/plugin.py +++ b/app/helper/plugin.py @@ -5,6 +5,7 @@ import json import shutil import site import sys +import tempfile import threading import traceback import zipfile @@ -48,6 +49,21 @@ class PluginHelper(metaclass=WeakSingleton): _install_statistic = f"{settings.MP_SERVER_HOST}/plugin/statistic" # 串行化运行期依赖安装,避免多个 pip 子进程和导入缓存刷新互相踩踏。 _pip_install_lock = threading.Lock() + # 这些包一旦被插件覆盖,最容易直接拖垮主程序启动,因此冲突提示需要单独高亮。 + _protected_runtime_packages = frozenset({ + "alembic", + "fastapi", + "pydantic", + "pydantic_core", + "pydantic_settings", + "sqlalchemy", + "starlette", + "uvicorn", + }) + _runtime_import_probe = ( + "import alembic, fastapi, pydantic, pydantic_core, pydantic_settings, " + "sqlalchemy, starlette, uvicorn; from pydantic import BaseModel, Field" + ) def __init__(self): self.systemconfig = SystemConfigOper() @@ -830,7 +846,178 @@ class PluginHelper(metaclass=WeakSingleton): return list(dict.fromkeys(wheels_dirs)) @staticmethod - def pip_install_with_fallback(requirements_file: Path, + def __build_pip_install_strategies(base_cmd: List[str]) -> List[Tuple[str, List[str]]]: + """ + 为 pip 命令构建统一的网络降级策略,避免不同安装路径各自拼接参数。 + """ + strategies = [] + if settings.PIP_PROXY: + strategies.append(("镜像站", base_cmd + ["-i", settings.PIP_PROXY])) + if settings.PROXY_HOST: + strategies.append(("代理", base_cmd + ["--proxy", settings.PROXY_HOST])) + strategies.append(("直连", base_cmd)) + return strategies + + @staticmethod + def __format_pkg_name_for_pip(name: str) -> str: + """ + 将内部统一使用的下划线包名转回 pip 更常见的连字符写法,便于日志和约束文件阅读。 + """ + return name.replace("_", "-") + + @classmethod + def __validate_runtime_dependency_conflicts( + cls, + requirements_file: Path, + installed_packages: Dict[str, Version] + ) -> Tuple[bool, str]: + """ + 在真正执行 pip 前,先拦截插件对现有运行环境中已安装包的显式覆盖请求。 + + 共享 venv 场景下,允许插件新增依赖,但不允许它升级/降级已有包,否则不仅主程序, + 其他插件也会被一起污染。 + """ + conflicts = [] + try: + with open(requirements_file, "r", encoding="utf-8") as f: + for raw_line in f: + line = raw_line.strip() + if not line or line.startswith("#"): + continue + try: + requirement = Requirement(line) + except Exception as err: + logger.debug(f"无法解析依赖项 '{line}',跳过运行环境冲突预检:{err}") + continue + + if requirement.marker and not requirement.marker.evaluate(): + continue + + package_name = cls.__standardize_pkg_name(requirement.name) + installed_version = installed_packages.get(package_name) + if installed_version is None: + continue + + if requirement.url: + conflicts.append(( + package_name, + str(installed_version), + f"来自 {requirement.url} 的同名包", + package_name in cls._protected_runtime_packages, + )) + continue + + if requirement.specifier and not requirement.specifier.contains( + installed_version, + prereleases=True + ): + conflicts.append(( + package_name, + str(installed_version), + str(requirement.specifier), + package_name in cls._protected_runtime_packages, + )) + except Exception as e: + logger.error(f"执行运行环境依赖冲突预检时发生错误:{e}") + return False, f"插件依赖预检失败:{e}" + + if not conflicts: + return True, "" + + def sort_key(item: Tuple[str, str, str, bool]) -> Tuple[int, str]: + return 0 if item[3] else 1, item[0] + + details = [] + for package_name, installed_version, expected, _is_protected in sorted(conflicts, key=sort_key)[:5]: + details.append( + f"{cls.__format_pkg_name_for_pip(package_name)} 当前为 {installed_version}," + f"插件要求 {expected}" + ) + if len(conflicts) > 5: + details.append(f"其余 {len(conflicts) - 5} 项冲突已省略") + + scope = "主程序核心依赖" if any(item[3] for item in conflicts) else "已安装依赖" + return False, ( + f"插件依赖与当前运行环境的{scope}冲突:{';'.join(details)}。" + f"为避免共享运行环境被污染,已拒绝安装。" + ) + + @classmethod + def __create_runtime_constraints_file(cls, installed_packages: Dict[str, Version]) -> Path: + """ + 以“当前环境已安装版本”为准生成临时约束文件,确保插件只能新增依赖, + 不能悄悄升级或降级任何已安装包。 + """ + temp_dir = Path(settings.TEMP_PATH) / "plugin_dependencies" + temp_dir.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile( + mode="w", + encoding="utf-8", + dir=temp_dir, + prefix="runtime-constraints-", + suffix=".txt", + delete=False + ) as temp_file: + for package_name, version in sorted(installed_packages.items()): + temp_file.write( + f"{cls.__format_pkg_name_for_pip(package_name)}=={version}\n" + ) + return Path(temp_file.name) + + @staticmethod + def __refresh_import_system(): + """ + 依赖安装或修复后刷新当前解释器的导入缓存,保证后续动态导入能看到新状态。 + """ + importlib.reload(site) + importlib.invalidate_caches() + + @classmethod + def __run_runtime_healthcheck(cls) -> Tuple[bool, str]: + """ + 安装完成后立即执行运行环境自检,尽量在插件加载前发现依赖图已被污染。 + """ + checks = [ + ("pip check", [sys.executable, "-m", "pip", "check"]), + ("核心依赖导入检查", [sys.executable, "-c", cls._runtime_import_probe]), + ] + for check_name, command in checks: + success, message = SystemUtils.execute_with_subprocess(command) + if not success: + return False, f"{check_name}失败:{message}" + return True, "" + + @classmethod + def __repair_main_runtime_dependencies(cls, snapshot_file: Optional[Path] = None) -> Tuple[bool, str]: + """ + 依赖安装后如果发现主运行环境已异常,优先恢复安装前依赖快照; + 若快照不可用,再按主项目依赖重新安装进行自愈。 + """ + repair_target = snapshot_file + repair_desc = "安装前依赖快照" + if repair_target and not repair_target.exists(): + repair_target = None + if repair_target is None: + repair_target = settings.ROOT_PATH / "requirements.txt" + repair_desc = "主程序 requirements.txt" + if not repair_target.exists(): + return False, f"恢复依赖文件不存在:{repair_target}" + + last_error = "" + base_cmd = [sys.executable, "-m", "pip", "install", "-r", str(repair_target)] + for strategy_name, pip_command in cls.__build_pip_install_strategies(base_cmd): + logger.warning(f"[PIP] 运行环境异常,尝试使用策略:{strategy_name} 恢复{repair_desc}") + success, message = SystemUtils.execute_with_subprocess(pip_command) + if success: + cls.__refresh_import_system() + return True, message + last_error = message + logger.error(f"[PIP] 使用策略:{strategy_name} 恢复{repair_desc}失败:{message}") + return False, last_error or f"恢复{repair_desc}失败" + + @classmethod + def pip_install_with_fallback(cls, + requirements_file: Path, find_links_dirs: Optional[List[Path]] = None) -> Tuple[bool, str]: """ 使用自动降级策略安装依赖,并确保新安装的包可被动态导入 @@ -866,36 +1053,71 @@ class PluginHelper(metaclass=WeakSingleton): else: logger.debug(f"[PIP] 未发现可用的 wheels 目录,将仅使用在线源。") - base_cmd = [sys.executable, "-m", "pip", "install"] + find_links_option + ["-r", str(requirements_file)] - strategies = [] + installed_packages = cls.__get_installed_packages() + check_ok, check_message = cls.__validate_runtime_dependency_conflicts(requirements_file, installed_packages) + if not check_ok: + logger.error(f"[PIP] 运行环境冲突预检失败:{check_message}") + return False, check_message - # 添加策略到列表中 - if settings.PIP_PROXY: - strategies.append(("镜像站", base_cmd + ["-i", settings.PIP_PROXY])) - if settings.PROXY_HOST: - strategies.append(("代理", base_cmd + ["--proxy", settings.PROXY_HOST])) - strategies.append(("直连", base_cmd)) + constraints_file = None + try: + constraints_file = cls.__create_runtime_constraints_file(installed_packages) + except Exception as e: + logger.error(f"[PIP] 创建运行环境约束文件失败:{e}") + return False, f"创建运行环境约束文件失败:{e}" + + base_cmd = [sys.executable, "-m", "pip", "install"] + find_links_option + if constraints_file: + # 这里固定约束到当前运行环境的已安装版本,避免共享 venv 被插件重写。 + base_cmd.extend(["-c", str(constraints_file)]) + base_cmd.extend(["-r", str(requirements_file)]) + strategies = cls.__build_pip_install_strategies(base_cmd) + + try: + # pip 会修改当前解释器的 site-packages,安装与缓存刷新必须串行,避免运行态模块被并发安装窗口污染。 + with cls._pip_install_lock: + loaded_modules_before_install = set(sys.modules.keys()) + # 遍历策略进行安装 + for strategy_name, pip_command in strategies: + logger.debug(f"[PIP] 尝试使用策略:{strategy_name} 安装依赖,命令:{' '.join(pip_command)}") + success, message = SystemUtils.execute_with_subprocess(pip_command) + if success: + logger.debug(f"[PIP] 策略:{strategy_name} 安装依赖成功,输出:{message}") + health_ok, health_message = cls.__run_runtime_healthcheck() + if not health_ok: + logger.error(f"[PIP] 依赖安装后运行环境自检失败:{health_message}") + repair_ok, repair_message = cls.__repair_main_runtime_dependencies(constraints_file) + if repair_ok: + health_restored, restored_message = cls.__run_runtime_healthcheck() + if health_restored: + cls.__refresh_import_system() + return False, ( + f"依赖安装后运行环境自检失败,已自动恢复主程序依赖:{health_message}" + ) + logger.error( + f"[PIP] 主程序依赖恢复后仍未通过健康检查:{restored_message}" + ) + return False, ( + f"依赖安装后运行环境自检失败,恢复主程序依赖后仍异常:" + f"{restored_message}" + ) + return False, ( + f"依赖安装后运行环境自检失败,且自动恢复主程序依赖失败:" + f"{repair_message}" + ) + + cls.__refresh_import_system() + loaded_modules_after_install = set(sys.modules.keys()) + loaded_modules_during_install = loaded_modules_after_install - loaded_modules_before_install + logger.debug(f"[PIP] 已刷新导入系统,新加载的模块: {loaded_modules_during_install}") + return True, message - # pip 会修改当前解释器的 site-packages,安装与缓存刷新必须串行,避免运行态模块被并发安装窗口污染。 - with PluginHelper._pip_install_lock: - loaded_modules_before_install = set(sys.modules.keys()) - # 遍历策略进行安装 - for strategy_name, pip_command in strategies: - logger.debug(f"[PIP] 尝试使用策略:{strategy_name} 安装依赖,命令:{' '.join(pip_command)}") - success, message = SystemUtils.execute_with_subprocess(pip_command) - if success: - logger.debug(f"[PIP] 策略:{strategy_name} 安装依赖成功,输出:{message}") - # 刷新导入系统即可发现新安装依赖,同时保持安装窗口内的运行态模块缓存稳定。 - importlib.reload(site) - importlib.invalidate_caches() - loaded_modules_after_install = set(sys.modules.keys()) - loaded_modules_during_install = loaded_modules_after_install - loaded_modules_before_install - logger.debug(f"[PIP] 已刷新导入系统,新加载的模块: {loaded_modules_during_install}") - return True, message - else: logger.error(f"[PIP] 策略:{strategy_name} 安装依赖失败,错误信息:{message}") + finally: + if constraints_file: + constraints_file.unlink(missing_ok=True) - return False, "[PIP] 所有策略均安装依赖失败,请检查网络连接或 PIP 配置" + return False, "[PIP] 所有策略均安装依赖失败,请检查网络连接、PIP 配置或插件依赖约束" @staticmethod def __request_with_fallback(url: str, diff --git a/app/modules/wechatclawbot/__init__.py b/app/modules/wechatclawbot/__init__.py index b2daa3a6..f95e4763 100644 --- a/app/modules/wechatclawbot/__init__.py +++ b/app/modules/wechatclawbot/__init__.py @@ -1,5 +1,5 @@ import json -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, List, Optional, Tuple, Union from app.core.cache import TTLCache from app.core.context import Context, MediaInfo @@ -295,7 +295,3 @@ class WechatClawBotModule(_ModuleBase, _MessageBase[WechatClawBot]): title=message.title, link=message.link, ) - - def register_commands(self, commands: Dict[str, dict]): - """微信 ClawBot 不支持原生菜单命令,统一走文本交互。""" - logger.debug("微信 ClawBot 不支持原生菜单命令,跳过命令注册") diff --git a/app/utils/http.py b/app/utils/http.py index 166690c1..4f1b23b2 100644 --- a/app/utils/http.py +++ b/app/utils/http.py @@ -56,6 +56,7 @@ _SharedTransportKey = Tuple[ int, # max_connections int, # keepalive_expiry ] + # 共享底层 transport 桶,按事件循环和配置区分,支持 LRU 淘汰 _shared_async_transports: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, collections.OrderedDict[_SharedTransportKey, httpx.AsyncHTTPTransport]] = weakref.WeakKeyDictionary() # 不同线程各自驱动的事件循环并发首次写入外层弱字典时,需要互斥保护 diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 8582bce7..2a9d3ed7 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -216,6 +216,39 @@ function graceful_exit() { exit "$exit_code" } +# 启动前先检查后端核心依赖是否仍然可导入。 +# 插件依赖和主程序共用同一套 venv 时,历史安装记录可能已经污染环境, +# 这里优先在真正拉起后端前做一次自愈,避免容器反复起不来。 +function ensure_backend_runtime_dependencies() { + local probe_code="import alembic, fastapi, pydantic, pydantic_core, pydantic_settings, sqlalchemy, starlette, uvicorn; from pydantic import BaseModel, Field" + + INFO "→ 启动前检查后端核心依赖..." + if "${VENV_PATH}/bin/python3" -c "${probe_code}" >/dev/null 2>&1; then + INFO "→ 后端核心依赖检查通过。" + return 0 + fi + + WARN "→ 检测到后端核心依赖异常,开始尝试恢复主程序依赖..." + local -a pip_cmd=("${VENV_PATH}/bin/python3" "-m" "pip" "install" "-r" "/app/requirements.txt") + if [ -n "${PIP_PROXY}" ]; then + pip_cmd+=("-i" "${PIP_PROXY}") + elif [ -n "${PROXY_HOST}" ]; then + pip_cmd+=("--proxy" "${PROXY_HOST}") + fi + + if ! "${pip_cmd[@]}" > /dev/stdout 2> /dev/stderr; then + ERROR "→ 自动恢复主程序依赖失败,后端无法启动。" + exit 1 + fi + + if ! "${VENV_PATH}/bin/python3" -c "${probe_code}" >/dev/null 2>&1; then + ERROR "→ 主程序依赖恢复后仍然异常,后端无法启动。" + exit 1 + fi + + INFO "→ 已自动恢复主程序依赖,继续启动后端。" +} + # 使用env配置 load_config_from_app_env @@ -325,6 +358,9 @@ fi # 设置后端服务权限掩码 umask "${UMASK}" +# 启动前优先确认主运行环境仍然健康,避免插件依赖污染导致服务直接起不来。 +ensure_backend_runtime_dependencies + # 清除非系统环境导入的变量,保证转移到 dumb-init 的时候,不会带入不必要的环境变量 INFO "准备为 Python 应用清理的非系统环境导入的变量..." if [ ${#VARS_SET_BY_SCRIPT[@]} -gt 0 ]; then diff --git a/tests/test_plugin_helper.py b/tests/test_plugin_helper.py index 2c538741..aa31dabd 100644 --- a/tests/test_plugin_helper.py +++ b/tests/test_plugin_helper.py @@ -7,6 +7,8 @@ from types import ModuleType from unittest import TestCase from unittest.mock import patch +from packaging.version import Version + class PluginHelperTest(TestCase): @@ -117,3 +119,107 @@ class PluginHelperTest(TestCase): self.assertEqual([], errors) self.assertEqual(1, max_active_installs) + + def test_pip_install_rejects_conflicting_runtime_dependency(self): + """ + 验证插件如果试图覆盖主程序核心依赖,会在真正执行 pip 前被直接拒绝。 + """ + try: + from app.helper.plugin import PluginHelper + except ModuleNotFoundError as exc: + self.skipTest(f"missing dependency: {exc}") + + with tempfile.TemporaryDirectory() as temp_dir: + requirements_file = Path(temp_dir) / "requirements.txt" + requirements_file.write_text("fastapi<0.1\n", encoding="utf-8") + with patch.object( + PluginHelper, + "_PluginHelper__get_installed_packages", + return_value={"fastapi": Version("0.115.14")} + ): + success, message = PluginHelper.pip_install_with_fallback(requirements_file) + + self.assertFalse(success) + self.assertIn("主程序核心依赖", message) + self.assertIn("fastapi", message) + + def test_pip_install_uses_runtime_constraints_file(self): + """ + 验证插件依赖安装会固定当前运行环境已安装版本,防止共享 venv 被升级或降级。 + """ + try: + from app.helper.plugin import PluginHelper + except ModuleNotFoundError as exc: + self.skipTest(f"missing dependency: {exc}") + + seen_constraints = [] + + def fake_execute(cmd): + if cmd[:4] == [sys.executable, "-m", "pip", "install"]: + constraint_index = cmd.index("-c") + 1 + constraint_file = Path(cmd[constraint_index]) + seen_constraints.append(constraint_file) + self.assertTrue(constraint_file.exists()) + self.assertIn("fastapi==0.115.14", constraint_file.read_text(encoding="utf-8")) + return True, "ok" + return True, "ok" + + with tempfile.TemporaryDirectory() as temp_dir: + requirements_file = Path(temp_dir) / "requirements.txt" + requirements_file.write_text("demo-package\n", encoding="utf-8") + with patch.object( + PluginHelper, + "_PluginHelper__get_installed_packages", + return_value={"fastapi": Version("0.115.14")} + ): + with patch("app.helper.plugin.SystemUtils.execute_with_subprocess", side_effect=fake_execute): + success, message = PluginHelper.pip_install_with_fallback(requirements_file) + + self.assertTrue(success) + self.assertEqual("ok", message) + self.assertEqual(1, len(seen_constraints)) + self.assertFalse(seen_constraints[0].exists()) + + def test_pip_install_repairs_runtime_when_healthcheck_fails(self): + """ + 验证插件依赖安装后若破坏运行环境,会先恢复主程序依赖,再向上层返回失败。 + """ + try: + from app.helper.plugin import PluginHelper + except ModuleNotFoundError as exc: + self.skipTest(f"missing dependency: {exc}") + + repair_commands = [] + healthcheck_failed = False + + def fake_execute(cmd): + nonlocal healthcheck_failed + if cmd[:4] == [sys.executable, "-m", "pip", "install"]: + if "-c" not in cmd: + repair_commands.append(cmd) + return True, "repaired" + return True, "installed" + if cmd[:4] == [sys.executable, "-m", "pip", "check"]: + if not healthcheck_failed: + healthcheck_failed = True + return False, "broken" + return True, "healthy" + if len(cmd) >= 3 and cmd[1] == "-c": + return True, "probe ok" + raise AssertionError(f"unexpected command: {cmd}") + + with tempfile.TemporaryDirectory() as temp_dir: + requirements_file = Path(temp_dir) / "requirements.txt" + requirements_file.write_text("demo-package\n", encoding="utf-8") + with patch.object( + PluginHelper, + "_PluginHelper__get_installed_packages", + return_value={"fastapi": Version("0.115.14")} + ): + with patch("app.helper.plugin.SystemUtils.execute_with_subprocess", side_effect=fake_execute): + success, message = PluginHelper.pip_install_with_fallback(requirements_file) + + self.assertFalse(success) + self.assertIn("已自动恢复主程序依赖", message) + self.assertEqual(1, len(repair_commands)) + self.assertIn("runtime-constraints-", repair_commands[0][-1])