From 51d8beebeee936562d9489203263708d7598d6b3 Mon Sep 17 00:00:00 2001 From: thsrite Date: Tue, 19 Mar 2024 22:09:22 +0800 Subject: [PATCH] =?UTF-8?q?fix=201.3=E4=BA=91=E7=9B=98=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 2 +- plugins/cloudlinkmonitor/__init__.py | 544 +++++++++++++++++---------- 2 files changed, 349 insertions(+), 197 deletions(-) diff --git a/package.json b/package.json index 42cc5ec..0e5f2c5 100644 --- a/package.json +++ b/package.json @@ -122,7 +122,7 @@ "CloudLinkMonitor": { "name": "云盘实时链接", "description": "监控云盘目录文件变化,自动转移链接(不刮削)。", - "version": "1.2", + "version": "1.3", "icon": "Linkease_A.png", "author": "thsrite", "level": 1 diff --git a/plugins/cloudlinkmonitor/__init__.py b/plugins/cloudlinkmonitor/__init__.py index 125a097..5c84cb8 100644 --- a/plugins/cloudlinkmonitor/__init__.py +++ b/plugins/cloudlinkmonitor/__init__.py @@ -1,5 +1,6 @@ import datetime import re +import shutil import threading import traceback from pathlib import Path @@ -16,14 +17,16 @@ from app import schemas from app.chain.tmdb import TmdbChain from app.chain.transfer import TransferChain from app.core.config import settings +from app.core.context import MediaInfo from app.core.event import eventmanager, Event from app.core.metainfo import MetaInfoPath +from app.db.downloadhistory_oper import DownloadHistoryOper from app.db.transferhistory_oper import TransferHistoryOper from app.log import logger +from app.modules.filetransfer import FileTransferModule from app.plugins import _PluginBase -from app.core.context import MediaInfo from app.schemas import Notification, NotificationType, TransferInfo -from app.schemas.types import EventType, MediaType +from app.schemas.types import EventType, MediaType, SystemConfigKey from app.utils.string import StringUtils from app.utils.system import SystemUtils @@ -57,7 +60,7 @@ class CloudLinkMonitor(_PluginBase): # 插件图标 plugin_icon = "Linkease_A.png" # 插件版本 - plugin_version = "1.2" + plugin_version = "1.3" # 插件作者 plugin_author = "thsrite" # 作者主页 @@ -65,43 +68,47 @@ class CloudLinkMonitor(_PluginBase): # 插件配置项ID前缀 plugin_config_prefix = "cloudlinkmonitor_" # 加载顺序 - plugin_order = 5 + plugin_order = 4 # 可使用的用户级别 auth_level = 1 # 私有属性 _scheduler = None + transferhis = None + downloadhis = None + transferchian = None + tmdbchain = None _observer = [] _enabled = False _notify = False _onlyonce = False - tmdbchain = None - transferhis = None - transferchian = None _cron = None + filetransfer = None _size = 0 - # 转移方式 - _monitor_dirs = "" - _exclude_keywords = "" - _cloud_medias = {} - # 模式 compatibility/fast _mode = "compatibility" - _transfer_type = "link" + # 转移方式 + _transfer_type = settings.TRANSFER_TYPE + _monitor_dirs = "" + _exclude_keywords = "" + _interval: int = 10 # 存储源目录与目的目录关系 _dirconf: Dict[str, Optional[Path]] = {} # 存储源目录转移方式 _transferconf: Dict[str, Optional[str]] = {} + _medias = {} # 退出事件 _event = threading.Event() def init_plugin(self, config: dict = None): + self.transferhis = TransferHistoryOper() + self.downloadhis = DownloadHistoryOper() + self.transferchian = TransferChain() + self.tmdbchain = TmdbChain() + self.filetransfer = FileTransferModule() # 清空配置 self._dirconf = {} self._transferconf = {} - self.tmdbchain = TmdbChain() - self.transferhis = TransferHistoryOper() - self.transferchian = TransferChain() # 读取配置 if config: @@ -112,6 +119,7 @@ class CloudLinkMonitor(_PluginBase): self._transfer_type = config.get("transfer_type") self._monitor_dirs = config.get("monitor_dirs") or "" self._exclude_keywords = config.get("exclude_keywords") or "" + self._interval = config.get("interval") or 10 self._cron = config.get("cron") self._size = config.get("size") or 0 @@ -133,6 +141,12 @@ class CloudLinkMonitor(_PluginBase): if not mon_path: continue + # 自定义转移方式 + _transfer_type = self._transfer_type + if mon_path.count("#") == 1: + _transfer_type = mon_path.split("#")[1] + mon_path = mon_path.split("#")[0] + # 存储目的目录 if SystemUtils.is_windows(): if mon_path.count(":") > 1: @@ -144,14 +158,16 @@ class CloudLinkMonitor(_PluginBase): paths = mon_path.split(":") # 目的目录 + target_path = None if len(paths) > 1: mon_path = paths[0] target_path = Path(paths[1]) self._dirconf[mon_path] = target_path else: - logger.warn(f"{mon_path} 未配置目的目录,将不会进行连接") - self.systemmessage.put(f"{mon_path} 未配置目的目录,将不会进行连接!") - continue + self._dirconf[mon_path] = None + + # 转移方式 + self._transferconf[mon_path] = _transfer_type # 启用目录监控 if self._enabled: @@ -203,17 +219,6 @@ class CloudLinkMonitor(_PluginBase): # 保存配置 self.__update_config() - # 全量同步定时 - if self._enabled and self._cron: - try: - self._scheduler.add_job(func=self.sync_all, - trigger=CronTrigger.from_crontab(self._cron), - name="云盘实时链接") - except Exception as err: - logger.error(f"定时任务配置错误:{str(err)}") - # 推送实时消息 - self.systemmessage.put(f"执行周期配置错误:{str(err)}") - # 启动定时服务 if self._scheduler.get_jobs(): self._scheduler.print_jobs() @@ -231,6 +236,7 @@ class CloudLinkMonitor(_PluginBase): "transfer_type": self._transfer_type, "monitor_dirs": self._monitor_dirs, "exclude_keywords": self._exclude_keywords, + "interval": self._interval, "cron": self._cron, "size": self._size }) @@ -242,27 +248,27 @@ class CloudLinkMonitor(_PluginBase): """ if event: event_data = event.event_data - if not event_data or event_data.get("action") != "realtime_link": + if not event_data or event_data.get("action") != "directory_sync": return self.post_message(channel=event.event_data.get("channel"), - title="开始实时连接 ...", + title="开始同步监控目录 ...", userid=event.event_data.get("user")) self.sync_all() if event: self.post_message(channel=event.event_data.get("channel"), - title="实时连接完成!", userid=event.event_data.get("user")) + title="监控目录同步完成!", userid=event.event_data.get("user")) def sync_all(self): """ 立即运行一次,全量同步目录中所有文件 """ - logger.info("开始全量实时连接 ...") + logger.info("开始全量同步监控目录 ...") # 遍历所有监控目录 for mon_path in self._dirconf.keys(): # 遍历目录下所有文件 - for file_path in SystemUtils.list_files(Path(mon_path), ['.*']): + for file_path in SystemUtils.list_files(Path(mon_path), settings.RMT_MEDIAEXT): self.__handle_file(event_path=str(file_path), mon_path=mon_path) - logger.info("全量实时连接完成!") + logger.info("全量同步监控目录完成!") def event_handler(self, event, mon_path: str, text: str, event_path: str): """ @@ -277,131 +283,6 @@ class CloudLinkMonitor(_PluginBase): logger.debug("文件%s:%s" % (text, event_path)) self.__handle_file(event_path=event_path, mon_path=mon_path) - def _link_file(self, src_path: Path, mon_path: str, - target_path: Path, transfer_type: str = "softlink") -> Tuple[bool, str]: - """ - 对文件做纯链接处理,不做识别重命名,则监控模块调用 - :param : 来源渠道 - :param src_path: 源文件 - :param target_path: 目标目录 - :param transfer_type: 转移方式 - """ - new_file = str(src_path).replace(mon_path, str(target_path)) - new_path = Path(new_file) - if new_path.exists(): - return True, "目标路径文件已存在" - else: - # 创建目标目录 - if not new_path.parent.exists(): - new_path.parent.mkdir(parents=True, exist_ok=True) - # 转移 - if transfer_type == "copy": - code, errmsg = SystemUtils.copy(src_path, new_path) - return True if code == 0 else False, errmsg - else: - # 元数据 - file_meta = MetaInfoPath(src_path) - # 识别媒体信息 - mediainfo: MediaInfo = self.chain.recognize_media(meta=file_meta) - # 获取集数据 - if mediainfo.type == MediaType.TV: - episodes_info = self.tmdbchain.tmdb_episodes(tmdbid=mediainfo.tmdb_id, - season=file_meta.begin_season or 1) - else: - episodes_info = None - # 转移 - transferinfo: TransferInfo = self.chain.transfer(mediainfo=mediainfo, - path=src_path, - transfer_type=transfer_type, - target=target_path, - meta=file_meta, - episodes_info=episodes_info) - if not transferinfo: - logger.error("文件转移模块运行失败") - return False, "文件转移模块运行失败" - if not transferinfo.success: - if not transferinfo.success: - # 转移失败 - logger.warn(f"{src_path.name} 入库失败:{transferinfo.message}") - # 新增转移失败历史记录 - self.transferhis.add_fail( - src_path=src_path, - mode=transfer_type, - meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo - ) - if self._notify: - self.chain.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{mediainfo.title_year}{file_meta.season_episode} 入库失败!", - text=f"原因:{transferinfo.message or '未知'}", - image=mediainfo.get_message_image() - )) - return False, "文件转移模块运行失败" - - # 新增转移成功历史记录 - self.transferhis.add_success( - src_path=src_path, - mode=transfer_type, - meta=file_meta, - mediainfo=mediainfo, - transferinfo=transferinfo - ) - - # 发送消息汇总 - media_list = self._cloud_medias.get(mediainfo.title_year + " " + file_meta.season) or {} - if media_list: - media_files = media_list.get("files") or [] - if media_files: - file_exists = False - for file in media_files: - if str(src_path) == file.get("path"): - file_exists = True - break - if not file_exists: - media_files.append({ - "path": src_path, - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - }) - else: - media_files = [ - { - "path": src_path, - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - } - ] - media_list = { - "files": media_files, - "time": datetime.datetime.now() - } - else: - media_list = { - "files": [ - { - "path": src_path, - "mediainfo": mediainfo, - "file_meta": file_meta, - "transferinfo": transferinfo - } - ], - "time": datetime.datetime.now() - } - self._cloud_medias[mediainfo.title_year + " " + file_meta.season] = media_list - - # 广播事件 - self.eventmanager.send_event(EventType.TransferComplete, { - 'meta': file_meta, - 'mediainfo': mediainfo, - 'transferinfo': transferinfo - }) - - return True, "文件转移模块运行成功" - def __handle_file(self, event_path: str, mon_path: str): """ 同步一个文件 @@ -414,6 +295,10 @@ class CloudLinkMonitor(_PluginBase): return # 全程加锁 with lock: + transfer_history = self.transferhis.get_by_src(event_path) + if transfer_history: + logger.debug("文件已处理过:%s" % event_path) + return # 回收站及隐藏的文件不处理 if event_path.find('/@Recycle/') != -1 \ @@ -430,39 +315,214 @@ class CloudLinkMonitor(_PluginBase): logger.info(f"{event_path} 命中过滤关键字 {keyword},不处理") return + # 整理屏蔽词不处理 + transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords) + if transfer_exclude_words: + for keyword in transfer_exclude_words: + if not keyword: + continue + if keyword and re.search(r"%s" % keyword, event_path, re.IGNORECASE): + logger.info(f"{event_path} 命中整理屏蔽词 {keyword},不处理") + return + + # 不是媒体文件不处理 + if file_path.suffix not in settings.RMT_MEDIAEXT: + logger.debug(f"{event_path} 不是媒体文件") + return + + # 判断是不是蓝光目录 + bluray_flag = False + if re.search(r"BDMV[/\\]STREAM", event_path, re.IGNORECASE): + bluray_flag = True + # 截取BDMV前面的路径 + blurray_dir = event_path[:event_path.find("BDMV")] + file_path = Path(blurray_dir) + logger.info(f"{event_path} 是蓝光目录,更正文件路径为:{str(file_path)}") + + # 查询历史记录,已转移的不处理 + if self.transferhis.get_by_src(str(file_path)): + logger.info(f"{file_path} 已整理过") + return + + # 元数据 + file_meta = MetaInfoPath(file_path) + if not file_meta.name: + logger.error(f"{file_path.name} 无法识别有效信息") + return + # 判断文件大小 - if self._size and float(self._size) > 0 and file_path.stat().st_size < float(self._size) * 1024: - logger.info(f"{event_path} 文件大小小于最小文件大小,复制...") - _transfer_type = "copy" - else: - _transfer_type = self._transfer_type + if self._size and float(self._size) > 0 and file_path.stat().st_size < float(self._size) * 1024 ** 3: + logger.info(f"{file_path} 文件大小小于监控文件大小,不处理") + return # 查询转移目的目录 target: Path = self._dirconf.get(mon_path) - if not target: - logger.warn(f"{mon_path} 未配置目的目录,将不会进行软连接") + # 查询转移方式 + transfer_type = self._transferconf.get(mon_path) + + # 识别媒体信息 + mediainfo: MediaInfo = self.chain.recognize_media(meta=file_meta) + if not mediainfo: + logger.warn(f'未识别到媒体信息,标题:{file_meta.name}') + # 新增转移成功历史记录 + his = self.transferhis.add_fail( + src_path=file_path, + mode=transfer_type, + meta=file_meta + ) + if self._notify: + self.post_message( + mtype=NotificationType.Manual, + title=f"{file_path.name} 未识别到媒体信息,无法入库!\n" + f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别转移。" + ) return - # 开始连接 - self._link_file(src_path=file_path, mon_path=mon_path, - target_path=target, transfer_type=_transfer_type) + # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title + if not settings.SCRAP_FOLLOW_TMDB: + transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id, + mtype=mediainfo.type.value) + if transfer_history: + mediainfo.title = transfer_history.title + logger.info(f"{file_path.name} 识别为:{mediainfo.type.value} {mediainfo.title_year}") + + # 获取集数据 + if mediainfo.type == MediaType.TV: + episodes_info = self.tmdbchain.tmdb_episodes(tmdbid=mediainfo.tmdb_id, + season=file_meta.begin_season or 1) + else: + episodes_info = None + + # 转移 + transferinfo: TransferInfo = self.filetransfer.transfer_media(in_path=file_path, + in_meta=file_meta, + mediainfo=mediainfo, + transfer_type=transfer_type, + target_dir=target, + episodes_info=episodes_info) + if not transferinfo: + logger.error("文件转移模块运行失败") + return + + if not transferinfo.success: + # 转移失败 + logger.warn(f"{file_path.name} 入库失败:{transferinfo.message}") + # 新增转移失败历史记录 + self.transferhis.add_fail( + src_path=file_path, + mode=transfer_type, + meta=file_meta, + mediainfo=mediainfo, + transferinfo=transferinfo + ) + if self._notify: + self.post_message( + mtype=NotificationType.Manual, + title=f"{mediainfo.title_year}{file_meta.season_episode} 入库失败!", + text=f"原因:{transferinfo.message or '未知'}", + image=mediainfo.get_message_image() + ) + return + + # 新增转移成功历史记录 + self.transferhis.add_success( + src_path=file_path, + mode=transfer_type, + meta=file_meta, + mediainfo=mediainfo, + transferinfo=transferinfo + ) + + """ + { + "title_year season": { + "files": [ + { + "path":, + "mediainfo":, + "file_meta":, + "transferinfo": + } + ], + "time": "2023-08-24 23:23:23.332" + } + } + """ + # 发送消息汇总 + media_list = self._medias.get(mediainfo.title_year + " " + file_meta.season) or {} + if media_list: + media_files = media_list.get("files") or [] + if media_files: + file_exists = False + for file in media_files: + if str(file_path) == file.get("path"): + file_exists = True + break + if not file_exists: + media_files.append({ + "path": str(file_path), + "mediainfo": mediainfo, + "file_meta": file_meta, + "transferinfo": transferinfo + }) + else: + media_files = [ + { + "path": str(file_path), + "mediainfo": mediainfo, + "file_meta": file_meta, + "transferinfo": transferinfo + } + ] + media_list = { + "files": media_files, + "time": datetime.datetime.now() + } + else: + media_list = { + "files": [ + { + "path": str(file_path), + "mediainfo": mediainfo, + "file_meta": file_meta, + "transferinfo": transferinfo + } + ], + "time": datetime.datetime.now() + } + self._medias[mediainfo.title_year + " " + file_meta.season] = media_list + + # 广播事件 + self.eventmanager.send_event(EventType.TransferComplete, { + 'meta': file_meta, + 'mediainfo': mediainfo, + 'transferinfo': transferinfo + }) + + # 移动模式删除空目录 + if transfer_type == "move": + for file_dir in file_path.parents: + if len(str(file_dir)) <= len(str(Path(mon_path))): + # 重要,删除到监控目录为止 + break + files = SystemUtils.list_files(file_dir, settings.RMT_MEDIAEXT + settings.DOWNLOAD_TMPEXT) + if not files: + logger.warn(f"移动模式,删除空目录:{file_dir}") + shutil.rmtree(file_dir, ignore_errors=True) except Exception as e: logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc())) - def get_state(self) -> bool: - return self._enabled - def send_msg(self): """ 定时检查是否有媒体处理完,发送统一消息 """ - if not self._cloud_medias or not self._cloud_medias.keys(): + if not self._medias or not self._medias.keys(): return # 遍历检查是否已刮削完,发送消息 - for medis_title_year_season in list(self._cloud_medias.keys()): - media_list = self._cloud_medias.get(medis_title_year_season) + for medis_title_year_season in list(self._medias.keys()): + media_list = self._medias.get(medis_title_year_season) logger.info(f"开始处理媒体 {medis_title_year_season} 消息") if not media_list: @@ -478,10 +538,11 @@ class CloudLinkMonitor(_PluginBase): file_meta = media_files[0].get("file_meta") mediainfo = media_files[0].get("mediainfo") # 判断剧集最后更新时间距现在是已超过10秒或者电影,发送消息 - if (datetime.datetime.now() - last_update_time).total_seconds() > int(10) \ + if (datetime.datetime.now() - last_update_time).total_seconds() > int(self._interval) \ or mediainfo.type == MediaType.MOVIE: # 发送通知 if self._notify: + # 汇总处理文件总大小 total_size = 0 file_count = 0 @@ -513,9 +574,12 @@ class CloudLinkMonitor(_PluginBase): transferinfo=transferinfo, season_episode=season_episode) # 发送完消息,移出key - del self._cloud_medias[medis_title_year_season] + del self._medias[medis_title_year_season] continue + def get_state(self) -> bool: + return self._enabled + @staticmethod def get_command() -> List[Dict[str, Any]]: """ @@ -523,24 +587,45 @@ class CloudLinkMonitor(_PluginBase): :return: 命令关键字、事件、描述、附带数据 """ return [{ - "cmd": "/realtime_link", + "cmd": "/directory_sync", "event": EventType.PluginAction, - "desc": "实时软连接", + "desc": "目录监控同步", "category": "管理", "data": { - "action": "realtime_link" + "action": "directory_sync" } }] def get_api(self) -> List[Dict[str, Any]]: return [{ - "path": "/realtime_link", + "path": "/directory_sync", "endpoint": self.sync, "methods": ["GET"], - "summary": "实时软连接", - "description": "实时软连接", + "summary": "目录监控同步", + "description": "目录监控同步", }] + def get_service(self) -> List[Dict[str, Any]]: + """ + 注册插件公共服务 + [{ + "id": "服务ID", + "name": "服务名称", + "trigger": "触发器:cron/interval/date/CronTrigger.from_crontab()", + "func": self.xxx, + "kwargs": {} # 定时器参数 + }] + """ + if self._enabled and self._cron: + return [{ + "id": "DirMonitor", + "name": "目录监控全量同步服务", + "trigger": CronTrigger.from_crontab(self._cron), + "func": self.sync_all, + "kwargs": {} + }] + return [] + def sync(self) -> schemas.Response: """ API调用目录同步 @@ -613,7 +698,7 @@ class CloudLinkMonitor(_PluginBase): 'component': 'VCol', 'props': { 'cols': 12, - 'md': 3 + 'md': 4 }, 'content': [ { @@ -633,7 +718,7 @@ class CloudLinkMonitor(_PluginBase): 'component': 'VCol', 'props': { 'cols': 12, - 'md': 3 + 'md': 4 }, 'content': [ { @@ -657,7 +742,29 @@ class CloudLinkMonitor(_PluginBase): 'component': 'VCol', 'props': { 'cols': 12, - 'md': 3 + 'md': 4 + }, + 'content': [ + { + 'component': 'VTextField', + 'props': { + 'model': 'interval', + 'label': '入库消息延迟', + 'placeholder': '10' + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 4 }, 'content': [ { @@ -674,15 +781,15 @@ class CloudLinkMonitor(_PluginBase): 'component': 'VCol', 'props': { 'cols': 12, - 'md': 3 + 'md': 4 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'size', - 'label': '最小文件大小(KB)', - 'placeholder': '' + 'label': '监控文件大小(GB)', + 'placeholder': '0' } } ] @@ -704,9 +811,11 @@ class CloudLinkMonitor(_PluginBase): 'model': 'monitor_dirs', 'label': '监控目录', 'rows': 5, - 'placeholder': '每一行一个目录,支持以下几种配置方式:\n' + 'placeholder': '每一行一个目录,支持以下几种配置方式,转移方式支持 move、copy、link、softlink、rclone_copy、rclone_move:\n' '监控目录\n' + '监控目录#转移方式\n' '监控目录:转移目的目录\n' + '监控目录:转移目的目录#转移方式' } } ] @@ -749,7 +858,49 @@ class CloudLinkMonitor(_PluginBase): 'props': { 'type': 'info', 'variant': 'tonal', - 'text': '最小文件大小:小于最小文件大小的文件将直接复制,其余则软链接。' + 'text': '监控目录不指定目的目录时,将转移到媒体库目录,并自动创建一级分类目录,同时按配置创建二级分类目录;监控目录指定了目的目录时,不会自动创建一级目录,但会根据配置创建二级分类目录。' + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VAlert', + 'props': { + 'type': 'info', + 'variant': 'tonal', + 'text': '入库消息延迟默认10s,如网络较慢可酌情调大,有助于发送统一入库消息。' + } + } + ] + } + ] + }, + { + 'component': 'VRow', + 'content': [ + { + 'component': 'VCol', + 'props': { + 'cols': 12, + }, + 'content': [ + { + 'component': 'VAlert', + 'props': { + 'type': 'info', + 'variant': 'tonal', + 'text': '监控文件大小:单位GB,0为不开启,低于监控文件大小的文件不会被监控转移。' } } ] @@ -762,12 +913,13 @@ class CloudLinkMonitor(_PluginBase): "enabled": False, "notify": False, "onlyonce": False, - "mode": "compatibility", - "transfer_type": "link", + "mode": "fast", + "transfer_type": settings.TRANSFER_TYPE, "monitor_dirs": "", "exclude_keywords": "", + "interval": 10, "cron": "", - "size": "" + "size": 0 } def get_page(self) -> List[dict]: