Update:VCB辅助整理插件功能重构

This commit is contained in:
Pixel-LH
2024-09-01 06:04:09 +08:00
parent 0c2a48ef63
commit f72f92ab37
3 changed files with 278 additions and 190 deletions

View File

@@ -324,6 +324,7 @@
"author": "pixel@qingwa",
"level": 2,
"history": {
"v1.8.1": "重构插件,测试版",
"v1.8": "增加了元数据刮削开关,升级后需要手动打开,否则默认不刮削",
"v1.7.1": "修复偶尔安装失败问题"
}

View File

@@ -5,7 +5,6 @@ import threading
import time
import traceback
from pathlib import Path
from time import sleep
from typing import List, Tuple, Dict, Any, Optional
import pytz
import qbittorrentapi
@@ -19,8 +18,6 @@ from app.chain.tmdb import TmdbChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.metainfo import MetaInfoPath
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.transferhistory_oper import TransferHistoryOper
from app.log import logger
@@ -77,7 +74,7 @@ class VCBAnimeMonitor(_PluginBase):
# 插件图标
plugin_icon = "vcbmonitor.png"
# 插件版本
plugin_version = "1.8"
plugin_version = "1.8.1"
# 插件作者
plugin_author = "pixel@qingwa"
# 作者主页
@@ -224,7 +221,8 @@ class VCBAnimeMonitor(_PluginBase):
try:
if target_path and target_path.is_relative_to(Path(mon_path)):
logger.warn(f"{target_path} 是监控目录 {mon_path} 的子目录,无法监控")
self.systemmessage.put(f"{target_path} 是下载目录 {mon_path} 的子目录,无法监控", title="整理VCB动漫压制组作品")
self.systemmessage.put(f"{target_path} 是下载目录 {mon_path} 的子目录,无法监控",
title="整理VCB动漫压制组作品")
continue
except Exception as e:
logger.debug(str(e))
@@ -382,27 +380,49 @@ class VCBAnimeMonitor(_PluginBase):
return
# 元数据
if file_path.parent.name == "SPs":
logger.warn("位于SPs目录下,跳过处理")
if file_path.parent.name in ["SPs", "Scans", "CDs"]:
logger.warn("位于特典等其他特殊目录下,跳过处理")
return
remeta = ReMeta(ova_switch=self._switch_ova, high_performance=self._high_mode)
if 'VCB-Studio' not in file_path.stem.strip():
logger.warn("不属于VCB的作品不处理")
return
remeta = ReMeta(ova_switch=self._switch_ova, )
file_meta = remeta.handel_file(file_path=file_path)
if file_meta:
if not file_meta.name:
logger.error(f"{file_path.name} 无法识别有效信息")
return
if remeta.is_special and not self._switch_ova:
if remeta.is_ova and not self._switch_ova:
logger.warn(f"{file_path.name} 为OVA资源未开启OVA开关不处理")
return
if remeta.is_special and self._switch_ova:
logger.info(f"{file_path.name} 为OVA资源,开始处理")
if self.get_data(key=f"OVA_{file_meta.title}") is not None:
ova_history_ep = int(self.get_data(key=f"OVA_{file_meta.title}")) + 1
file_meta.begin_episode = ova_history_ep
self.save_data(key=f"OVA_{file_meta.title}", value=ova_history_ep)
# if remeta.is_ova and self._switch_ova:
# logger.info(f"{file_path.name} 为OVA资源,开始处理")
# if self.get_data(key=f"OVA_{file_meta.title}") is not None:
# ova_history_ep = int(self.get_data(key=f"OVA_{file_meta.title}")) + 1
# file_meta.begin_episode = ova_history_ep
# self.save_data(key=f"OVA_{file_meta.title}", value=ova_history_ep)
# else:
# file_meta.begin_episode = 1
# self.save_data(key=f"OVA_{file_meta.title}", value=1)
if remeta.is_ova and self._switch_ova:
logger.info(f"{file_path.name} 为OVA资源,开始历史记录处理")
ova_history_ep_list = self.plugindata.get(file_meta.title, [])
if ova_history_ep_list:
ep = file_meta.begin_episode
if ep in ova_history_ep_list:
for i in range(1, 100):
if ep + i not in ova_history_ep_list:
ova_history_ep_list.append(ep + i)
file_meta.begin_episode = ep + i
break
else:
ova_history_ep_list.append(ep)
self.plugindata.put(file_meta.title, ova_history_ep_list)
else:
file_meta.begin_episode = 1
self.save_data(key=f"OVA_{file_meta.title}", value=1)
self.plugindata.put(file_meta.title, [file_meta.begin_episode])
else:
return

