Files
archived-MoviePilot-Plugins/plugins/linktosrc/__init__.py
thsrite 29471ee394 revert
2024-06-24 15:06:28 +08:00

205 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
from pathlib import Path
from typing import List, Tuple, Dict, Any
from app.core.config import Settings
from app.log import logger
from app.plugins import _PluginBase
class LinkToSrc(_PluginBase):
# 插件名称
plugin_name = "源文件恢复"
# 插件描述
plugin_desc = "根据MoviePilot的转移记录中的硬链文件恢复源文件"
# 插件图标
plugin_icon = "Time_machine_A.png"
# 插件版本
plugin_version = "1.2"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "linktosrc_"
# 加载顺序
plugin_order = 32
# 可使用的用户级别
auth_level = 1
_onlyonce: bool = False
_link_dirs: str = None
def init_plugin(self, config: dict = None):
if config:
self._onlyonce = config.get("onlyonce")
self._link_dirs = config.get("link_dirs")
if self._onlyonce:
# 执行替换
self._task()
self._onlyonce = False
self.__update_config()
def _task(self):
db_path = Settings().CONFIG_PATH / 'user.db'
try:
gradedb = sqlite3.connect(db_path)
except Exception as e:
logger.error(f"无法打开数据库文件 {db_path},请检查路径是否正确:{str(e)}")
return
transfer_history = []
# 创建游标cursor来执行execute语句
cursor = gradedb.cursor()
if self._link_dirs:
link_dirs = self._link_dirs.split("\n")
for link_dir in link_dirs:
sql = f'''
SELECT
src,
dest
FROM
transferhistory
WHERE
src IS NOT NULL and dest IS NOT NULL and dest like '{link_dir}%';
'''
cursor.execute(sql)
transfer_history += cursor.fetchall()
else:
sql = '''
SELECT
src,
dest
FROM
transferhistory
WHERE
src IS NOT NULL and dest IS NOT NULL;
'''
cursor.execute(sql)
transfer_history = cursor.fetchall()
logger.info(f"查询到历史记录{len(transfer_history)}")
cursor.close()
if not transfer_history:
logger.error("未获取到历史记录,停止处理")
return
for history in transfer_history:
src = history[0]
dest = history[1]
# 判断源文件是否存在
if Path(src).exists():
logger.warn(f"源文件{src}已存在,跳过处理")
continue
# 源文件不存在,目标文件也不存在,跳过
if not Path(dest).exists():
logger.warn(f"源文件{src}不存在且硬链文件{dest}不存在,跳过处理")
continue
# 创建源文件目录,防止目录不存在无法执行
Path(src).parent.mkdir(parents=True, exist_ok=True)
# 目标文件硬链回源文件
Path(src).hardlink_to(dest)
logger.info(f"硬链文件{dest}重新链接回源文件{src}")
logger.info("全部处理完成")
def __update_config(self):
self.update_config({
"onlyonce": self._onlyonce,
"link_dirs": self._link_dirs
})
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '立即运行一次',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'link_dirs',
'label': '需要恢复的硬链接目录',
'rows': 5,
'placeholder': '硬链接目录 (一行一个)'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '根据转移记录中的硬链接恢复源文件',
'style': 'white-space: pre-line;'
}
}
]
}
]
}
]
}
], {
"onlyonce": False,
"link_dirs": ""
}
def get_page(self) -> List[dict]:
pass
def get_state(self) -> bool:
return self._onlyonce
def stop_service(self):
pass