Files
archived-MoviePilot-Plugins/plugins/cloudsyncdel/__init__.py
thsrite 160780a9df fix
2024-11-11 15:31:55 +08:00

700 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import shutil
import time
from pathlib import Path
from typing import Any, List, Dict, Tuple
from app import schemas
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType, MediaImageType, NotificationType, MediaType
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
class CloudSyncDel(_PluginBase):
# 插件名称
plugin_name = "云盘同步删除"
# 插件描述
plugin_desc = "媒体库删除软连接/strm文件后同步删除云盘文件。"
# 插件图标
plugin_icon = "clouddisk.png"
# 插件版本
plugin_version = "1.5.6"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "cloudsyncdel_"
# 加载顺序
plugin_order = 9
# 可使用的用户级别
auth_level = 2
# 私有属性
_enabled = False
# 任务执行间隔
_paths = {}
_cloud_paths = {}
_local_paths = {}
_notify = False
_url = None
_del_history = False
_video_formats = ('.mp4', '.avi', '.rmvb', '.wmv', '.mov', '.mkv', '.flv', '.ts', '.webm', '.iso', '.mpg')
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._notify = config.get("notify")
self._url = config.get("url")
self._del_history = config.get("del_history")
if config.get("path"):
for path in str(config.get("path")).split("\n"):
paths = path.split("#")[0]
cloud_path = path.split("#")[1]
self._paths[paths.split(":")[0]] = paths.split(":")[1]
self._cloud_paths[paths.split(":")[1]] = cloud_path
if config.get("local_path"):
for path in str(config.get("local_path")).split("\n"):
self._local_paths[path.split(":")[0]] = path.split(":")[1]
# 清理插件历史
if self._del_history:
self.del_data(key="history")
self.update_config({
"enabled": self._enabled,
"notify": self._notify,
"path": config.get("path"),
"url": self._url,
"del_history": False
})
@eventmanager.register(EventType.PluginAction)
def clouddisk_del(self, event: Event = None):
if not self._enabled:
return
if not event:
return
event_data = event.event_data
if not event_data or (
event_data.get("action") != "networkdisk_del" and event_data.get("action") != "cloudsyncdel"):
return
logger.info(f"接收到云盘删除请求 {event_data}")
media_path = event_data.get("media_path")
if not media_path:
logger.error("未获取到删除媒体路径,跳过处理")
return
media_name = event_data.get("media_name")
tmdb_id = event_data.get("tmdb_id")
media_type = event_data.get("media_type")
season_num = event_data.get("season_num")
episode_num = event_data.get("episode_num")
# 本地路径替换
local_path = self.__get_path(self._local_paths, media_path)
logger.info(f"获取到 {self._local_paths} 替换后本地文件路径 {local_path}")
if Path(local_path).exists() and (
Path(local_path).is_dir() or (Path(local_path).is_file() and not Path(local_path).is_symlink())):
if Path(local_path).is_dir():
shutil.rmtree(local_path)
elif Path(local_path).is_file():
Path(local_path).unlink() # 删除文件
logger.info(f"获取到本地路径 {local_path}, 通知媒体库同步删除插件删除")
eventItem = schemas.WebhookEventInfo(event="media_del", channel="emby")
eventItem.item_type = media_type
eventItem.item_name = media_name
eventItem.item_path = local_path
eventItem.tmdb_id = tmdb_id
eventItem.season_id = season_num
eventItem.episode_id = episode_num
eventItem.item_isvirtual = "False"
self.eventmanager.send_event(EventType.WebhookMessage, eventItem)
else:
# 检索相同目录下同名的媒体文件
pattern = Path(local_path).stem.replace('[', '?').replace(']', '?')
logger.info(f"开始筛选 {Path(local_path).parent} 下同名文件 {pattern}")
files = Path(local_path).parent.glob(f"{pattern}.*")
if not files:
logger.info(f"未找到本地同名文件 {pattern},开始删除云盘")
else:
for file in files:
Path(file).unlink()
logger.info(f"本地文件 {file} 已删除")
if Path(file).suffix in settings.RMT_MEDIAEXT:
logger.info(f"获取到本地路径 {local_path}, 通知媒体库同步删除插件删除")
eventItem = schemas.WebhookEventInfo(event="media_del", channel="emby")
eventItem.item_type = media_type
eventItem.item_name = media_name
eventItem.item_path = local_path
eventItem.tmdb_id = tmdb_id
eventItem.season_id = season_num
eventItem.episode_id = episode_num
eventItem.item_isvirtual = "False"
self.eventmanager.send_event(EventType.WebhookMessage, eventItem)
# 删除thumb图片
thumb_file = Path(local_path).parent / (Path(local_path).stem + "-thumb.jpg")
if thumb_file.exists():
thumb_file.unlink()
logger.info(f"本地文件 {thumb_file} 已删除")
media_path = self.__get_path(self._paths, media_path)
if not media_path:
return
logger.info(f"获取到 {self._paths} 替换后本地路径 {media_path}")
# 判断文件是否存在
cloud_file_flag = False
media_path = Path(media_path)
cloud_path = None
if media_path.suffix:
# 删除云盘文件
cloud_file = self.__get_path(self._cloud_paths, str(media_path))
logger.info(f"获取到 {self._cloud_paths} 替换后云盘文件路径 {cloud_file}")
cloud_file_path = Path(cloud_file)
# 删除文件、nfo、jpg等同名文件
pattern = cloud_file_path.stem.replace('[', '?').replace(']', '?')
logger.info(f"开始筛选 {cloud_file_path.parent} 下同名文件 {pattern}")
files = cloud_file_path.parent.glob(f"{pattern}.*")
for file in files:
Path(file).unlink()
logger.info(f"云盘文件 {file} 已删除")
if Path(file).suffix in settings.RMT_MEDIAEXT:
cloud_path = file
cloud_file_flag = True
# 删除thumb图片
thumb_file = cloud_file_path.parent / (cloud_file_path.stem + "-thumb.jpg")
if thumb_file.exists():
thumb_file.unlink()
logger.info(f"云盘文件 {thumb_file} 已删除")
# 删除空目录
# 判断当前媒体父路径下是否有媒体文件,如有则无需遍历父级
if not SystemUtils.exits_files(cloud_file_path.parent, settings.RMT_MEDIAEXT):
# 判断父目录是否为空, 为空则删除
for parent_path in cloud_file_path.parents:
if str(parent_path.parent) != str(cloud_file_path.root):
# 父目录非根目录,才删除父目录
if not SystemUtils.exits_files(parent_path, settings.RMT_MEDIAEXT):
# 当前路径下没有媒体文件则删除
shutil.rmtree(parent_path)
logger.warn(f"云盘目录 {parent_path} 已删除")
cloud_file_flag = True
else:
# 删除云盘文件
cloud_path = self.__get_path(self._cloud_paths, str(media_path))
if Path(cloud_path).exists():
shutil.rmtree(cloud_path)
logger.warn(f"云盘目录 {cloud_path} 已删除")
cloud_file_flag = True
# 发送消息
image = 'https://emby.media/notificationicon.png'
media_type = MediaType.MOVIE if media_type in ["Movie", "MOV"] else MediaType.TV
if cloud_file_flag:
if self._url:
if not Path(cloud_path).suffix or Path(cloud_path).suffix in settings.RMT_MEDIAEXT:
RequestUtils(content_type="application/json").post(url=self._url, json={
"path": str(cloud_path),
"type": "del"
})
if self._notify:
backrop_image = self.chain.obtain_specific_image(
mediaid=tmdb_id,
mtype=media_type,
image_type=MediaImageType.Backdrop,
season=season_num,
episode=episode_num
) or image
# 类型
if media_type == MediaType.MOVIE:
msg = f'电影 {media_name} {tmdb_id}'
# 删除电视剧
elif media_type == MediaType.TV and not season_num and not episode_num:
msg = f'剧集 {media_name} {tmdb_id}'
# 删除季 S02
elif media_type == MediaType.TV and season_num and (not episode_num or not str(episode_num).isdigit()):
msg = f'剧集 {media_name} S{season_num} {tmdb_id}'
# 删除剧集S02E02
elif media_type == MediaType.TV and season_num and episode_num and str(episode_num).isdigit():
msg = f'剧集 {media_name} S{season_num}E{episode_num} {tmdb_id}'
else:
msg = media_name
# 发送通知
self.post_message(
mtype=NotificationType.Plugin,
title="云盘同步删除任务完成",
image=backrop_image,
text=f"{msg}\n"
f"时间 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}"
)
# 读取历史记录
history = self.get_data('history') or []
# 获取poster
poster_image = self.chain.obtain_specific_image(
mediaid=tmdb_id,
mtype=media_type,
image_type=MediaImageType.Poster,
) or image
history.append({
"type": media_type.value,
"title": media_name,
"path": str(media_path),
"season": season_num if season_num and str(season_num).isdigit() else None,
"episode": episode_num if episode_num and str(episode_num).isdigit() else None,
"image": poster_image,
"del_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"unique": f"{media_name} {tmdb_id}"
})
# 保存历史
self.save_data("history", history)
def __get_path(self, paths, file_path: str):
"""
路径转换
"""
if paths and paths.keys():
for library_path in paths.keys():
if str(file_path).startswith(str(library_path)):
# 替换网盘路径
return str(file_path).replace(str(library_path), str(paths.get(str(library_path))))
# 未匹配到路径,返回原路径
return file_path
def delete_history(self, key: str, apikey: str):
"""
删除同步历史记录
"""
if apikey != settings.API_TOKEN:
return schemas.Response(success=False, message="API密钥错误")
# 历史记录
historys = self.get_data('history')
if not historys:
return schemas.Response(success=False, message="未找到历史记录")
# 删除指定记录
historys = [h for h in historys if h.get("unique") != key]
self.save_data('history', historys)
return schemas.Response(success=True, message="删除成功")
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
定义远程控制命令
:return: 命令关键字、事件、描述、附带数据
"""
return [{
"cmd": "/cloudsyncdel",
"event": EventType.PluginAction,
"desc": "云盘同步删除",
"category": "",
"data": {
"action": "cloudsyncdel"
}
}]
def get_api(self) -> List[Dict[str, Any]]:
return [
{
"path": "/delete_history",
"endpoint": self.delete_history,
"methods": ["GET"],
"summary": "删除订阅历史记录"
}
]
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '开启通知',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'del_history',
'label': '清空历史',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'path',
'rows': '2',
'label': '媒体库路径映射(删除云盘文件)',
'placeholder': '媒体服务器软连接/strm路径:MoviePilot软连接/strm路径#MoviePilot云盘路径一行一个'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'local_path',
'rows': '2',
'label': '本地路径映射(回调【媒体文件同步删除】插件删除本地文件)',
'placeholder': '媒体服务器软连接/strm路径:MoviePilot本地文件路径一行一个'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'url',
'label': '任务推送url',
'placeholder': 'post请求json方式推送path和type(del)字段'
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '需要开启媒体库删除插件且正确配置排除路径。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '关于路径映射:'
'emby软连接路径:/data/series/A.mp4,'
'MoviePilot软连接路径:/mnt/link/series/A.mp4。'
'MoviePilot云盘路径:/mnt/cloud/series/A.mp4。'
'路径映射填/data:/mnt/link#/mnt/cloud'
}
}
]
}
]
},
]
}
], {
"enabled": False,
"path": "",
"url": "",
"local_path": "",
"notify": False,
"del_history": False
}
def get_page(self) -> List[dict]:
"""
拼装插件详情页面,需要返回页面配置,同时附带数据
"""
# 查询同步详情
historys = self.get_data('history')
if not historys:
return [
{
'component': 'div',
'text': '暂无数据',
'props': {
'class': 'text-center',
}
}
]
# 数据按时间降序排序
historys = sorted(historys, key=lambda x: x.get('del_time'), reverse=True)
# 拼装页面
contents = []
for history in historys:
htype = history.get("type")
title = history.get("title")
season = history.get("season")
episode = history.get("episode")
image = history.get("image")
del_time = history.get("del_time")
unique = history.get("unique")
if season:
if episode:
sub_contents = [
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'类型:{htype}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'标题:{title}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'季:{season}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'集:{episode}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'时间:{del_time}'
}
]
else:
sub_contents = [
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'类型:{htype}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'标题:{title}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'季:{season}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'时间:{del_time}'
}
]
else:
sub_contents = [
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'类型:{htype}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'标题:{title}'
},
{
'component': 'VCardText',
'props': {
'class': 'pa-0 px-2'
},
'text': f'时间:{del_time}'
}
]
contents.append(
{
'component': 'VCard',
'content': [
{
"component": "VDialogCloseBtn",
"props": {
'innerClass': 'absolute top-0 right-0',
},
'events': {
'click': {
'api': 'plugin/CloudSyncDel/delete_history',
'method': 'get',
'params': {
'key': unique,
'apikey': settings.API_TOKEN
}
}
},
},
{
'component': 'div',
'props': {
'class': 'd-flex justify-space-start flex-nowrap flex-row',
},
'content': [
{
'component': 'div',
'content': [
{
'component': 'VImg',
'props': {
'src': image,
'height': 120,
'width': 80,
'aspect-ratio': '2/3',
'class': 'object-cover shadow ring-gray-500',
'cover': True
}
}
]
},
{
'component': 'div',
'content': sub_contents
}
]
}
]
}
)
return [
{
'component': 'div',
'props': {
'class': 'grid gap-3 grid-info-card',
},
'content': contents
}
]
def stop_service(self):
"""
退出插件
"""
pass