feat(plugin): enhance dependency management by protecting main program dependencies and refining runtime constraints

This commit is contained in:
jxxghp
2026-05-12 12:38:17 +08:00
parent 1c17c0b07e
commit ac090af606
2 changed files with 303 additions and 25 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
from collections import deque
import importlib
import io
import json
@@ -17,6 +18,7 @@ import aiofiles
import aioshutil
import httpx
from anyio import Path as AsyncPath
from packaging.markers import default_environment
from packaging.requirements import Requirement
from packaging.specifiers import SpecifierSet, InvalidSpecifier
from packaging.version import Version, InvalidVersion
@@ -865,17 +867,214 @@ class PluginHelper(metaclass=WeakSingleton):
"""
return name.replace("_", "-")
@staticmethod
def __marker_matches(marker, extra: str = "") -> bool:
"""
使用当前运行环境和可选 extra 上下文判断 marker 是否生效。
"""
if not marker:
return True
try:
env = default_environment()
env["extra"] = extra
return marker.evaluate(env)
except Exception as err:
logger.debug(f"依赖 marker 计算失败,按不匹配处理:{err}")
return False
@classmethod
def __parse_project_requirement_roots(
cls,
requirements_file: Path,
visited_files: Optional[Set[Path]] = None
) -> Dict[str, Set[str]]:
"""
解析主项目 requirements 文件,收集根依赖及其启用的 extras。
支持递归处理 -r/--requirement忽略索引、约束等 pip 选项。
"""
roots = {}
if visited_files is None:
visited_files = set()
try:
requirements_file = requirements_file.resolve()
except Exception:
requirements_file = Path(requirements_file)
if requirements_file in visited_files:
return roots
visited_files.add(requirements_file)
if not requirements_file.exists():
logger.warning(f"主项目依赖文件不存在:{requirements_file}")
return roots
try:
with open(requirements_file, "r", encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
include_path = None
if line.startswith("-r"):
include_path = line[2:].strip() if line != "-r" else ""
elif line.startswith("--requirement"):
include_path = line[len("--requirement"):].strip()
if include_path is not None:
if include_path.startswith("="):
include_path = include_path[1:].strip()
if not include_path:
logger.debug(f"忽略无法识别的 requirements 引用:{line}")
continue
included_roots = cls.__parse_project_requirement_roots(
requirements_file.parent / include_path,
visited_files
)
for package_name, extras in included_roots.items():
roots.setdefault(package_name, set()).update(extras)
continue
if line.startswith((
"-c", "--constraint", "-i", "--index-url", "--extra-index-url",
"-f", "--find-links", "--trusted-host", "--no-index"
)):
continue
try:
requirement = Requirement(line)
except Exception as err:
logger.debug(f"无法解析主项目依赖项 '{line}'{err}")
continue
if not cls.__marker_matches(requirement.marker):
continue
package_name = cls.__standardize_pkg_name(requirement.name)
roots.setdefault(package_name, set()).update(
extra.lower() for extra in requirement.extras
)
return roots
except Exception as e:
logger.error(f"解析主项目依赖文件失败:{requirements_file} - {e}")
return {}
@classmethod
def __get_installed_distribution_requirements(cls) -> Dict[str, Tuple[Version, List[Requirement]]]:
"""
获取当前环境中每个已安装包的依赖声明,用于展开主程序依赖图。
"""
requirement_graph = {}
try:
for dist in distributions():
name = dist.metadata.get("Name")
if not name:
continue
package_name = cls.__standardize_pkg_name(name)
version_str = dist.metadata.get("Version") or getattr(dist, "version", None)
if not version_str:
continue
try:
version = Version(version_str)
except InvalidVersion:
logger.debug(f"无法解析已安装包 '{package_name}' 的版本:{version_str}")
continue
requirements = []
for raw_requirement in dist.requires or []:
try:
requirements.append(Requirement(raw_requirement))
except Exception as err:
logger.debug(f"无法解析已安装包 '{package_name}' 的依赖项 '{raw_requirement}'{err}")
if package_name not in requirement_graph or version > requirement_graph[package_name][0]:
requirement_graph[package_name] = (version, requirements)
return requirement_graph
except Exception as e:
logger.error(f"收集已安装包依赖图时发生错误:{e}")
return {}
@classmethod
def __get_protected_runtime_packages(
cls,
installed_packages: Optional[Dict[str, Version]] = None
) -> Dict[str, Version]:
"""
仅收集主程序依赖图中的已安装包版本。
主项目 requirements 中声明的根依赖及其当前已安装的传递依赖都会被冻结,
未被主程序依赖图引用的插件自带包允许后续插件按需升级或降级。
"""
if installed_packages is None:
installed_packages = cls.__get_installed_packages()
protected_packages = {
package_name: version
for package_name, version in installed_packages.items()
if package_name in cls._protected_runtime_packages
}
root_requirements_file = settings.ROOT_PATH / "requirements.txt"
if not root_requirements_file.exists():
root_requirements_file = settings.ROOT_PATH / "requirements.in"
root_requirements = cls.__parse_project_requirement_roots(root_requirements_file)
if not root_requirements:
return protected_packages
requirement_graph = cls.__get_installed_distribution_requirements()
active_extras = {
package_name: set(extras)
for package_name, extras in root_requirements.items()
}
pending_packages = deque(active_extras.keys())
processed_extras: Dict[str, Set[str]] = {}
while pending_packages:
package_name = pending_packages.popleft()
selected_extras = active_extras.get(package_name, set())
previous_extras = processed_extras.get(package_name)
if previous_extras is not None and selected_extras.issubset(previous_extras):
continue
processed_extras[package_name] = set(selected_extras)
if package_name in installed_packages:
protected_packages[package_name] = installed_packages[package_name]
_, requirements = requirement_graph.get(package_name, (None, []))
if not requirements:
continue
active_extra_values = [""] + sorted(selected_extras)
for requirement in requirements:
if requirement.marker and not any(
cls.__marker_matches(requirement.marker, extra)
for extra in active_extra_values
):
continue
dep_name = cls.__standardize_pkg_name(requirement.name)
known_extras = active_extras.setdefault(dep_name, set())
before_len = len(known_extras)
known_extras.update(extra.lower() for extra in requirement.extras)
if dep_name not in processed_extras or len(known_extras) != before_len:
pending_packages.append(dep_name)
return protected_packages
@classmethod
def __validate_runtime_dependency_conflicts(
cls,
requirements_file: Path,
installed_packages: Dict[str, Version]
protected_packages: Dict[str, Version]
) -> Tuple[bool, str]:
"""
在真正执行 pip 前,先拦截插件对现有运行环境中已安装包的显式覆盖请求。
在真正执行 pip 前,先拦截插件对主程序依赖的显式覆盖请求。
共享 venv 场景下,允许插件新增依赖,但不允许它升级/降级已有包,否则不仅主程序
其他插件也会被一起污染
共享 venv 场景下,仅冻结主程序依赖;插件新增依赖、以及插件之间共享的额外依赖
允许后续安装继续调整版本
"""
conflicts = []
try:
@@ -890,11 +1089,11 @@ class PluginHelper(metaclass=WeakSingleton):
logger.debug(f"无法解析依赖项 '{line}',跳过运行环境冲突预检:{err}")
continue
if requirement.marker and not requirement.marker.evaluate():
if not cls.__marker_matches(requirement.marker):
continue
package_name = cls.__standardize_pkg_name(requirement.name)
installed_version = installed_packages.get(package_name)
installed_version = protected_packages.get(package_name)
if installed_version is None:
continue
@@ -936,17 +1135,16 @@ class PluginHelper(metaclass=WeakSingleton):
if len(conflicts) > 5:
details.append(f"其余 {len(conflicts) - 5} 项冲突已省略")
scope = "主程序核心依赖" if any(item[3] for item in conflicts) else "已安装依赖"
scope = "主程序核心依赖" if any(item[3] for item in conflicts) else "主程序依赖"
return False, (
f"插件依赖与当前运行环境的{scope}冲突:{''.join(details)}"
f"为避免共享运行环境被污染,已拒绝安装。"
)
@classmethod
def __create_runtime_constraints_file(cls, installed_packages: Dict[str, Version]) -> Path:
def __create_runtime_constraints_file(cls, protected_packages: Dict[str, Version]) -> Path:
"""
“当前环境已安装版本”为准生成临时约束文件,确保插件只能新增依赖
不能悄悄升级或降级任何已安装包。
主程序依赖的当前已安装版本生成临时约束文件,确保插件安装不会改写主程序依赖
"""
temp_dir = Path(settings.TEMP_PATH) / "plugin_dependencies"
temp_dir.mkdir(parents=True, exist_ok=True)
@@ -958,7 +1156,7 @@ class PluginHelper(metaclass=WeakSingleton):
suffix=".txt",
delete=False
) as temp_file:
for package_name, version in sorted(installed_packages.items()):
for package_name, version in sorted(protected_packages.items()):
temp_file.write(
f"{cls.__format_pkg_name_for_pip(package_name)}=={version}\n"
)
@@ -990,11 +1188,11 @@ class PluginHelper(metaclass=WeakSingleton):
@classmethod
def __repair_main_runtime_dependencies(cls, snapshot_file: Optional[Path] = None) -> Tuple[bool, str]:
"""
依赖安装后如果发现主运行环境已异常,优先恢复安装前依赖快照;
依赖安装后如果发现主运行环境已异常,优先恢复主程序依赖快照;
若快照不可用,再按主项目依赖重新安装进行自愈。
"""
repair_target = snapshot_file
repair_desc = "安装前依赖快照"
repair_desc = "主程序依赖快照"
if repair_target and not repair_target.exists():
repair_target = None
if repair_target is None:
@@ -1054,21 +1252,23 @@ class PluginHelper(metaclass=WeakSingleton):
logger.debug(f"[PIP] 未发现可用的 wheels 目录,将仅使用在线源。")
installed_packages = cls.__get_installed_packages()
check_ok, check_message = cls.__validate_runtime_dependency_conflicts(requirements_file, installed_packages)
protected_packages = cls.__get_protected_runtime_packages(installed_packages)
check_ok, check_message = cls.__validate_runtime_dependency_conflicts(requirements_file, protected_packages)
if not check_ok:
logger.error(f"[PIP] 运行环境冲突预检失败:{check_message}")
return False, check_message
constraints_file = None
try:
constraints_file = cls.__create_runtime_constraints_file(installed_packages)
except Exception as e:
logger.error(f"[PIP] 创建运行环境约束文件失败:{e}")
return False, f"创建运行环境约束文件失败:{e}"
if protected_packages:
try:
constraints_file = cls.__create_runtime_constraints_file(protected_packages)
except Exception as e:
logger.error(f"[PIP] 创建运行环境约束文件失败:{e}")
return False, f"创建运行环境约束文件失败:{e}"
base_cmd = [sys.executable, "-m", "pip", "install"] + find_links_option
if constraints_file:
# 这里固定约束到当前运行环境的已安装版本,避免共享 venv 被插件重写
# 这里固定约束到主程序依赖的当前版本,避免共享 venv 被插件改写核心运行环境
base_cmd.extend(["-c", str(constraints_file)])
base_cmd.extend(["-r", str(requirements_file)])
strategies = cls.__build_pip_install_strategies(base_cmd)
@@ -1086,7 +1286,9 @@ class PluginHelper(metaclass=WeakSingleton):
health_ok, health_message = cls.__run_runtime_healthcheck()
if not health_ok:
logger.error(f"[PIP] 依赖安装后运行环境自检失败:{health_message}")
repair_ok, repair_message = cls.__repair_main_runtime_dependencies(constraints_file)
repair_ok, repair_message = cls.__repair_main_runtime_dependencies(
constraints_file if protected_packages else None
)
if repair_ok:
health_restored, restored_message = cls.__run_runtime_healthcheck()
if health_restored: