Files
2024-03-08 10:08:48 +08:00

541 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import shutil
import time
from pathlib import Path
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple
from app.schemas.types import EventType, MediaImageType, NotificationType, MediaType
from app.utils.system import SystemUtils
class CloudDiskDel(_PluginBase):
# 插件名称
plugin_name = "云盘文件删除"
# 插件描述
plugin_desc = "媒体库删除strm文件后同步删除云盘资源。"
# 插件图标
plugin_icon = "clouddisk.png"
# 插件版本
plugin_version = "1.3"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "clouddiskdel_"
# 加载顺序
plugin_order = 26
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
# 任务执行间隔
_paths = {}
_notify = False
_del_history = False
_video_formats = ('.mp4', '.avi', '.rmvb', '.wmv', '.mov', '.mkv', '.flv', '.ts', '.webm', '.iso', '.mpg')
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._notify = config.get("notify")
self._del_history = config.get("del_history")
for path in str(config.get("path")).split("\n"):
paths = path.split(":")
self._paths[paths[0]] = paths[1]
# 清理插件历史
if self._del_history:
self.del_data(key="history")
self.update_config({
"enabled": self._enabled,
"notify": self._notify,
"path": config.get("path"),
"del_history": False
})
@eventmanager.register(EventType.PluginAction)
def clouddisk_del(self, event: Event):
if not self._enabled:
return
if not event:
return
event_data = event.event_data
if not event_data or event_data.get("action") != "networkdisk_del":
return
logger.info(f"获取到云盘删除请求 {event_data}")
media_path = event_data.get("media_path")
if not media_path:
logger.error("未获取到删除路径")
return
media_name = event_data.get("media_name")
tmdb_id = event_data.get("tmdb_id")
media_type = event_data.get("media_type")
season_num = event_data.get("season_num")
episode_num = event_data.get("episode_num")
# 不是网盘监控路径,直接排除
cloud_file_flag = False
# 判断删除媒体路径是否与配置的媒体库路径相符,相符则继续删除,不符则跳过
for library_path in list(self._paths.keys()):
if str(media_path).startswith(library_path):
cloud_file_flag = True
# 替换网盘路径
media_path = str(media_path).replace(library_path, self._paths.get(library_path))
logger.info(f"获取到moviepilot本地云盘挂载路径 {media_path}")
path = Path(media_path)
if path.is_file() or media_path.endswith(".strm"):
# 删除文件、nfo、jpg等同名文件
pattern = path.stem.replace('[', '?').replace(']', '?')
logger.info(f"开始筛选同名文件 {pattern}")
files = path.parent.glob(f"{pattern}.*")
remove_flag = False
for file in files:
Path(file).unlink()
logger.info(f"云盘文件 {file} 已删除")
self.__remove_json(file)
remove_flag = True
if not remove_flag:
for ext in self._video_formats:
file = path.stem + ext
if Path(file).exists():
Path(file).unlink()
logger.info(f"云盘文件 {file} 已删除")
self.__remove_json(file)
else:
# 非根目录,才删除目录
shutil.rmtree(path)
# 删除目录
logger.warn(f"云盘目录 {path} 已删除")
self.__remove_json(path)
# 判断当前媒体父路径下是否有媒体文件,如有则无需遍历父级
if not SystemUtils.exits_files(path.parent, settings.RMT_MEDIAEXT):
# 判断父目录是否为空, 为空则删除
for parent_path in path.parents:
if str(parent_path.parent) != str(path.root):
# 父目录非根目录,才删除父目录
if not SystemUtils.exits_files(parent_path, settings.RMT_MEDIAEXT):
# 当前路径下没有媒体文件则删除
shutil.rmtree(parent_path)
logger.warn(f"云盘目录 {parent_path} 已删除")
self.__remove_json(parent_path)
break
if cloud_file_flag:
# 发送消息
image = 'https://emby.media/notificationicon.png'
media_type = MediaType.MOVIE if media_type in ["Movie", "MOV"] else MediaType.TV
if self._notify:
backrop_image = self.chain.obtain_specific_image(
mediaid=tmdb_id,
mtype=media_type,
image_type=MediaImageType.Backdrop,
season=season_num,
episode=episode_num
) or image
# 类型
if media_type == MediaType.MOVIE:
msg = f'电影 {media_name} {tmdb_id}'
# 删除电视剧
elif media_type == MediaType.TV and not season_num and not episode_num:
msg = f'剧集 {media_name} {tmdb_id}'
# 删除季 S02
elif media_type == MediaType.TV and season_num and not episode_num:
msg = f'剧集 {media_name} S{season_num} {tmdb_id}'
# 删除剧集S02E02
elif media_type == MediaType.TV and season_num and episode_num:
msg = f'剧集 {media_name} S{season_num}E{episode_num} {tmdb_id}'
else:
msg = media_name
# 发送通知
self.post_message(
mtype=NotificationType.MediaServer,
title="云盘同步删除任务完成",
image=backrop_image,
text=f"{msg}\n"
f"时间 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}"
)
# 读取历史记录
history = self.get_data('history') or []
# 获取poster
poster_image = self.chain.obtain_specific_image(
mediaid=tmdb_id,
mtype=media_type,
image_type=MediaImageType.Poster,
) or image
history.append({
"type": media_type.value,
"title": media_name,
"path": media_path,
"season": season_num if season_num and str(season_num).isdigit() else None,
"episode": episode_num if episode_num and str(episode_num).isdigit() else None,
"image": poster_image,
"del_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
})
# 保存历史
self.save_data("history", history)
def __remove_json(self, path):
"""
删除json中的文件内容
"""
try:
# 删除本地缓存文件
cloud_files_json = os.path.join(settings.PLUGIN_DATA_PATH, "CloudStrm", "cloud_files.json")
if Path(cloud_files_json).exists():
# 删除json文件中已删除部分文件
# 尝试加载本地
with open(cloud_files_json, 'r') as file:
content = file.read()
if content:
__cloud_files = json.loads(content)
if __cloud_files:
if not isinstance(__cloud_files, list):
__cloud_files = [__cloud_files]
if str(path) in __cloud_files:
# 删除已删除文件
__cloud_files.remove(str(path))
# 重新写入本地
file = open(cloud_files_json, 'w')
file.write(json.dumps(__cloud_files))
file.close()
else:
remove_flag = False
# 删除目录下文件
for cloud_file in __cloud_files:
if str(cloud_file).startswith(str(path)):
__cloud_files.remove(cloud_file)
remove_flag = True
if remove_flag:
# 重新写入本地
file = open(cloud_files_json, 'w')
file.write(json.dumps(__cloud_files))
file.close()
except Exception as e:
print(str(e))
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
定义远程控制命令
:return: 命令关键字、事件、描述、附带数据
"""
return [{
"cmd": "/networkdisk_del",
"event": EventType.PluginAction,
"desc": "云盘文件删除",
"category": "",
"data": {
"action": "networkdisk_del"
}
}]
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '开启通知',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'del_history',
'label': '删除历史',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'path',
'rows': '2',
'label': '媒体库路径映射',
'placeholder': '媒体服务器路径:moviepilot内云盘挂载路径一行一个'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '需要开启媒体库删除插件且正确配置排除路径。'
'主要针对于strm文件删除后同步删除云盘资源。'
'如遇删除失败,请检查文件权限问题。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '关于路径映射:'
'emby:/data/series/A.mp4,'
'moviepilot内云盘挂载路径:/mnt/link/series/A.mp4。'
'路径映射填/data:/mnt/link'
}
}
]
}
]
},
]
}
], {
"enabled": False,
"path": "",
"notify": False,
"del_history": False
}
def get_page(self) -> List[dict]:
"""
拼装插件详情页面,需要返回页面配置,同时附带数据
"""
# 查询同步详情
historys = self.get_data('history')
if not historys:
return [
{
'component': 'div',
'text': '暂无数据',
'props': {
'class': 'text-center',
}
}
]
# 数据按时间降序排序
historys = sorted(historys, key=lambda x: x.get('del_time'), reverse=True)
# 拼装页面
contents = []
for history in historys:
htype = history.get("type")
title = history.get("title")
season = history.get("season")
episode = history.get("episode")
image = history.get("image")
del_time = history.get("del_time")
if season:
sub_contents = [
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'类型:{htype}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'标题:{title}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'季:{season}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'集:{episode}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'时间:{del_time}'
}
]
else:
sub_contents = [
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'类型:{htype}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'标题:{title}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'时间:{del_time}'
}
]
contents.append(
{
'component': 'VCard',
'content': [
{
'component': 'div',
'props': {
'class': 'd-flex justify-space-start flex-nowrap flex-row',
},
'content': [
{
'component': 'div',
'content': [
{
'component': 'VImg',
'props': {
'src': image,
'height': 120,
'width': 80,
'aspect-ratio': '2/3',
'class': 'object-cover shadow ring-gray-500',
'cover': True
}
}
]
},
{
'component': 'div',
'content': sub_contents
}
]
}
]
}
)
return [
{
'component': 'div',
'props': {
'class': 'grid gap-3 grid-info-card',
},
'content': contents
}
]
def stop_service(self):
"""
退出插件
"""
pass