diff --git a/package.json b/package.json index e719740..f2db2d8 100644 --- a/package.json +++ b/package.json @@ -324,6 +324,7 @@ "author": "pixel@qingwa", "level": 2, "history": { + "v1.8.1": "重构插件,测试版", "v1.8": "增加了元数据刮削开关,升级后需要手动打开,否则默认不刮削", "v1.7.1": "修复偶尔安装失败问题" } diff --git a/plugins/vcbanimemonitor/__init__.py b/plugins/vcbanimemonitor/__init__.py index de3edd1..84c5f86 100644 --- a/plugins/vcbanimemonitor/__init__.py +++ b/plugins/vcbanimemonitor/__init__.py @@ -5,7 +5,6 @@ import threading import time import traceback from pathlib import Path -from time import sleep from typing import List, Tuple, Dict, Any, Optional import pytz import qbittorrentapi @@ -19,8 +18,6 @@ from app.chain.tmdb import TmdbChain from app.chain.transfer import TransferChain from app.core.config import settings from app.core.context import MediaInfo -from app.core.event import eventmanager, Event -from app.core.metainfo import MetaInfoPath from app.db.downloadhistory_oper import DownloadHistoryOper from app.db.transferhistory_oper import TransferHistoryOper from app.log import logger @@ -77,7 +74,7 @@ class VCBAnimeMonitor(_PluginBase): # 插件图标 plugin_icon = "vcbmonitor.png" # 插件版本 - plugin_version = "1.8" + plugin_version = "1.8.1" # 插件作者 plugin_author = "pixel@qingwa" # 作者主页 @@ -224,7 +221,8 @@ class VCBAnimeMonitor(_PluginBase): try: if target_path and target_path.is_relative_to(Path(mon_path)): logger.warn(f"{target_path} 是监控目录 {mon_path} 的子目录,无法监控") - self.systemmessage.put(f"{target_path} 是下载目录 {mon_path} 的子目录,无法监控", title="整理VCB动漫压制组作品") + self.systemmessage.put(f"{target_path} 是下载目录 {mon_path} 的子目录,无法监控", + title="整理VCB动漫压制组作品") continue except Exception as e: logger.debug(str(e)) @@ -382,27 +380,49 @@ class VCBAnimeMonitor(_PluginBase): return # 元数据 - if file_path.parent.name == "SPs": - logger.warn("位于SPs目录下,跳过处理") + if file_path.parent.name in ["SPs", "Scans", "CDs"]: + logger.warn("位于特典等其他特殊目录下,跳过处理") return - remeta = ReMeta(ova_switch=self._switch_ova, high_performance=self._high_mode) + + if 'VCB-Studio' not in file_path.stem.strip(): + logger.warn("不属于VCB的作品,不处理!") + return + + remeta = ReMeta(ova_switch=self._switch_ova, ) file_meta = remeta.handel_file(file_path=file_path) if file_meta: if not file_meta.name: logger.error(f"{file_path.name} 无法识别有效信息") return - if remeta.is_special and not self._switch_ova: + if remeta.is_ova and not self._switch_ova: logger.warn(f"{file_path.name} 为OVA资源,未开启OVA开关,不处理") return - if remeta.is_special and self._switch_ova: - logger.info(f"{file_path.name} 为OVA资源,开始处理") - if self.get_data(key=f"OVA_{file_meta.title}") is not None: - ova_history_ep = int(self.get_data(key=f"OVA_{file_meta.title}")) + 1 - file_meta.begin_episode = ova_history_ep - self.save_data(key=f"OVA_{file_meta.title}", value=ova_history_ep) + # if remeta.is_ova and self._switch_ova: + # logger.info(f"{file_path.name} 为OVA资源,开始处理") + # if self.get_data(key=f"OVA_{file_meta.title}") is not None: + # ova_history_ep = int(self.get_data(key=f"OVA_{file_meta.title}")) + 1 + # file_meta.begin_episode = ova_history_ep + # self.save_data(key=f"OVA_{file_meta.title}", value=ova_history_ep) + # else: + # file_meta.begin_episode = 1 + # self.save_data(key=f"OVA_{file_meta.title}", value=1) + if remeta.is_ova and self._switch_ova: + logger.info(f"{file_path.name} 为OVA资源,开始历史记录处理") + ova_history_ep_list = self.plugindata.get(file_meta.title, []) + if ova_history_ep_list: + ep = file_meta.begin_episode + if ep in ova_history_ep_list: + for i in range(1, 100): + if ep + i not in ova_history_ep_list: + ova_history_ep_list.append(ep + i) + file_meta.begin_episode = ep + i + break + else: + ova_history_ep_list.append(ep) + self.plugindata.put(file_meta.title, ova_history_ep_list) else: - file_meta.begin_episode = 1 - self.save_data(key=f"OVA_{file_meta.title}", value=1) + self.plugindata.put(file_meta.title, [file_meta.begin_episode]) + else: return diff --git a/plugins/vcbanimemonitor/remeta.py b/plugins/vcbanimemonitor/remeta.py index 4624cc7..5b8a659 100644 --- a/plugins/vcbanimemonitor/remeta.py +++ b/plugins/vcbanimemonitor/remeta.py @@ -1,203 +1,270 @@ import concurrent import re +from dataclasses import dataclass from pathlib import Path from typing import List from app.chain.media import MediaChain -from app.chain.tmdb import TmdbChain from app.core.metainfo import MetaInfoPath from app.log import logger from app.schemas import MediaType +season_patterns = [ + {"pattern": re.compile(r"S(\d+)$", re.IGNORECASE), "group": 1}, + {"pattern": re.compile(r"(\d+)$", re.IGNORECASE), "group": 1}, + {"pattern": re.compile(r"(\d+)(st|nd|rd|th)?\s*season", re.IGNORECASE), "group": 1}, + {"pattern": re.compile(r"(.*) ?\s*season (\d+)", re.IGNORECASE), "group": 2}, + {"pattern": re.compile(r"\s(II|III|IV|V|VI|VII|VIII|IX|X)$", re.IGNORECASE), "group": "1"} +] +episode_patterns = [ + {"pattern": re.compile(r"(\d+)\((\d+)\)", re.IGNORECASE), "group": 2}, + {"pattern": re.compile(r"(\d+)", re.IGNORECASE), "group": 1}, + {"pattern": re.compile(r'(\d+)v\d+', re.IGNORECASE), "group": 1}, +] -def roman_to_int(s) -> int: - """ - :param s: 罗马数字字符串 - 罗马数字转整数 - """ - roman_dict = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 'C': 100, 'D': 500, 'M': 1000} - total = 0 - prev_value = 0 +ova_patterns = [ + re.compile(r".*?(OVA|OAD).*?", re.IGNORECASE), + re.compile(r"\d+\.5"), + re.compile(r"00") +] - for char in reversed(s): # 反向遍历罗马数字字符串 - current_value = roman_dict[char] - if current_value >= prev_value: - total += current_value # 如果当前值大于等于前一个值,加上当前值 - else: - total -= current_value # 如果当前值小于前一个值,减去当前值 - prev_value = current_value +final_season_patterns = [ + re.compile('final season', re.IGNORECASE), + re.compile('The Final', re.IGNORECASE), + re.compile(r'\sFinal') +] - return total + +@dataclass +class VCBMetaBase: + # 转化为小写后的原始文件名称 (不含后缀) + original_title: str = "" + # 解析后不包含季度和集数的标题 + title: str = "" + # 类型:TV / Movie (默认TV) + type: str = "TV" + # 可能含有季度的标题,一级解析后的标题 + season_title: str = "" + # 可能含有集数的字符串列表 + ep_title: List[str] = None + # 识别出来的季度 + season: int = None + # 识别出来的集数 + ep: int = None + # 是否是OVA/OAD + is_ova: bool = False + + +blocked_words = ["vcb-studio", "360p", "480p", "720p", "1080p", "2160p", "hdr", "x265", "x264", "aac", "flac"] class ReMeta: - # 解析之后的标题: - title: str = None - # 识别出来的集数 - ep: int = None - # 识别出来的季度 - season: int = None - # 特殊季识别开关 - is_special = False - # OVA/OAD识别开关 - ova_switch: bool = False - # 高性能处理开关 - high_performance = False - season_patterns = [ - {"pattern": re.compile(r"S(\d+)$"), "group": 1}, - {"pattern": re.compile(r"(\d+)$"), "group": 1}, - {"pattern": re.compile(r"(\d+)(st|nd|rd|th)?\s*[Ss][Ee][Aa][Ss][Oo][Nn]"), "group": 1}, - {"pattern": re.compile(r"(.*) ?\s*[Ss][Ee][Aa][Ss][Oo][Nn] (\d+)"), "group": 2}, - {"pattern": re.compile(r"\s(II|III|IV|V|VI|VII|VIII|IX|X)$"), "group": "1"} - ] - episode_patterns = [ - {"pattern": re.compile(r"\[(\d+)\((\d+)\)]"), "group": 2}, - {"pattern": re.compile(r"\[(\d+)]"), "group": 1}, - {"pattern": re.compile(r'\[(\d+)v\d+]'), "group": 1}, - - ] - _ova_patterns = [re.compile(r"\[.*?(OVA|OAD).*?]"), - re.compile(r"\[\d+\.5]"), - re.compile(r"\[00\]")] - - final_season_patterns = [re.compile('final season', re.IGNORECASE), - re.compile('The Final', re.IGNORECASE), - re.compile(r'\sFinal') - ] - # 自定义添加的季度正则表达式 - _custom_season_patterns = [] - - def __init__(self, ova_switch: bool = False, high_performance: bool = False): + def __init__(self, ova_switch: bool = False, custom_season_patterns: list[dict] = None): + self.meta = None + # TODO:自定义季度匹配规则 + self.custom_season_patterns = custom_season_patterns + self.season_patterns = season_patterns self.ova_switch = ova_switch - self.high_performance = high_performance + self.vcb_meta = VCBMetaBase() + self.is_ova = False + + def is_tv(self, title: str) -> bool: + """ + 判断是否是TV + """ + if title.count("[") != 4 and title.count("]") != 4: + self.vcb_meta.type = "Movie" + self.vcb_meta.title = re.sub(r'\[.*?\]', '', title).strip() + return False + return True def handel_file(self, file_path: Path): + file_name = file_path.stem.strip().lower() + self.vcb_meta.original_title = file_name + if not self.is_tv(file_name): + logger.warn( + "不符合VCB-Studio的剧集命名规范,归类为电影,跳过剧集模块处理。注意:年份较为久远的作品可能会判断错误") + else: + self.tv_mode() + self.is_ova = self.vcb_meta.is_ova meta = MetaInfoPath(file_path) - self.title = meta.title - self.title = Path(self.title).stem.strip() - if 'VCB-Studio' not in meta.title: - logger.warn("不属于VCB的作品,不处理!") - return None - if meta.title.count("[") != 4 and meta.title.count("]") != 4: - # 可能是电影,电影只有三组[],因此去除所有[]后只剩下电影名 - logger.warn("不符合VCB-Studio的剧集命名规范,跳过剧集模块处理!交给默认处理逻辑") - meta.title = re.sub(r'\[.*?\]', '', meta.title).strip() - meta.en_name = meta.title - return meta - split_title: List[str] | None = self.split_season_ep(self.title) - if split_title: - self.handle_season_ep(split_title) - if self.season is not None: - meta.begin_season = self.season - else: - logger.warn("未识别出季度,默认处理逻辑返回第一季") - if self.ep is not None: - meta.begin_episode = self.ep - else: - logger.warn("未识别出集数,默认处理逻辑返回第一集") - meta.title = self.title - meta.en_name = self.title - logger.info(f"识别出季度为{self.season},集数为{self.ep},标题为:{self.title}") - + meta.title = self.vcb_meta.title + meta.en_name = self.vcb_meta.title + meta.begin_season = self.vcb_meta.season + if self.vcb_meta.ep: + meta.begin_episode = self.vcb_meta.ep + if self.vcb_meta.type == "Movie": + meta.type = MediaType.MOVIE return meta - # 分离季度部分和集数部分 - def split_season_ep(self, pre_title: str): - split_ep = re.findall(r"(\[.*?])", pre_title)[1] - if not split_ep: - logger.warn("未识别出集数位置信息,结束识别!") - return None - split_title = re.sub(r"\[.*?\]", "", pre_title).strip() - logger.info(f"分离出包含季度的部分:{split_title} \n 分离出包含集数的部分: {split_ep}") - return [split_title, split_ep] + def split_season_ep(self): + # 把所有的[] 里面的内容获取出来,不需要[]本身 + self.vcb_meta.ep_title = re.findall(r'\[(.*?)\]', self.vcb_meta.original_title) + # 去除所有[]后只剩下剧名 + self.vcb_meta.season_title = re.sub(r"\[.*?\]", "", self.vcb_meta.original_title).strip() + if self.vcb_meta.ep_title: + self.culling_blocked_words() + logger.info( + f"分离出包含可能季度的内容部分:{self.vcb_meta.season_title} | 可能包含集数的内容部分: {self.vcb_meta.ep_title}") + self.vcb_meta.title = self.vcb_meta.season_title + if not self.vcb_meta.ep_title: + self.vcb_meta.title = self.vcb_meta.season_title + logger.warn("未识别出可能存在集数位置的信息,跳过剩余识别步骤!") - def handle_season_ep(self, title: List[str]): - if self.high_performance: - with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor: - title_season_result = executor.submit(self.handle_season, title[0]) - ep_result = executor.submit(self.re_ep, title[1], ) - try: - title_season_result = title_season_result.result() # Blocks until the task is complete. - ep_result = ep_result.result() # Blocks until the task is complete. - except Exception as exc: - print('Generated an exception: %s' % exc) - else: - title_season_result = self.handle_season(title[0]) - ep_result = self.re_ep(title[1]) - self.title = title_season_result["title"] - is_ova = ep_result["is_ova"] - if ep_result["ep"] is not None: - self.ep = ep_result["ep"] - if title_season_result["season"]: - self.season = title_season_result["season"] - if is_ova: - self.season = 0 - self.is_special = True + def tv_mode(self): + logger.info("开始分离季度和集数部分") + self.split_season_ep() + if not self.vcb_meta.ep_title: + return + self.parse_season() + self.parse_episode() - # 处理季度 - def handle_season(self, pre_title: str) -> dict: - title_season = {"title": pre_title, "season": 1} - for season_pattern in self.season_patterns: - pattern = season_pattern["pattern"] - group = season_pattern["group"] - match = pattern.search(pre_title) + def parse_season(self): + """ + 从标题中解析季度 + """ + flag = False + for pattern in season_patterns: + match = pattern["pattern"].search(self.vcb_meta.season_title) if match: - if type(group) == str: - title_season["season"] = roman_to_int(match.group(int(group))) - title_season["title"] = re.sub(pattern, "", pre_title).strip() + if isinstance(pattern["group"], int): + self.vcb_meta.season = int(match.group(pattern["group"])) else: - title_season["season"] = int(match.group(group)) - title_season["title"] = re.sub(pattern, "", pre_title).strip() - return title_season - for final_season_pattern in self.final_season_patterns: - match = final_season_pattern.search(pre_title) - if match: - logger.info("识别出最终季度,开始处理!") - title_season["title"] = re.sub(final_season_pattern, "", pre_title).strip() - title_season["season"] = self.handle_final_season(title=pre_title) - break - return title_season + self.vcb_meta.season = self.roman_to_int(match.group(pattern["group"])) + # 匹配成功后,标题中去除季度信息 + self.vcb_meta.title = pattern["pattern"].sub("", self.vcb_meta.season_title).strip + logger.info(f"识别出季度为{self.vcb_meta.season}") + return + logger.info(f"正常匹配季度失败,开始匹配ova/oad/最终季度") + if not flag: + # 匹配是否为最终季 + for pattern in final_season_patterns: + if pattern.search(self.vcb_meta.season_title): + logger.info("命中到最终季匹配规则") + self.vcb_meta.title = pattern.sub("", self.vcb_meta.season_title).strip() + self.handle_final_season() + return + logger.info("未识别出最终季度,开始匹配OVA/OAD") + # 匹配是否为OVA/OAD + if "ova" in self.vcb_meta.season_title or "oad" in self.vcb_meta.season_title: + logger.info("季度部分命中到OVA/OAD匹配规则") + if self.ova_switch: + logger.info("开启OVA/OAD处理逻辑") + self.vcb_meta.is_ova = True + for pattern in ova_patterns: + if pattern.search(self.vcb_meta.season_title): + self.vcb_meta.title = pattern.sub("", self.vcb_meta.season_title).strip() + self.vcb_meta.title = re.sub("ova|oad", "", self.vcb_meta.season_title).strip() + self.vcb_meta.season = 0 + return + logger.warn("未识别出季度,默认处理逻辑返回第一季") + self.vcb_meta.title = self.vcb_meta.season_title + self.vcb_meta.season = 1 - # 处理存在“Final”字样命名的季度 - def handle_final_season(self, title: str) -> int | None: - medias = MediaChain().search(title=title)[1] - if not medias: - logger.warn("没有找到对应的媒体信息!") - return - # 根据类型进行过滤,只取类型是电视剧和动漫的media - medias = [media for media in medias if media.type == MediaType.TV] - if not medias: - logger.warn("没有找到动漫或电视剧的媒体信息!") - return - media = sorted(medias, key=lambda x: x.popularity, reverse=True)[0] - media_tmdb_id = media.tmdb_id - seasons_info = TmdbChain().tmdb_seasons(tmdbid=media_tmdb_id) - if seasons_info is None: - logger.warn("无法获取最终季") - else: - logger.info(f"获取到最终季,季度为{len(seasons_info)}") - return len(seasons_info) + def parse_episode(self): + """ + 从标题中解析集数 + """ + # 从ep_title中剔除不相关的内容之后只剩下存在集数的字符串 + ep = self.vcb_meta.ep_title[0] + for pattern in episode_patterns: + match = pattern["pattern"].search(ep) + if match: + self.vcb_meta.ep = int(match.group(pattern["group"])) + logger.info(f"识别出集数为{self.vcb_meta.ep}") + return + # 直接进入判断是否为OVA/OAD + for pattern in ova_patterns: + if pattern.search(ep): + self.vcb_meta.is_ova = True + # 直接获取数字 + self.vcb_meta.ep = int(re.search(r"\d+", ep).group()) or 1 + logger.info(f"识别出集数为{self.vcb_meta.ep}") + return - def re_ep(self, ep_title: str, ) -> dict: + def culling_blocked_words(self): """ - # 集数匹配处理模块 - :param ep_title: 从title解析出的集数,ep_title固定格式[集数] - 1.先判断是否存在OVA/OAD,形如:[OVA],[12(OVA)],[12.5]这种形式都是属于OVA/OAD,交给处理OVA模块处理 - 2.集数通常有两种情况一种:[12]直接性,另一种:[12(24)],这一种应该去括号内的为集数 - :return: 集数(int) + 从ep_title中剔除不相关的内容 """ - ep_ova = {"ep": None, "is_ova": False} - for ova_pattern in self._ova_patterns: - match = ova_pattern.search(ep_title) - if match: - ep_ova["is_ova"] = True - ep_ova["ep"] = 1 - return ep_ova - for ep_pattern in self.episode_patterns: - pattern = ep_pattern["pattern"] - group = ep_pattern["group"] - match = pattern.search(ep_title) - if match: - ep_ova["ep"] = int(match.group(group)) - return ep_ova - return ep_ova + blocked_set = set(blocked_words) # 将阻止词列表转换为集合 + result = [ep for ep in self.vcb_meta.ep_title if not any(word in ep for word in blocked_set)] + self.vcb_meta.ep_title = result + + def handle_final_season(self): + + meta, medias = MediaChain().search(title=self.vcb_meta.title) + if not medias: + logger.warning("匹配到最终季时无法找到对应的媒体信息!季度返回默认值:1") + self.vcb_meta.season = 1 + return + + max_season_number = 1 + # 当没有季度参考时用评分来决定 + vote_average = 0 + season_info = False + for media in medias: + if media.type != MediaType.TV: + logger.info(f"搜索到的: {media.title}, 媒体类型为 {media.type},跳过") + continue + if media.season_info: + season_info = True + last_season_number = int(media.season_info[-1].get("season_number", 1)) + if last_season_number > max_season_number: + max_season_number = last_season_number + else: + logger.info(f"媒体: {media.title} 没有季信息,跳过") + if not season_info: + # 备用方案 + for media in medias: + if media.seasons: + seasons: dict + # 获取最大的键,即最大季度 + last_season_number = max(media.seasons.keys()) + if last_season_number > max_season_number: + max_season_number = last_season_number + logger.info(f"获取到最终季,季度为 {max_season_number},标题为 {media.title},年份为 {media.year}") + else: + logger.info(f"媒体: {media.title} 没有季信息,跳过") + + self.vcb_meta.season = max_season_number + logger.info(f"获取到最终季,季度为 {self.vcb_meta.season}") + + @staticmethod + def roman_to_int(s) -> int: + """ + :param s: 罗马数字字符串 + 罗马数字转整数 + """ + roman_dict = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 'C': 100, 'D': 500, 'M': 1000} + total = 0 + prev_value = 0 + + for char in reversed(s): # 反向遍历罗马数字字符串 + current_value = roman_dict[char] + if current_value >= prev_value: + total += current_value # 如果当前值大于等于前一个值,加上当前值 + else: + total -= current_value # 如果当前值小于前一个值,减去当前值 + prev_value = current_value + + return total + + +def test(title: str): + # 示例文件名 + pre_title = title + + # 提取方括号内的内容,不包括方括号 + content = re.findall(r'\[(.*?)\]', pre_title) + + print(content) + + +if __name__ == '__main__': + # title = "[BeanSub&VCB-Studio] Jujutsu Kaisen [26][Ma10p_1080p][x265_flac].mkv " + # test(title) + + ReMeta( + ova_switch=True, + ).handel_file(Path( + r"[Nekomoe kissaten&VCB-Studio] Fruits Basket The Final [08][Ma10p_1080p][x265_flac].mkv"))