From 860bbf3556474722ca76d580b85e82f589c8d5f9 Mon Sep 17 00:00:00 2001 From: qqcomeup Date: Wed, 21 Jan 2026 22:57:12 +0800 Subject: [PATCH] Add files via upload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR 改动汇总 核心改动 将自定义多线程处理逻辑改为直接使用系统 TransferChain 队列 代码变化 新增: 导入 TransferTask __handle_file() 中创建 TransferTask 并推送到队列 删除: 自定义队列和线程池相关代码(_file_queue, _worker_threads, _queue_active, _thread_count) send_msg() 消息汇总方法 _medias 消息汇总逻辑 interval 和 thread_count 配置项 约 200 行冗余代码 修改: __handle_file() 从完整处理改为只识别+推送 event_handler() 和 sync_all() 改为直接调用处理 版本号 2.5.9 → 2.6.0 效果 插件只负责监控和识别,整理任务由系统多线程队列统一处理,代码更简洁,功能更完整。 --- plugins.v2/cloudlinkmonitor/__init__.py | 254 +++--------------------- 1 file changed, 25 insertions(+), 229 deletions(-) diff --git a/plugins.v2/cloudlinkmonitor/__init__.py b/plugins.v2/cloudlinkmonitor/__init__.py index 5d63409..f5dc333 100644 --- a/plugins.v2/cloudlinkmonitor/__init__.py +++ b/plugins.v2/cloudlinkmonitor/__init__.py @@ -28,7 +28,7 @@ from app.helper.directory import DirectoryHelper from app.log import logger from app.modules.filemanager import FileManagerModule from app.plugins import _PluginBase -from app.schemas import NotificationType, TransferInfo, TransferDirectoryConf +from app.schemas import NotificationType, TransferInfo, TransferDirectoryConf, TransferTask from app.schemas.types import EventType, MediaType, SystemConfigKey from app.utils.string import StringUtils from app.utils.system import SystemUtils @@ -63,7 +63,7 @@ class CloudLinkMonitor(_PluginBase): # 插件图标 plugin_icon = "Linkease_A.png" # 插件版本 - plugin_version = "2.5.9" + plugin_version = "2.6.0" # 插件作者 plugin_author = "thsrite" # 作者主页 @@ -138,7 +138,6 @@ class CloudLinkMonitor(_PluginBase): self._transfer_type = config.get("transfer_type") self._monitor_dirs = config.get("monitor_dirs") or "" self._exclude_keywords = config.get("exclude_keywords") or "" - self._interval = config.get("interval") or 10 self._cron = config.get("cron") self._size = config.get("size") or 0 self._softlink = config.get("softlink") @@ -150,9 +149,6 @@ class CloudLinkMonitor(_PluginBase): if self._enabled or self._onlyonce: # 定时服务管理器 self._scheduler = BackgroundScheduler(timezone=settings.TZ) - if self._notify: - # 追加入库消息统一发送服务 - self._scheduler.add_job(self.send_msg, trigger='interval', seconds=15) # 读取目录配置 monitor_dirs = self._monitor_dirs.split("\n") @@ -266,7 +262,6 @@ class CloudLinkMonitor(_PluginBase): "transfer_type": self._transfer_type, "monitor_dirs": self._monitor_dirs, "exclude_keywords": self._exclude_keywords, - "interval": self._interval, "history": self._history, "softlink": self._softlink, "cron": self._cron, @@ -455,214 +450,32 @@ class CloudLinkMonitor(_PluginBase): logger.error(f"未配置监控目录 {mon_path} 的目的目录") return - # 转移文件 - transferinfo: TransferInfo = self.chain.transfer(fileitem=file_item, - meta=file_meta, - mediainfo=mediainfo, - target_directory=target_dir, - episodes_info=episodes_info) - - if not transferinfo: - logger.error("文件转移模块运行失败") - return - - if not transferinfo.success: - # 转移失败 - logger.warn(f"{file_path.name} 入库失败:{transferinfo.message}") - - if self._history: - # 新增转移失败历史记录 - self.transferhis.add_fail( - fileitem=file_item, - mode=transfer_type, - meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo - ) - if self._notify: - self.post_message( - mtype=NotificationType.Manual, - title=f"{mediainfo.title_year}{file_meta.season_episode} 入库失败!", - text=f"原因:{transferinfo.message or '未知'}", - image=mediainfo.get_message_image() - ) - return - - if self._history: - # 新增转移成功历史记录 - self.transferhis.add_success( - fileitem=file_item, - mode=transfer_type, - meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo - ) - - # 刮削 - if self._scrape: - self.mediaChain.scrape_metadata(fileitem=transferinfo.target_diritem, - meta=file_meta, - mediainfo=mediainfo) - """ - { - "title_year season": { - "files": [ - { - "path":, - "mediainfo":, - "file_meta":, - "transferinfo": - } - ], - "time": "2023-08-24 23:23:23.332" - } - } - """ - if self._notify: - # 发送消息汇总 - media_list = self._medias.get(mediainfo.title_year + " " + file_meta.season) or {} - if media_list: - media_files = media_list.get("files") or [] - if media_files: - file_exists = False - for file in media_files: - if str(file_path) == file.get("path"): - file_exists = True - break - if not file_exists: - media_files.append({ - "path": str(file_path), - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - }) - else: - media_files = [ - { - "path": str(file_path), - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - } - ] - media_list = { - "files": media_files, - "time": datetime.datetime.now() - } - else: - media_list = { - "files": [ - { - "path": str(file_path), - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - } - ], - "time": datetime.datetime.now() - } - self._medias[mediainfo.title_year + " " + file_meta.season] = media_list - - if self._refresh: - # 广播事件 - self.eventmanager.send_event(EventType.TransferComplete, { - 'meta': file_meta, - 'mediainfo': mediainfo, - 'transferinfo': transferinfo - }) - - if self._softlink: - # 通知实时软连接生成 - self.eventmanager.send_event(EventType.PluginAction, { - 'file_path': str(transferinfo.target_item.path), - 'action': 'softlink_file' - }) - - if self._strm: - # 通知Strm助手生成 - self.eventmanager.send_event(EventType.PluginAction, { - 'file_path': str(transferinfo.target_item.path), - 'action': 'cloudstrm_file' - }) - - # 移动模式删除空目录 - if transfer_type == "move": - for file_dir in file_path.parents: - if len(str(file_dir)) <= len(str(Path(mon_path))): - # 重要,删除到监控目录为止 - break - files = SystemUtils.list_files(file_dir, settings.RMT_MEDIAEXT + settings.DOWNLOAD_TMPEXT) - if not files: - logger.warn(f"移动模式,删除空目录:{file_dir}") - shutil.rmtree(file_dir, ignore_errors=True) + # 创建转移任务并推送到TransferChain队列进行多线程处理 + transfer_task = TransferTask( + fileitem=file_item, + meta=file_meta, + mediainfo=mediainfo, + target_directory=target_dir, + episodes_info=episodes_info, + transfer_type=transfer_type, + background=True + ) + + # 推送到TransferChain的多线程队列 + logger.info(f"将 {file_path.name} 推送到整理队列进行多线程处理") + self.transferchian.put_to_queue(transfer_task) + + # 不再需要后续的转移、刮削等处理,由TransferChain统一处理 + # TransferChain会自动处理: + # 1. 文件转移 + # 2. 历史记录 + # 3. 刮削事件 + # 4. 消息通知 + # 5. 其他后续处理 except Exception as e: logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc())) - def send_msg(self): - """ - 定时检查是否有媒体处理完,发送统一消息 - """ - if not self._medias or not self._medias.keys(): - return - - # 遍历检查是否已刮削完,发送消息 - for medis_title_year_season in list(self._medias.keys()): - media_list = self._medias.get(medis_title_year_season) - logger.info(f"开始处理媒体 {medis_title_year_season} 消息") - - if not media_list: - continue - - # 获取最后更新时间 - last_update_time = media_list.get("time") - media_files = media_list.get("files") - if not last_update_time or not media_files: - continue - - transferinfo = media_files[0].get("transferinfo") - file_meta = media_files[0].get("file_meta") - mediainfo = media_files[0].get("mediainfo") - # 判断剧集最后更新时间距现在是已超过10秒或者电影,发送消息 - if (datetime.datetime.now() - last_update_time).total_seconds() > int(self._interval) \ - or mediainfo.type == MediaType.MOVIE: - # 发送通知 - if self._notify: - - # 汇总处理文件总大小 - total_size = 0 - file_count = 0 - - # 剧集汇总 - episodes = [] - for file in media_files: - transferinfo = file.get("transferinfo") - total_size += transferinfo.total_size - file_count += 1 - - file_meta = file.get("file_meta") - if file_meta and file_meta.begin_episode: - episodes.append(file_meta.begin_episode) - - transferinfo.total_size = total_size - # 汇总处理文件数量 - transferinfo.file_count = file_count - - # 剧集季集信息 S01 E01-E04 || S01 E01、E02、E04 - season_episode = None - # 处理文件多,说明是剧集,显示季入库消息 - if mediainfo.type == MediaType.TV: - # 季集文本 - season_episode = f"{file_meta.season} {StringUtils.format_ep(episodes)}" - # 发送消息 - self.transferchian.send_transfer_message(meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo, - season_episode=season_episode) - # 发送完消息,移出key - del self._medias[medis_title_year_season] - continue - def get_state(self) -> bool: return self._enabled @@ -939,23 +752,6 @@ class CloudLinkMonitor(_PluginBase): } } ] - }, - { - 'component': 'VCol', - 'props': { - 'cols': 12, - 'md': 4 - }, - 'content': [ - { - 'component': 'VTextField', - 'props': { - 'model': 'interval', - 'label': '入库消息延迟', - 'placeholder': '10' - } - } - ] } ] }, @@ -1041,7 +837,7 @@ class CloudLinkMonitor(_PluginBase): 'props': { 'type': 'info', 'variant': 'tonal', - 'text': '入库消息延迟默认10s,如网络较慢可酌情调大,有助于发送统一入库消息。' + 'text': '文件识别后将推送到MoviePilot的多线程整理队列进行处理,线程数由系统配置决定。' } } ]