View File

@@ -1,203 +1,270 @@
import concurrent
import re
from dataclasses import dataclass
from pathlib import Path
from typing import List
from app.chain.media import MediaChain
from app.chain.tmdb import TmdbChain
from app.core.metainfo import MetaInfoPath
from app.log import logger
from app.schemas import MediaType
season_patterns = [
{"pattern": re.compile(r"S(\d+)$", re.IGNORECASE), "group": 1},
{"pattern": re.compile(r"(\d+)$", re.IGNORECASE), "group": 1},
{"pattern": re.compile(r"(\d+)(st|nd|rd|th)?\s*season", re.IGNORECASE), "group": 1},
{"pattern": re.compile(r"(.*) ?\s*season (\d+)", re.IGNORECASE), "group": 2},
{"pattern": re.compile(r"\s(II|III|IV|V|VI|VII|VIII|IX|X)$", re.IGNORECASE), "group": "1"}
]
episode_patterns = [
{"pattern": re.compile(r"(\d+)\((\d+)\)", re.IGNORECASE), "group": 2},
{"pattern": re.compile(r"(\d+)", re.IGNORECASE), "group": 1},
{"pattern": re.compile(r'(\d+)v\d+', re.IGNORECASE), "group": 1},
]
def roman_to_int(s) -> int:
"""
:param s: 罗马数字字符串
罗马数字转整数
"""
roman_dict = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 'C': 100, 'D': 500, 'M': 1000}
total = 0
prev_value = 0
ova_patterns = [
re.compile(r".*?(OVA|OAD).*?", re.IGNORECASE),
re.compile(r"\d+\.5"),
re.compile(r"00")
]
for char in reversed(s): # 反向遍历罗马数字字符串
current_value = roman_dict[char]
if current_value >= prev_value:
total += current_value # 如果当前值大于等于前一个值,加上当前值
else:
total -= current_value # 如果当前值小于前一个值,减去当前值
prev_value = current_value
final_season_patterns = [
re.compile('final season', re.IGNORECASE),
re.compile('The Final', re.IGNORECASE),
re.compile(r'\sFinal')
]
return total
@dataclass
class VCBMetaBase:
# 转化为小写后的原始文件名称 (不含后缀)
original_title: str = ""
# 解析后不包含季度和集数的标题
title: str = ""
# 类型:TV / Movie (默认TV)
type: str = "TV"
# 可能含有季度的标题,一级解析后的标题
season_title: str = ""
# 可能含有集数的字符串列表
ep_title: List[str] = None
# 识别出来的季度
season: int = None
# 识别出来的集数
ep: int = None
# 是否是OVA/OAD
is_ova: bool = False
blocked_words = ["vcb-studio", "360p", "480p", "720p", "1080p", "2160p", "hdr", "x265", "x264", "aac", "flac"]
class ReMeta:
# 解析之后的标题:
title: str = None
# 识别出来的集数
ep: int = None
# 识别出来的季度
season: int = None
# 特殊季识别开关
is_special = False
# OVA/OAD识别开关
ova_switch: bool = False
# 高性能处理开关
high_performance = False
season_patterns = [
{"pattern": re.compile(r"S(\d+)$"), "group": 1},
{"pattern": re.compile(r"(\d+)$"), "group": 1},
{"pattern": re.compile(r"(\d+)(st|nd|rd|th)?\s*[Ss][Ee][Aa][Ss][Oo][Nn]"), "group": 1},
{"pattern": re.compile(r"(.*) ?\s*[Ss][Ee][Aa][Ss][Oo][Nn] (\d+)"), "group": 2},
{"pattern": re.compile(r"\s(II|III|IV|V|VI|VII|VIII|IX|X)$"), "group": "1"}
]
episode_patterns = [
{"pattern": re.compile(r"\[(\d+)\((\d+)\)]"), "group": 2},
{"pattern": re.compile(r"\[(\d+)]"), "group": 1},
{"pattern": re.compile(r'\[(\d+)v\d+]'), "group": 1},
]
_ova_patterns = [re.compile(r"\[.*?(OVA|OAD).*?]"),
re.compile(r"\[\d+\.5]"),
re.compile(r"\[00\]")]
final_season_patterns = [re.compile('final season', re.IGNORECASE),
re.compile('The Final', re.IGNORECASE),
re.compile(r'\sFinal')
]
# 自定义添加的季度正则表达式
_custom_season_patterns = []
def __init__(self, ova_switch: bool = False, high_performance: bool = False):
def __init__(self, ova_switch: bool = False, custom_season_patterns: list[dict] = None):
self.meta = None
# TODO:自定义季度匹配规则
self.custom_season_patterns = custom_season_patterns
self.season_patterns = season_patterns
self.ova_switch = ova_switch
self.high_performance = high_performance
self.vcb_meta = VCBMetaBase()
self.is_ova = False
def is_tv(self, title: str) -> bool:
"""
判断是否是TV
"""
if title.count("[") != 4 and title.count("]") != 4:
self.vcb_meta.type = "Movie"
self.vcb_meta.title = re.sub(r'\[.*?\]', '', title).strip()
return False
return True
def handel_file(self, file_path: Path):
file_name = file_path.stem.strip().lower()
self.vcb_meta.original_title = file_name
if not self.is_tv(file_name):
logger.warn(
"不符合VCB-Studio的剧集命名规范归类为电影,跳过剧集模块处理。注意:年份较为久远的作品可能会判断错误")
else:
self.tv_mode()
self.is_ova = self.vcb_meta.is_ova
meta = MetaInfoPath(file_path)
self.title = meta.title
self.title = Path(self.title).stem.strip()
if 'VCB-Studio' not in meta.title:
logger.warn("不属于VCB的作品不处理")
return None
if meta.title.count("[") != 4 and meta.title.count("]") != 4:
# 可能是电影,电影只有三组[],因此去除所有[]后只剩下电影名
logger.warn("不符合VCB-Studio的剧集命名规范跳过剧集模块处理交给默认处理逻辑")
meta.title = re.sub(r'\[.*?\]', '', meta.title).strip()
meta.en_name = meta.title
return meta
split_title: List[str] | None = self.split_season_ep(self.title)
if split_title:
self.handle_season_ep(split_title)
if self.season is not None:
meta.begin_season = self.season
else:
logger.warn("未识别出季度,默认处理逻辑返回第一季")
if self.ep is not None:
meta.begin_episode = self.ep
else:
logger.warn("未识别出集数,默认处理逻辑返回第一集")
meta.title = self.title
meta.en_name = self.title
logger.info(f"识别出季度为{self.season},集数为{self.ep},标题为:{self.title}")
meta.title = self.vcb_meta.title
meta.en_name = self.vcb_meta.title
meta.begin_season = self.vcb_meta.season
if self.vcb_meta.ep:
meta.begin_episode = self.vcb_meta.ep
if self.vcb_meta.type == "Movie":
meta.type = MediaType.MOVIE
return meta
# 分离季度部分和集数部分
def split_season_ep(self, pre_title: str):
split_ep = re.findall(r"(\[.*?])", pre_title)[1]
if not split_ep:
logger.warn("未识别出集数位置信息,结束识别!")
return None
split_title = re.sub(r"\[.*?\]", "", pre_title).strip()
logger.info(f"分离出包含季度的部分:{split_title} \n 分离出包含集数的部分: {split_ep}")
return [split_title, split_ep]
def split_season_ep(self):
# 把所有的[] 里面的内容获取出来,不需要[]本身
self.vcb_meta.ep_title = re.findall(r'\[(.*?)\]', self.vcb_meta.original_title)
# 去除所有[]后只剩下剧名
self.vcb_meta.season_title = re.sub(r"\[.*?\]", "", self.vcb_meta.original_title).strip()
if self.vcb_meta.ep_title:
self.culling_blocked_words()
logger.info(
f"分离出包含可能季度的内容部分:{self.vcb_meta.season_title} | 可能包含集数的内容部分: {self.vcb_meta.ep_title}")
self.vcb_meta.title = self.vcb_meta.season_title
if not self.vcb_meta.ep_title:
self.vcb_meta.title = self.vcb_meta.season_title
logger.warn("未识别出可能存在集数位置的信息,跳过剩余识别步骤!")
def handle_season_ep(self, title: List[str]):
if self.high_performance:
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
title_season_result = executor.submit(self.handle_season, title[0])
ep_result = executor.submit(self.re_ep, title[1], )
try:
title_season_result = title_season_result.result() # Blocks until the task is complete.
ep_result = ep_result.result() # Blocks until the task is complete.
except Exception as exc:
print('Generated an exception: %s' % exc)
else:
title_season_result = self.handle_season(title[0])
ep_result = self.re_ep(title[1])
self.title = title_season_result["title"]
is_ova = ep_result["is_ova"]
if ep_result["ep"] is not None:
self.ep = ep_result["ep"]
if title_season_result["season"]:
self.season = title_season_result["season"]
if is_ova:
self.season = 0
self.is_special = True
def tv_mode(self):
logger.info("开始分离季度和集数部分")
self.split_season_ep()
if not self.vcb_meta.ep_title:
return
self.parse_season()
self.parse_episode()
# 处理季度
def handle_season(self, pre_title: str) -> dict:
title_season = {"title": pre_title, "season": 1}
for season_pattern in self.season_patterns:
pattern = season_pattern["pattern"]
group = season_pattern["group"]
match = pattern.search(pre_title)
def parse_season(self):
"""
从标题中解析季度
"""
flag = False
for pattern in season_patterns:
match = pattern["pattern"].search(self.vcb_meta.season_title)
if match:
if type(group) == str:
title_season["season"] = roman_to_int(match.group(int(group)))
title_season["title"] = re.sub(pattern, "", pre_title).strip()
if isinstance(pattern["group"], int):
self.vcb_meta.season = int(match.group(pattern["group"]))
else:
title_season["season"] = int(match.group(group))
title_season["title"] = re.sub(pattern, "", pre_title).strip()
return title_season
for final_season_pattern in self.final_season_patterns:
match = final_season_pattern.search(pre_title)
if match:
logger.info("识别出最终季度,开始处理!")
title_season["title"] = re.sub(final_season_pattern, "", pre_title).strip()
title_season["season"] = self.handle_final_season(title=pre_title)
break
return title_season
self.vcb_meta.season = self.roman_to_int(match.group(pattern["group"]))
# 匹配成功后,标题中去除季度信息
self.vcb_meta.title = pattern["pattern"].sub("", self.vcb_meta.season_title).strip
logger.info(f"识别出季度为{self.vcb_meta.season}")
return
logger.info(f"正常匹配季度失败开始匹配ova/oad/最终季度")
if not flag:
# 匹配是否为最终季
for pattern in final_season_patterns:
if pattern.search(self.vcb_meta.season_title):
logger.info("命中到最终季匹配规则")
self.vcb_meta.title = pattern.sub("", self.vcb_meta.season_title).strip()
self.handle_final_season()
return
logger.info("未识别出最终季度开始匹配OVA/OAD")
# 匹配是否为OVA/OAD
if "ova" in self.vcb_meta.season_title or "oad" in self.vcb_meta.season_title:
logger.info("季度部分命中到OVA/OAD匹配规则")
if self.ova_switch:
logger.info("开启OVA/OAD处理逻辑")
self.vcb_meta.is_ova = True
for pattern in ova_patterns:
if pattern.search(self.vcb_meta.season_title):
self.vcb_meta.title = pattern.sub("", self.vcb_meta.season_title).strip()
self.vcb_meta.title = re.sub("ova|oad", "", self.vcb_meta.season_title).strip()
self.vcb_meta.season = 0
return
logger.warn("未识别出季度,默认处理逻辑返回第一季")
self.vcb_meta.title = self.vcb_meta.season_title
self.vcb_meta.season = 1
# 处理存在“Final”字样命名的季度
def handle_final_season(self, title: str) -> int | None:
medias = MediaChain().search(title=title)[1]
if not medias:
logger.warn("没有找到对应的媒体信息!")
return
# 根据类型进行过滤只取类型是电视剧和动漫的media
medias = [media for media in medias if media.type == MediaType.TV]
if not medias:
logger.warn("没有找到动漫或电视剧的媒体信息!")
return
media = sorted(medias, key=lambda x: x.popularity, reverse=True)[0]
media_tmdb_id = media.tmdb_id
seasons_info = TmdbChain().tmdb_seasons(tmdbid=media_tmdb_id)
if seasons_info is None:
logger.warn("无法获取最终季")
else:
logger.info(f"获取到最终季,季度为{len(seasons_info)}")
return len(seasons_info)
def parse_episode(self):
"""
从标题中解析集数
"""
# 从ep_title中剔除不相关的内容之后只剩下存在集数的字符串
ep = self.vcb_meta.ep_title[0]
for pattern in episode_patterns:
match = pattern["pattern"].search(ep)
if match:
self.vcb_meta.ep = int(match.group(pattern["group"]))
logger.info(f"识别出集数为{self.vcb_meta.ep}")
return
# 直接进入判断是否为OVA/OAD
for pattern in ova_patterns:
if pattern.search(ep):
self.vcb_meta.is_ova = True
# 直接获取数字
self.vcb_meta.ep = int(re.search(r"\d+", ep).group()) or 1
logger.info(f"识别出集数为{self.vcb_meta.ep}")
return
def re_ep(self, ep_title: str, ) -> dict:
def culling_blocked_words(self):
"""
# 集数匹配处理模块
:param ep_title: 从title解析出的集数,ep_title固定格式[集数]
1.先判断是否存在OVA/OAD,形如:[OVA],[12(OVA)],[12.5]这种形式都是属于OVA/OAD交给处理OVA模块处理
2.集数通常有两种情况一种:[12]直接性,另一种:[12(24)],这一种应该去括号内的为集数
:return: 集数(int)
从ep_title中剔除不相关的内容
"""
ep_ova = {"ep": None, "is_ova": False}
for ova_pattern in self._ova_patterns:
match = ova_pattern.search(ep_title)
if match:
ep_ova["is_ova"] = True
ep_ova["ep"] = 1
return ep_ova
for ep_pattern in self.episode_patterns:
pattern = ep_pattern["pattern"]
group = ep_pattern["group"]
match = pattern.search(ep_title)
if match:
ep_ova["ep"] = int(match.group(group))
return ep_ova
return ep_ova
blocked_set = set(blocked_words) # 将阻止词列表转换为集合
result = [ep for ep in self.vcb_meta.ep_title if not any(word in ep for word in blocked_set)]
self.vcb_meta.ep_title = result
def handle_final_season(self):
meta, medias = MediaChain().search(title=self.vcb_meta.title)
if not medias:
logger.warning("匹配到最终季时无法找到对应的媒体信息季度返回默认值1")
self.vcb_meta.season = 1
return
max_season_number = 1
# 当没有季度参考时用评分来决定
vote_average = 0
season_info = False
for media in medias:
if media.type != MediaType.TV:
logger.info(f"搜索到的: {media.title}, 媒体类型为 {media.type},跳过")
continue
if media.season_info:
season_info = True
last_season_number = int(media.season_info[-1].get("season_number", 1))
if last_season_number > max_season_number:
max_season_number = last_season_number
else:
logger.info(f"媒体: {media.title} 没有季信息,跳过")
if not season_info:
# 备用方案
for media in medias:
if media.seasons:
seasons: dict
# 获取最大的键,即最大季度
last_season_number = max(media.seasons.keys())
if last_season_number > max_season_number:
max_season_number = last_season_number
logger.info(f"获取到最终季,季度为 {max_season_number},标题为 {media.title},年份为 {media.year}")
else:
logger.info(f"媒体: {media.title} 没有季信息,跳过")
self.vcb_meta.season = max_season_number
logger.info(f"获取到最终季,季度为 {self.vcb_meta.season}")
@staticmethod
def roman_to_int(s) -> int:
"""
:param s: 罗马数字字符串
罗马数字转整数
"""
roman_dict = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 'C': 100, 'D': 500, 'M': 1000}
total = 0
prev_value = 0
for char in reversed(s): # 反向遍历罗马数字字符串
current_value = roman_dict[char]
if current_value >= prev_value:
total += current_value # 如果当前值大于等于前一个值,加上当前值
else:
total -= current_value # 如果当前值小于前一个值,减去当前值
prev_value = current_value
return total
def test(title: str):
# 示例文件名
pre_title = title
# 提取方括号内的内容,不包括方括号
content = re.findall(r'\[(.*?)\]', pre_title)
print(content)
if __name__ == '__main__':
# title = "[BeanSub&VCB-Studio] Jujutsu Kaisen [26][Ma10p_1080p][x265_flac].mkv "
# test(title)
ReMeta(
ova_switch=True,
).handel_file(Path(
r"[Nekomoe kissaten&VCB-Studio] Fruits Basket The Final [08][Ma10p_1080p][x265_flac].mkv"))