feat 云盘助手插件(未完成)

This commit is contained in:
thsrite
2024-06-21 16:45:51 +08:00
parent 0fa48862ba
commit 7fa1506814
2 changed files with 924 additions and 0 deletions

BIN
icons/cloudassistant.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 KiB

View File

@@ -0,0 +1,924 @@
import datetime
import os
import re
import shutil
import threading
import time
import traceback
from pathlib import Path
from typing import List, Tuple, Dict, Any, Optional
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from app import schemas
from app.chain.tmdb import TmdbChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType, SystemConfigKey
from app.utils.system import SystemUtils
lock = threading.Lock()
class CloudFileMonitorHandler(FileSystemEventHandler):
"""
目录监控响应类
"""
def __init__(self, monpath: str, sync: Any, **kwargs):
super(CloudFileMonitorHandler, self).__init__(**kwargs)
self._watch_path = monpath
self.sync = sync
def on_created(self, event):
self.sync.event_handler(event=event, text="创建",
mon_path=self._watch_path, event_path=event.src_path)
def on_moved(self, event):
self.sync.event_handler(event=event, text="移动",
mon_path=self._watch_path, event_path=event.dest_path)
class CloudAssistant(_PluginBase):
# 插件名称
plugin_name = "云盘助手"
# 插件描述
plugin_desc = "定时移动到云盘,软连接回本地,定时清理无效软连接"
# 插件图标
plugin_icon = "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/cloudassistant.png"
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "cloudassistant_"
# 加载顺序
plugin_order = 4
# 可使用的用户级别
auth_level = 2
# 私有属性
_scheduler = None
transferhis = None
downloadhis = None
transferchian = None
tmdbchain = None
_observer = []
_enabled = False
_notify = False
_onlyonce = False
_cron = None
_clean = False
_transfer = False
# 模式 compatibility/fast
_mode = "fast"
# 转移方式
_transfer_type = "link"
_monitor_dirs = ""
_exclude_keywords = ""
# 存储源目录与目的目录关系
_dirconf: Dict[str, Optional[Path]] = {}
# 存储源目录转移方式
_transferconf: Dict[str, Optional[str]] = {}
_softdirconf: Dict[str, Optional[str]] = {}
_historyconf: Dict[str, Optional[bool]] = {}
# 退出事件
_event = threading.Event()
def init_plugin(self, config: dict = None):
self.transferhis = TransferHistoryOper()
self.downloadhis = DownloadHistoryOper()
self.transferchian = TransferChain()
self.tmdbchain = TmdbChain()
# 清空配置
self._dirconf = {}
self._softdirconf = {}
self._transferconf = {}
self._historyconf = {}
# 读取配置
if config:
self._enabled = config.get("enabled")
self._notify = config.get("notify")
self._onlyonce = config.get("onlyonce")
self._clean = config.get("clean")
self._transfer = config.get("transfer")
self._mode = config.get("mode")
self._transfer_type = config.get("transfer_type")
self._monitor_dirs = config.get("monitor_dirs") or ""
self._exclude_keywords = config.get("exclude_keywords") or ""
self._cron = config.get("cron")
# 清理插件历史
if self._clean:
self.del_data(key="history")
self._clean = False
self.__update_config()
# 停止现有任务
self.stop_service()
if self._enabled or self._onlyonce:
# 定时服务管理器
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 读取目录配置
monitor_dirs = self._monitor_dirs.split("\n")
if not monitor_dirs:
return
for mon_path in monitor_dirs:
# 格式 本地媒体路径:云盘挂载本地路径$软连接回本地路径%True/False#转移方式
# /mnt/meida:/mnt/cloud/115/emby$/mnt/softlink%True
if not mon_path:
continue
# 自定义转移方式
_transfer_type = self._transfer_type
if mon_path.count("#") == 1:
_transfer_type = mon_path.split("#")[1]
mon_path = mon_path.split("#")[0]
# 软连接回本地路径
_soft_path = None
if mon_path.count("$") == 1:
_soft_path = mon_path.split("$")[1]
mon_path = mon_path.split("$")[0]
# 转移完是否删除历史记录
_history = False
if mon_path.count("%") == 1:
_history = mon_path.split("%")[1]
_history = True if _history == "True" else False
mon_path = mon_path.split("%")[0]
# 存储目的目录
if SystemUtils.is_windows():
if mon_path.count(":") > 1:
paths = [mon_path.split(":")[0] + ":" + mon_path.split(":")[1],
mon_path.split(":")[2] + ":" + mon_path.split(":")[3]]
else:
paths = [mon_path]
else:
paths = mon_path.split(":")
# 目的目录
target_path = None
if len(paths) > 1:
mon_path = paths[0]
target_path = Path(paths[1])
self._dirconf[mon_path] = target_path
else:
self._dirconf[mon_path] = None
# 转移方式
self._transferconf[mon_path] = _transfer_type
# 软连接回本地路径
self._softdirconf[mon_path] = _soft_path
# 是否删除历史记录
self._historyconf[mon_path] = _history
# 启用目录监控
if self._enabled:
# 检查媒体库目录是不是下载目录的子目录
try:
if target_path and target_path.is_relative_to(Path(mon_path)):
logger.warn(f"{target_path} 是监控目录 {mon_path} 的子目录,无法监控")
self.systemmessage.put(f"{target_path} 是下载目录 {mon_path} 的子目录,无法监控",
title="目录监控")
continue
except Exception as e:
logger.debug(str(e))
pass
try:
if self._mode == "compatibility":
# 兼容模式目录同步性能降低且NAS不能休眠但可以兼容挂载的远程共享目录如SMB
observer = PollingObserver(timeout=10)
else:
# 内部处理系统操作类型选择最优解
observer = Observer(timeout=10)
self._observer.append(observer)
observer.schedule(CloudFileMonitorHandler(mon_path, self), path=mon_path, recursive=True)
observer.daemon = True
observer.start()
logger.info(f"{mon_path} 的目录监控服务启动")
except Exception as e:
err_msg = str(e)
if "inotify" in err_msg and "reached" in err_msg:
logger.warn(
f"目录监控服务启动出现异常:{err_msg}请在宿主机上不是docker容器内执行以下命令并重启"
+ """
echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf
echo fs.inotify.max_user_instances=524288 | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
""")
else:
logger.error(f"{mon_path} 启动目录监控失败:{err_msg}")
self.systemmessage.put(f"{mon_path} 启动目录监控失败:{err_msg}", title="目录监控")
# 运行一次定时服务
if self._onlyonce:
logger.info("目录监控服务启动,立即运行一次")
self._scheduler.add_job(func=self.sync_all, trigger='date',
run_date=datetime.datetime.now(
tz=pytz.timezone(settings.TZ)) + datetime.timedelta(seconds=3)
)
# 关闭一次性开关
self._onlyonce = False
# 保存配置
self.__update_config()
# 启动定时服务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
def __update_config(self):
"""
更新配置
"""
self.update_config({
"enabled": self._enabled,
"notify": self._notify,
"onlyonce": self._onlyonce,
"clean": self._clean,
"mode": self._mode,
"transfer": self._transfer,
"transfer_type": self._transfer_type,
"monitor_dirs": self._monitor_dirs,
"exclude_keywords": self._exclude_keywords,
"cron": self._cron
})
@eventmanager.register(EventType.PluginAction)
def remote_sync(self, event: Event):
"""
远程全量同步
"""
if event:
event_data = event.event_data
if not event_data or event_data.get("action") != "cloudassistant":
return
self.post_message(channel=event.event_data.get("channel"),
title="云盘助手开始同步监控目录 ...",
userid=event.event_data.get("user"))
self.sync_all()
if event:
self.post_message(channel=event.event_data.get("channel"),
title="云盘助手监控目录同步完成!", userid=event.event_data.get("user"))
def sync_all(self):
"""
立即运行一次,全量同步目录中所有文件
"""
logger.info("云盘助手全量同步监控目录 ...")
# 遍历所有监控目录
for mon_path in self._dirconf.keys():
# 遍历目录下所有文件
for file_path in SystemUtils.list_files(Path(mon_path), [".*"]):
self.__handle_file(event_path=str(file_path), mon_path=mon_path)
logger.info("云盘助手全量同步监控目录完成!")
def event_handler(self, event, mon_path: str, text: str, event_path: str):
"""
处理文件变化
:param event: 事件
:param mon_path: 监控目录
:param text: 事件描述
:param event_path: 事件文件路径
"""
if not event.is_directory:
# 文件发生变化
logger.debug("文件%s%s" % (text, event_path))
self.__handle_file(event_path=event_path, mon_path=mon_path)
def __handle_file(self, event_path: str, mon_path: str):
"""
同步一个文件
:param event_path: 事件文件路径
:param mon_path: 监控目录
"""
file_path = Path(event_path)
try:
if not file_path.exists():
return
# 全程加锁
with lock:
# 回收站及隐藏的文件不处理
if event_path.find('/@Recycle/') != -1 \
or event_path.find('/#recycle/') != -1 \
or event_path.find('/.') != -1 \
or event_path.find('/@eaDir') != -1:
logger.debug(f"{event_path} 是回收站或隐藏的文件")
return
# 命中过滤关键字不处理
if self._exclude_keywords:
for keyword in self._exclude_keywords.split("\n"):
if keyword and re.findall(keyword, event_path):
logger.info(f"{event_path} 命中过滤关键字 {keyword},不处理")
return
# 整理屏蔽词不处理
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
if transfer_exclude_words:
for keyword in transfer_exclude_words:
if not keyword:
continue
if keyword and re.search(r"%s" % keyword, event_path, re.IGNORECASE):
logger.info(f"{event_path} 命中整理屏蔽词 {keyword},不处理")
return
# 判断是不是蓝光目录
if re.search(r"BDMV[/\\]STREAM", event_path, re.IGNORECASE):
# 截取BDMV前面的路径
blurray_dir = event_path[:event_path.find("BDMV")]
file_path = Path(blurray_dir)
logger.info(f"{event_path} 是蓝光目录,更正文件路径为:{str(file_path)}")
# 查询转移目的目录
target: Path = self._dirconf.get(mon_path)
# 查询转移方式
transfer_type = self._transferconf.get(mon_path)
# 软连接回本地路径
soft_path = self._softdirconf.get(mon_path)
# 是否删除历史记录
history_type = self._historyconf.get(mon_path)
# 1、转移到云盘挂载路径
target_cloud_file = str(file_path).replace(str(mon_path), str(target))
retcode = 1
if self._transfer:
retcode = self.__transfer_file(file_path=file_path, target_file=target_cloud_file,
transfer_type=transfer_type)
# 2、软连接回本地路径
if not self._transfer or retcode == 0:
target_soft_file = str(target_cloud_file).replace(str(target), str(soft_path))
retcode = self.__transfer_file(file_path=target_cloud_file, target_file=target_soft_file,
transfer_type="softlink")
if retcode == 0:
# 是否删除本地历史
if history_type:
transferhis = self.transferhis.get_by_src(str(file_path))
if transferhis:
self.transferhis.delete(transferhis.id)
logger.info(f"删除本地历史记录:{transferhis.id}")
# 3、存操作记录
history = self.get_data('history') or []
history.append({
"file_path": file_path,
"transfer_type": transfer_type,
"target_cloud_file": target_cloud_file,
"target_soft_file": target_soft_file,
"delete_history": history_type,
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
})
# 保存历史
self.save_data(key="history", value=history)
# 移动模式删除空目录
if transfer_type == "move":
for file_dir in file_path.parents:
if len(str(file_dir)) <= len(str(Path(mon_path))):
# 重要,删除到监控目录为止
break
files = SystemUtils.list_files(file_dir, settings.RMT_MEDIAEXT + settings.DOWNLOAD_TMPEXT)
if not files:
logger.warn(f"移动模式,删除空目录:{file_dir}")
shutil.rmtree(file_dir, ignore_errors=True)
except Exception as e:
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))
def __transfer_file(self, file_path, target_file, transfer_type):
"""
转移文件
"""
# 如果是文件夹
if Path(target_file).is_dir():
if not Path(target_file).exists():
logger.info(f"创建目标文件夹 {target_file}")
os.makedirs(target_file)
return 1
else:
# 文件
if Path(target_file).exists():
logger.info(f"目标文件 {target_file} 已存在")
return 1
if not Path(target_file).parent.exists():
logger.info(f"创建目标文件夹 {Path(target_file).parent}")
os.makedirs(Path(target_file).parent)
# 媒体文件软连接
retcode, retmsg = self.__transfer_command(file_path, Path(target_file), transfer_type)
logger.info(
f"媒体文件{str(file_path)} {transfer_type}{target_file} {retcode} {retmsg}")
return retcode
@staticmethod
def __transfer_command(file_item: Path, target_file: Path, transfer_type: str) -> int:
"""
使用系统命令处理单个文件
:param file_item: 文件路径
:param target_file: 目标文件路径
:param transfer_type: RmtMode转移方式
"""
with lock:
# 转移
if transfer_type == 'link':
# 硬链接
retcode, retmsg = SystemUtils.link(file_item, target_file)
elif transfer_type == 'softlink':
# 软链接
retcode, retmsg = SystemUtils.softlink(file_item, target_file)
elif transfer_type == 'move':
# 移动
retcode, retmsg = SystemUtils.move(file_item, target_file)
elif transfer_type == 'rclone_move':
# Rclone 移动
retcode, retmsg = SystemUtils.rclone_move(file_item, target_file)
elif transfer_type == 'rclone_copy':
# Rclone 复制
retcode, retmsg = SystemUtils.rclone_copy(file_item, target_file)
else:
# 复制
retcode, retmsg = SystemUtils.copy(file_item, target_file)
if retcode != 0:
logger.error(retmsg)
return retcode
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
定义远程控制命令
:return: 命令关键字、事件、描述、附带数据
"""
return [{
"cmd": "/cloud_assistant",
"event": EventType.PluginAction,
"desc": "云盘助手同步",
"category": "",
"data": {
"action": "cloud_assistant"
}
}]
def get_api(self) -> List[Dict[str, Any]]:
return [{
"path": "/cloud_assistant",
"endpoint": self.sync,
"methods": ["GET"],
"summary": "云盘助手同步",
"description": "云盘助手同步",
}]
def get_service(self) -> List[Dict[str, Any]]:
"""
注册插件公共服务
[{
"id": "服务ID",
"name": "服务名称",
"trigger": "触发器cron/interval/date/CronTrigger.from_crontab()",
"func": self.xxx,
"kwargs": {} # 定时器参数
}]
"""
if self._enabled and self._cron:
return [{
"id": "CloudAssistantSyncAll",
"name": "云盘助手全量同步服务",
"trigger": CronTrigger.from_crontab(self._cron),
"func": self.sync_all,
"kwargs": {}
}]
return []
def sync(self, apikey: str) -> schemas.Response:
"""
API调用目录同步
"""
if apikey != settings.API_TOKEN:
return schemas.Response(success=False, message="API密钥错误")
self.sync_all()
return schemas.Response(success=True)
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '发送通知',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '立即运行一次',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'clean',
'label': '清空插件历史',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'transfer',
'label': '是否转移',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSelect',
'props': {
'model': 'mode',
'label': '监控模式',
'items': [
{'title': '兼容模式', 'value': 'compatibility'},
{'title': '性能模式', 'value': 'fast'}
]
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSelect',
'props': {
'model': 'transfer_type',
'label': '整理方式',
'items': [
{'title': '移动', 'value': 'move'},
{'title': '复制', 'value': 'copy'},
{'title': '软链接', 'value': 'softlink'},
{'title': 'Rclone复制', 'value': 'rclone_copy'},
{'title': 'Rclone移动', 'value': 'rclone_move'}
]
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'cron',
'label': '定时全量同步周期',
'placeholder': '5位cron表达式留空关闭'
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'monitor_dirs',
'label': '监控目录',
'rows': 5,
'placeholder': '本地媒体路径:云盘挂载本地路径$软连接回本地路径%是否删除转移历史记录True/False#转移方式'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'exclude_keywords',
'label': '排除关键词',
'rows': 2,
'placeholder': '每一行一个关键词'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '如未开启转移,则不会从本地媒体路径转移到云盘挂载本地路径,仅会进行软连接操作。'
}
}
]
}
]
}
]
}
], {
"enabled": False,
"notify": False,
"onlyonce": False,
"clean": False,
"transfer": False,
"mode": "fast",
"transfer_type": "link",
"monitor_dirs": "",
"exclude_keywords": "",
"cron": ""
}
def get_page(self) -> List[dict]:
# 查询同步详情
historys = self.get_data('history')
if not historys:
return [
{
'component': 'div',
'text': '暂无数据',
'props': {
'class': 'text-center',
}
}
]
if not isinstance(historys, list):
historys = [historys]
# 按照时间倒序
historys = sorted(historys, key=lambda x: x.get("time") or 0, reverse=True)
msgs = [
{
'component': 'tr',
'props': {
'class': 'text-sm'
},
'content': [
{
'component': 'td',
'props': {
'class': 'whitespace-nowrap break-keep text-high-emphasis'
},
'text': history.get("time")
},
{
'component': 'td',
'text': history.get("file_path")
},
{
'component': 'td',
'text': history.get("transfer_type")
},
{
'component': 'td',
'text': history.get("target_cloud_file")
},
{
'component': 'td',
'text': history.get("target_soft_file")
},
{
'component': 'td',
'text': history.get("delete_history")
}
]
} for history in historys
]
# 拼装页面
return [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTable',
'props': {
'hover': True
},
'content': [
{
'component': 'thead',
'content': [
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': 'time'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '本地文件'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '转移方式'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '云盘文件'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '软连接文件'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '是否删除历史记录'
},
]
},
{
'component': 'tbody',
'content': msgs
}
]
}
]
}
]
}
]
def stop_service(self):
"""
退出插件
"""
if self._observer:
for observer in self._observer:
try:
observer.stop()
observer.join()
except Exception as e:
print(str(e))
self._observer = []
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._event.set()
self._scheduler.shutdown()
self._event.clear()
self._scheduler = None