diff --git a/package.v2.json b/package.v2.json index 96250d2..007aa9b 100644 --- a/package.v2.json +++ b/package.v2.json @@ -430,11 +430,12 @@ "name": "IMDb源", "description": "让探索支持IMDb数据源。", "labels": "探索", - "version": "1.3.1", + "version": "1.3.2", "icon": "IMDb_IOS-OSX_App.png", "author": "wumode", "level": 1, "history": { + "v1.3.2": "更新 API query hash", "v1.3.1": "修复按日期排序错误", "v1.3": "优化网络连接", "v1.2": "推荐热门纪录片", @@ -451,8 +452,8 @@ "author": "wumode", "level": 1, "history": { - "v0.1.0": "新增ClashRuleProvider", - "v1.0.0": "支持: 规则分页; 导入规则; 代理组; 附加出站代理; 按区域分组" + "v1.0.0": "支持: 规则分页; 导入规则; 代理组; 附加出站代理; 按区域分组", + "v0.1.0": "新增ClashRuleProvider" } }, "LexiAnnot": { diff --git a/plugins.v2/imdbsource/__init__.py b/plugins.v2/imdbsource/__init__.py index f55e148..076474c 100644 --- a/plugins.v2/imdbsource/__init__.py +++ b/plugins.v2/imdbsource/__init__.py @@ -1,13 +1,18 @@ -from datetime import datetime +import re +import json from typing import Optional, Any, List, Dict, Tuple +from datetime import datetime -from app import schemas from app.core.config import settings from app.core.event import eventmanager, Event +from app.log import logger from app.plugins import _PluginBase -from app.plugins.imdbsource.imdb_helper import ImdbHelper from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData from app.schemas.types import ChainEventType, MediaType +from app.core.meta import MetaBase +from app.core.context import MediaInfo +from app.plugins.imdbsource.imdb_helper import ImdbHelper +from app import schemas from app.utils.http import RequestUtils @@ -17,10 +22,9 @@ class ImdbSource(_PluginBase): # 插件描述 plugin_desc = "让探索和推荐支持IMDb数据源。" # 插件图标 - plugin_icon = ("https://raw.githubusercontent.com/jxxghp/" - "MoviePilot-Plugins/refs/heads/main/icons/IMDb_IOS-OSX_App.png") + plugin_icon = "IMDb_IOS-OSX_App.png" # 插件版本 - plugin_version = "1.3.1" + plugin_version = "1.3.2" # 插件作者 plugin_author = "wumode" # 作者主页 @@ -123,8 +127,135 @@ class ImdbSource(_PluginBase): "id2": self.xxx2, } """ + # return {"recognize_media": (self.recognize_media, ModuleExecutionType.Hijack)} pass + @staticmethod + # @MediaInfo.source_processor("imdb") + def process_imdb_info(mediainfo: MediaInfo, info: dict): + """处理 IMDB 信息""" + mediainfo.source_info["imdb"] = info + if isinstance(info.get('media_type'), MediaType): + mediainfo.type = info.get('media_type') + elif info.get('media_type'): + mediainfo.type = MediaType.MOVIE if info.get("type") == "movie" else MediaType.TV + mediainfo.title = info.get("title") + mediainfo.release_date = info.get('release_date') + if info.get("id"): + mediainfo.source_id["imdb"] = info.get("id") + mediainfo.imdb_id = info.get('id') + if not mediainfo.source_id: + return + mediainfo.vote_average = round(float(info.get("rating").get("aggregate_rating")), 1) if info.get("rating") else 0 + mediainfo.overview = info.get('plot') + mediainfo.genre_ids = info.get('genre') or [] + # 风格 + if not mediainfo.genres: + mediainfo.genres = [{"id": genre, "name": genre} for genre in info.get("genres") or []] + if info.get('spoken_languages', []): + mediainfo.original_language = info.get('spoken_languages', [])[0].get("name") + mediainfo.en_title = info.get('primary_title') + mediainfo.title = info.get('primary_title') + mediainfo.original_title = info.get('original_title') + # mediainfo.release_date = info.get('start_year') + mediainfo.year = info.get('start_year') + if info.get('posters', []): + mediainfo.poster_path = info.get("posters", [])[0].get("url") + directors = [] + if info.get('directors', []): + for dn in info.get('directors', []): + director = dn.get("name") + if not director: + continue + d_ = {"name": director.get("display_name"), "id": director.get("id"), "avatars": director.get("avatars")} + directors.append(d_) + if info.get('writers', []): + for wn in info.get('writers', []): + writer = wn.get("name") + d_ = {"name": writer.get("display_name"), "id": writer.get("id"), "avatars": writer.get("avatars")} + directors.append(d_) + mediainfo.directors = directors + actors = [] + if info.get('casts', []): + for cast in info.get('casts', []): + cn = cast.get("name", {}) + character_name = cast.get("characters")[0] if cast.get("characters") else '' + d_ = {"name": cn.get("display_name"), "id": cn.get("id"), + "avatars": cn.get("avatars"), "character": character_name} + actors.append(d_) + + def recognize_media(self, meta: MetaBase = None, + mtype: MediaType = None, + imdbid: Optional[str] = None, + episode_group: Optional[str] = None, + cache: Optional[bool] = True, + **kwargs) -> Optional[MediaInfo]: + logger.warn(f"IMDb Source: {MetaBase.title}") + if not self._imdb_helper: + return None + if not imdbid and not meta: + return None + if not meta: + # 未提供元数据时,直接使用imdbid查询,不使用缓存 + cache_info = {} + elif not meta.name: + logger.warn("识别媒体信息时未提供元数据名称") + return None + cache_info = {} + if not cache_info or not cache: + info = None + if imdbid: + info = self._imdb_helper.get_info(mtype=mtype, imdbid=imdbid) + if not info and meta: + info = {} + names = list(dict.fromkeys([k for k in [meta.cn_name, meta.en_name] if k])) + for name in names: + if meta.begin_season: + logger.info(f"正在识别 {name} 第{meta.begin_season}季 ...") + else: + logger.info(f"正在识别 {name} ...") + if meta.type == MediaType.UNKNOWN and not meta.year: + info = self._imdb_helper.match_multi(name) + else: + if meta.type == MediaType.TV: + # 确定是电视 + info = self._imdb_helper.match(name=name, + year=meta.year, + mtype=meta.type,season_year=meta.year, + season_number=meta.begin_season) + if not info: + # 去掉年份再查一次 + info = self._imdb_helper.match(name=name, mtype=meta.type) + else: + # 有年份先按电影查 + info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.MOVIE) + # 没有再按电视剧查 + if not info: + info = self._imdb_helper.match(name=name, + year=meta.year, + mtype=MediaType.TV) + if not info: + # 去掉年份和类型再查一次 + info = self._imdb_helper.match_multi(name=name) + if info: + break + else: + info = None + if info: + # mediainfo = MediaInfo(source_info={"imdb": info}) + mediainfo = MediaInfo() + if meta: + logger.info(f"{meta.name} IMDB识别结果:{mediainfo.type.value} " + f"{mediainfo.title_year} " + f"{mediainfo.imdb_id}") + else: + logger.info(f"{imdbid} IMDB识别结果:{mediainfo.type.value} " + f"{mediainfo.title_year}") + return mediainfo + + logger.info(f"{meta.name if meta else imdbid} 未匹配到IMDB媒体信息") + return None + @staticmethod def __movie_to_media(movie_info: dict) -> schemas.MediaInfo: title = "" @@ -150,7 +281,7 @@ class ImdbSource(_PluginBase): return schemas.MediaInfo( type="电影", title=title, - year=release_year, + year=f'{release_year}', title_year=f"{title} ({release_year})", mediaid_prefix="imdb", media_id=str(movie_info.get("id")), @@ -191,7 +322,7 @@ class ImdbSource(_PluginBase): return schemas.MediaInfo( type="电视剧", title=title, - year=release_year, + year=f'{release_year}', title_year=f"{title} ({release_year})", mediaid_prefix="imdb", media_id=str(series_info.get("id")), diff --git a/plugins.v2/imdbsource/imdb_helper.py b/plugins.v2/imdbsource/imdb_helper.py index 12c1c09..b0031f7 100644 --- a/plugins.v2/imdbsource/imdb_helper.py +++ b/plugins.v2/imdbsource/imdb_helper.py @@ -1,15 +1,11 @@ import re from typing import Optional, Any, Dict, List, Tuple -from io import StringIO from collections import OrderedDict from dataclasses import dataclass -import graphene import requests from requests_html import HTMLSession import ijson -import json -import base64 from app.log import logger from app.utils.http import RequestUtils @@ -149,8 +145,7 @@ class ImdbHelper: def __init__(self, proxies=None): self._proxies = proxies - self._session = HTMLSession() - self._req_utils = RequestUtils(headers=self._imdb_headers, session=self._session, timeout=10, proxies=proxies) + self._req_utils = RequestUtils(headers=self._imdb_headers, session=HTMLSession(), timeout=10, proxies=proxies) self._imdb_req = RequestUtils(accept_type="application/json", content_type="application/json", headers=self._imdb_headers, @@ -158,6 +153,7 @@ class ImdbHelper: proxies=proxies, session=requests.Session()) self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None} + self.hash_status = {"AdvancedTitleSearch": False, "TitleAkasPaginated": False} self._search_states = OrderedDict() self._max_states = 30 @@ -185,7 +181,7 @@ class ImdbHelper: f"/en-US/title/{imdbid}/episodes.json?season={season}&ref_=ttep&tconst={imdbid}") response = self._req_utils.get_res(url) if not response or response.status_code != 200: - return + return None json_content = response.text try: section = next(ijson.items(json_content, prefix)) @@ -247,11 +243,14 @@ class ImdbHelper: return None data = ret.json() if "errors" in data: - logger.error(f"Imdb query errors") - return None + error = data.get("errors")[0] if data.get("errors") else {} + if error and error.get("message") == 'PersistedQueryNotFound': + logger.warn(f"PersistedQuery hash has expired, trying to update...") + self.__get_hash.cache_clear() + return {'error': error} return data.get("data") - @cached(maxsize=1, ttl=30 * 24 * 3600) + @cached(maxsize=1, ttl=6 * 3600) def __get_hash(self) -> Optional[dict]: """ 根据IMDb hash使用 @@ -264,11 +263,13 @@ class ImdbHelper: proxies=self._proxies ) if not res: - logger.error("获取IMDb hash") + logger.error("Error getting hash") return None return res.json() - def __update_hash(self): + def __update_hash(self, force: bool = False) -> None: + if force: + self.__get_hash.cache_clear() imdb_hash = self.__get_hash() if imdb_hash: self._imdb_api_hash["AdvancedTitleSearch"] = imdb_hash.get("AdvancedTitleSearch") @@ -325,7 +326,8 @@ class ImdbHelper: release_date_start: Optional[str] = None, award_constraint: Optional[Tuple[str, ...]] = None, ranked: Optional[Tuple[str, ...]] = None, - interests: Optional[Tuple[str, ...]] = None): + interests: Optional[Tuple[str, ...]] = None + )->Optional[Dict]: # 创建参数对象 params = SearchParams( title_types=title_types, @@ -342,7 +344,7 @@ class ImdbHelper: ranked=ranked, interests=interests ) - sha256 = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4' + sha256 = '81b46290a78cc1e8b3d713e6a43c191c55b4dccf3e1945d6b46668945846d832' self.__update_hash() if self._imdb_api_hash.get("AdvancedTitleSearch"): sha256 = self._imdb_api_hash["AdvancedTitleSearch"] @@ -359,7 +361,7 @@ class ImdbHelper: 'titleTypes': [], 'jobCategories': []} if search_state.pageinfo.get('endCursor'): last_cursor = search_state.pageinfo.get('endCursor') - # 这里实现基于上次结果的逻辑 + # 实现基于上次结果的逻辑 else: # 重新搜索 first_page = True @@ -382,11 +384,12 @@ class ImdbHelper: last_cursor: Optional[str] = None, ) -> Optional[Dict]: - variables = {"first": 50, + variables: Dict[str, Any] = {"first": 50, "locale": "en-US", "sortBy": params.sort_by, "sortOrder": params.sort_order, } + operation_name = 'AdvancedTitleSearch' if params.title_types: title_type_ids = [] for title_type in params.title_types: @@ -439,12 +442,20 @@ class ImdbHelper: if not first_page and last_cursor: variables["after"] = last_cursor - params = {"operationName": "AdvancedTitleSearch", + params = {"operationName": operation_name, "variables": variables} data = self.__request(params, sha256) if not data: return None - return data.get("advancedTitleSearch") + if 'error' in data: + error = data['error'] + if error: + logger.error(f"Error querying {operation_name}: {error.get('message')}") + if error.get('message') == 'PersistedQueryNotFound': + self.hash_status[operation_name] = False + return None + self.hash_status[operation_name] = True + return data.get('advancedTitleSearch') def __known_as(self, imdbid: str, sha256='48d4f7bfa73230fb550147bd4704d8050080e65fe2ad576da6276cac2330e446') -> Optional[List]: @@ -453,14 +464,23 @@ class ImdbHelper: :param imdbid: IMBd id :return: 别名列表 """ + operation_name = "TitleAkasPaginated" self.__update_hash() - if self._imdb_api_hash.get("TitleAkasPaginated"): - sha256 = self._imdb_api_hash["TitleAkasPaginated"] - params = {"operationName": "TitleAkasPaginated", + if self._imdb_api_hash.get(operation_name): + sha256 = self._imdb_api_hash[operation_name] + params = {"operationName": operation_name, "variables": {"const": imdbid, "first": 50, "locale": "en-US", "originalTitleText": False}} data = self.__request(params=params, sha256=sha256) if not data: return None + if 'error' in data: + error = data['error'] + if error: + logger.error(f"Error querying {operation_name} API: {error.get('message')}") + if error.get('message') == 'PersistedQueryNotFound': + self.hash_status[operation_name] = False + return None + self.hash_status[operation_name] = True if not data.get("data", {}).get("title", {}).get("akas", {}).get("total"): return None akas = [] @@ -633,6 +653,7 @@ class ImdbHelper: _tv_info["seasons"] = tv_extra_info["seasons"] _tv_info["episodes"] = tv_extra_info["episodes"] return True + return False tvs = self.search_tvs(title=name) if (tvs is None) or (len(tvs) == 0): @@ -660,6 +681,7 @@ class ImdbHelper: continue if __season_match(_tv_info=tv_info, _season_year=season_year): return tv_info + return None def get_info(self, mtype: MediaType,