fix CloudStrm3.0 定时扫描

This commit is contained in:
thsrite
2024-03-06 13:14:08 +08:00
parent 457b643acc
commit 0bc7e0ec30
4 changed files with 226 additions and 311 deletions

View File

@@ -11,7 +11,7 @@ MoviePilot三方插件市场https://github.com/thsrite/MoviePilot-Plugins/
- [站点数据统计 1.4](docs%2FSiteStatisticNoMsg.md) (无未读消息版本)(废弃)
- [站点未读消息 1.2](docs%2FSiteUnreadMsg.md)
- [云盘Strm生成 2.2](docs%2FCloudStrm.md)
- [云盘Strm生成 3.0](docs%2FCloudStrm.md)
- [Strm文件模式转换 1.0](docs%2FStrmConvert.md)
- [清理订阅缓存 1.0](docs%2FSubscribeClear.md)
- [添加种子下载 1.0](docs%2FDownloadTorrent.md)

View File

@@ -2,6 +2,7 @@
### 更新记录
- 3.0 实现改为定时扫描
- 2.2 fix#12
- 2.1 增加复制非媒体文件开关
- 2.0 修复重复执行的问题
@@ -18,14 +19,9 @@
目录监控格式:
- 1.监控方式#监控目录#目的目录#媒体服务器内源文件路径
- 2.监控方式#监控目录#目的目录#cd2#cd2挂载本地跟路径#cd2服务地址
- 3.监控方式#监控目录#目的目录#alist#alist挂载本地跟路径#alist服务地址
监控方式:
- fast:性能模式,内部处理系统操作类型选择最优解
- compatibility:兼容模式目录同步性能降低且NAS不能休眠但可以兼容挂载的远程共享目录如SMB (建议使用)
- 1.监控目录#目的目录#媒体服务器内源文件路径
- 2.监控目录#目的目录#cd2#cd2挂载本地跟路径#cd2服务地址
- 3.监控目录#目的目录#alist#alist挂载本地跟路径#alist服务地址
路径:
@@ -41,4 +37,4 @@
- 媒体服务器内源文件路径 /mount/cloud/aliyun/emby`/tvshow/爸爸去哪儿/Season 5/14.特别版.mp4`
- 监控配置为:compatibility#/mount/cloud/aliyun/emby#/mnt/link/aliyun#/mount/cloud/aliyun/emby
- 监控配置为:/mount/cloud/aliyun/emby#/mnt/link/aliyun#/mount/cloud/aliyun/emby

View File

