diff --git a/app/modules/feishu/feishu.py b/app/modules/feishu/feishu.py index 2b943db8..993591ff 100644 --- a/app/modules/feishu/feishu.py +++ b/app/modules/feishu/feishu.py @@ -1,9 +1,11 @@ +import asyncio import json import threading import uuid from typing import Any, Dict, List, Optional, Tuple import lark_oapi as lark +import lark_oapi.ws.client as lark_ws_client_module from lark_oapi.api.im.v1 import ( CreateMessageRequest, CreateMessageRequestBody, @@ -56,6 +58,7 @@ class Feishu: self._ready = threading.Event() self._stop_event = threading.Event() self._ws_thread: Optional[threading.Thread] = None + self._ws_loop: Optional[asyncio.AbstractEventLoop] = None self._user_chat_mapping: Dict[str, str] = {} self._user_receive_id_type_mapping: Dict[str, str] = {} self._chat_open_mapping: Dict[str, str] = {} @@ -100,6 +103,18 @@ class Feishu: def _run_ws_client(self) -> None: """在后台线程中运行飞书长连接客户端。""" + original_select = lark_ws_client_module._select + original_loop = lark_ws_client_module.loop + loop = asyncio.new_event_loop() + self._ws_loop = loop + asyncio.set_event_loop(loop) + lark_ws_client_module.loop = loop + + async def _wait_for_stop() -> None: + while not self._stop_event.is_set(): + await asyncio.sleep(1) + + lark_ws_client_module._select = _wait_for_stop try: self._ws_client = lark.ws.Client( self._app_id, @@ -116,6 +131,23 @@ class Feishu: self._ready.clear() if not self._stop_event.is_set(): logger.error(f"飞书长连接服务启动失败:{err}") + finally: + lark_ws_client_module._select = original_select + lark_ws_client_module.loop = original_loop + pending_tasks = [ + task + for task in asyncio.all_tasks(loop) + if not task.done() + ] + for task in pending_tasks: + task.cancel() + if pending_tasks: + loop.run_until_complete( + asyncio.gather(*pending_tasks, return_exceptions=True) + ) + loop.close() + asyncio.set_event_loop(None) + self._ws_loop = None def _forward_to_message_chain(self, payload: dict) -> None: """将飞书入站消息转发到统一消息入口,复用现有交互主链。""" @@ -262,14 +294,16 @@ class Feishu: self._stop_event.set() self._ready.clear() ws_client = self._ws_client + ws_loop = self._ws_loop if ws_client: try: ws_client._auto_reconnect = False - if ws_client._conn is not None: - try: - ws_client._conn.close() - except Exception as err: - logger.debug(f"关闭飞书连接失败:{err}") + if ws_loop and ws_loop.is_running(): + disconnect_future = asyncio.run_coroutine_threadsafe( + ws_client._disconnect(), + ws_loop, + ) + disconnect_future.result(timeout=5) except Exception as err: logger.debug(f"停止飞书客户端失败:{err}") if self._ws_thread and self._ws_thread.is_alive(): diff --git a/docker/Dockerfile b/docker/Dockerfile index 974cce11..6bb9a39a 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -66,10 +66,18 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # 安装 Python 构建依赖并创建虚拟环境 WORKDIR /app COPY requirements.in requirements.in +COPY scripts/uv-pip-compat.sh /usr/local/bin/uv-pip-compat RUN python3 -m venv ${VENV_PATH} \ - && pip install --upgrade "pip" \ - && pip install "Cython~=3.1.2" "pip-tools" \ - && pip-compile requirements.in \ + && env UV_INSTALL_DIR=/usr/local/bin sh -c "$(curl -LsSf https://astral.sh/uv/install.sh)" \ + && chmod +x /usr/local/bin/uv-pip-compat \ + && ln -sf /usr/local/bin/uv ${VENV_PATH}/bin/uv \ + && ln -sf /usr/local/bin/uv-pip-compat ${VENV_PATH}/bin/pip \ + && ln -sf /usr/local/bin/uv-pip-compat ${VENV_PATH}/bin/pip3 \ + && ln -sf /usr/local/bin/uv-pip-compat ${VENV_PATH}/bin/pip3.12 \ + && ln -sf /usr/local/bin/uv-pip-compat ${VENV_PATH}/bin/pip-compile \ + && ln -sf /usr/local/bin/uv-pip-compat ${VENV_PATH}/bin/pip-sync \ + && pip install "Cython~=3.1.2" \ + && pip-compile requirements.in -o requirements.txt \ && pip install -r requirements.txt # 下载准备代码 @@ -98,6 +106,8 @@ ENV LD_PRELOAD="/usr/local/lib/libjemalloc.so" # python 环境 COPY --from=prepare_venv --chmod=777 ${VENV_PATH} ${VENV_PATH} +COPY --from=prepare_venv /usr/local/bin/uv /usr/local/bin/uv +COPY --from=prepare_venv /usr/local/bin/uv-pip-compat /usr/local/bin/uv-pip-compat # playwright 环境 RUN playwright install-deps chromium \ diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 2f787f83..23e2a2b3 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -229,7 +229,7 @@ function ensure_backend_runtime_dependencies() { fi WARN "→ 检测到后端核心依赖异常,开始尝试恢复主程序依赖..." - local -a pip_cmd=("${VENV_PATH}/bin/python3" "-m" "pip" "install" "-r" "/app/requirements.txt") + local -a pip_cmd=("${VENV_PATH}/bin/pip" "install" "-r" "/app/requirements.txt") if [ -n "${PIP_PROXY}" ]; then pip_cmd+=("-i" "${PIP_PROXY}") elif [ -n "${PROXY_HOST}" ]; then diff --git a/docker/update.sh b/docker/update.sh index 55fa2be7..fb9fc1bc 100644 --- a/docker/update.sh +++ b/docker/update.sh @@ -69,13 +69,13 @@ function install_backend_and_download_resources() { # 复制新的requirements.in cp "${TMP_PATH}/App/requirements.in" /app/requirements.in # 重新编译依赖 - if ! ${VENV_PATH}/bin/pip-compile /app/requirements.in; then + if ! ${VENV_PATH}/bin/pip-compile /app/requirements.in -o /app/requirements.txt; then ERROR "依赖编译失败,恢复原依赖" cp /tmp/requirements.txt.backup /app/requirements.txt return 1 fi # 安装新依赖 - if ! ${VENV_PATH}/bin/pip install ${PIP_OPTIONS} --root-user-action=ignore -r /app/requirements.txt; then + if ! ${VENV_PATH}/bin/pip install ${PIP_OPTIONS} -r /app/requirements.txt; then ERROR "依赖安装失败,恢复原依赖" cp /tmp/requirements.txt.backup /app/requirements.txt return 1 diff --git a/scripts/bootstrap-local.sh b/scripts/bootstrap-local.sh index 5f6556f5..b97cfac2 100755 --- a/scripts/bootstrap-local.sh +++ b/scripts/bootstrap-local.sh @@ -427,7 +427,7 @@ ensure_prereqs() { exit 1 fi - if ! ensure_base_tools || ! ensure_python; then + if ! ensure_base_tools || ! ensure_python || ! ensure_uv; then python_install_hint exit 1 fi diff --git a/scripts/local_setup.py b/scripts/local_setup.py index d36d4c01..02ddb428 100644 --- a/scripts/local_setup.py +++ b/scripts/local_setup.py @@ -541,6 +541,71 @@ def get_venv_python(venv_dir: Path) -> Path: return venv_dir / "bin" / "python" +def get_venv_bin_dir(venv_dir: Path) -> Path: + if os.name == "nt": + return venv_dir / "Scripts" + return venv_dir / "bin" + + +def get_venv_pip(venv_dir: Path) -> Path: + if os.name == "nt": + return get_venv_bin_dir(venv_dir) / "pip.exe" + return get_venv_bin_dir(venv_dir) / "pip" + + +def _ensure_uv_available_for_venv(venv_dir: Path, venv_python: Path) -> Optional[Path]: + if os.name == "nt": + return None + + venv_bin = get_venv_bin_dir(venv_dir) + uv_bin = venv_bin / "uv" + if uv_bin.exists(): + return uv_bin + + system_uv = shutil.which("uv") + if system_uv: + uv_target = Path(system_uv).expanduser().resolve() + print_step(f"复用系统 uv:{uv_target}") + if uv_bin.exists() or uv_bin.is_symlink(): + uv_bin.unlink() + uv_bin.symlink_to(uv_target) + return uv_bin + + print_step("当前未检测到 uv,先在虚拟环境内安装 uv") + run([str(venv_python), "-m", "pip", "install", "--upgrade", "pip", "uv"]) + if uv_bin.exists(): + return uv_bin + raise RuntimeError("uv 安装完成,但虚拟环境中未找到 uv 可执行文件") + + +def configure_venv_pip_compat(venv_dir: Path, venv_python: Path) -> Path: + if os.name == "nt": + return get_venv_pip(venv_dir) + + _ensure_uv_available_for_venv(venv_dir, venv_python) + venv_bin = get_venv_bin_dir(venv_dir) + wrapper_src = ROOT / "scripts" / "uv-pip-compat.sh" + wrapper_dst = venv_bin / "uv-pip-compat" + shutil.copy2(wrapper_src, wrapper_dst) + wrapper_dst.chmod(0o755) + + python_version = get_python_version(str(venv_python)) + compat_links = { + "pip", + "pip3", + f"pip{python_version[0]}", + f"pip{python_version[0]}.{python_version[1]}", + "pip-compile", + "pip-sync", + } + for link_name in compat_links: + link_path = venv_bin / link_name + if link_path.exists() or link_path.is_symlink(): + link_path.unlink() + link_path.symlink_to(wrapper_dst.name) + return get_venv_pip(venv_dir) + + def ensure_supported_python(python_bin: str) -> None: version = get_python_version(python_bin) if version < MIN_PYTHON_VERSION: @@ -2566,6 +2631,7 @@ def install_deps(*, python_bin: str, venv_dir: Path, recreate: bool) -> Path: ensure_supported_python(python_bin) venv_dir = venv_dir.expanduser().resolve() venv_python = get_venv_python(venv_dir) + venv_pip = get_venv_pip(venv_dir) print_step(f"使用 Python 解释器:{python_bin}") if recreate and venv_dir.exists(): @@ -2578,13 +2644,15 @@ def install_deps(*, python_bin: str, venv_dir: Path, recreate: bool) -> Path: else: print_step(f"复用已有虚拟环境:{venv_dir}") - print_step("升级 pip") - run([str(venv_python), "-m", "pip", "install", "--upgrade", "pip"]) + if os.name == "nt": + print_step("升级 pip") + run([str(venv_python), "-m", "pip", "install", "--upgrade", "pip"]) + else: + print_step("为虚拟环境配置 uv 兼容 pip 命令") + venv_pip = configure_venv_pip_compat(venv_dir, venv_python) print_step("安装项目依赖") - run( - [str(venv_python), "-m", "pip", "install", "-r", str(ROOT / "requirements.txt")] - ) + run([str(venv_pip), "install", "-r", str(ROOT / "requirements.txt")]) return venv_python diff --git a/scripts/uv-pip-compat.sh b/scripts/uv-pip-compat.sh new file mode 100644 index 00000000..b3158219 --- /dev/null +++ b/scripts/uv-pip-compat.sh @@ -0,0 +1,56 @@ +#!/bin/sh + +set -eu + +SCRIPT_PATH="$0" +SCRIPT_DIR=$(CDPATH= cd -- "$(dirname -- "$SCRIPT_PATH")" && pwd) +COMMAND_NAME=$(basename -- "$SCRIPT_PATH") + +if [ "${COMMAND_NAME}" = "uv-pip-compat" ] || [ "${COMMAND_NAME}" = "uv-pip-compat.sh" ]; then + if [ "$#" -eq 0 ]; then + echo "用法: uv-pip-compat [args...]" >&2 + exit 2 + fi + COMMAND_NAME="$1" + shift +fi + +if [ -x "${SCRIPT_DIR}/uv" ]; then + UV_BIN="${SCRIPT_DIR}/uv" +elif command -v uv >/dev/null 2>&1; then + UV_BIN=$(command -v uv) +else + echo "未找到 uv,可执行 pip 兼容层无法继续运行。" >&2 + exit 127 +fi + +case "${COMMAND_NAME}" in + pip|pip3|pip3.*) + if [ "$#" -eq 0 ]; then + exec "${UV_BIN}" pip --help + fi + + case "$1" in + -V|--version|version) + exec "${UV_BIN}" --version + ;; + help) + shift + exec "${UV_BIN}" help pip "$@" + ;; + *) + exec "${UV_BIN}" pip "$@" + ;; + esac + ;; + pip-compile) + exec "${UV_BIN}" pip compile "$@" + ;; + pip-sync) + exec "${UV_BIN}" pip sync "$@" + ;; + *) + echo "不支持的 pip 兼容命令入口:${COMMAND_NAME}" >&2 + exit 2 + ;; +esac diff --git a/tests/test_feishu.py b/tests/test_feishu.py index baf11d8d..488a3242 100644 --- a/tests/test_feishu.py +++ b/tests/test_feishu.py @@ -1,4 +1,5 @@ import sys +import asyncio import json import unittest from types import ModuleType, SimpleNamespace @@ -195,6 +196,51 @@ class TestFeishu(unittest.TestCase): self.assertEqual(response.message_id, "om_789") self.assertEqual(response.chat_id, "oc_789") + def test_run_ws_client_binds_thread_local_event_loop(self): + client = self._build_client() + original_loop = object() + fake_ws_client = MagicMock() + created_loops = [] + real_new_event_loop = asyncio.new_event_loop + + def _new_loop(): + loop = real_new_event_loop() + created_loops.append(loop) + return loop + + with patch("app.modules.feishu.feishu.lark_ws_client_module.loop", original_loop), patch( + "app.modules.feishu.feishu.lark_ws_client_module._select", + new=MagicMock(return_value=None), + ), patch("app.modules.feishu.feishu.asyncio.new_event_loop", side_effect=_new_loop), patch( + "app.modules.feishu.feishu.lark.ws.Client", return_value=fake_ws_client + ), patch.object( + fake_ws_client, "start", side_effect=lambda: None + ) as mock_start: + client._run_ws_client() + + self.assertIsNone(client._ws_loop) + mock_start.assert_called_once() + self.assertEqual(len(created_loops), 1) + self.assertTrue(created_loops[0].is_closed()) + + def test_stop_disconnects_ws_client_via_threadsafe_loop(self): + client = self._build_client() + stop_loop = MagicMock() + stop_loop.is_running.return_value = True + client._ws_loop = stop_loop + client._ws_client = MagicMock() + client._ws_thread = MagicMock() + client._ws_thread.is_alive.return_value = False + + future = MagicMock() + future.result.return_value = None + + with patch("app.modules.feishu.feishu.asyncio.run_coroutine_threadsafe", return_value=future) as runner: + client.stop() + + runner.assert_called_once() + future.result.assert_called_once_with(timeout=5) + if __name__ == "__main__": unittest.main()