diff --git a/package.v2.json b/package.v2.json index b44fc48..0bf3f8c 100644 --- a/package.v2.json +++ b/package.v2.json @@ -435,11 +435,12 @@ "name": "IMDb源", "description": "让探索,推荐和媒体识别支持IMDb数据源。", "labels": "探索", - "version": "1.5.2", + "version": "1.5.3", "icon": "IMDb_IOS-OSX_App.png", "author": "wumode", "level": 1, "history": { + "v1.5.3": "异步执行; 修复 bugs (主程序版本需要高于 2.6.8)", "v1.5.2": "修复一些bugs", "v1.5.1": "改进媒体id转换; 支持二级分类和自定义推荐", "v1.5.0": "支持媒体识别", diff --git a/plugins.v2/imdbsource/__init__.py b/plugins.v2/imdbsource/__init__.py index c0a4ccd..22847fc 100644 --- a/plugins.v2/imdbsource/__init__.py +++ b/plugins.v2/imdbsource/__init__.py @@ -1,4 +1,4 @@ -from typing import Optional, Any, List, Dict, Tuple +from typing import Any, Callable, Coroutine, Dict, Optional, List, Tuple from datetime import datetime import re import urllib.parse @@ -14,7 +14,7 @@ from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, from app.schemas.types import ChainEventType from app.plugins.imdbsource.imdbhelper import ImdbHelper from app import schemas -from app.utils.http import RequestUtils +from app.utils.http import AsyncRequestUtils, RequestUtils from app.schemas.types import MediaType from app.core.meta import MetaBase from app.core.context import MediaInfo @@ -29,7 +29,7 @@ class ImdbSource(_PluginBase): # 插件图标 plugin_icon = "IMDb_IOS-OSX_App.png" # 插件版本 - plugin_version = "1.5.2" + plugin_version = "1.5.3" # 插件作者 plugin_author = "wumode" # 作者主页 @@ -48,14 +48,14 @@ class ImdbSource(_PluginBase): _recognize_media: bool = False _interests: List[str] = [] _component_size: str = 'medium' - _recognition_mode = 'auxiliary' + _recognition_mode: str = 'auxiliary' # 私有属性 - _imdb_helper = None - _cache = {"discover": [], "trending": [], "imdb_top_250": [], "staff_picks": {}} - _img_proxy_prefix = '' + _imdb_helper: Optional[ImdbHelper] = None + _cache: Dict[str, Any] = {"discover": [], "trending": [], "imdb_top_250": [], "staff_picks": {}} + _img_proxy_prefix: str = '' _scheduler: Optional[BackgroundScheduler] = None - _original_method = None + _original_method: Optional[Callable[..., Coroutine[Any, Any, Optional[MediaInfo]]]] = None def init_plugin(self, config: dict = None): @@ -78,6 +78,23 @@ class ImdbSource(_PluginBase): return plugin_instance.recognize_media(meta, mtype) return result + async def patched_async_recognize_media(chain_self, meta: MetaBase = None, + mtype: Optional[MediaType] = None, + tmdbid: Optional[int] = None, + doubanid: Optional[str] = None, + bangumiid: Optional[int] = None, + episode_group: Optional[str] = None, + cache: bool = True): + # 调用原始方法 + if not plugin_instance._original_method: + return None + result = await plugin_instance._original_method(chain_self, meta, mtype, tmdbid, doubanid, bangumiid, + episode_group, cache) + if result is None and plugin_instance._enabled and plugin_instance._recognize_media: + logger.info(f"通过插件 {plugin_instance.plugin_name} 执行:async_recognize_media ...") + return await plugin_instance.async_recognize_media(meta, mtype) + return result + # 给 patch 函数加唯一标记 patched_recognize_media._patched_by = id(self) # 保存原始方法 @@ -85,6 +102,12 @@ class ImdbSource(_PluginBase): ChainBase.recognize_media._patched_by == id(self)): self._original_method = getattr(ChainBase, "recognize_media", None) + patched_async_recognize_media._patched_by = id(self) + # 保存原始方法 + if not (hasattr(ChainBase.async_recognize_media, "_patched_by") and + ChainBase.async_recognize_media._patched_by == id(self)): + self._original_method = getattr(ChainBase, "async_recognize_media", None) + if config: self._enabled = config.get("enabled") self._proxy = config.get("proxy") @@ -113,12 +136,21 @@ class ImdbSource(_PluginBase): if not (hasattr(ChainBase.recognize_media, "_patched_by") and ChainBase.recognize_media._patched_by == id(self)): ChainBase.recognize_media = patched_recognize_media + # 替换 ChainBase.async_recognize_media + if not (hasattr(ChainBase.async_recognize_media, "_patched_by") and + ChainBase.async_recognize_media._patched_by == id(self)): + ChainBase.async_recognize_media = patched_async_recognize_media else: # 恢复 ChainBase.recognize_media if (hasattr(ChainBase.recognize_media, "_patched_by") and ChainBase.recognize_media._patched_by == id(self) and self._original_method): ChainBase.recognize_media = self._original_method + # 恢复 ChainBase.async_recognize_media + if (hasattr(ChainBase.async_recognize_media, "_patched_by") and + ChainBase.async_recognize_media._patched_by == id(self) and + self._original_method): + ChainBase.async_recognize_media = self._original_method else: self.stop_service() @@ -712,6 +744,10 @@ class ImdbSource(_PluginBase): ChainBase.recognize_media._patched_by == id(self) and self._original_method): ChainBase.recognize_media = self._original_method + if (hasattr(ChainBase.async_recognize_media, "_patched_by") and + ChainBase.async_recognize_media._patched_by == id(self) and + self._original_method): + ChainBase.async_recognize_media = self._original_method def get_module(self) -> Dict[str, Any]: """ @@ -723,6 +759,7 @@ class ImdbSource(_PluginBase): """ modules = {} if self._recognize_media and self._recognition_mode == 'hijacking': + modules['async_recognize_media'] = self.async_recognize_media modules['recognize_media'] = self.recognize_media return modules @@ -745,7 +782,7 @@ class ImdbSource(_PluginBase): if entries: imdb_items = self._imdb_helper.vertical_list_page_items( titles=[entry.get('ttconst', '') for entry in entries], - names=[item for entry in entries for item in entry.get("relatedconst", [])], + names=[item for entry in entries for item in entry.get("relatedconst", []) if item.startswith("nm")], images=[entry.get('rmconst', '') for entry in entries], ) if not entries or not imdb_items: @@ -756,9 +793,11 @@ class ImdbSource(_PluginBase): title = "" if movie_info.get("titleText"): title = movie_info.get("titleText", {}).get("text", "") - release_year = 0 + release_year = '' if movie_info.get("releaseYear"): release_year = movie_info.get("releaseYear", {}).get("year") + if not release_year and movie_info.get("releaseDate"): + release_year = movie_info.get("releaseDate", {}).get("year") poster_path = None if movie_info.get("primaryImage"): primary_image = movie_info.get("primaryImage").get("url") @@ -773,12 +812,13 @@ class ImdbSource(_PluginBase): runtime = movie_info.get("runtime").get("seconds") overview = '' if movie_info.get("plot"): - overview = movie_info.get("plot").get("plotText").get("plainText") + if movie_info.get("plot", {}).get("plotText"): + overview = movie_info.get("plot").get("plotText").get("plainText") return schemas.MediaInfo( type="电影", title=title, year=f'{release_year}', - title_year=f"{title} ({release_year})", + title_year=f"{title} ({release_year})" if release_year else title, mediaid_prefix="imdb", media_id=str(movie_info.get("id")), poster_path=poster_path, @@ -792,9 +832,11 @@ class ImdbSource(_PluginBase): title = "" if series_info.get("titleText"): title = series_info.get("titleText", {}).get("text", "") - release_year = 0 + release_year = '' if series_info.get("releaseYear"): release_year = series_info.get("releaseYear", {}).get("year") + if not release_year and series_info.get("releaseDate"): + release_year = series_info.get("releaseDate", {}).get("year") poster_path = None if series_info.get("primaryImage"): primary_image = series_info.get("primaryImage").get("url") @@ -809,7 +851,7 @@ class ImdbSource(_PluginBase): runtime = series_info.get("runtime").get("seconds") overview = '' if series_info.get("plot"): - if series_info.get("plot").get("plotText"): + if series_info.get("plot", {}).get("plotText"): overview = series_info.get("plot").get("plotText").get("plainText") release_date_str = '0000-00-00' if series_info.get("releaseDate"): @@ -819,7 +861,7 @@ class ImdbSource(_PluginBase): type="电视剧", title=title, year=f'{release_year}', - title_year=f"{title} ({release_year})", + title_year=f"{title} ({release_year})" if release_year else title, mediaid_prefix="imdb", media_id=str(series_info.get("id")), release_date=release_date_str, @@ -840,7 +882,7 @@ class ImdbSource(_PluginBase): return True return False - def trending(self, interest: str, page: int = 1) -> List[schemas.MediaInfo]: + async def trending(self, interest: str, page: int = 1) -> List[schemas.MediaInfo]: if not self._imdb_helper: return [] if interest not in self._imdb_helper.interest_id: @@ -859,7 +901,7 @@ class ImdbSource(_PluginBase): results.extend(self._cache[interest]) remaining = count - len(results) self._cache[interest] = [] # 清空缓存 - data = self._imdb_helper.advanced_title_search(first_page=first_page, + data = await self._imdb_helper.async_advanced_title_search(first_page=first_page, title_types=title_types, sort_by="POPULARITY", sort_order="ASC", @@ -882,7 +924,7 @@ class ImdbSource(_PluginBase): res.append(self.__series_to_media(item.get('node').get("title"))) return res - def imdb_top_250(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + async def imdb_top_250(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: if not self._imdb_helper: return [] title_types = ("movie",) @@ -898,12 +940,13 @@ class ImdbSource(_PluginBase): results.extend(self._cache["imdb_top_250"]) remaining = count - len(results) self._cache["imdb_top_250"] = [] # 清空缓存 - data = self._imdb_helper.advanced_title_search(first_page=first_page, - title_types=title_types, - sort_by="USER_RATING", - sort_order="DESC", - ranked=("TOP_RATED_MOVIES-250",) - ) + data = await self._imdb_helper.async_advanced_title_search( + first_page=first_page, + title_types=title_types, + sort_by="USER_RATING", + sort_order="DESC", + ranked=("TOP_RATED_MOVIES-250",) + ) if not data: new_results = [] else: @@ -919,7 +962,7 @@ class ImdbSource(_PluginBase): res.append(self.__movie_to_media(item.get('node').get("title"))) return res - def imdb_trending(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + async def imdb_trending(self, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: if not self._imdb_helper: return [] title_types = ("tvSeries", "tvMiniSeries", "tvShort", 'movie') @@ -935,11 +978,12 @@ class ImdbSource(_PluginBase): results.extend(self._cache["discover"]) remaining = count - len(results) self._cache["discover"] = [] # 清空缓存 - data = self._imdb_helper.advanced_title_search(first_page=first_page, - title_types=title_types, - sort_by="POPULARITY", - sort_order="ASC", - ) + data = await self._imdb_helper.async_advanced_title_search( + first_page=first_page, + title_types=title_types, + sort_by="POPULARITY", + sort_order="ASC", + ) if not data: new_results = [] else: @@ -957,18 +1001,18 @@ class ImdbSource(_PluginBase): res.append(self.__series_to_media(item.get('node').get("title"))) return res - def imdb_discover(self, mtype: str = "series", - country: str = None, - lang: str = None, - genre: str = None, - sort_by: str = 'POPULARITY', - sort_order: str = 'DESC', - using_rating: bool = False, - user_rating: str = None, - year: str = None, - award: str = None, - ranked_list: str = None, - page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + async def imdb_discover(self, mtype: str = "series", + country: str = None, + lang: str = None, + genre: str = None, + sort_by: str = 'POPULARITY', + sort_order: str = 'DESC', + using_rating: bool = False, + user_rating: str = None, + year: str = None, + award: str = None, + ranked_list: str = None, + page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: if not self._imdb_helper: return [] @@ -1037,19 +1081,20 @@ class ImdbSource(_PluginBase): else: results.extend(self._cache["discover"]) remaining = count - len(results) - self._cache["discover"] = [] # 清空缓存 - data = self._imdb_helper.advanced_title_search(first_page=first_page, - title_types=title_type, - genres=genres, - sort_by=sort_by, - sort_order=sort_order, - rating_min=user_rating, - countries=countries, - languages=languages, - release_date_end=release_date_end, - release_date_start=release_date_start, - award_constraint=awards, - ranked=ranked_lists) + self._cache["discover"] = [] + data = await self._imdb_helper.async_advanced_title_search( + first_page=first_page, + title_types=title_type, + genres=genres, + sort_by=sort_by, + sort_order=sort_order, + rating_min=user_rating, + countries=countries, + languages=languages, + release_date_end=release_date_end, + release_date_start=release_date_start, + award_constraint=awards, + ranked=ranked_lists) if not data: new_results = [] else: @@ -1682,7 +1727,7 @@ class ImdbSource(_PluginBase): event_data.extra_sources.append(imdb_source) @eventmanager.register(ChainEventType.MediaRecognizeConvert) - def media_recognize_covert(self, event: Event) -> Optional[dict]: + async def async_media_recognize_covert(self, event: Event) -> Optional[dict]: if not self._enabled: return event_data: MediaRecognizeConvertEventData = event.event_data @@ -1693,7 +1738,7 @@ class ImdbSource(_PluginBase): if not event_data.mediaid.startswith("imdb"): return imdb_id = event_data.mediaid[5:] - tmdb_id = ImdbSource.imdb_to_tmdb(imdb_id) + tmdb_id = await self.async_imdb_to_tmdb(imdb_id) if tmdb_id is not None: event_data.media_dict["id"] = tmdb_id @@ -1783,7 +1828,68 @@ class ImdbSource(_PluginBase): if info: info = self._imdb_helper.update_info(info.get('id'), info=info) or info mediainfo = ImdbSource._convert_mediainfo(info) - mediainfo.tmdb_id = ImdbSource.imdb_to_tmdb(info.get('id'), mediainfo) + mediainfo.tmdb_id = self.imdb_to_tmdb(info.get('id'), mediainfo) + cat = ImdbHelper.get_category(info.get('media_type'), info) + mediainfo.set_category(cat) + logger.info(f"{meta.name} IMDb 识别结果:{mediainfo.type.value} " + f"{mediainfo.title_year} " + f"{mediainfo.imdb_id}") + return mediainfo + return None + + async def async_recognize_media(self, meta: MetaBase = None, + mtype: MediaType = None, + **kwargs) -> Optional[MediaInfo]: + """ + 异步识别媒体信息 + :param meta: 识别的元数据 + :param mtype: 识别的媒体类型 + :return: 识别的媒体信息,包括剧集信息 + """ + if not self._enabled: + return None + if not meta: + return None + elif not meta.name: + logger.warn("识别媒体信息时未提供元数据名称") + return None + else: + if mtype: + meta.type = mtype + info = {} + # 简体名称 + zh_name = zhconv.convert(meta.cn_name, 'zh-hans') if meta.cn_name else None + names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k])) + for name in names: + if meta.begin_season: + logger.info(f"正在识别 {name} 第{meta.begin_season}季 ...") + else: + logger.info(f"正在识别 {name} ...") + if meta.type == MediaType.UNKNOWN and not meta.year: + info = await self._imdb_helper.async_match_by(name) + else: + if meta.type == MediaType.TV: + info = await self._imdb_helper.async_match(name=name, year=meta.year, mtype=meta.type, + season_year=meta.year, + season_number=meta.begin_season) + if not info: + # 去掉年份再查一次 + info = await self._imdb_helper.async_match(name=name, mtype=meta.type) + else: + # 有年份先按电影查 + info = await self._imdb_helper.async_match(name=name, year=meta.year, mtype=MediaType.MOVIE) + # 没有再按电视剧查 + if not info: + info = await self._imdb_helper.async_match(name=name, year=meta.year, mtype=MediaType.TV) + if not info: + # 去掉年份和类型再查一次 + info = await self._imdb_helper.async_match_by(name=name) + if info: + break + if info: + info = await self._imdb_helper.async_update_info(info.get('id'), info=info) or info + mediainfo = ImdbSource._convert_mediainfo(info) + mediainfo.tmdb_id = await self.async_imdb_to_tmdb(info.get('id'), mediainfo) cat = ImdbHelper.get_category(info.get('media_type'), info) mediainfo.set_category(cat) logger.info(f"{meta.name} IMDb 识别结果:{mediainfo.type.value} " @@ -1832,16 +1938,7 @@ class ImdbSource(_PluginBase): return mediainfo @staticmethod - def imdb_to_tmdb(imdb_id: str, media_info: Optional[MediaInfo] = None) -> Optional[int]: - api_key = settings.TMDB_API_KEY - api_url = ( - f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}" - f"?api_key={api_key}&external_source=imdb_id" - ) - ret = RequestUtils(accept_type="application/json").get_res(api_url) - if not ret: - return None - data = ret.json() + def _match_results(data: dict, media_info: Optional[MediaInfo] = None) -> Optional[int]: # 合并两种结果 all_results = [] for key in ["movie_results", "tv_results"]: @@ -1899,3 +1996,27 @@ class ImdbSource(_PluginBase): # 最终按人气返回 most_popular = pick_most_popular(filtered) return most_popular.get("id") if most_popular else None + + def imdb_to_tmdb(self, imdb_id: str, media_info: Optional[MediaInfo] = None) -> Optional[int]: + api_key = settings.TMDB_API_KEY + api_url = ( + f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}" + f"?api_key={api_key}&external_source=imdb_id" + ) + data = RequestUtils(accept_type="application/json", proxies=settings.PROXY if self._proxy else None + ).get_json(api_url) + if not data: + return None + return self._match_results(data, media_info) + + async def async_imdb_to_tmdb(self, imdb_id: str, media_info: Optional[MediaInfo] = None) -> Optional[int]: + api_key = settings.TMDB_API_KEY + api_url = ( + f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}" + f"?api_key={api_key}&external_source=imdb_id" + ) + data = await AsyncRequestUtils(accept_type="application/json", proxies=settings.PROXY if self._proxy else None + ).get_json(api_url) + if not data: + return None + return self._match_results(data, media_info) diff --git a/plugins.v2/imdbsource/imdbhelper.py b/plugins.v2/imdbsource/imdbhelper.py index 172173b..42f5eed 100644 --- a/plugins.v2/imdbsource/imdbhelper.py +++ b/plugins.v2/imdbsource/imdbhelper.py @@ -1,3 +1,4 @@ +import asyncio import re from json import JSONDecodeError from typing import Optional, Any, Dict, Tuple, List, Union @@ -6,11 +7,12 @@ from dataclasses import dataclass import json import base64 +import httpx import requests from app.core.config import settings from app.log import logger -from app.utils.http import RequestUtils +from app.utils.http import RequestUtils, AsyncRequestUtils from app.utils.string import StringUtils from app.utils.common import retry from app.core.cache import cached @@ -19,6 +21,7 @@ from app.schemas.types import MediaType @dataclass(frozen=True) class SearchParams: + title_types: Optional[Tuple[str, ...]] = None genres: Optional[Tuple[str, ...]] = None sort_by: str = 'POPULARITY' @@ -35,6 +38,7 @@ class SearchParams: class SearchState: + def __init__(self, pageinfo: dict, total: int): self.pageinfo = pageinfo self.total = total @@ -47,14 +51,18 @@ class ImdbHelper: interests = {'Action': {'Action': 'in0000001', 'Action Epic': 'in0000002', 'B-Action': 'in0000003', 'Car Action': 'in0000004', 'Disaster': 'in0000005', 'Gun Fu': 'in0000197', 'Kung Fu': 'in0000198', 'Martial Arts': 'in0000006', 'One-Person Army Action': 'in0000007', 'Samurai': 'in0000199', 'Superhero': 'in0000008', 'Sword & Sandal': 'in0000009', 'War': 'in0000010', 'War Epic': 'in0000011', 'Wuxia': 'in0000200'}, 'Adventure': {'Adventure': 'in0000012', 'Adventure Epic': 'in0000015', 'Desert Adventure': 'in0000013', 'Dinosaur Adventure': 'in0000014', 'Globetrotting Adventure': 'in0000016', 'Jungle Adventure': 'in0000017', 'Mountain Adventure': 'in0000018', 'Quest': 'in0000019', 'Road Trip': 'in0000020', 'Sea Adventure': 'in0000021', 'Swashbuckler': 'in0000022', 'Teen Adventure': 'in0000023', 'Urban Adventure': 'in0000024'}, 'Animation': {'Adult Animation': 'in0000025', 'Animation': 'in0000026', 'Computer Animation': 'in0000028', 'Hand-Drawn Animation': 'in0000029', 'Stop Motion Animation': 'in0000030'}, 'Anime': {'Anime': 'in0000027', 'Isekai': 'in0000201', 'Iyashikei': 'in0000202', 'Josei': 'in0000203', 'Mecha': 'in0000204', 'Seinen': 'in0000205', 'Shōjo': 'in0000207', 'Shōnen': 'in0000206', 'Slice of Life': 'in0000208'}, 'Comedy': {'Body Swap Comedy': 'in0000031', 'Buddy Comedy': 'in0000032', 'Buddy Cop': 'in0000033', 'Comedy': 'in0000034', 'Dark Comedy': 'in0000035', 'Farce': 'in0000036', 'High-Concept Comedy': 'in0000037', 'Mockumentary': 'in0000038', 'Parody': 'in0000039', 'Quirky Comedy': 'in0000040', 'Raunchy Comedy': 'in0000041', 'Satire': 'in0000042', 'Screwball Comedy': 'in0000043', 'Sitcom': 'in0000044', 'Sketch Comedy': 'in0000045', 'Slapstick': 'in0000046', 'Stand-Up': 'in0000047', 'Stoner Comedy': 'in0000048', 'Teen Comedy': 'in0000049'}, 'Crime': {'Caper': 'in0000050', 'Cop Drama': 'in0000051', 'Crime': 'in0000052', 'Drug Crime': 'in0000053', 'Film Noir': 'in0000054', 'Gangster': 'in0000055', 'Heist': 'in0000056', 'Police Procedural': 'in0000057', 'True Crime': 'in0000058'}, 'Documentary': {'Crime Documentary': 'in0000059', 'Documentary': 'in0000060', 'Docuseries': 'in0000061', 'Faith & Spirituality Documentary': 'in0000062', 'Food Documentary': 'in0000063', 'History Documentary': 'in0000064', 'Military Documentary': 'in0000065', 'Music Documentary': 'in0000066', 'Nature Documentary': 'in0000067', 'Political Documentary': 'in0000068', 'Science & Technology Documentary': 'in0000069', 'Sports Documentary': 'in0000070', 'Travel Documentary': 'in0000071'}, 'Drama': {'Biography': 'in0000072', 'Coming-of-Age': 'in0000073', 'Costume Drama': 'in0000074', 'Docudrama': 'in0000075', 'Drama': 'in0000076', 'Epic': 'in0000077', 'Financial Drama': 'in0000078', 'Historical Epic': 'in0000079', 'History': 'in0000080', 'Korean Drama': 'in0000209', 'Legal Drama': 'in0000081', 'Medical Drama': 'in0000082', 'Period Drama': 'in0000083', 'Political Drama': 'in0000084', 'Prison Drama': 'in0000085', 'Psychological Drama': 'in0000086', 'Showbiz Drama': 'in0000087', 'Soap Opera': 'in0000088', 'Teen Drama': 'in0000089', 'Telenovela': 'in0000210', 'Tragedy': 'in0000090', 'Workplace Drama': 'in0000091'}, 'Family': {'Animal Adventure': 'in0000092', 'Family': 'in0000093'}, 'Fantasy': {'Dark Fantasy': 'in0000095', 'Fairy Tale': 'in0000097', 'Fantasy': 'in0000098', 'Fantasy Epic': 'in0000096', 'Supernatural Fantasy': 'in0000099', 'Sword & Sorcery': 'in0000100', 'Teen Fantasy': 'in0000101'}, 'Game Show': {'Beauty Competition': 'in0000102', 'Cooking Competition': 'in0000103', 'Game Show': 'in0000105', 'Quiz Show': 'in0000104', 'Survival Competition': 'in0000106', 'Talent Competition': 'in0000107'}, 'Horror': {'B-Horror': 'in0000108', 'Body Horror': 'in0000109', 'Folk Horror': 'in0000110', 'Found Footage Horror': 'in0000111', 'Horror': 'in0000112', 'Monster Horror': 'in0000113', 'Psychological Horror': 'in0000114', 'Slasher Horror': 'in0000115', 'Splatter Horror': 'in0000116', 'Supernatural Horror': 'in0000117', 'Teen Horror': 'in0000118', 'Vampire Horror': 'in0000119', 'Werewolf Horror': 'in0000120', 'Witch Horror': 'in0000121', 'Zombie Horror': 'in0000122'}, 'Lifestyle': {'Beauty Makeover': 'in0000123', 'Cooking & Food': 'in0000124', 'Home Improvement': 'in0000125', 'Lifestyle': 'in0000126', 'News': 'in0000211', 'Talk Show': 'in0000127', 'Travel': 'in0000128'}, 'Music': {'Concert': 'in0000129', 'Music': 'in0000130'}, 'Musical': {'Classic Musical': 'in0000131', 'Jukebox Musical': 'in0000132', 'Musical': 'in0000133', 'Pop Musical': 'in0000134', 'Rock Musical': 'in0000135'}, 'Mystery': {'Bumbling Detective': 'in0000136', 'Cozy Mystery': 'in0000137', 'Hard-boiled Detective': 'in0000138', 'Mystery': 'in0000139', 'Suspense Mystery': 'in0000140', 'Whodunnit': 'in0000141'}, 'Reality TV': {'Business Reality TV': 'in0000142', 'Crime Reality TV': 'in0000143', 'Dating Reality TV': 'in0000144', 'Docusoap Reality TV': 'in0000145', 'Hidden Camera': 'in0000146', 'Paranormal Reality TV': 'in0000147', 'Reality TV': 'in0000148'}, 'Romance': {'Dark Romance': 'in0000149', 'Feel-Good Romance': 'in0000151', 'Romance': 'in0000152', 'Romantic Comedy': 'in0000153', 'Romantic Epic': 'in0000150', 'Steamy Romance': 'in0000154', 'Teen Romance': 'in0000155', 'Tragic Romance': 'in0000156'}, 'Sci-Fi': {'Alien Invasion': 'in0000157', 'Artificial Intelligence': 'in0000158', 'Cyberpunk': 'in0000159', 'Dystopian Sci-Fi': 'in0000160', 'Kaiju': 'in0000161', 'Sci-Fi': 'in0000162', 'Sci-Fi Epic': 'in0000163', 'Space Sci-Fi': 'in0000164', 'Steampunk': 'in0000165', 'Time Travel': 'in0000166'}, 'Seasonal': {'Holiday': 'in0000192', 'Holiday Animation': 'in0000193', 'Holiday Comedy': 'in0000194', 'Holiday Family': 'in0000195', 'Holiday Romance': 'in0000196'}, 'Short': {'Short': 'in0000212'}, 'Sport': {'Baseball': 'in0000167', 'Basketball': 'in0000168', 'Boxing': 'in0000169', 'Extreme Sport': 'in0000170', 'Football': 'in0000171', 'Motorsport': 'in0000172', 'Soccer': 'in0000173', 'Sport': 'in0000174', 'Water Sport': 'in0000175'}, 'Thriller': {'Conspiracy Thriller': 'in0000176', 'Cyber Thriller': 'in0000177', 'Erotic Thriller': 'in0000178', 'Giallo': 'in0000179', 'Legal Thriller': 'in0000180', 'Political Thriller': 'in0000181', 'Psychological Thriller': 'in0000182', 'Serial Killer': 'in0000183', 'Spy': 'in0000184', 'Survival': 'in0000185', 'Thriller': 'in0000186'}, 'Western': {'Classical Western': 'in0000187', 'Contemporary Western': 'in0000188', 'Spaghetti Western': 'in0000190', 'Western': 'in0000191', 'Western Epic': 'in0000189'}} _official_endpoint = "https://caching.graphql.imdb.com/" _imdb_headers = { - "Accept": "text/html,application/json,text/plain,*/*", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36", - "Referer": "https://www.imdb.com/", + "Accept": "text/html,application/json", + "User-Agent": settings.NORMAL_USER_AGENT, } _free_api = "https://api.imdbapi.dev" def __init__(self, proxies=None): self._proxies = proxies + + proxy_url = None + if proxies: + proxy_url = proxies.get("https") or proxies.get("http") + self._imdb_req = RequestUtils(accept_type="application/json", content_type="application/json", headers=self._imdb_headers, @@ -62,8 +70,22 @@ class ImdbHelper: proxies=proxies, session=requests.Session()) self._free_imdb_req = RequestUtils(accept_type="application/json", proxies=proxies, session=requests.Session()) - self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None} - self.hash_status = {"AdvancedTitleSearch": False, "TitleAkasPaginated": False} + + self._imdb_client = httpx.AsyncClient(timeout=10, proxy=proxy_url, headers=self._imdb_headers) + self._async_imdb_req = AsyncRequestUtils( + accept_type="application/json", + content_type="application/json", + client=self._imdb_client + ) + + self._free_api_client = httpx.AsyncClient(timeout=10, proxy=proxy_url) + self._async_free_api_req = AsyncRequestUtils( + accept_type="application/json", + client=self._free_api_client + ) + + self._imdb_api_hash = {"AdvancedTitleSearch": None} + self.hash_status = {"AdvancedTitleSearch": False} self._search_states = OrderedDict() self._max_states = 30 self.interest_id = {} @@ -72,31 +94,42 @@ class ImdbHelper: self.interest_id[name] = in_id @retry(Exception, logger=logger) - @cached(maxsize=32, ttl=1800) - def __query_graphql (self, query: str, variables: Dict[str, Any]) -> Optional[Dict]: + @cached(maxsize=128, ttl=1800) + def _query_graphql(self, query: str, variables: Dict[str, Any]) -> Optional[Dict]: params = {'query': query, 'variables': variables} - ret = self._imdb_req.post_res(f"{self._official_endpoint}", json=params, raise_exception=True) - if not ret: + data = RequestUtils(proxies=self._proxies, headers=self._imdb_headers, timeout=10).post_json( + f"{self._official_endpoint}", json=params, raise_exception=True) + if not data: return None - data = ret.json() if "errors" in data: error = data.get("errors")[0] if data.get("errors") else {} return {'error': error} return data.get("data") @retry(Exception, logger=logger) - @cached(maxsize=32, ttl=1800) - def __request(self, params: Dict, sha256) -> Optional[Dict]: - params["extensions"] = {"persistedQuery": {"sha256Hash": sha256, "version": 1}} - ret = self._imdb_req.post_res(f"{self._official_endpoint}", json=params, raise_exception=True) - if not ret: + @cached(maxsize=128, ttl=1800) + async def _async_query_graphql(self, query: str, variables: Dict[str, Any]) -> Optional[Dict]: + params = {'query': query, 'variables': variables} + data = await self._async_imdb_req.post_json(f"{self._official_endpoint}", json=params, raise_exception=True) + if not data: + return None + if "errors" in data: + error = data.get("errors")[0] if data.get("errors") else {} + return {'error': error} + return data.get("data") + + @retry(Exception, logger=logger) + @cached(maxsize=128, ttl=1800) + async def _async_request(self, params: Dict, sha256) -> Optional[Dict]: + params["extensions"] = {"persistedQuery": {"sha256Hash": sha256, "version": 1}} + data = await self._async_imdb_req.post_json(f"{self._official_endpoint}", json=params, raise_exception=True) + if not data: return None - data = ret.json() if "errors" in data: error = data.get("errors")[0] if data.get("errors") else {} if error and error.get("message") == 'PersistedQueryNotFound': logger.warn(f"PersistedQuery hash has expired, trying to update...") - self.__get_hash.cache_clear() + self._async_fetch_hash.cache_clear() return {'error': error} return data.get("data") @@ -127,12 +160,36 @@ class ImdbHelper: return None return json_text + async def async_fetch_github_file(self, repo: str, owner: str, file_path: str, branch: str = None) -> Optional[str]: + """ + 异步从GitHub仓库获取指定文本文件内容 + :param repo: 仓库名称 + :param owner: 仓库所有者 + :param file_path: 文件路径(相对于仓库根目录) + :param branch: 分支名称,默认为 None(使用默认分支) + :return: 文件内容字符串,若获取失败则返回 None + """ + api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}" + if branch: + api_url = f"{api_url}?ref={branch}" + response = await AsyncRequestUtils(headers=settings.GITHUB_HEADERS, proxies=self._proxies).get_res(api_url) + if not response or response.status_code != 200: + return None + try: + data = response.json() + content_base64 = data['content'] + json_bytes = base64.b64decode(content_base64) + json_text = json_bytes.decode('utf-8') + except (TypeError, ValueError, KeyError, UnicodeDecodeError): + return None + return json_text + @cached(maxsize=1, ttl=6 * 3600) - def __get_hash(self) -> Optional[dict]: + async def _async_fetch_hash(self) -> Optional[dict]: """ - 获取 IMDb hash + 异步获取 IMDb hash """ - res = self.get_github_file( + res = await self.async_fetch_github_file( 'MoviePilot-Plugins', 'wumode', 'plugins.v2/imdbsource/imdb_hash.json', @@ -167,13 +224,12 @@ class ImdbHelper: return None return json_data - def __update_hash(self, force: bool = False) -> None: + async def _async_update_hash(self, force: bool = False): if force: - self.__get_hash.cache_clear() - imdb_hash = self.__get_hash() + self._async_fetch_hash.cache_clear() + imdb_hash = await self._async_fetch_hash() if imdb_hash: self._imdb_api_hash["AdvancedTitleSearch"] = imdb_hash.get("AdvancedTitleSearch") - self._imdb_api_hash["TitleAkasPaginated"] = imdb_hash.get("TitleAkasPaginated") @staticmethod def __award_to_constraint(award: str) -> Optional[Dict]: @@ -213,7 +269,7 @@ class ImdbHelper: return None @staticmethod - def compare_names(file_name: str, names: Union[list,str]) -> bool: + def compare_names(file_name: str, names: Union[list, str]) -> bool: """ 比较文件名是否匹配,忽略大小写和特殊字符 :param file_name: 识别的文件名或者种子名 @@ -295,22 +351,22 @@ class ImdbHelper: return key return "" - def advanced_title_search(self, - first_page: bool = True, - title_types: Optional[Tuple[str, ...]] = None, - genres: Optional[Tuple[str, ...]] = None, - sort_by: str = 'POPULARITY', - sort_order: str = 'ASC', - rating_min: Optional[float] = None, - rating_max: Optional[float] = None, - countries: Optional[Tuple[str, ...]] = None, - languages: Optional[Tuple[str, ...]] = None, - release_date_end: Optional[str] = None, - release_date_start: Optional[str] = None, - award_constraint: Optional[Tuple[str, ...]] = None, - ranked: Optional[Tuple[str, ...]] = None, - interests: Optional[Tuple[str, ...]] = None - )->Optional[Dict]: + async def async_advanced_title_search(self, + first_page: bool = True, + title_types: Optional[Tuple[str, ...]] = None, + genres: Optional[Tuple[str, ...]] = None, + sort_by: str = 'POPULARITY', + sort_order: str = 'ASC', + rating_min: Optional[float] = None, + rating_max: Optional[float] = None, + countries: Optional[Tuple[str, ...]] = None, + languages: Optional[Tuple[str, ...]] = None, + release_date_end: Optional[str] = None, + release_date_start: Optional[str] = None, + award_constraint: Optional[Tuple[str, ...]] = None, + ranked: Optional[Tuple[str, ...]] = None, + interests: Optional[Tuple[str, ...]] = None + ) -> Optional[Dict]: # 创建参数对象 params = SearchParams( title_types=title_types, @@ -328,7 +384,7 @@ class ImdbHelper: interests=interests ) sha256 = '81b46290a78cc1e8b3d713e6a43c191c55b4dccf3e1945d6b46668945846d832' - self.__update_hash() + await self._async_update_hash() if self._imdb_api_hash.get("AdvancedTitleSearch"): sha256 = self._imdb_api_hash["AdvancedTitleSearch"] # 获取或创建搜索状态 @@ -350,28 +406,30 @@ class ImdbHelper: first_page = True else: first_page = True - result = self.__advanced_title_search(params, sha256, first_page, last_cursor) + result = await self._async_advanced_title_search(params, sha256, first_page, last_cursor) if result: page_info = result.get("pageInfo", {}) total = result.get("total", 0) search_state = SearchState(page_info, total) self._search_states[params] = search_state + if not page_info.get("hasNextPage"): + logger.debug('There seems to be no more results') if len(self._search_states) > self._max_states: self._search_states.popitem(last=False) # 移除最旧的条目 return result - def __advanced_title_search(self, - params: SearchParams, - sha256: str, - first_page: bool = True, - last_cursor: Optional[str] = None, - ) -> Optional[Dict]: + async def _async_advanced_title_search(self, + params: SearchParams, + sha256: str, + first_page: bool = True, + last_cursor: Optional[str] = None, + ) -> Optional[Dict]: variables: Dict[str, Any] = {"first": 50, - "locale": "en-US", - "sortBy": params.sort_by, - "sortOrder": params.sort_order, - } + "locale": "en-US", + "sortBy": params.sort_by, + "sortOrder": params.sort_order, + } operation_name = 'AdvancedTitleSearch' if params.title_types: title_type_ids = [] @@ -427,7 +485,7 @@ class ImdbHelper: params = {"operationName": operation_name, "variables": variables} - data = self.__request(params, sha256) + data = await self._async_request(params, sha256) if not data: return None if 'error' in data: @@ -465,6 +523,30 @@ class ImdbHelper: videos: Optional[List[str]] = None, is_registered: bool = False ) -> Optional[Dict[str, Any]]: + + query = "query VerticalListPageItems( $titles: [ID!]! $names: [ID!]! $images: [ID!]! $videos: [ID!]!) {\n titles(ids: $titles) { ...TitleParts meterRanking { currentRank meterType rankChange {changeDirection difference} } ratingsSummary { aggregateRating } }\n names(ids: $names) { ...NameParts }\n videos(ids: $videos) { ...VideoParts }\n images(ids: $images) { ...ImageParts }\n}\nfragment TitleParts on Title {\n id\n titleText { text }\n titleType { id }\n releaseYear { year }\n akas(first: 50) { edges { node { text country { id text } language { text text } } } }\n plot { plotText {plainText}}\n primaryImage { id url width height }\n}\nfragment NameParts on Name {\n id\n nameText { text }\n primaryImage { id url width height }\n}\nfragment ImageParts on Image {\n id\n height\n width\n url\n}\nfragment VideoParts on Video {\n id\n name { value }\n contentType { displayName { value } id }\n previewURLs { displayName { value } url videoDefinition videoMimeType }\n playbackURLs { displayName { value } url videoDefinition videoMimeType }\n thumbnail { height url width }\n}" + variables = {'images': images or [], + 'titles': titles or [], + 'names': names or [], + 'videos': videos or [], + 'isRegistered': is_registered, + } + data = self._query_graphql(query, variables) + if 'error' in data: + error = data['error'] + if error: + logger.error(f"Error querying VerticalListPageItems: {error}") + return None + return data + + @cached(maxsize=128, ttl=3600) + async def async_vertical_list_page_items(self, + titles: Optional[List[str]] = None, + names: Optional[List[str]] = None, + images: Optional[List[str]] = None, + videos: Optional[List[str]] = None, + is_registered: bool = False + ) -> Optional[Dict[str, Any]]: """ { 'titles': [ @@ -521,7 +603,7 @@ class ImdbHelper: 'videos': videos or [], 'isRegistered': is_registered, } - data = self.__query_graphql(query, variables) + data = await self._async_query_graphql(query, variables) if 'error' in data: error = data['error'] if error: @@ -543,40 +625,59 @@ class ImdbHelper: return None return r.json() - def advanced_search(self, query: str, media_types: Optional[List[str]] = None, start_year: Optional[int] = None, - end_year: Optional[int] = None, country_code: Optional[str] = None) -> Optional[list]: + @retry(Exception, logger=logger) + @cached(ttl=6 * 3600) + async def _async_free_imdb_api(self, path: str, params: Optional[dict] = None) -> Optional[dict]: + r = await self._async_free_api_req.get_res(url=f"{self._free_api}{path}", params=params, raise_exception=True) + if r is None: + return None + if r.status_code != 200: + try: + logger.warn(f"{path}: {r.json().get('message')}") + except requests.exceptions.JSONDecodeError: + return None + return None + return r.json() + + def search_titles(self, query: str, limit: Optional[int] = None) -> Optional[list]: """ Perform an advanced search for titles using a query string with additional filters. :param query: The search query for titles. - :param media_types: The type of titles to filter by. - MOVIE: Represents a movie title. - TV_SERIES: Represents a TV series title. - TV_MINI_SERIES: Represents a TV mini-series title. - TV_SPECIAL: Represents a TV special title. - TV_MOVIE: Represents a TV movie title. - SHORT: Represents a short title. - VIDEO: Represents a video title. - :param start_year: The start year for filtering titles. - :param end_year: The end year for filtering titles. - :param country_code: The country code for filtering titles. + :param limit: The maximum number of results to return. :return: Search results. See `curl -X 'GET' 'https://api.imdbapi.dev/search/titles?query=Kite' -H 'accept: application/json'` """ - endpoint = '/advancedSearch/titles' + endpoint = '/search/titles' params: Dict[str, Any] = {'query': query} - if media_types: - params['types'] = media_types - if start_year: - params['startYear'] = start_year - if end_year: - params['endYear'] = end_year - if country_code: - params['countryCode'] = country_code + if limit: + params['limit'] = limit r = self.__free_imdb_api(path=endpoint, params=params) if r is None: return None return r.get('titles') + def advanced_search(self, query: str, limit: Optional[int] = None, + media_types: Optional[List[str]] = None, + year: Optional[int] = None) -> Optional[list]: + """ + Perform an advanced search for titles using a query string with additional filters. + :param query: The search query for titles. + :param limit: The maximum number of results to return. + :param media_types: The type of titles to filter by. + :param year: The start year for filtering titles. + :return: Search results. + See `curl -X 'GET' 'https://api.imdbapi.dev/search/titles?query=Kite' -H 'accept: application/json'` + """ + + data = self.search_titles(query=query, limit=limit) + if data is None: + return None + if year: + data = [title for title in data if title.get('startYear') == year] + if media_types: + data = [title for title in data if title.get('type') in media_types] + return data + def details(self, title_id: str) -> Optional[dict]: """ Retrieve a title's details using its IMDb ID. @@ -588,7 +689,7 @@ class ImdbHelper: r = self.__free_imdb_api(path=endpoint % title_id) return r - def episodes(self, title_id: str, season: Optional[str]=None, + def episodes(self, title_id: str, season: Optional[str] = None, page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[dict]: """ Retrieve the episodes associated with a specific title. @@ -711,11 +812,12 @@ class ImdbHelper: mtypes = [MediaType.MOVIE, MediaType.TV] if not mtype else [mtype] search_types = [] if MediaType.TV in mtypes: - search_types.extend(['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL']) + search_types.extend(["tvSeries", "tvMiniSeries", "tvSpecial"]) if MediaType.MOVIE in mtypes: - search_types.extend(['MOVIE', 'TV_MOVIE']) + search_types.extend(['movie', 'tvMovie']) if year: - multi_res = self.advanced_search(query=name, start_year=int(year), end_year=int(year), media_types=search_types) + multi_res = self.advanced_search(query=name, year=int(year), + media_types=search_types) else: multi_res = self.advanced_search(query=name, media_types=search_types) ret_info = {} @@ -728,7 +830,7 @@ class ImdbHelper: key=lambda x: ('1' if x.get('type') in ['movie', 'tvMovie'] else '0') + (f"{x.get('startYear')}" or '0000'), reverse=True ) - items = self.vertical_list_page_items([ x.get('id') for x in multi_res]) + items = self.vertical_list_page_items([x.get('id') for x in multi_res]) titles = items.get('titles') if items else [] titles_dict = {} for title in titles: @@ -773,12 +875,12 @@ class ImdbHelper: return True return False - search_types = ['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL'] + search_types = ['tvSeries', 'tvMiniSeries', 'tvSpecial'] res = self.advanced_search(query=name, media_types=search_types) if not res: logger.debug(f"{name} 未找到季{season_number}相关信息!") return None - tvs = [r for r in res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) == MediaType.TV] + tvs = [r for r in res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) == MediaType.TV] tvs = sorted(tvs, key=lambda x: x.get('startYear') or 0, reverse=True) items = self.vertical_list_page_items([x.get('id') for x in tvs]) titles = items.get('titles') if items else [] @@ -853,4 +955,334 @@ class ImdbHelper: info['credits'] = self.credits(title_id, page_size=30) if info.get('media_type') == MediaType.TV and info.get('seasons') is None: info['seasons'] = self.__get_tv_seasons(info.get('id')) or {} - return info \ No newline at end of file + return info + + async def async_search_titles(self, query: str, limit: Optional[int] = None) -> Optional[list]: + """ + Perform an advanced search for titles using a query string with additional filters. + :param query: The search query for titles. + :param limit: The maximum number of results to return. + :return: Search results. + See `curl -X 'GET' 'https://api.imdbapi.dev/search/titles?query=Kite' -H 'accept: application/json'` + """ + endpoint = '/search/titles' + params: Dict[str, Any] = {'query': query} + if limit: + params['limit'] = limit + r = await self._async_free_imdb_api(path=endpoint, params=params) + if r is None: + return None + return r.get('titles') + + async def async_details(self, title_id: str) -> Optional[dict]: + """ + Retrieve a title's details using its IMDb ID. + :param title_id: IMDb title ID in the format "tt1234567". + :return: Details. + See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947' -H 'accept: application/json'` + """ + endpoint = '/titles/%s' + r = await self._async_free_imdb_api(path=endpoint % title_id) + return r + + async def async_episodes(self, title_id: str, season: Optional[str] = None, + page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[dict]: + """ + Retrieve the episodes associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :param season: The season number to filter episodes by. + :param page_size: The maximum number of episodes to return per page. + The value must be between 1 and 50. Default is 20. + :param page_token: Token for pagination, if applicable. + :return: Episodes. + See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947/episodes?season=1&pageSize=5' \ + -H 'accept: application/json'` + """ + endpoint = '/titles/%s/episodes' + param: Dict[str, Any] = {} + if season is not None: + param['season'] = season + if page_size is not None: + param['pageSize'] = page_size + if page_token is not None: + param['pageToken'] = page_token + r = await self._async_free_imdb_api(path=endpoint % title_id, params=param) + return r + + async def async_seasons(self, title_id: str) -> Optional[List[dict]]: + """ + Retrieve the seasons associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :return: Seasons. + """ + """ + {[{"season": "1", "episodeCount": 11}]} + """ + endpoint = '/titles/%s/seasons' + r = await self._async_free_imdb_api(path=endpoint % title_id) + if r is None: + return None + return r.get('seasons') + + async def async_credits(self, title_id: str, categories: Optional[List[str]] = None, + page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[dict]: + """ + Retrieve the credits associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :param categories: The categories to filter credits by. + DIRECTOR: The director category. + WRITER: The writer category. + CAST: The cast category, which includes all actors and actresses. + ACTOR: The actor category. + ACTRESS: The actress category. + :param page_size: The maximum number of episodes to return per page. + The value must be between 1 and 50. Default is 20. + :param page_token: Token for pagination, if applicable. + :return: Credits. + See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947/credits?categories=CAST' \ + -H 'accept: application/json'` + """ + endpoint = '/titles/%s/credits' + param: Dict[str, Any] = {} + if categories: + param['categories'] = categories + if page_size is not None: + param['pageSize'] = page_size + if page_token is not None: + param['pageToken'] = page_token + r = await self._async_free_imdb_api(path=endpoint % title_id, params=param) or {} + return r.get('credits') + + async def async_akas(self, title_id: str) -> Optional[list]: + """ + Retrieve the alternative titles (AKAs) associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :return: AKAs. + [{ + "text": "Kite Festival of Love", + "country": { + "code": "CA", + "name": "Canada" + }, + "language": { + "code": "fra", + "name": "French" + } + },] + """ + endpoint = '/titles/%s/akas' + r = await self._async_free_imdb_api(path=endpoint % title_id) + if r is None: + return None + return r.get('akas') + + async def async_get_tv_seasons(self, title_id: str) -> Optional[Dict[str, Any]]: + seasons = await self.async_seasons(title_id) + if not seasons: + return None + seasons_dict = {season.get('season'): {**season, 'episode_count': 0, 'air_date': '0000-00-00'} + for season in seasons} + page_token = None + while True: + episodes = await self.async_episodes(title_id, page_size=50, page_token=page_token) or {} + for episode in episodes.get('episodes', []): + s = episode.get('season') + seasons_dict[s]['episode_count'] += 1 + if not seasons_dict[s].get('release_date'): + seasons_dict[s]['air_date'] = ImdbHelper.release_date_string(episode.get('releaseDate', {})) + seasons_dict[s]['release_date'] = episode.get('releaseDate') + page_token = episodes.get('nextPageToken') + if not page_token: + break + return seasons_dict + + async def async_match_by(self, name: str, mtype: Optional[MediaType] = None, year: Optional[str] = None + ) -> Optional[dict]: + """ + 根据名称同时查询电影和电视剧,没有类型也没有年份时使用 + :param name: 识别的文件名或种子名 + :param mtype: 类型:电影、电视剧 + :param year: 年份,如要是季集需要是首播年份 + :return: 匹配的媒体信息 + """ + + mtypes = [MediaType.MOVIE, MediaType.TV] if not mtype else [mtype] + search_types = [] + if MediaType.TV in mtypes: + search_types.extend(["tvSeries", "tvMiniSeries", "tvSpecial"]) + if MediaType.MOVIE in mtypes: + search_types.extend(['movie', 'tvMovie']) + if year: + multi_res = await self.async_advanced_search(query=name, year=int(year), + media_types=search_types) + else: + multi_res = await self.async_advanced_search(query=name, media_types=search_types) + ret_info = {} + if multi_res is None or len(multi_res) == 0: + logger.debug(f"{name} 未找到相关媒体息!") + return None + multi_res = [r for r in multi_res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) in mtypes] + multi_res = sorted( + multi_res, + key=lambda x: ('1' if x.get('type') in ['movie', 'tvMovie'] else '0') + (f"{x.get('startYear')}" or '0000'), + reverse=True + ) + items = await self.async_vertical_list_page_items([x.get('id') for x in multi_res]) + titles = items.get('titles') if items else [] + titles_dict = {} + for title in titles: + titles_dict[title.get('id')] = title + for result in multi_res: + title = titles_dict.get(result.get('id'), {}) + start_year = result.get('startYear') + if year and str(start_year) != year: + continue + if ImdbHelper.compare_names(name, [result.get('primaryTitle', ''), result.get('originalTitle', '')]): + ret_info = result + break + names = [edge.get('node', {}).get('text', '') for edge in title.get('akas', {}).get('edges', [])] + if ImdbHelper.compare_names(name, names): + ret_info = result + break + if ret_info: + title = titles_dict.get(ret_info.get('id'), {}) + ret_info['akas'] = [e.get('node', {}) for e in title.get('akas', {}).get('edges', [])] + ret_info['rating'] = title.get('ratingsSummary') or {} + ret_info['media_type'] = ImdbHelper.type_to_mtype(ret_info.get('type')) + return ret_info + + async def async_match_by_season(self, name: str, season_year: str, season_number: int) -> Optional[dict]: + """ + 根据电视剧的名称和季的年份及序号匹配 IMDb + :param name: 识别的文件名或者种子名 + :param season_year: 季的年份 + :param season_number: 季序号 + :return: 匹配的媒体信息 + """ + + async def __season_match(_tv_info: dict, _season_year: str) -> bool: + if not _tv_info: + return False + seasons = await self.async_get_tv_seasons(_tv_info.get('id')) or {} + for season, season_info in seasons.items(): + if season_info.get("air_date"): + if season_info.get("air_date")[0:4] == str(_season_year) \ + and season == str(season_number): + _tv_info['seasons'] = seasons + return True + return False + + search_types = ['tvSeries', 'tvMiniSeries', 'tvSpecial'] + res = await self.async_advanced_search(query=name, media_types=search_types) + if not res: + logger.debug(f"{name} 未找到季{season_number}相关信息!") + return None + tvs = [r for r in res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) == MediaType.TV] + tvs = sorted(tvs, key=lambda x: x.get('startYear') or 0, reverse=True) + items = await self.async_vertical_list_page_items([x.get('id') for x in tvs]) + titles = items.get('titles') if items else [] + titles_dict = {} + for title in titles: + titles_dict[title.get('id')] = title + for tv in tvs: + # 年份 + title = titles_dict.get(tv.get('id'), {}) + akas = [e.get('node', {}) for e in title.get('akas', {}).get('edges', [])] + tv_year = tv.get('startYear') + if self.compare_names(name, [tv.get('primaryTitle', ''), tv.get('originalTitle', '')]) and \ + str(tv_year) == season_year: + tv['akas'] = akas + tv['rating'] = title.get('ratingsSummary') or {} + return tv + names = [aka.get('text', '') for aka in akas] + if not tv or not self.compare_names(name, names): + continue + if await __season_match(_tv_info=tv, _season_year=season_year): + tv['akas'] = akas + tv['rating'] = title.get('ratingsSummary') or {} + return tv + return None + + async def async_match(self, name: str, + mtype: MediaType, + year: Optional[str] = None, + season_year: Optional[str] = None, + season_number: Optional[int] = None, + ) -> Optional[dict]: + """ + 异步搜索 IMDb 中的媒体信息,匹配返回一条尽可能正确的信息 + :param name: 检索的名称 + :param mtype: 类型:电影、电视剧 + :param year: 年份,如要是季集需要是首播年份 + :param season_year: 当前季集年份 + :param season_number: 季集,整数 + :return: 匹配的媒体信息 + """ + if not name: + return None + info = {} + if mtype == MediaType.TV: + # 有当前季和当前季集年份,使用精确匹配 + if season_year and season_number: + logger.debug(f"正在识别{mtype.value}:{name}, 季集={season_number}, 季集年份={season_year} ...") + info = await self.async_match_by_season(name, season_year, season_number) + if info: + info['media_type'] = MediaType.TV + return info + year_range = [year, str(int(year) + 1), str(int(year) - 1)] if year else [None] + for year in year_range: + logger.debug(f"正在识别{mtype.value}:{name}, 年份={year} ...") + info = await self.async_match_by(name, mtype, year) + if info: + break + return info + + async def async_update_info(self, title_id: str, info: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """ + Update media information. + :param title_id: IMDb ID. + :param info: Media information to be updated. + :return: IMDb info. + """ + info = info or {} + details_task = asyncio.create_task(self.async_details(title_id)) + credits_task = asyncio.create_task(self.async_credits(title_id, page_size=30)) + details = await details_task or {} + info.update(details) + + akas_task = None + seasons_task = None + + if info.get("akas") is None: + akas_task = asyncio.create_task(self.async_akas(title_id)) + if info.get('media_type') == MediaType.TV and info.get('seasons') is None: + seasons_task = asyncio.create_task(self.async_get_tv_seasons(info.get('id'))) + + info['credits'] = await credits_task + if akas_task: + info['akas'] = await akas_task or [] + if seasons_task: + info['seasons'] = await seasons_task or {} + return info + + async def async_advanced_search(self, query: str, limit: Optional[int]=None, + media_types: Optional[List[str]]=None, + year: Optional[int]=None) -> Optional[list]: + """ + Perform an advanced search for titles using a query string with additional filters. + :param query: The search query for titles. + :param limit: The maximum number of results to return. + :param media_types: The type of titles to filter by. + :param year: The start year for filtering titles. + :return: Search results. + See `curl -X 'GET' 'https://api.imdbapi.dev/search/titles?query=Kite' -H 'accept: application/json'` + """ + + data = await self.async_search_titles(query=query, limit=limit) + if data is None: + return None + if year: + data = [title for title in data if title.get('startYear') == year] + if media_types: + data = [title for title in data if title.get('type') in media_types] + return data