@@ -2,7 +2,7 @@
"CloudStrm": {
"name": "云盘Strm生成",
"description": "监控文件创建生成Strm文件。",
"version": "2.2",
"version": "3.0",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/create.png",
"author": "thsrite",
"level": 1

View File

@@ -1,3 +1,4 @@
import json
import os
import shutil
import urllib.parse
@@ -8,43 +9,22 @@ import pytz
from typing import Any, List, Dict, Tuple, Optional
from apscheduler.schedulers.background import BackgroundScheduler
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from apscheduler.triggers.cron import CronTrigger
from app.log import logger
from app.plugins import _PluginBase
from app.core.config import settings
class FileMonitorHandler(FileSystemEventHandler):
"""
目录监控响应类
"""
def __init__(self, watching_path: str, file_change: Any, **kwargs):
super(FileMonitorHandler, self).__init__(**kwargs)
self._watch_path = watching_path
self.file_change = file_change
# def on_any_event(self, event):
# logger.info(f"目录监控event_type {event.event_type} 路径 {event.src_path}")
def on_created(self, event):
self.file_change.event_handler(event=event, source_dir=self._watch_path, event_path=event.src_path)
def on_moved(self, event):
self.file_change.event_handler(event=event, source_dir=self._watch_path, event_path=event.dest_path)
class CloudStrm(_PluginBase):
# 插件名称
plugin_name = "云盘Strm生成"
# 插件描述
plugin_desc = "监控文件创建生成Strm文件。"
plugin_desc = "定时扫描云盘文件生成Strm文件。"
# 插件图标
plugin_icon = "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/create.png"
# 插件版本
plugin_version = "2.2"
plugin_version = "3.0"
# 插件作者
plugin_author = "thsrite"
# 作者主页
@@ -58,19 +38,21 @@ class CloudStrm(_PluginBase):
# 私有属性
_enabled = False
_cron = None
_monitor_confs = None
_onlyonce = False
_copy_files = False
_relay = 3
_rebuild = False
_observer = []
_video_formats = ('.mp4', '.avi', '.rmvb', '.wmv', '.mov', '.mkv', '.flv', '.ts', '.webm', '.iso', '.mpg', '.m2ts')
__cloud_files_json = "cloud_files.json"
_dirconf = {}
_modeconf = {}
_libraryconf = {}
_cloudtypeconf = {}
_cloudurlconf = {}
_cloudpathconf = {}
__cloud_files = []
# 定时器
_scheduler: Optional[BackgroundScheduler] = None
@@ -78,18 +60,19 @@ class CloudStrm(_PluginBase):
def init_plugin(self, config: dict = None):
# 清空配置
self._dirconf = {}
self._modeconf = {}
self._libraryconf = {}
self._cloudtypeconf = {}
self._cloudurlconf = {}
self._cloudpathconf = {}
self.__cloud_files_json = os.path.join(self.get_data_path(), self.__cloud_files_json)
if config:
self._enabled = config.get("enabled")
self._cron = config.get("cron")
self._onlyonce = config.get("onlyonce")
self._rebuild = config.get("rebuild")
self._copy_files = config.get("copy_files")
self._monitor_confs = config.get("monitor_confs")
self._relay = config.get("relay") or 3
# 停止现有任务
self.stop_service()
@@ -106,19 +89,17 @@ class CloudStrm(_PluginBase):
# 格式 源目录:目的目录:媒体库内网盘路径:监控模式
if not monitor_conf:
continue
if str(monitor_conf).count("#") == 3:
mode = str(monitor_conf).split("#")[0]
source_dir = str(monitor_conf).split("#")[1]
target_dir = str(monitor_conf).split("#")[2]
library_dir = str(monitor_conf).split("#")[3]
if str(monitor_conf).count("#") == 2:
source_dir = str(monitor_conf).split("#")[0]
target_dir = str(monitor_conf).split("#")[1]
library_dir = str(monitor_conf).split("#")[2]
self._libraryconf[source_dir] = library_dir
elif str(monitor_conf).count("#") == 5:
mode = str(monitor_conf).split("#")[0]
source_dir = str(monitor_conf).split("#")[1]
target_dir = str(monitor_conf).split("#")[2]
cloud_type = str(monitor_conf).split("#")[3]
cloud_path = str(monitor_conf).split("#")[4]
cloud_url = str(monitor_conf).split("#")[5]
elif str(monitor_conf).count("#") == 4:
source_dir = str(monitor_conf).split("#")[0]
target_dir = str(monitor_conf).split("#")[1]
cloud_type = str(monitor_conf).split("#")[2]
cloud_path = str(monitor_conf).split("#")[3]
cloud_url = str(monitor_conf).split("#")[4]
self._cloudtypeconf[source_dir] = cloud_type
self._cloudpathconf[source_dir] = cloud_path
self._cloudurlconf[source_dir] = cloud_url
@@ -127,34 +108,21 @@ class CloudStrm(_PluginBase):
continue
# 存储目录监控配置
self._dirconf[source_dir] = target_dir
self._modeconf[source_dir] = mode
# 启用目录监控
if self._enabled:
# 检查媒体库目录是不是下载目录的子目录
try:
if target_dir and Path(target_dir).is_relative_to(Path(source_dir)):
logger.warn(f"{target_dir} 是下载目录 {source_dir} 的子目录,无法监控")
self.systemmessage.put(f"{target_dir} 是下载目录 {source_dir} 的子目录,无法监控")
continue
except Exception as e:
logger.debug(str(e))
pass
# 检查媒体库目录是不是下载目录的子目录
try:
if target_dir and Path(target_dir).is_relative_to(Path(source_dir)):
logger.warn(f"{target_dir} 是下载目录 {source_dir} 的子目录,无法监控")
self.systemmessage.put(f"{target_dir} 是下载目录 {source_dir} 的子目录,无法监控")
continue
except Exception as e:
logger.debug(str(e))
pass
# 异步开启云盘监控
logger.info(f"异步开启云盘监控 {source_dir} {mode}")
self._scheduler.add_job(func=self.start_monitor, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(
seconds=int(self._relay)),
name=f"云盘监控 {source_dir}",
kwargs={
"mode": mode,
"source_dir": source_dir
})
# 运行一次定时服务
if self._onlyonce:
logger.info("云盘监控服务启动,立即运行一次")
self._scheduler.add_job(func=self.sync_all, trigger='date',
logger.info("云盘监控全量执行服务启动,立即运行一次")
self._scheduler.add_job(func=self.__scan, trigger='date',
run_date=datetime.now(tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="云盘监控全量执行")
# 关闭一次性开关
@@ -162,193 +130,181 @@ class CloudStrm(_PluginBase):
# 保存配置
self.__update_config()
# 周期运行
if self._cron:
try:
self._scheduler.add_job(func=self.__scan,
trigger=CronTrigger.from_crontab(self._cron),
name="云盘监控")
except Exception as err:
logger.error(f"定时任务配置错误:{err}")
# 推送实时消息
self.systemmessage.put(f"执行周期配置错误:{err}")
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
def start_monitor(self, mode: str, source_dir: str):
def __scan(self):
"""
异步开启云盘监控
"""
try:
if str(mode) == "compatibility":
# 兼容模式目录同步性能降低且NAS不能休眠但可以兼容挂载的远程共享目录如SMB
observer = PollingObserver(timeout=10)
else:
# 内部处理系统操作类型选择最优解
observer = Observer(timeout=10)
self._observer.append(observer)
observer.schedule(FileMonitorHandler(source_dir, self), path=source_dir, recursive=True)
observer.daemon = True
observer.start()
logger.info(f"{source_dir} 的云盘监控服务启动")
except Exception as e:
err_msg = str(e)
if "inotify" in err_msg and "reached" in err_msg:
logger.warn(
f"云盘监控服务启动出现异常:{err_msg}请在宿主机上不是docker容器内执行以下命令并重启"
+ """
echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf
echo fs.inotify.max_user_instances=524288 | sudo tee -a /etc/sysctl.conf
sudo sysctl -p
""")
else:
logger.error(f"{source_dir} 启动云盘监控失败:{err_msg}")
self.systemmessage.put(f"{source_dir} 启动云盘监控失败:{err_msg}")
def event_handler(self, event, source_dir: str, event_path: str):
"""
处理文件变化
:param event: 事件
:param source_dir: 监控目录
:param event_path: 事件文件路径
"""
# 回收站及隐藏的文件不处理
if (event_path.find("/@Recycle") != -1
or event_path.find("/#recycle") != -1
or event_path.find("/.") != -1
or event_path.find("/@eaDir") != -1):
logger.info(f"{event_path} 是回收站或隐藏的文件,跳过处理")
return
# 文件发生变化
logger.info(f"变动类型 {event.event_type} 变动路径 {event_path}")
self.__handle_file(event=event, event_path=event_path, source_dir=source_dir)
def __handle_file(self, event, event_path: str, source_dir: str):
"""
同步一个文件
:param event_path: 事件文件路径
:param source_dir: 监控目录
"""
try:
# 转移路径
dest_dir = self._dirconf.get(source_dir)
# 媒体库容器内挂载路径
library_dir = self._libraryconf.get(source_dir)
# 云服务类型
cloud_type = self._cloudtypeconf.get(source_dir)
# 云服务挂载本地跟路径
cloud_path = self._cloudpathconf.get(source_dir)
# 云服务地址
cloud_url = self._cloudurlconf.get(source_dir)
# 文件夹同步创建
if event.is_directory:
target_path = event_path.replace(source_dir, dest_dir)
# 目标文件夹不存在则创建
if not Path(target_path).exists():
logger.info(f"创建目标文件夹 {target_path}")
os.makedirs(target_path)
else:
# 文件nfo、图片、视频文件
dest_file = event_path.replace(source_dir, dest_dir)
if Path(dest_file).exists():
logger.debug(f"目标文件 {dest_file} 已存在")
return
# 目标文件夹不存在则创建
if not Path(dest_file).parent.exists():
logger.info(f"创建目标文件夹 {Path(dest_file).parent}")
os.makedirs(Path(dest_file).parent)
# 视频文件创建.strm文件
if event_path.lower().endswith(self._video_formats):
# 创建.strm文件
self.__create_strm_file(dest_file=dest_file,
dest_dir=dest_dir,
source_file=event_path,
library_dir=library_dir,
cloud_type=cloud_type,
cloud_path=cloud_path,
cloud_url=cloud_url)
else:
# 其他nfo、jpg等复制文件
shutil.copy2(event_path, dest_file)
logger.info(f"复制其他文件 {event_path}{dest_file}")
except Exception as e:
logger.error(f"event_handler_created error: {e}")
print(str(e))
def sync_all(self):
"""
同步所有文件
扫描
"""
if not self._dirconf or not self._dirconf.keys():
logger.error("未获取到可用目录监控配置,请检查")
return
# 首次扫描或者重建索引
__init_flag = False
if self._rebuild or not Path(self.__cloud_files_json).exists():
self.__init_cloud_files_json()
self._rebuild = False
self.__update_config()
__init_flag = True
else:
# 尝试加载本地
with open(self.__cloud_files_json, 'r') as file:
content = file.read()
if content:
self.__cloud_files = json.loads(content)
# 本地没加载到则重建索引
if not self.__cloud_files:
self.__init_cloud_files_json()
self._rebuild = False
self.__update_config()
__init_flag = True
# 不是首次索引,则重新扫描、判断是否有新文件
if not __init_flag:
__save_flag = False
for source_dir in self._dirconf.keys():
for root, dirs, files in os.walk(source_dir):
# 如果遇到名为'extrafanart'的文件夹,则跳过处理该文件夹,继续处理其他文件夹
if "extrafanart" in dirs:
dirs.remove("extrafanart")
# 处理文件
for file in files:
source_file = os.path.join(root, file)
# 回收站及隐藏的文件不处理
if (source_file.find("/@Recycle") != -1
or source_file.find("/#recycle") != -1
or source_file.find("/.") != -1
or source_file.find("/@eaDir") != -1):
logger.info(f"{source_file} 是回收站或隐藏的文件,跳过处理")
continue
if source_file not in self.__cloud_files:
logger.info(f"扫描到新文件 {source_file},正在开始处理")
# 云盘文件json新增
self.__cloud_files.append(source_file)
# 扫描云盘文件判断是否有对应strm
self.__strm(source_file)
__save_flag = True
# 重新保存json文件
if __save_flag:
self.__sava_json()
def __init_cloud_files_json(self):
"""
初始化云盘文件json
"""
# init
for source_dir in self._dirconf.keys():
# 转移路径
dest_dir = self._dirconf.get(source_dir)
# 媒体库容器内挂载路径
library_dir = self._libraryconf.get(source_dir)
# 云服务类型
cloud_type = self._cloudtypeconf.get(source_dir)
# 云服务挂载本地跟路径
cloud_path = self._cloudpathconf.get(source_dir)
# 云服务地址
cloud_url = self._cloudurlconf.get(source_dir)
for root, dirs, files in os.walk(source_dir):
# 如果遇到名为'extrafanart'的文件夹,则跳过处理该文件夹,继续处理其他文件夹
if "extrafanart" in dirs:
dirs.remove("extrafanart")
logger.info(f"开始初始化生成strm文件 {source_dir}")
self.__handle_all(source_dir=source_dir,
dest_dir=dest_dir,
library_dir=library_dir,
cloud_type=cloud_type,
cloud_path=cloud_path,
cloud_url=cloud_url)
logger.info(f"{source_dir} 初始化生成strm文件完成")
# 处理文件
for file in files:
source_file = os.path.join(root, file)
# 回收站及隐藏的文件不处理
if (source_file.find("/@Recycle") != -1
or source_file.find("/#recycle") != -1
or source_file.find("/.") != -1
or source_file.find("/@eaDir") != -1):
logger.info(f"{source_file} 是回收站或隐藏的文件,跳过处理")
continue
def __handle_all(self, source_dir, dest_dir, library_dir, cloud_type=None, cloud_path=None, cloud_url=None):
logger.info(f"扫描到新文件 {source_file},正在开始处理")
# 云盘文件json新增
self.__cloud_files.append(source_file)
# 扫描云盘文件判断是否有对应strm
self.__strm(source_file)
# 写入本地文件
if self.__cloud_files:
logger.info(f"开始写入本地文件 {self.__cloud_files_json}")
self.__sava_json()
else:
logger.warning(f"未获取到文件列表")
def __sava_json(self):
"""
遍历生成所有文件的strm
保存json文件
"""
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
file = open(self.__cloud_files_json, 'w')
file.write(json.dumps(self.__cloud_files))
file.close()
for root, dirs, files in os.walk(source_dir):
# 如果遇到名为'extrafanart'的文件夹,则跳过处理该文件夹,继续处理其他文件夹
if "extrafanart" in dirs:
dirs.remove("extrafanart")
def __strm(self, source_file):
"""
判断文件是否有对应strm
"""
try:
# 获取文件的转移路径
for source_dir in self._dirconf.keys():
if str(source_file).startswith(source_dir):
# 转移路径
dest_dir = self._dirconf.get(source_dir)
# 媒体库容器内挂载路径
library_dir = self._libraryconf.get(source_dir)
# 云服务类型
cloud_type = self._cloudtypeconf.get(source_dir)
# 云服务挂载本地跟路径
cloud_path = self._cloudpathconf.get(source_dir)
# 云服务地址
cloud_url = self._cloudurlconf.get(source_dir)
for file in files:
source_file = os.path.join(root, file)
logger.info(f"处理源文件::: {source_file}")
dest_file = os.path.join(dest_dir, os.path.relpath(source_file, source_dir))
if Path(dest_file).exists():
logger.debug(f"目标文件 {dest_file} 已存在")
continue
logger.info(f"开始生成目标文件::: {dest_file}")
# 创建目标目录中缺少的文件夹
if not os.path.exists(Path(dest_file).parent):
os.makedirs(Path(dest_file).parent)
# 如果目标文件已存在,跳过处理
if os.path.exists(dest_file):
logger.warn(f"文件已存在,跳过处理::: {dest_file}")
continue
if file.lower().endswith(self._video_formats):
# 如果视频文件小于1MB则直接复制不创建.strm文件
if os.path.getsize(source_file) < 1024 * 1024:
logger.info(f"视频文件小于1MB的视频文件到:::{dest_file}")
shutil.copy2(source_file, dest_file)
# 转移后文件
dest_file = source_file.replace(source_dir, dest_dir)
# 如果是文件夹
if Path(dest_file).is_dir():
if not Path(dest_file).exists():
logger.info(f"创建目标文件夹 {dest_file}")
os.makedirs(dest_file)
continue
else:
# 创建.strm文件
self.__create_strm_file(dest_file=dest_file,
dest_dir=dest_dir,
source_file=source_file,
library_dir=library_dir,
cloud_type=cloud_type,
cloud_path=cloud_path,
cloud_url=cloud_url)
else:
if self._copy_files:
# 复制文件
logger.info(f"复制其他文件到:::{dest_file}")
shutil.copy2(source_file, dest_file)
if Path(dest_file).exists():
logger.info(f"目标文件 {dest_file} 已存在")
continue
# 文件
if not Path(dest_file).parent.exists():
logger.info(f"创建目标文件夹 {Path(dest_file).parent}")
os.makedirs(Path(dest_file).parent)
# 视频文件创建.strm文件
if dest_file.lower().endswith(self._video_formats):
# 创建.strm文件
self.__create_strm_file(dest_file=dest_file,
dest_dir=dest_dir,
source_file=source_file,
library_dir=library_dir,
cloud_type=cloud_type,
cloud_path=cloud_path,
cloud_url=cloud_url)
else:
if self._copy_files:
# 其他nfo、jpg等复制文件
shutil.copy2(source_file, dest_file)
logger.info(f"复制其他文件 {source_file}{dest_file}")
except Exception as e:
logger.error(f"create strm file error: {e}")
print(str(e))
@staticmethod
def __create_strm_file(dest_file: str, dest_dir: str, source_file: str, library_dir: str = None,
@@ -411,8 +367,9 @@ class CloudStrm(_PluginBase):
self.update_config({
"enabled": self._enabled,
"onlyonce": self._onlyonce,
"rebuild": self._rebuild,
"copy_files": self._copy_files,
"relay": self._relay,
"cron": self._cron,
"monitor_confs": self._monitor_confs
})
@@ -441,7 +398,7 @@ class CloudStrm(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
'md': 4
},
'content': [
{
@@ -457,7 +414,7 @@ class CloudStrm(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
'md': 4
},
'content': [
{
@@ -468,6 +425,22 @@ class CloudStrm(_PluginBase):
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'rebuild',
'label': '重建索引',
}
}
]
}
]
},
@@ -483,9 +456,9 @@ class CloudStrm(_PluginBase):
{
'component': 'VTextField',
'props': {
'model': 'relay',
'label': '监控延迟',
'placeholder': '3'
'model': 'cron',
'label': '定时周期',
'placeholder': '0 0 * * *'
}
}
]
@@ -550,9 +523,9 @@ class CloudStrm(_PluginBase):
'type': 'info',
'variant': 'tonal',
'text': '目录监控格式:'
'1.监控方式#监控目录#目的目录#媒体服务器内源文件路径;'
'2.监控方式#监控目录#目的目录#cd2#cd2挂载本地跟路径#cd2服务地址'
'3.监控方式#监控目录#目的目录#alist#alist挂载本地跟路径#alist服务地址。'
'1.监控目录#目的目录#媒体服务器内源文件路径;'
'2.监控目录#目的目录#cd2#cd2挂载本地跟路径#cd2服务地址'
'3.监控目录#目的目录#alist#alist挂载本地跟路径#alist服务地址。'
}
}
]
@@ -573,31 +546,7 @@ class CloudStrm(_PluginBase):
'props': {
'type': 'info',
'variant': 'tonal',
'text': '媒体服务器内源文件路径:'
'源文件目录即云盘挂载到媒体服务器的路径。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '监控方式:'
'fast:性能模式(快);'
'compatibility:兼容模式(稳,推荐)'
'text': '媒体服务器内源文件路径:源文件目录即云盘挂载到媒体服务器的路径。'
}
}
]
@@ -626,28 +575,6 @@ class CloudStrm(_PluginBase):
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '由于unraid开启云盘监控很慢所以采取异步方式开启磁盘监控'
'具体开启情况可稍等3-5分钟查看日志。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
@@ -674,8 +601,9 @@ class CloudStrm(_PluginBase):
}
], {
"enabled": False,
"relay": 3,
"cron": "",
"onlyonce": False,
"rebuild": False,
"copy_files": False,
"monitor_confs": ""
}
@@ -695,12 +623,3 @@ class CloudStrm(_PluginBase):
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))
if self._observer:
for observer in self._observer:
try:
observer.stop()
observer.join()
except Exception as e:
print(str(e))
self._observer = []