fix 1.3云盘监控

This commit is contained in:
thsrite
2024-03-19 22:09:22 +08:00
parent 2e820bfcf9
commit 51d8beebee
2 changed files with 349 additions and 197 deletions

View File

@@ -122,7 +122,7 @@
"CloudLinkMonitor": {
"name": "云盘实时链接",
"description": "监控云盘目录文件变化,自动转移链接(不刮削)。",
"version": "1.2",
"version": "1.3",
"icon": "Linkease_A.png",
"author": "thsrite",
"level": 1

View File

@@ -1,5 +1,6 @@
import datetime
import re
import shutil
import threading
import traceback
from pathlib import Path
@@ -16,14 +17,16 @@ from app import schemas
from app.chain.tmdb import TmdbChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.metainfo import MetaInfoPath
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.log import logger
from app.modules.filetransfer import FileTransferModule
from app.plugins import _PluginBase
from app.core.context import MediaInfo
from app.schemas import Notification, NotificationType, TransferInfo
from app.schemas.types import EventType, MediaType
from app.schemas.types import EventType, MediaType, SystemConfigKey
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
@@ -57,7 +60,7 @@ class CloudLinkMonitor(_PluginBase):
# 插件图标
plugin_icon = "Linkease_A.png"
# 插件版本
plugin_version = "1.2"
plugin_version = "1.3"
# 插件作者
plugin_author = "thsrite"
# 作者主页
@@ -65,43 +68,47 @@ class CloudLinkMonitor(_PluginBase):
# 插件配置项ID前缀
plugin_config_prefix = "cloudlinkmonitor_"
# 加载顺序
plugin_order = 5
plugin_order = 4
# 可使用的用户级别
auth_level = 1
# 私有属性
_scheduler = None
transferhis = None
downloadhis = None
transferchian = None
tmdbchain = None
_observer = []
_enabled = False
_notify = False
_onlyonce = False
tmdbchain = None
transferhis = None
transferchian = None
_cron = None
filetransfer = None
_size = 0
# 转移方式
_monitor_dirs = ""
_exclude_keywords = ""
_cloud_medias = {}
# 模式 compatibility/fast
_mode = "compatibility"
_transfer_type = "link"
# 转移方式
_transfer_type = settings.TRANSFER_TYPE
_monitor_dirs = ""
_exclude_keywords = ""
_interval: int = 10
# 存储源目录与目的目录关系
_dirconf: Dict[str, Optional[Path]] = {}
# 存储源目录转移方式
_transferconf: Dict[str, Optional[str]] = {}
_medias = {}
# 退出事件
_event = threading.Event()
def init_plugin(self, config: dict = None):
self.transferhis = TransferHistoryOper()
self.downloadhis = DownloadHistoryOper()
self.transferchian = TransferChain()
self.tmdbchain = TmdbChain()
self.filetransfer = FileTransferModule()
# 清空配置
self._dirconf = {}
self._transferconf = {}
self.tmdbchain = TmdbChain()
self.transferhis = TransferHistoryOper()
self.transferchian = TransferChain()
# 读取配置
if config:
@@ -112,6 +119,7 @@ class CloudLinkMonitor(_PluginBase):
self._transfer_type = config.get("transfer_type")
self._monitor_dirs = config.get("monitor_dirs") or ""
self._exclude_keywords = config.get("exclude_keywords") or ""
self._interval = config.get("interval") or 10
self._cron = config.get("cron")
self._size = config.get("size") or 0
@@ -133,6 +141,12 @@ class CloudLinkMonitor(_PluginBase):
if not mon_path:
continue
# 自定义转移方式
_transfer_type = self._transfer_type
if mon_path.count("#") == 1:
_transfer_type = mon_path.split("#")[1]
mon_path = mon_path.split("#")[0]
# 存储目的目录
if SystemUtils.is_windows():
if mon_path.count(":") > 1:
@@ -144,14 +158,16 @@ class CloudLinkMonitor(_PluginBase):
paths = mon_path.split(":")
# 目的目录
target_path = None
if len(paths) > 1:
mon_path = paths[0]
target_path = Path(paths[1])
self._dirconf[mon_path] = target_path
else:
logger.warn(f"{mon_path} 未配置目的目录,将不会进行连接")
self.systemmessage.put(f"{mon_path} 未配置目的目录,将不会进行连接!")
continue
self._dirconf[mon_path] = None
# 转移方式
self._transferconf[mon_path] = _transfer_type
# 启用目录监控
if self._enabled:
@@ -203,17 +219,6 @@ class CloudLinkMonitor(_PluginBase):
# 保存配置
self.__update_config()
# 全量同步定时
if self._enabled and self._cron:
try:
self._scheduler.add_job(func=self.sync_all,
trigger=CronTrigger.from_crontab(self._cron),
name="云盘实时链接")
except Exception as err:
logger.error(f"定时任务配置错误:{str(err)}")
# 推送实时消息
self.systemmessage.put(f"执行周期配置错误:{str(err)}")
# 启动定时服务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
@@ -231,6 +236,7 @@ class CloudLinkMonitor(_PluginBase):
"transfer_type": self._transfer_type,
"monitor_dirs": self._monitor_dirs,
"exclude_keywords": self._exclude_keywords,
"interval": self._interval,
"cron": self._cron,
"size": self._size
})
@@ -242,27 +248,27 @@ class CloudLinkMonitor(_PluginBase):
"""
if event:
event_data = event.event_data
if not event_data or event_data.get("action") != "realtime_link":
if not event_data or event_data.get("action") != "directory_sync":
return
self.post_message(channel=event.event_data.get("channel"),
title="开始实时连接 ...",
title="开始同步监控目录 ...",
userid=event.event_data.get("user"))
self.sync_all()
if event:
self.post_message(channel=event.event_data.get("channel"),
title="实时连接完成!", userid=event.event_data.get("user"))
title="监控目录同步完成!", userid=event.event_data.get("user"))
def sync_all(self):
"""
立即运行一次,全量同步目录中所有文件
"""
logger.info("开始全量实时连接 ...")
logger.info("开始全量同步监控目录 ...")
# 遍历所有监控目录
for mon_path in self._dirconf.keys():
# 遍历目录下所有文件
for file_path in SystemUtils.list_files(Path(mon_path), ['.*']):
for file_path in SystemUtils.list_files(Path(mon_path), settings.RMT_MEDIAEXT):
self.__handle_file(event_path=str(file_path), mon_path=mon_path)
logger.info("全量实时连接完成!")
logger.info("全量同步监控目录完成!")
def event_handler(self, event, mon_path: str, text: str, event_path: str):
"""
@@ -277,131 +283,6 @@ class CloudLinkMonitor(_PluginBase):
logger.debug("文件%s%s" % (text, event_path))
self.__handle_file(event_path=event_path, mon_path=mon_path)
def _link_file(self, src_path: Path, mon_path: str,
target_path: Path, transfer_type: str = "softlink") -> Tuple[bool, str]:
"""
对文件做纯链接处理,不做识别重命名,则监控模块调用
:param : 来源渠道
:param src_path: 源文件
:param target_path: 目标目录
:param transfer_type: 转移方式
"""
new_file = str(src_path).replace(mon_path, str(target_path))
new_path = Path(new_file)
if new_path.exists():
return True, "目标路径文件已存在"
else:
# 创建目标目录
if not new_path.parent.exists():
new_path.parent.mkdir(parents=True, exist_ok=True)
# 转移
if transfer_type == "copy":
code, errmsg = SystemUtils.copy(src_path, new_path)
return True if code == 0 else False, errmsg
else:
# 元数据
file_meta = MetaInfoPath(src_path)
# 识别媒体信息
mediainfo: MediaInfo = self.chain.recognize_media(meta=file_meta)
# 获取集数据
if mediainfo.type == MediaType.TV:
episodes_info = self.tmdbchain.tmdb_episodes(tmdbid=mediainfo.tmdb_id,
season=file_meta.begin_season or 1)
else:
episodes_info = None
# 转移
transferinfo: TransferInfo = self.chain.transfer(mediainfo=mediainfo,
path=src_path,
transfer_type=transfer_type,
target=target_path,
meta=file_meta,
episodes_info=episodes_info)
if not transferinfo:
logger.error("文件转移模块运行失败")
return False, "文件转移模块运行失败"
if not transferinfo.success:
if not transferinfo.success:
# 转移失败
logger.warn(f"{src_path.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
src_path=src_path,
mode=transfer_type,
meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo
)
if self._notify:
self.chain.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{mediainfo.title_year}{file_meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
image=mediainfo.get_message_image()
))
return False, "文件转移模块运行失败"
# 新增转移成功历史记录
self.transferhis.add_success(
src_path=src_path,
mode=transfer_type,
meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo
)
# 发送消息汇总
media_list = self._cloud_medias.get(mediainfo.title_year + " " + file_meta.season) or {}
if media_list:
media_files = media_list.get("files") or []
if media_files:
file_exists = False
for file in media_files:
if str(src_path) == file.get("path"):
file_exists = True
break
if not file_exists:
media_files.append({
"path": src_path,
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
})
else:
media_files = [
{
"path": src_path,
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
]
media_list = {
"files": media_files,
"time": datetime.datetime.now()
}
else:
media_list = {
"files": [
{
"path": src_path,
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
],
"time": datetime.datetime.now()
}
self._cloud_medias[mediainfo.title_year + " " + file_meta.season] = media_list
# 广播事件
self.eventmanager.send_event(EventType.TransferComplete, {
'meta': file_meta,
'mediainfo': mediainfo,
'transferinfo': transferinfo
})
return True, "文件转移模块运行成功"
def __handle_file(self, event_path: str, mon_path: str):
"""
同步一个文件
@@ -414,6 +295,10 @@ class CloudLinkMonitor(_PluginBase):
return
# 全程加锁
with lock:
transfer_history = self.transferhis.get_by_src(event_path)
if transfer_history:
logger.debug("文件已处理过:%s" % event_path)
return
# 回收站及隐藏的文件不处理
if event_path.find('/@Recycle/') != -1 \
@@ -430,39 +315,214 @@ class CloudLinkMonitor(_PluginBase):
logger.info(f"{event_path} 命中过滤关键字 {keyword},不处理")
return
# 整理屏蔽词不处理
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
if transfer_exclude_words:
for keyword in transfer_exclude_words:
if not keyword:
continue
if keyword and re.search(r"%s" % keyword, event_path, re.IGNORECASE):
logger.info(f"{event_path} 命中整理屏蔽词 {keyword},不处理")
return
# 不是媒体文件不处理
if file_path.suffix not in settings.RMT_MEDIAEXT:
logger.debug(f"{event_path} 不是媒体文件")
return
# 判断是不是蓝光目录
bluray_flag = False
if re.search(r"BDMV[/\\]STREAM", event_path, re.IGNORECASE):
bluray_flag = True
# 截取BDMV前面的路径
blurray_dir = event_path[:event_path.find("BDMV")]
file_path = Path(blurray_dir)
logger.info(f"{event_path} 是蓝光目录,更正文件路径为:{str(file_path)}")
# 查询历史记录,已转移的不处理
if self.transferhis.get_by_src(str(file_path)):
logger.info(f"{file_path} 已整理过")
return
# 元数据
file_meta = MetaInfoPath(file_path)
if not file_meta.name:
logger.error(f"{file_path.name} 无法识别有效信息")
return
# 判断文件大小
if self._size and float(self._size) > 0 and file_path.stat().st_size < float(self._size) * 1024:
logger.info(f"{event_path} 文件大小小于最小文件大小,复制...")
_transfer_type = "copy"
else:
_transfer_type = self._transfer_type
if self._size and float(self._size) > 0 and file_path.stat().st_size < float(self._size) * 1024 ** 3:
logger.info(f"{file_path} 文件大小小于监控文件大小,不处理")
return
# 查询转移目的目录
target: Path = self._dirconf.get(mon_path)
if not target:
logger.warn(f"{mon_path} 未配置目的目录,将不会进行软连接")
# 查询转移方式
transfer_type = self._transferconf.get(mon_path)
# 识别媒体信息
mediainfo: MediaInfo = self.chain.recognize_media(meta=file_meta)
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{file_meta.name}')
# 新增转移成功历史记录
his = self.transferhis.add_fail(
src_path=file_path,
mode=transfer_type,
meta=file_meta
)
if self._notify:
self.post_message(
mtype=NotificationType.Manual,
title=f"{file_path.name} 未识别到媒体信息,无法入库!\n"
f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别转移。"
)
return
# 开始连接
self._link_file(src_path=file_path, mon_path=mon_path,
target_path=target, transfer_type=_transfer_type)
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
if transfer_history:
mediainfo.title = transfer_history.title
logger.info(f"{file_path.name} 识别为:{mediainfo.type.value} {mediainfo.title_year}")
# 获取集数据
if mediainfo.type == MediaType.TV:
episodes_info = self.tmdbchain.tmdb_episodes(tmdbid=mediainfo.tmdb_id,
season=file_meta.begin_season or 1)
else:
episodes_info = None
# 转移
transferinfo: TransferInfo = self.filetransfer.transfer_media(in_path=file_path,
in_meta=file_meta,
mediainfo=mediainfo,
transfer_type=transfer_type,
target_dir=target,
episodes_info=episodes_info)
if not transferinfo:
logger.error("文件转移模块运行失败")
return
if not transferinfo.success:
# 转移失败
logger.warn(f"{file_path.name} 入库失败:{transferinfo.message}")
# 新增转移失败历史记录
self.transferhis.add_fail(
src_path=file_path,
mode=transfer_type,
meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo
)
if self._notify:
self.post_message(
mtype=NotificationType.Manual,
title=f"{mediainfo.title_year}{file_meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
image=mediainfo.get_message_image()
)
return
# 新增转移成功历史记录
self.transferhis.add_success(
src_path=file_path,
mode=transfer_type,
meta=file_meta,
mediainfo=mediainfo,
transferinfo=transferinfo
)
"""
{
"title_year season": {
"files": [
{
"path":,
"mediainfo":,
"file_meta":,
"transferinfo":
}
],
"time": "2023-08-24 23:23:23.332"
}
}
"""
# 发送消息汇总
media_list = self._medias.get(mediainfo.title_year + " " + file_meta.season) or {}
if media_list:
media_files = media_list.get("files") or []
if media_files:
file_exists = False
for file in media_files:
if str(file_path) == file.get("path"):
file_exists = True
break
if not file_exists:
media_files.append({
"path": str(file_path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
})
else:
media_files = [
{
"path": str(file_path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
]
media_list = {
"files": media_files,
"time": datetime.datetime.now()
}
else:
media_list = {
"files": [
{
"path": str(file_path),
"mediainfo": mediainfo,
"file_meta": file_meta,
"transferinfo": transferinfo
}
],
"time": datetime.datetime.now()
}
self._medias[mediainfo.title_year + " " + file_meta.season] = media_list
# 广播事件
self.eventmanager.send_event(EventType.TransferComplete, {
'meta': file_meta,
'mediainfo': mediainfo,
'transferinfo': transferinfo
})
# 移动模式删除空目录
if transfer_type == "move":
for file_dir in file_path.parents:
if len(str(file_dir)) <= len(str(Path(mon_path))):
# 重要,删除到监控目录为止
break
files = SystemUtils.list_files(file_dir, settings.RMT_MEDIAEXT + settings.DOWNLOAD_TMPEXT)
if not files:
logger.warn(f"移动模式,删除空目录:{file_dir}")
shutil.rmtree(file_dir, ignore_errors=True)
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))
def get_state(self) -> bool:
return self._enabled
def send_msg(self):
"""
定时检查是否有媒体处理完,发送统一消息
"""
if not self._cloud_medias or not self._cloud_medias.keys():
if not self._medias or not self._medias.keys():
return
# 遍历检查是否已刮削完,发送消息
for medis_title_year_season in list(self._cloud_medias.keys()):
media_list = self._cloud_medias.get(medis_title_year_season)
for medis_title_year_season in list(self._medias.keys()):
media_list = self._medias.get(medis_title_year_season)
logger.info(f"开始处理媒体 {medis_title_year_season} 消息")
if not media_list:
@@ -478,10 +538,11 @@ class CloudLinkMonitor(_PluginBase):
file_meta = media_files[0].get("file_meta")
mediainfo = media_files[0].get("mediainfo")
# 判断剧集最后更新时间距现在是已超过10秒或者电影发送消息
if (datetime.datetime.now() - last_update_time).total_seconds() > int(10) \
if (datetime.datetime.now() - last_update_time).total_seconds() > int(self._interval) \
or mediainfo.type == MediaType.MOVIE:
# 发送通知
if self._notify:
# 汇总处理文件总大小
total_size = 0
file_count = 0
@@ -513,9 +574,12 @@ class CloudLinkMonitor(_PluginBase):
transferinfo=transferinfo,
season_episode=season_episode)
# 发送完消息移出key
del self._cloud_medias[medis_title_year_season]
del self._medias[medis_title_year_season]
continue
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
@@ -523,24 +587,45 @@ class CloudLinkMonitor(_PluginBase):
:return: 命令关键字、事件、描述、附带数据
"""
return [{
"cmd": "/realtime_link",
"cmd": "/directory_sync",
"event": EventType.PluginAction,
"desc": "实时软连接",
"desc": "目录监控同步",
"category": "管理",
"data": {
"action": "realtime_link"
"action": "directory_sync"
}
}]
def get_api(self) -> List[Dict[str, Any]]:
return [{
"path": "/realtime_link",
"path": "/directory_sync",
"endpoint": self.sync,
"methods": ["GET"],
"summary": "实时软连接",
"description": "实时软连接",
"summary": "目录监控同步",
"description": "目录监控同步",
}]
def get_service(self) -> List[Dict[str, Any]]:
"""
注册插件公共服务
[{
"id": "服务ID",
"name": "服务名称",
"trigger": "触发器cron/interval/date/CronTrigger.from_crontab()",
"func": self.xxx,
"kwargs": {} # 定时器参数
}]
"""
if self._enabled and self._cron:
return [{
"id": "DirMonitor",
"name": "目录监控全量同步服务",
"trigger": CronTrigger.from_crontab(self._cron),
"func": self.sync_all,
"kwargs": {}
}]
return []
def sync(self) -> schemas.Response:
"""
API调用目录同步
@@ -613,7 +698,7 @@ class CloudLinkMonitor(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
'md': 4
},
'content': [
{
@@ -633,7 +718,7 @@ class CloudLinkMonitor(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
'md': 4
},
'content': [
{
@@ -657,7 +742,29 @@ class CloudLinkMonitor(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'interval',
'label': '入库消息延迟',
'placeholder': '10'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
@@ -674,15 +781,15 @@ class CloudLinkMonitor(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'size',
'label': '最小文件大小(KB',
'placeholder': ''
'label': '监控文件大小(GB',
'placeholder': '0'
}
}
]
@@ -704,9 +811,11 @@ class CloudLinkMonitor(_PluginBase):
'model': 'monitor_dirs',
'label': '监控目录',
'rows': 5,
'placeholder': '每一行一个目录,支持以下几种配置方式:\n'
'placeholder': '每一行一个目录,支持以下几种配置方式,转移方式支持 move、copy、link、softlink、rclone_copy、rclone_move\n'
'监控目录\n'
'监控目录#转移方式\n'
'监控目录:转移目的目录\n'
'监控目录:转移目的目录#转移方式'
}
}
]
@@ -749,7 +858,49 @@ class CloudLinkMonitor(_PluginBase):
'props': {
'type': 'info',
'variant': 'tonal',
'text': '最小文件大小:小于最小文件大小的文件将直接复制,其余则软链接'
'text': '监控目录不指定目的目录时,将转移到媒体库目录,并自动创建一级分类目录,同时按配置创建二级分类目录;监控目录指定了目的目录时,不会自动创建一级目录,但会根据配置创建二级分类目录'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '入库消息延迟默认10s如网络较慢可酌情调大有助于发送统一入库消息。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '监控文件大小单位GB0为不开启低于监控文件大小的文件不会被监控转移。'
}
}
]
@@ -762,12 +913,13 @@ class CloudLinkMonitor(_PluginBase):
"enabled": False,
"notify": False,
"onlyonce": False,
"mode": "compatibility",
"transfer_type": "link",
"mode": "fast",
"transfer_type": settings.TRANSFER_TYPE,
"monitor_dirs": "",
"exclude_keywords": "",
"interval": 10,
"cron": "",
"size": ""
"size": 0
}
def get_page(self) -> List[dict]: