From 9319b47fadda0cfa7d26b04a1dd810b682a35d03 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 22 May 2026 09:14:42 +0800 Subject: [PATCH] refactor: use watchfiles for directory monitor --- app/monitor.py | 326 ++++++++++++++++++++----------- requirements.in | 1 - tests/test_monitor_watchfiles.py | 139 +++++++++++++ 3 files changed, 352 insertions(+), 114 deletions(-) create mode 100644 tests/test_monitor_watchfiles.py diff --git a/app/monitor.py b/app/monitor.py index aeadcaea..9300a70b 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -4,13 +4,13 @@ import re import threading import time import traceback +from dataclasses import dataclass from pathlib import Path from threading import Lock from typing import Any, Optional, Dict, List from apscheduler.schedulers.background import BackgroundScheduler -from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent -from watchdog.observers.polling import PollingObserver +from watchfiles import Change, DefaultFilter, watch from app.chain import ChainBase from app.chain.storage import StorageChain @@ -34,29 +34,195 @@ class MonitorChain(ChainBase): pass -class FileMonitorHandler(FileSystemEventHandler): +@dataclass(frozen=True) +class DirectoryChangeEvent: """ - 目录监控响应类 + 目录文件变化事件,隔离底层 watchfiles 事件结构。 """ + change_type: Change + src_path: str + is_directory: bool - def __init__(self, mon_path: Path, callback: Any, **kwargs): - super(FileMonitorHandler, self).__init__(**kwargs) + +class LocalDirectoryWatcher: + """ + 基于 watchfiles 的本地目录监控线程。 + """ + _HANDLE_CHANGES = {Change.added, Change.modified} + + def __init__(self, mon_path: Path, callback: Any, force_polling: Optional[bool] = None): + """ + 初始化本地目录监控。 + :param mon_path: 监控目录 + :param callback: 目录变化回调对象 + :param force_polling: 是否强制使用轮询模式,None 表示由 watchfiles 自动选择 + """ self._watch_path = mon_path - self.callback = callback + self._callback = callback + self._force_polling = force_polling + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + self._watch_filter = DefaultFilter() - def on_created(self, event: FileSystemEvent): - try: - self.callback.event_handler(event=event, text="创建", event_path=event.src_path, - file_size=Path(event.src_path).stat().st_size) - except Exception as e: - logger.error(f"on_created 异常: {e}") + @property + def watch_path(self) -> Path: + """ + 获取监控目录。 + :return: 监控目录 + """ + return self._watch_path - def on_moved(self, event: FileSystemMovedEvent): + def start(self): + """ + 启动本地目录监控线程。 + """ + if not self._watch_path.exists(): + raise FileNotFoundError(f"监控目录不存在: {self._watch_path}") + if not self._watch_path.is_dir(): + raise NotADirectoryError(f"监控路径不是目录: {self._watch_path}") + if self.is_alive(): + logger.info(f"本地目录监控已在运行中: {self._watch_path}") + return + self._stop_event.clear() + self._thread = threading.Thread( + target=self._run, + name=f"MoviePilot-DirectoryWatcher-{self._watch_path.name}", + daemon=True + ) + self._thread.start() + + def stop(self): + """ + 请求停止本地目录监控线程。 + """ + self._stop_event.set() + + def join(self, timeout: Optional[float] = None): + """ + 等待本地目录监控线程退出。 + :param timeout: 最长等待秒数 + """ + if self._thread: + self._thread.join(timeout=timeout) + + def is_alive(self) -> bool: + """ + 判断监控线程是否仍在运行。 + :return: 线程存活状态 + """ + return bool(self._thread and self._thread.is_alive()) + + def _run(self): + """ + 运行 watchfiles 主循环,并在快速模式不可用时回退到轮询。 + """ try: - self.callback.event_handler(event=event, text="移动", event_path=event.dest_path, - file_size=Path(event.dest_path).stat().st_size) - except Exception as e: - logger.error(f"on_moved 异常: {e}") + self._run_watch(force_polling=self._force_polling) + except Exception as err: + if self._stop_event.is_set(): + return + if self._force_polling is True: + logger.error(f"本地目录监控发生错误: {self._watch_path} - {err}") + logger.debug(traceback.format_exc()) + return + logger.warn(f"快速模式监控 {self._watch_path} 失败,将自动切换到兼容模式: {err}") + try: + self._run_watch(force_polling=True) + except Exception as fallback_err: + if not self._stop_event.is_set(): + logger.error(f"兼容模式监控 {self._watch_path} 仍然失败: {fallback_err}") + logger.debug(traceback.format_exc()) + + def _run_watch(self, force_polling: Optional[bool]): + """ + 执行一次 watchfiles 监控循环。 + :param force_polling: 是否强制轮询 + """ + for changes in watch( + str(self._watch_path), + watch_filter=self._watch_filter, + stop_event=self._stop_event, + rust_timeout=1000, + yield_on_timeout=True, + force_polling=force_polling, + recursive=True, + ignore_permission_denied=True): + if self._stop_event.is_set(): + break + if not changes: + continue + self._handle_changes(changes) + + def _handle_changes(self, changes: set[tuple[Change, str]]): + """ + 将 watchfiles 原始变更转换为目录监控事件。 + :param changes: watchfiles 返回的变更集合 + """ + for change_type, path_str in sorted(changes, key=lambda item: item[1]): + if change_type not in self._HANDLE_CHANGES: + continue + event_path = Path(path_str) + event = self._build_event(change_type=change_type, event_path=event_path) + if not event or event.is_directory: + continue + file_size = self._get_file_size(event_path) + if file_size is None: + continue + text = self._change_text(change_type) + try: + self._callback.event_handler( + event=event, + text=text, + event_path=path_str, + file_size=file_size + ) + except Exception as err: + logger.error(f"处理本地目录监控事件失败: {path_str} - {err}") + + @staticmethod + def _build_event(change_type: Change, event_path: Path) -> Optional[DirectoryChangeEvent]: + """ + 构建目录变化事件,路径已不存在时忽略。 + :param change_type: watchfiles 变化类型 + :param event_path: 变化路径 + :return: 目录变化事件 + """ + try: + is_directory = event_path.is_dir() + except OSError as err: + logger.debug(f"读取目录监控事件路径失败: {event_path} - {err}") + return None + if not event_path.exists(): + return None + return DirectoryChangeEvent( + change_type=change_type, + src_path=event_path.as_posix(), + is_directory=is_directory + ) + + @staticmethod + def _get_file_size(event_path: Path) -> Optional[int]: + """ + 读取事件文件大小,文件已消失时返回 None。 + :param event_path: 事件文件路径 + :return: 文件大小 + """ + try: + return event_path.stat().st_size + except OSError as err: + logger.debug(f"读取目录监控文件大小失败: {event_path} - {err}") + return None + + @staticmethod + def _change_text(change_type: Change) -> str: + """ + 转换 watchfiles 事件类型为日志文案。 + :param change_type: watchfiles 变化类型 + :return: 事件描述 + """ + if change_type == Change.modified: + return "修改" + return "新增" class Monitor(ConfigReloadMixin, metaclass=SingletonClass): @@ -67,10 +233,8 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): def __init__(self): super().__init__() - # 退出事件 - self._event = threading.Event() - # 监控服务 - self._observers = [] + # 本地目录监控服务 + self._watchers = [] # 定时服务 self._scheduler = None # 存储过照间隔(分钟) @@ -435,32 +599,25 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): limits=limits) logger.info(f"监控模式决策: {reason}") - if use_polling: - observer = PollingObserver() - logger.info(f"使用兼容模式(轮询)监控 {mon_path}") - else: - observer = self.__choose_observer() - if observer is None: - logger.warn(f"快速模式不可用,自动切换到兼容模式监控 {mon_path}") - observer = PollingObserver() - else: - logger.info(f"使用快速模式监控 {mon_path}") - if limits['warnings']: - for warning in limits['warnings']: - logger.warn(f"系统限制警告: {warning}") - if limits['max_user_watches'] > 0: - usage_percent = (file_count / limits['max_user_watches']) * 100 - logger.info( - f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})") - - self._observers.append(observer) - observer.schedule(FileMonitorHandler(mon_path=mon_path, callback=self), - path=str(mon_path), - recursive=True) - observer.daemon = True - observer.start() - mode_name = "兼容模式(轮询)" if use_polling else "快速模式" + logger.info(f"使用{mode_name}监控 {mon_path}") + if not use_polling: + if limits['warnings']: + for warning in limits['warnings']: + logger.warn(f"系统限制警告: {warning}") + if limits['max_user_watches'] > 0: + usage_percent = (file_count / limits['max_user_watches']) * 100 + logger.info( + f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})") + + watcher = LocalDirectoryWatcher( + mon_path=mon_path, + callback=self, + force_polling=True if use_polling else None + ) + self._watchers.append(watcher) + watcher.start() + logger.info(f"✓ 本地目录监控已启动: {mon_path} [{mode_name}]") except Exception as e: @@ -521,64 +678,6 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): remote_count = len([d for d in monitor_dirs if d.storage != "local" and d.monitor_type == "monitor"]) logger.info(f"目录监控启动完成: 本地监控 {local_count} 个,远程监控 {remote_count} 个") - def __choose_observer(self) -> Optional[Any]: - """ - 选择最优的监控模式(带错误处理和自动回退) - """ - system = platform.system() - - observers_to_try = [] - - try: - if system == 'Linux': - observers_to_try = [ - ('InotifyObserver', - lambda: self.__try_import_observer('watchdog.observers.inotify', 'InotifyObserver')), - ] - elif system == 'Darwin': - observers_to_try = [ - ('FSEventsObserver', - lambda: self.__try_import_observer('watchdog.observers.fsevents', 'FSEventsObserver')), - ] - elif system == 'Windows': - observers_to_try = [ - ('WindowsApiObserver', - lambda: self.__try_import_observer('watchdog.observers.read_directory_changes', - 'WindowsApiObserver')), - ] - - # 尝试每个观察者 - for observer_name, observer_func in observers_to_try: - try: - observer_class = observer_func() - if observer_class: - # 尝试创建实例以验证是否可用 - test_observer = observer_class() - test_observer.stop() # 立即停止测试实例 - logger.debug(f"成功初始化 {observer_name}") - return observer_class() - except Exception as e: - logger.debug(f"初始化 {observer_name} 失败: {e}") - continue - - except Exception as e: - logger.debug(f"选择观察者时出错: {e}") - - logger.debug("所有快速监控模式都不可用,将使用兼容模式") - return None - - @staticmethod - def __try_import_observer(module_name: str, class_name: str): - """ - 尝试导入观察者类 - """ - try: - module = __import__(module_name, fromlist=[class_name]) - return getattr(module, class_name) - except (ImportError, AttributeError) as e: - logger.debug(f"导入 {module_name}.{class_name} 失败: {e}") - return None - def polling_observer(self, storage: str, mon_paths: List[Path]): """ 轮询监控(改进版) @@ -738,17 +837,19 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): """ 退出监控 """ - self._event.set() - if self._observers: + if self._watchers: logger.info("正在停止本地目录监控服务...") - for observer in self._observers: + for watcher in self._watchers: try: - observer.stop() - observer.join() - logger.debug(f"已停止监控服务: {observer}") + watcher.stop() + watcher.join(timeout=5) + if watcher.is_alive(): + logger.warning(f"本地目录监控线程在5秒内未能停止: {watcher.watch_path}") + else: + logger.debug(f"已停止本地目录监控服务: {watcher.watch_path}") except Exception as e: logger.error(f"停止目录监控服务出现了错误:{e}") - self._observers = [] + self._watchers = [] logger.info("本地目录监控服务已停止") if self._scheduler: self._scheduler.remove_all_jobs() @@ -763,4 +864,3 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): self._cache.close() if self._snapshot_cache: self._snapshot_cache.close() - self._event.clear() diff --git a/requirements.in b/requirements.in index ad15d638..d04d7029 100644 --- a/requirements.in +++ b/requirements.in @@ -48,7 +48,6 @@ starlette~=0.46.2 PyVirtualDisplay~=3.0 psutil~=7.0.0 python-dotenv~=1.1.1 -watchdog~=6.0.0 watchfiles~=1.1.0 click~=8.2.1 parse~=1.20.2 diff --git a/tests/test_monitor_watchfiles.py b/tests/test_monitor_watchfiles.py new file mode 100644 index 00000000..01945a0d --- /dev/null +++ b/tests/test_monitor_watchfiles.py @@ -0,0 +1,139 @@ +import tempfile +import unittest +from pathlib import Path +from unittest.mock import MagicMock + +from watchfiles import Change + +from app.monitor import DirectoryChangeEvent, LocalDirectoryWatcher, Monitor + + +class CallbackRecorder: + """ + 测试用目录监控回调记录器。 + """ + + def __init__(self): + """ + 初始化事件记录列表。 + """ + self.events = [] + + def event_handler(self, event, text: str, event_path: str, file_size: int = None): + """ + 记录目录监控分发出来的事件。 + :param event: 目录监控事件 + :param text: 事件描述 + :param event_path: 事件路径 + :param file_size: 文件大小 + """ + self.events.append((event, text, event_path, file_size)) + + +class LocalDirectoryWatcherTest(unittest.TestCase): + """ + watchfiles 本地目录监控测试。 + """ + + def test_handle_changes_dispatches_added_and_modified_files(self): + """ + 新增和修改文件应转换成目录监控整理回调。 + """ + with tempfile.TemporaryDirectory() as temp_dir: + watch_dir = Path(temp_dir) + added_file = watch_dir / "a_added.mkv" + modified_file = watch_dir / "b_modified.mkv" + skipped_dir = watch_dir / "c_dir" + added_file.write_bytes(b"added") + modified_file.write_bytes(b"modified") + skipped_dir.mkdir() + + callback = CallbackRecorder() + watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True) + watcher._handle_changes({ + (Change.added, added_file.as_posix()), + (Change.modified, modified_file.as_posix()), + (Change.deleted, added_file.as_posix()), + (Change.added, skipped_dir.as_posix()), + }) + + self.assertEqual(2, len(callback.events)) + self.assertEqual((Change.added, "新增", added_file.as_posix(), 5), + (callback.events[0][0].change_type, + callback.events[0][1], + callback.events[0][2], + callback.events[0][3])) + self.assertEqual((Change.modified, "修改", modified_file.as_posix(), 8), + (callback.events[1][0].change_type, + callback.events[1][1], + callback.events[1][2], + callback.events[1][3])) + + def test_handle_changes_skips_missing_paths(self): + """ + 事件到达时已经消失的路径不应触发整理。 + """ + with tempfile.TemporaryDirectory() as temp_dir: + watch_dir = Path(temp_dir) + missing_file = watch_dir / "missing.mkv" + + callback = CallbackRecorder() + watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True) + watcher._handle_changes({(Change.added, missing_file.as_posix())}) + + self.assertEqual([], callback.events) + + +class MonitorWatchfilesEventTest(unittest.TestCase): + """ + Monitor 对 watchfiles 事件的兼容处理测试。 + """ + + def test_event_handler_routes_file_events_to_transfer_handler(self): + """ + 文件事件应继续按 local 存储交给整理流程。 + """ + monitor = object.__new__(Monitor) + handle_file = MagicMock() + setattr(monitor, "_Monitor__handle_file", handle_file) + event_path = Path("/downloads/movie.mkv") + event = DirectoryChangeEvent( + change_type=Change.added, + src_path=event_path.as_posix(), + is_directory=False + ) + + monitor.event_handler( + event=event, + text="新增", + event_path=event_path.as_posix(), + file_size=1024 + ) + + handle_file.assert_called_once_with( + storage="local", + event_path=event_path, + file_size=1024 + ) + + def test_event_handler_ignores_directory_events(self): + """ + 目录事件不应进入文件整理流程。 + """ + monitor = object.__new__(Monitor) + handle_file = MagicMock() + setattr(monitor, "_Monitor__handle_file", handle_file) + event_path = Path("/downloads/folder") + event = DirectoryChangeEvent( + change_type=Change.added, + src_path=event_path.as_posix(), + is_directory=True + ) + + monitor.event_handler( + event=event, + text="新增", + event_path=event_path.as_posix() + ) + + handle_file.assert_not_called()