Files
archived-MoviePilot-Plugins/plugins.v2/libraryduplicatecheck/__init__.py
2025-01-13 13:48:45 +08:00

719 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
import shutil
import threading
from datetime import datetime, timedelta
import os
from collections import defaultdict
from pathlib import Path
import pytz
from app.core.config import settings
from app.helper.mediaserver import MediaServerHelper
from app.modules.emby import Emby
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple, Optional
from app.log import logger
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app.schemas.types import EventType, NotificationType
from app.utils.http import RequestUtils
from app.utils.system import SystemUtils
lock = threading.Lock()
class LibraryDuplicateCheck(_PluginBase):
# 插件名称
plugin_name = "媒体库重复媒体检测"
# 插件描述
plugin_desc = "媒体库重复媒体检查,可选保留规则保留其一。"
# 插件图标
plugin_icon = "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/libraryduplicate.png"
# 插件版本
plugin_version = "2.0.2"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "libraryduplicatecheck_"
# 加载顺序
plugin_order = 15
# 可使用的用户级别
auth_level = 2
# 私有属性
_enabled = False
# 任务执行间隔
_paths = {}
_path_type = {}
_path_mediatpye = {}
_notify = False
_delete_softlink = False
_cron = None
_onlyonce = False
_path = None
_retain_type = None
_rmt_mediaext = ".mp4, .mkv, .ts, .iso,.rmvb, .avi, .mov, .mpeg,.mpg, .wmv, .3gp, .asf, .m4v, .flv, .m2ts, .strm,.tp, .f4v"
_mediaservers = None
mediaserver_helper = None
_EMBY_HOST = None
_EMBY_APIKEY = None
_scheduler: Optional[BackgroundScheduler] = None
def init_plugin(self, config: dict = None):
self.mediaserver_helper = MediaServerHelper()
if config:
self._enabled = config.get("enabled")
self._notify = config.get("notify")
self._cron = config.get("cron")
self._delete_softlink = config.get("delete_softlink")
self._onlyonce = config.get("onlyonce")
self._retain_type = config.get("retain_type")
self._path = config.get("path")
self._rmt_mediaext = config.get(
"rmt_mediaext") or ".mp4, .mkv, .ts, .iso,.rmvb, .avi, .mov, .mpeg,.mpg, .wmv, .3gp, .asf, .m4v, .flv, .m2ts, .strm,.tp, .f4v"
self._mediaservers = config.get("mediaservers") or []
self._paths = {}
self._path_type = {}
self._path_mediatpye = {}
if config.get("path"):
for path in str(config.get("path")).split("\n"):
path_mediatpye = '电影'
if path.count("%") == 1:
path_mediatpye = path.split("%")[1]
path = path.split("%")[0]
retain_type = self._retain_type
if path.count("$") == 1:
retain_type = path.split("$")[1]
path = path.split("$")[0]
if path.count("#") == 1:
library_name = path.split("#")[1]
path = path.split("#")[0]
self._paths[path] = library_name
else:
self._paths[path] = None
self._path_type[path] = retain_type
self._path_mediatpye[path] = path_mediatpye
if self._enabled or self._onlyonce:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 立即运行一次
if self._onlyonce:
logger.info(f"媒体库重复媒体检测服务启动,立即运行一次")
self._scheduler.add_job(self.check_duplicate, 'date',
run_date=datetime.now(
tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="媒体库重复媒体检测")
# 关闭一次性开关
self._onlyonce = False
# 保存配置
self.__update_config()
# 周期运行
if self._cron:
try:
self._scheduler.add_job(func=self.check_duplicate,
trigger=CronTrigger.from_crontab(self._cron),
name="媒体库重复媒体检测")
except Exception as err:
logger.error(f"定时任务配置错误:{err}")
# 推送实时消息
self.systemmessage.put(f"执行周期配置错误:{err}")
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
def check_duplicate(self):
"""
检查媒体库重复媒体
"""
if not self._paths and not self._paths.keys():
logger.warning("媒体库重复媒体检测服务未配置路径")
return
emby_servers = self.mediaserver_helper.get_services(name_filters=self._mediaservers, type_filter="emby")
if not emby_servers:
logger.error("未配置Emby媒体服务器")
return
for emby_name, emby_server in emby_servers.items():
logger.info(f"开始检查媒体服务器 {emby_name} 重复媒体")
self._EMBY_APIKEY = emby_server.config.config.get("apikey")
self._EMBY_HOST = emby_server.config.config.get("host")
if not self._EMBY_HOST.endswith("/"):
self._EMBY_HOST += "/"
if not self._EMBY_HOST.startswith("http"):
self._EMBY_HOST = "http://" + self._EMBY_HOST
msg = ""
for path in self._paths.keys():
_retain_type = self._path_type.get(path)
_path_mediatpye = self._path_mediatpye.get(path)
logger.info(f"{emby_name} 开始检查路径:{path} {_retain_type}")
duplicate_files, delete_duplicate_files, delete_cloud_files = self.__find_duplicate_videos(path,
_retain_type,
_path_mediatpye)
logger.info(f"{emby_name} 路径 {path} 检查完毕")
library_name = self._paths.get(path)
if library_name:
logger.info(f"{emby_name} 开始刷新媒体库:{library_name}")
# 获取emby 媒体库
librarys = Emby().get_librarys()
if not librarys:
logger.error(f"{emby_name} 获取媒体库失败")
return
if str(self._retain_type) != '仅检查' and delete_duplicate_files > 0 or delete_cloud_files > 0:
for library in librarys:
if not library:
continue
if library.name == library_name:
logger.info(f"{emby_name} 媒体库:{library_name} 刷新完成")
self.__refresh_emby_library_by_id(library.id)
break
msg += (f"{path}{'#' + library_name if library_name else ''} 检查完成\n"
f"文件保留规则: {_retain_type}\n"
f"本地重复文件: {duplicate_files}\n"
f"删除本地文件: {delete_duplicate_files}\n"
f"删除云盘文件: {delete_cloud_files}\n\n")
if self._notify:
self.post_message(
mtype=NotificationType.Plugin,
title=f"{emby_name} 媒体库重复媒体检测",
text=msg,
link=settings.MP_DOMAIN('#/history')
)
logger.info(f"{emby_name} 媒体库重复媒体检测完成")
def __refresh_emby_library_by_id(self, item_id: str) -> bool:
"""
通知Emby刷新一个项目的媒体库
"""
if not self._EMBY_HOST or not self._EMBY_APIKEY:
return False
req_url = "%semby/Items/%s/Refresh?Recursive=true&api_key=%s" % (self._EMBY_HOST, item_id, self._EMBY_APIKEY)
try:
res = RequestUtils().post_res(req_url)
if res:
return True
else:
logger.info(f"刷新媒体库对象 {item_id} 失败无法连接Emby")
except Exception as e:
logger.error(f"连接Items/Id/Refresh出错" + str(e))
return False
return False
def __find_duplicate_videos(self, directory, retain_type, path_mediatpye):
"""
检查目录下视频文件是否有重复
"""
# Dictionary to hold the list of files for each video name
video_files = defaultdict(list)
duplicate_files = 0
delete_duplicate_files = 0
delete_cloud_files = 0
# Traverse the directory and subdirectories
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
# Check the file extension
if (Path(str(file_path)).exists() or os.path.islink(file_path)) and Path(file).suffix.lower() in [
ext.strip() for ext in self._rmt_mediaext.split(",")]:
video_name = Path(file).stem.split('-')[0].rstrip()
if str(path_mediatpye) == '电视剧':
# 使用正则表达式匹配
match = re.search(r"S\d+E\d+", Path(file).stem)
if match:
video_name += f" {match.group(0)}"
logger.info(f'Scan file -> {file} -> {video_name}')
video_files[video_name].append(file_path)
logger.info("\n================== RESULT ==================\n")
# 全程加锁
with lock:
# 按照 paths 的长度对 video_files 进行排序
sorted_video_files = sorted(video_files.items(), key=lambda item: len(item[1]))
# Find and handle duplicate video files
for name, paths in sorted_video_files:
if len(paths) > 1:
duplicate_files += len(paths)
logger.info(f"Duplicate video files for '{name}':")
for path in paths:
if Path(path).exists() or os.path.islink(path):
logger.info(f" {path} 文件大小:{os.path.getsize(path)},创建时间:{os.path.getmtime(path)}")
else:
logger.info(f" {path} 文件已被删除")
# Decide which file to keep based on criteria (e.g., file size or creation date)
logger.info(f"文件保留规则:{str(retain_type)}")
keep_file = self.__choose_file_to_keep(paths, retain_type)
logger.info(f"本地保留文件: {keep_file}")
if self._delete_softlink:
keep_cloud_file = os.readlink(str(keep_file))
logger.info(f"云盘保留文件: {keep_cloud_file}")
# Delete the other duplicate files (if needed)
for path in paths:
if (Path(path).exists() or os.path.islink(path)) and str(path) != str(keep_file):
delete_duplicate_files += 1
self.__delete_duplicate_file(duplicate_file=path,
paths=paths,
keep_file=str(keep_file),
file_type="监控")
if self._delete_softlink:
# 同步删除软连接源目录
cloud_file = os.readlink(path)
if cloud_file and Path(cloud_file).exists():
delete_cloud_files += 1
self.__delete_duplicate_file(duplicate_file=cloud_file,
paths=paths,
keep_file=keep_cloud_file,
file_type="云盘")
else:
logger.info(f"'{name}' No Duplicate video files.")
return duplicate_files, delete_duplicate_files, delete_cloud_files
def __delete_duplicate_file(self, duplicate_file, paths, keep_file, file_type):
"""
删除重复文件
"""
cloud_file_path = Path(duplicate_file)
# 删除文件、nfo、jpg等同名文件
pattern = cloud_file_path.stem.replace('[', '?').replace(']', '?')
files = list(cloud_file_path.parent.glob(f"{pattern}.*"))
logger.info(f"筛选 {cloud_file_path.parent} 下同名文件 {pattern}.* {len(files)}")
media_files = []
for file in files:
if Path(file).suffix.lower() in [ext.strip() for ext in
self._rmt_mediaext.split(",")]:
media_files.append(Path(file).stem)
media_files = list(set(media_files))
if len(media_files) == len(paths):
# 说明两个重名的同名删除非keep媒体文件保留刮削文件
for file in media_files:
if str(file) != str(keep_file):
if str(self._retain_type) != "仅检查":
Path(file).unlink()
logger.info(f"{file_type}文件 {file} 已删除")
else:
logger.warning(f"{file_type}文件 {file} 将被删除")
else:
for file in files:
if str(file) != str(keep_file):
if str(self._retain_type) != "仅检查":
Path(file).unlink()
logger.info(f"{file_type}文件 {file} 已删除")
else:
logger.warning(f"{file_type}文件 {file} 将被删除")
# 删除thumb图片
thumb_file = cloud_file_path.parent / (cloud_file_path.stem + "-thumb.jpg")
if thumb_file.exists():
if str(self._retain_type) != "仅检查":
thumb_file.unlink()
logger.info(f"{file_type}文件 {thumb_file} 已删除")
else:
logger.warning(f"{file_type}文件 {thumb_file} 将被删除")
self.__rmtree(Path(duplicate_file), file_type)
def __rmtree(self, path: Path, file_type: str):
"""
删除目录及其子目录
"""
# 判断当前媒体父路径下是否有媒体文件,如有则无需遍历父级
if not SystemUtils.exits_files(path.parent, [ext.strip() for ext in
self._rmt_mediaext.split(",")]):
# 判断父目录是否为空, 为空则删除
for parent_path in path.parents:
if str(parent_path.parent) != str(path.root):
# 父目录非根目录,才删除父目录
if not SystemUtils.exits_files(parent_path, [ext.strip() for ext in
self._rmt_mediaext.split(",")]):
if parent_path.exists():
# 当前路径下没有媒体文件则删除
if str(self._retain_type) != "仅检查":
shutil.rmtree(parent_path)
logger.warn(f"{file_type}目录 {parent_path} 已删除")
else:
logger.warning(f"{file_type}目录 {parent_path} 将被删除")
@staticmethod
def __choose_file_to_keep(paths, retain_type):
checked = None
checked_path = None
for path in paths:
if str(retain_type) == "保留体积最小":
selected = os.path.getsize(path)
if checked is None or selected < checked:
checked = selected
checked_path = path
elif str(retain_type) == "保留体积最大":
selected = os.path.getsize(path)
if checked is None or selected > checked:
checked = selected
checked_path = path
elif str(retain_type) == "保留创建最早":
selected = os.path.getmtime(path)
if checked is None or selected < checked:
checked = selected
checked_path = path
elif str(retain_type) == "保留创建最晚":
selected = os.path.getmtime(path)
if checked is None or selected > checked:
checked = selected
checked_path = path
return checked_path
def __update_config(self):
self.update_config({
"enabled": self._enabled,
"onlyonce": self._onlyonce,
"cron": self._cron,
"delete_softlink": self._delete_softlink,
"notify": self._notify,
"path": self._path,
"retain_type": self._retain_type,
"rmt_mediaext": self._rmt_mediaext,
"mediaservers": self._mediaservers,
})
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
"""
定义远程控制命令
:return: 命令关键字、事件、描述、附带数据
"""
return [{
"cmd": "/libraryduplicatecheck",
"event": EventType.PluginAction,
"desc": "媒体库重复媒体检测",
"category": "",
"data": {
"action": "libraryduplicatecheck"
}
}]
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '开启通知',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'delete_softlink',
'label': '删除软连接源文件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '立即运行一次',
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VCronField',
'props': {
'model': 'cron',
'label': '执行周期',
'placeholder': '5位cron表达式留空自动'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSelect',
'props': {
'multiple': False,
'chips': True,
'model': 'retain_type',
'label': '保留规则',
'items': [
{'title': '仅检查', 'value': '仅检查'},
{'title': '保留体积最小', 'value': '保留体积最小'},
{'title': '保留体积最大', 'value': '保留体积最大'},
{'title': '保留创建最早', 'value': '保留创建最早'},
{'title': '保留创建最晚', 'value': '保留创建最晚'},
]
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'path',
'label': '检查路径',
'rows': 2,
'placeholder': "检查的媒体路径\n"
"检查的媒体路径$保留规则\n"
"检查的媒体路径#媒体库名称\n"
"检查的媒体路径#媒体库名称$保留规则\n"
"检查的媒体路径#媒体库名称$保留规则%电视剧\n"
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'rmt_mediaext',
'label': '视频格式',
'rows': 2,
'placeholder': ".mp4, .mkv, .ts, .iso,.rmvb, .avi, .mov, .mpeg,.mpg, .wmv, .3gp, .asf, .m4v, .flv, .m2ts, .strm,.tp, .f4v"
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VSelect',
'props': {
'multiple': True,
'chips': True,
'clearable': True,
'model': 'mediaservers',
'label': '媒体服务器',
'items': [{"title": config.name, "value": config.name}
for config in self.mediaserver_helper.get_configs().values() if
config.type == "emby"]
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '检测指定路径下同一媒体文件是否有重复(不同扩展名视为同一媒体)。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '检查路径配置`#媒体库`名称时会通知Emby刷新媒体库。检查路径配置`%电视剧`可指定处理媒体的格式。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '检查路径自定义配置`$保留规则`且插件保留规则为`仅检查`时,将会预览操作而不删除重复文件。'
}
}
]
}
]
}
]
}
], {
"enabled": False,
"onlyonce": False,
"delete_softlink": False,
"cron": "5 1 * * *",
"path": "",
"notify": False,
"retain_type": "仅检查",
"mediaservers": [],
"rmt_mediaext": ".mp4, .mkv, .ts, .iso,.rmvb, .avi, .mov, .mpeg,.mpg, .wmv, .3gp, .asf, .m4v, .flv, .m2ts, .strm,.tp, .f4v"
}
def get_page(self) -> List[dict]:
pass
def stop_service(self):
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))