diff --git a/package.v2.json b/package.v2.json index 29714c7..8abacc5 100644 --- a/package.v2.json +++ b/package.v2.json @@ -424,11 +424,13 @@ "name": "IMDb源", "description": "让探索支持IMDb数据源。", "labels": "探索", - "version": "1.0", + "version": "1.2", "icon": "IMDb_IOS-OSX_App.png", "author": "wumode", "level": 1, "history": { + "v1.2": "推荐热门纪录片", + "v1.1": "推荐支持IMDB数据源; 优化海报尺寸,减少卡顿", "v1.0": "探索支持IMDb数据源" } } diff --git a/plugins.v2/imdbsource/__init__.py b/plugins.v2/imdbsource/__init__.py index 902a13c..2466cd2 100644 --- a/plugins.v2/imdbsource/__init__.py +++ b/plugins.v2/imdbsource/__init__.py @@ -6,24 +6,25 @@ from app.core.config import settings from app.core.event import eventmanager, Event from app.log import logger from app.plugins import _PluginBase -from app.schemas import DiscoverSourceEventData -from app.schemas.types import EventType, ChainEventType, MediaType +from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData +from app.schemas.types import ChainEventType, MediaType from app.core.meta import MetaBase from app.core.context import MediaInfo from app.plugins.imdbsource.imdb_helper import ImdbHelper from app import schemas +from app.utils.http import RequestUtils class ImdbSource(_PluginBase): # 插件名称 plugin_name = "IMDb源" # 插件描述 - plugin_desc = "让探索支持IMDb数据源。" + plugin_desc = "让探索和推荐支持IMDb数据源。" # 插件图标 - plugin_icon = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/refs/heads/imdbsource_assets/icons/" - "IMDb_IOS-OSX_App.png") + plugin_icon = ("https://raw.githubusercontent.com/jxxghp/" + "MoviePilot-Plugins/refs/heads/main/icons/IMDb_IOS-OSX_App.png") # 插件版本 - plugin_version = "1.0" + plugin_version = "1.2" # 插件作者 plugin_author = "wumode" # 作者主页 @@ -40,7 +41,8 @@ class ImdbSource(_PluginBase): _proxy = False _imdb_helper = None - _discover_cache = [] + _cache = {"discover": [], "trending": [], "trending_in_anime": [], "trending_in_sitcom": [], + "trending_in_documentary": [], "imdb_top_250": []} def init_plugin(self, config: dict = None): if config: @@ -254,6 +256,292 @@ class ImdbSource(_PluginBase): logger.info(f"{meta.name if meta else imdbid} 未匹配到IMDB媒体信息") return None + @staticmethod + def __movie_to_media(movie_info: dict) -> schemas.MediaInfo: + title = "" + if movie_info.get("titleText"): + title = movie_info.get("titleText", {}).get("text", "") + release_year = 0 + if movie_info.get("releaseYear"): + release_year = movie_info.get("releaseYear", {}).get("year") + poster_path = None + if movie_info.get("primaryImage"): + primary_image = movie_info.get("primaryImage").get("url") + if primary_image: + poster_path = primary_image.replace('@._V1', '@._V1_QL75_UY414_CR6,0,280,414_') + vote_average = 0 + if movie_info.get("ratingsSummary"): + vote_average = movie_info.get("ratingsSummary").get("aggregateRating") + runtime = 0 + if movie_info.get("runtime"): + runtime = movie_info.get("runtime").get("seconds") + overview = '' + if movie_info.get("plot"): + overview = movie_info.get("plot").get("plotText").get("plainText") + return schemas.MediaInfo( + type="电影", + title=title, + year=release_year, + title_year=f"{title} ({release_year})", + mediaid_prefix="imdb", + media_id=str(movie_info.get("id")), + poster_path=poster_path, + vote_average=vote_average, + runtime=runtime, + overview=overview, + imdb_id=movie_info.get("id") + ) + + @staticmethod + def __series_to_media(series_info: dict) -> schemas.MediaInfo: + title = "" + if series_info.get("titleText"): + title = series_info.get("titleText", {}).get("text", "") + release_year = 0 + if series_info.get("releaseYear"): + release_year = series_info.get("releaseYear", {}).get("year") + poster_path = None + if series_info.get("primaryImage"): + primary_image = series_info.get("primaryImage").get("url") + if primary_image: + poster_path = primary_image.replace('@._V1', '@._V1_QL75_UY414_CR6,0,280,414_') + vote_average = 0 + if series_info.get("ratingsSummary"): + vote_average = series_info.get("ratingsSummary").get("aggregateRating") + runtime = 0 + if series_info.get("runtime"): + runtime = series_info.get("runtime").get("seconds") + overview = '' + if series_info.get("plot"): + if series_info.get("plot").get("plotText"): + overview = series_info.get("plot").get("plotText").get("plainText") + release_date_str = '0000-00-00' + if series_info.get("releaseDate"): + release_date = series_info.get('releaseDate') + release_date_str = f"{release_date.get('year')}-{release_date.get('month')}-{release_date.get('day')}" + return schemas.MediaInfo( + type="电视剧", + title=title, + year=release_year, + title_year=f"{title} ({release_year})", + mediaid_prefix="imdb", + media_id=str(series_info.get("id")), + release_date=release_date_str, + poster_path=poster_path, + vote_average=vote_average, + runtime=runtime, + overview=overview, + imdb_id=series_info.get("id") + ) + + @staticmethod + def title_id_to_mtype(title_id: str) -> MediaType: + if title_id in ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"]: + return MediaType.TV + elif title_id in ["movie", "tvMovie"]: + return MediaType.MOVIE + return MediaType.UNKNOWN + + def trending_in_documentary(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + if apikey != settings.API_TOKEN: + return [] + if not self._imdb_helper: + return [] + title_types = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode", 'movie') + first_page = False + if page == 1: + first_page = True + self._cache["trending_in_documentary"] = [] # 清空缓存 + results = [] + if len(self._cache["trending_in_documentary"]) >= count: + results = self._cache["trending_in_documentary"][:count] + self._cache["trending_in_documentary"] = self._cache["trending_in_documentary"][count:] + else: + results.extend(self._cache["trending_in_documentary"]) + remaining = count - len(results) + self._cache["trending_in_documentary"] = [] # 清空缓存 + data = self._imdb_helper.advanced_title_search(first_page=first_page, + title_types=title_types, + sort_by="POPULARITY", + sort_order="ASC", + interests=("Documentary",) + ) + if not data: + new_results = [] + else: + new_results = data.get("edges") + if new_results: + results.extend(new_results[:remaining]) + self._cache["trending_in_documentary"] = new_results[remaining:] + res = [] + for item in results: + title_type_id = item.get('node').get("title").get("titleType", {}).get("id") + mtype = self.title_id_to_mtype(title_type_id) + if mtype == MediaType.MOVIE: + res.append(self.__movie_to_media(item.get('node').get("title"))) + elif mtype == MediaType.TV: + res.append(self.__series_to_media(item.get('node').get("title"))) + return res + + def imdb_top_250(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + if apikey != settings.API_TOKEN: + return [] + if not self._imdb_helper: + return [] + title_types = ("movie",) + first_page = False + if page == 1: + first_page = True + self._cache["imdb_top_250"] = [] # 清空缓存 + results = [] + if len(self._cache["imdb_top_250"]) >= count: + results = self._cache["imdb_top_250"][:count] + self._cache["imdb_top_250"] = self._cache["imdb_top_250"][count:] + else: + results.extend(self._cache["imdb_top_250"]) + remaining = count - len(results) + self._cache["imdb_top_250"] = [] # 清空缓存 + data = self._imdb_helper.advanced_title_search(first_page=first_page, + title_types=title_types, + sort_by="POPULARITY", + sort_order="ASC", + ranked=("TOP_RATED_MOVIES-250",) + ) + if not data: + new_results = [] + else: + new_results = data.get("edges") + if new_results: + results.extend(new_results[:remaining]) + self._cache["imdb_top_250"] = new_results[remaining:] + res = [] + for item in results: + title_type_id = item.get('node').get("title").get("titleType", {}).get("id") + mtype = self.title_id_to_mtype(title_type_id) + if mtype == MediaType.MOVIE: + res.append(self.__movie_to_media(item.get('node').get("title"))) + return res + + def trending_in_sitcom(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + if apikey != settings.API_TOKEN: + return [] + if not self._imdb_helper: + return [] + title_types = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode") + first_page = False + if page == 1: + first_page = True + self._cache["trending_in_sitcom"] = [] # 清空缓存 + results = [] + if len(self._cache["trending_in_sitcom"]) >= count: + results = self._cache["trending_in_sitcom"][:count] + self._cache["trending_in_sitcom"] = self._cache["trending_in_sitcom"][count:] + else: + results.extend(self._cache["trending_in_sitcom"]) + remaining = count - len(results) + self._cache["trending_in_sitcom"] = [] # 清空缓存 + data = self._imdb_helper.advanced_title_search(first_page=first_page, + title_types=title_types, + sort_by="POPULARITY", + sort_order="ASC", + interests=("Sitcom",) + ) + if not data: + new_results = [] + else: + new_results = data.get("edges") + if new_results: + results.extend(new_results[:remaining]) + self._cache["trending_in_sitcom"] = new_results[remaining:] + res = [] + for item in results: + title_type_id = item.get('node').get("title").get("titleType", {}).get("id") + mtype = self.title_id_to_mtype(title_type_id) + if mtype == MediaType.TV: + res.append(self.__series_to_media(item.get('node').get("title"))) + return res + + def trending_in_anime(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + if apikey != settings.API_TOKEN: + return [] + if not self._imdb_helper: + return [] + title_types = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode", 'movie') + first_page = False + if page == 1: + first_page = True + self._cache["trending_in_anime"] = [] # 清空缓存 + results = [] + if len(self._cache["trending_in_anime"]) >= count: + results = self._cache["trending_in_anime"][:count] + self._cache["trending_in_anime"] = self._cache["trending_in_anime"][count:] + else: + results.extend(self._cache["trending_in_anime"]) + remaining = count - len(results) + self._cache["trending_in_anime"] = [] # 清空缓存 + data = self._imdb_helper.advanced_title_search(first_page=first_page, + title_types=title_types, + sort_by="POPULARITY", + sort_order="ASC", + interests=("Anime",) + ) + if not data: + new_results = [] + else: + new_results = data.get("edges") + if new_results: + results.extend(new_results[:remaining]) + self._cache["trending_in_anime"] = new_results[remaining:] + res = [] + for item in results: + title_type_id = item.get('node').get("title").get("titleType", {}).get("id") + mtype = self.title_id_to_mtype(title_type_id) + if mtype == MediaType.MOVIE: + res.append(self.__movie_to_media(item.get('node').get("title"))) + elif mtype == MediaType.TV: + res.append(self.__series_to_media(item.get('node').get("title"))) + return res + + def imdb_trending(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: + if apikey != settings.API_TOKEN: + return [] + if not self._imdb_helper: + return [] + title_types = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode", 'movie') + first_page = False + if page == 1: + first_page = True + self._cache["discover"] = [] # 清空缓存 + results = [] + if len(self._cache["discover"]) >= count: + results = self._cache["discover"][:count] + self._cache["discover"] = self._cache["discover"][count:] + else: + results.extend(self._cache["discover"]) + remaining = count - len(results) + self._cache["discover"] = [] # 清空缓存 + data = self._imdb_helper.advanced_title_search(first_page=first_page, + title_types=title_types, + sort_by="POPULARITY", + sort_order="ASC", + ) + if not data: + new_results = [] + else: + new_results = data.get("edges") + if new_results: + results.extend(new_results[:remaining]) + self._cache["discover"] = new_results[remaining:] + res = [] + for item in results: + title_type_id = item.get('node').get("title").get("titleType", {}).get("id") + mtype = self.title_id_to_mtype(title_type_id) + if mtype == MediaType.MOVIE: + res.append(self.__movie_to_media(item.get('node').get("title"))) + elif mtype == MediaType.TV: + res.append(self.__series_to_media(item.get('node').get("title"))) + return res + def imdb_discover(self, apikey: str, mtype: str = "series", country: str = None, lang: str = None, @@ -264,89 +552,23 @@ class ImdbSource(_PluginBase): user_rating: str = None, year: str = None, award: str = None, + ranked_list: str = None, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]: - def __movie_to_media(movie_info: dict) -> schemas.MediaInfo: - title = "" - if movie_info.get("titleText"): - title = movie_info.get("titleText", {}).get("text", "") - release_year = 0 - if movie_info.get("releaseYear"): - release_year = movie_info.get("releaseYear", {}).get("year") - poster_path = None - if movie_info.get("primaryImage"): - poster_path = movie_info.get("primaryImage").get("url") - vote_average = 0 - if movie_info.get("ratingsSummary"): - vote_average = movie_info.get("ratingsSummary").get("aggregateRating") - runtime = 0 - if movie_info.get("runtime"): - runtime = movie_info.get("runtime").get("seconds") - overview = '' - if movie_info.get("plot"): - overview = movie_info.get("plot").get("plotText").get("plainText") - return schemas.MediaInfo( - type="电影", - title=title, - year=release_year, - title_year=f"{title} ({release_year})", - mediaid_prefix="imdb", - media_id=str(movie_info.get("id")), - poster_path=poster_path, - vote_average=vote_average, - runtime=runtime, - overview=overview - ) - - def __series_to_media(series_info: dict) -> schemas.MediaInfo: - title = "" - if series_info.get("titleText"): - title = series_info.get("titleText", {}).get("text", "") - release_year = 0 - if series_info.get("releaseYear"): - release_year = series_info.get("releaseYear", {}).get("year") - poster_path = None - if series_info.get("primaryImage"): - poster_path = series_info.get("primaryImage").get("url") - vote_average = 0 - if series_info.get("ratingsSummary"): - vote_average = series_info.get("ratingsSummary").get("aggregateRating") - runtime = 0 - if series_info.get("runtime"): - runtime = series_info.get("runtime").get("seconds") - overview = '' - if series_info.get("plot"): - if series_info.get("plot").get("plotText"): - overview = series_info.get("plot").get("plotText").get("plainText") - release_date_str = '0000-00-00' - if series_info.get("releaseDate"): - release_date = series_info.get('releaseDate') - release_date_str = f"{release_date.get('year')}-{release_date.get('month')}-{release_date.get('day')}" - return schemas.MediaInfo( - type="电视剧", - title=title, - year=release_year, - title_year=f"{title} ({release_year})", - mediaid_prefix="imdb", - media_id=str(series_info.get("id")), - release_date=release_date_str, - poster_path=poster_path, - vote_average=vote_average, - runtime=runtime, - overview=overview - ) + if apikey != settings.API_TOKEN: + return [] if not self._imdb_helper: return [] - title_type: MediaType = MediaType.TV + title_type = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode") if mtype == 'movies': - title_type = MediaType.MOVIE + title_type = ("movie",) if user_rating and using_rating: user_rating = float(user_rating) else: user_rating = None - genres = [genre] if genre else None - countries = [country] if country else None - languages = [lang] if lang else None + genres = (genre,) if genre else None + countries = (country,) if country else None + languages = (lang,) if lang else None release_date_start = None release_date_end = None if year: @@ -385,21 +607,22 @@ class ImdbSource(_PluginBase): elif year == "1970s": release_date_start = "1970-01-01" release_date_end = "1979-12-31" - awards = [award] if award else None + awards = (award,) if award else None + ranked_lists = (ranked_list,) if ranked_list else None first_page = False if page == 1: first_page = True - self._discover_cache = [] # 清空缓存 + self._cache["discover"] = [] # 清空缓存 results = [] - if len(self._discover_cache) >= count: - results = self._discover_cache[:30] - self._discover_cache = self._discover_cache[30:] + if len(self._cache["discover"]) >= count: + results = self._cache["discover"][:count] + self._cache["discover"] = self._cache["discover"][count:] else: - results.extend(self._discover_cache) - remaining = 30 - len(results) - self._discover_cache = [] # 清空缓存 + results.extend(self._cache["discover"]) + remaining = count - len(results) + self._cache["discover"] = [] # 清空缓存 data = self._imdb_helper.advanced_title_search(first_page=first_page, - title_type=title_type, + title_types=title_type, genres=genres, sort_by=sort_by, sort_order=sort_order, @@ -408,18 +631,19 @@ class ImdbSource(_PluginBase): languages=languages, release_date_end=release_date_end, release_date_start=release_date_start, - award_constraint=awards) + award_constraint=awards, + ranked=ranked_lists) if not data: new_results = [] else: new_results = data.get("edges") if new_results: results.extend(new_results[:remaining]) - self._discover_cache = new_results[remaining:] + self._cache["discover"] = new_results[remaining:] if mtype == "movies": - results = [__movie_to_media(movie.get('node').get("title")) for movie in results] + results = [self.__movie_to_media(movie.get('node').get("title")) for movie in results] else: - results = [__series_to_media(series.get('node').get("title")) for series in results] + results = [self.__series_to_media(series.get('node').get("title")) for series in results] return results def get_api(self) -> List[Dict[str, Any]]: @@ -432,13 +656,50 @@ class ImdbSource(_PluginBase): "summary": "API说明" }] """ - return [{ - "path": "/imdb_discover", - "endpoint": self.imdb_discover, - "methods": ["GET"], - "summary": "TheTVDB探索数据源", - "description": "获取TheTVDB探索数据", - }] + return [ + { + "path": "/imdb_discover", + "endpoint": self.imdb_discover, + "methods": ["GET"], + "summary": "IMDb探索数据源", + "description": "获取 IMDb探索 数据", + }, + { + "path": "/imdb_trending", + "endpoint": self.imdb_trending, + "methods": ["GET"], + "summary": "IMDb Trending", + "description": "获取 IMDb Trending 数据", + }, + { + "path": "/trending_in_anime", + "endpoint": self.trending_in_anime, + "methods": ["GET"], + "summary": "IMDb Trending in Anime", + "description": "获取 IMDb Trending in Anime 数据", + }, + { + "path": "/trending_in_sitcom", + "endpoint": self.trending_in_sitcom, + "methods": ["GET"], + "summary": "IMDb Trending in Sitcom", + "description": "获取 IMDb Trending in Sitcom 数据", + }, + { + "path": "/imdb_top_250", + "endpoint": self.imdb_top_250, + "methods": ["GET"], + "summary": "IMDb Top 250 Movies", + "description": "获取 IMDb Top 250 Movies 数据", + }, + { + "path": "/trending_in_documentary", + "endpoint": self.trending_in_documentary, + "methods": ["GET"], + "summary": "IMDb Trending in Documentary", + "description": "获取 IMDb Trending in Documentary 数据", + } + ] @staticmethod def imdb_filter_ui() -> List[dict]: @@ -481,7 +742,7 @@ class ImdbSource(_PluginBase): lang_dict = { "en": "英语", "zh": "中文", - "jp": "日语", + "ja": "日语", "ko": "韩语", "fr": "法语", "de": "德语", @@ -636,6 +897,27 @@ class ImdbSource(_PluginBase): } for key, value in award_dict.items() ] + ranked_list_dict = { + "TOP_RATED_MOVIES-100": "IMDb Top 100", + "TOP_RATED_MOVIES-250": "IMDb Top 250", + "TOP_RATED_MOVIES-1000": "IMDb Top 1000", + "LOWEST_RATED_MOVIES-100": "IMDb Bottom 100", + "LOWEST_RATED_MOVIES-250": "IMDb Bottom 250", + "LOWEST_RATED_MOVIES-1000": "IMDb Bottom 1000", + } + + ranked_list_ui = [ + { + "component": "VChip", + "props": { + "filter": True, + "tile": True, + "value": key + }, + "text": value + } for key, value in ranked_list_dict.items() + ] + return [ { "component": "div", @@ -818,6 +1100,34 @@ class ImdbSource(_PluginBase): } ] }, + { + "component": "div", + "props": { + "class": "flex justify-start items-center", + "show": "{{mtype == 'movies'}}" + }, + "content": [ + { + "component": "div", + "props": { + "class": "mr-5" + }, + "content": [ + { + "component": "VLabel", + "text": "排名" + } + ] + }, + { + "component": "VChipGroup", + "props": { + "model": "ranked_list" + }, + "content": ranked_list_ui + } + ] + }, { "component": "div", "props": { @@ -943,7 +1253,11 @@ class ImdbSource(_PluginBase): "year": None, "user_rating": 1, "using_rating": False, - "award": None + "award": None, + "ranked_list": None + }, + depends={ + "ranked_list": ["mtype"] }, filter_ui=self.imdb_filter_ui() ) @@ -951,3 +1265,69 @@ class ImdbSource(_PluginBase): event_data.extra_sources = [imdb_source] else: event_data.extra_sources.append(imdb_source) + + @eventmanager.register(ChainEventType.MediaRecognizeConvert) + def media_recognize_covert(self, event: Event) -> Optional[dict]: + if not self._enabled: + return + event_data: MediaRecognizeConvertEventData = event.event_data + if not event_data: + return + api_key = settings.TMDB_API_KEY + if event_data.convert_type != "themoviedb" or not api_key: + return + if not event_data.mediaid.startswith("imdb"): + return + imdb_id = event_data.mediaid[5:] + api_url = f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}?api_key={api_key}&external_source=imdb_id" + ret = RequestUtils(accept_type="application/json").get_res(api_url) + if ret: + data = ret.json() + all_results = [] + for result_type in ["movie_results", "tv_results"]: + if data.get(result_type): + all_results.extend(data[result_type]) + if not all_results: + return # 无匹配结果 + # 按 popularity 降序排序,取最高人气的条目 + most_popular_item = max(all_results, key=lambda x: x.get("popularity", -1)) + event_data.media_dict["id"] = most_popular_item.get("id") + + @eventmanager.register(ChainEventType.RecommendSource) + def recommend_source(self, event: Event): + if not self._enabled: + return + event_data: RecommendSourceEventData = event.event_data + if not event_data: + return + imdb_trending: schemas.RecommendMediaSource = schemas.RecommendMediaSource( + name="IMDb Trending", + api_path=f"plugin/ImdbSource/imdb_trending?apikey={settings.API_TOKEN}", + type='Rankings' + ) + trending_in_anime: schemas.RecommendMediaSource = schemas.RecommendMediaSource( + name="IMDb Trending in Anime", + api_path=f"plugin/ImdbSource/trending_in_anime?apikey={settings.API_TOKEN}", + type='Anime' + ) + trending_in_sitcom: schemas.RecommendMediaSource = schemas.RecommendMediaSource( + name="IMDb Trending in Sitcom", + api_path=f"plugin/ImdbSource/trending_in_sitcom?apikey={settings.API_TOKEN}", + type='TV Shows' + ) + + imdb_top_250: schemas.RecommendMediaSource = schemas.RecommendMediaSource( + name="IMDb Top 250 Movies", + api_path=f"plugin/ImdbSource/imdb_top_250?apikey={settings.API_TOKEN}", + type='Movies' + ) + imdb_documentary: schemas.RecommendMediaSource = schemas.RecommendMediaSource( + name="IMDb Trending in Documentary", + api_path=f"plugin/ImdbSource/trending_in_documentary?apikey={settings.API_TOKEN}", + type='Rankings' + ) + trending_source = [imdb_trending, trending_in_anime, trending_in_sitcom, imdb_top_250, imdb_documentary] + if not event_data.extra_sources: + event_data.extra_sources = trending_source + else: + event_data.extra_sources.extend(trending_source) diff --git a/plugins.v2/imdbsource/imdb_helper.py b/plugins.v2/imdbsource/imdb_helper.py index ca4aab8..26c39c4 100644 --- a/plugins.v2/imdbsource/imdb_helper.py +++ b/plugins.v2/imdbsource/imdb_helper.py @@ -1,6 +1,8 @@ import re -from typing import Optional, Dict, List +from typing import Optional, Any, Dict, List, Tuple from io import StringIO +from collections import OrderedDict +from dataclasses import dataclass import graphene from requests_html import HTMLSession @@ -15,6 +17,28 @@ from app.schemas.types import MediaType from app.core.cache import cached +@dataclass(frozen=True) +class SearchParams: + title_types: Optional[Tuple[str, ...]] = None + genres: Optional[Tuple[str, ...]] = None + sort_by: str = 'POPULARITY' + sort_order: str = 'ASC' + rating_min: Optional[float] = None + rating_max: Optional[float] = None + countries: Optional[Tuple[str, ...]] = None + languages: Optional[Tuple[str, ...]] = None + release_date_end: Optional[str] = None + release_date_start: Optional[str] = None + award_constraint: Optional[Tuple[str, ...]] = None + ranked: Optional[Tuple[str, ...]] = None + interests: Optional[Tuple[str, ...]] = None + + +class SearchState: + def __init__(self, last_cursor: str): + self.last_cursor = last_cursor + + class ImdbHelper: _query_by_id = """query queryWithVariables($id: ID!) { title(id: $id) { @@ -98,15 +122,27 @@ class ImdbHelper: _hash_update_url = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/" "refs/heads/imdbsource_assets/plugins.v2/imdbsource/imdb_hash.json") _qid_map = { - MediaType.TV: ["tvSeries", "tvMiniSeries"], + MediaType.TV: ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"], MediaType.MOVIE: ["movie"] } + _imdb_headers = { "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome" "/84.0.4147.105 Safari/537.36", "Referer": "https://www.imdb.com/", } + all_title_types = ["tvSeries", "tvMiniSeries", "movie", "tvMovie", "musicVideo", "tvShort", "short", + "tvEpisode", "tvSpecial", "videoGame"] + interest_id = { + "Anime": "in0000027", + "Superhero": "in0000008", + "Sitcom": "in0000044", + "Coming-of-Age": "in0000073", + "Slasher Horror": "in0000115", + "Raunchy Comedy": "in0000041", + "Documentary": "in0000060" + } def __init__(self, proxies=None): self._proxies = proxies @@ -114,8 +150,9 @@ class ImdbHelper: self._req_utils = RequestUtils(headers=self._imdb_headers, session=self._session, timeout=10, proxies=proxies) self._imdb_req = RequestUtils(accept_type="application/json", content_type="application/json", headers=self._imdb_headers, timeout=10, proxies=proxies) - self._last_cursor = '' self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None} + self._search_states = OrderedDict() + self._max_states = 30 def imdbid(self, imdbid: str) -> Optional[Dict]: params = {"operationName": "queryWithVariables", "query": self._query_by_id, "variables": {"id": imdbid}} @@ -247,69 +284,153 @@ class ImdbHelper: else: return None + @staticmethod + def __ranked_list_to_constraint(ranked: str) -> Optional[Dict]: + """ + "TOP_RATED_MOVIES-100": "IMDb Top 100", + "TOP_RATED_MOVIES-250": "IMDb Top 250", + "TOP_RATED_MOVIES-1000": "IMDb Top 1000", + "LOWEST_RATED_MOVIES-100": "IMDb Bottom 100", + "LOWEST_RATED_MOVIES-250": "IMDb Bottom 250", + "LOWEST_RATED_MOVIES-1000": "IMDb Bottom 1000" + """ + pattern = r'^(TOP_RATED_MOVIES|LOWEST_RATED_MOVIES)-(\d+)$' + match = re.match(pattern, ranked) + if match: + ranked_title_list_type = match.group(1) + rank_range = int(match.group(2)) + constraint = {"rankRange": {"max": rank_range}, "rankedTitleListType": ranked_title_list_type} + return constraint + return None + def advanced_title_search(self, - sha256: str = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4', first_page: bool = True, - title_type: MediaType = MediaType.TV, - genres: Optional[List] = None, + title_types: Optional[Tuple[str, ...]] = None, + genres: Optional[Tuple[str, ...]] = None, sort_by: str = 'POPULARITY', sort_order: str = 'ASC', rating_min: Optional[float] = None, rating_max: Optional[float] = None, - countries: Optional[List] = None, - languages: Optional[list] = None, + countries: Optional[Tuple[str, ...]] = None, + languages: Optional[Tuple[str, ...]] = None, release_date_end: Optional[str] = None, release_date_start: Optional[str] = None, - award_constraint: Optional[List[str]] = None - ) -> Optional[Dict]: + award_constraint: Optional[Tuple[str, ...]] = None, + ranked: Optional[Tuple[str, ...]] = None, + interests: Optional[Tuple[str, ...]] = None): + # 创建参数对象 + params = SearchParams( + title_types=title_types, + genres=genres, + sort_by=sort_by, + sort_order=sort_order, + rating_min=rating_min, + rating_max=rating_max, + countries=countries, + languages=languages, + release_date_end=release_date_end, + release_date_start=release_date_start, + award_constraint=award_constraint, + ranked=ranked, + interests=interests + ) + sha256 = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4' self.__update_hash() if self._imdb_api_hash.get("AdvancedTitleSearch"): sha256 = self._imdb_api_hash["AdvancedTitleSearch"] - if title_type not in [MediaType.TV, MediaType.MOVIE]: - return None + # 获取或创建搜索状态 + last_cursor = None + if not first_page and params in self._search_states: + search_state = self._search_states.pop(params) # 移除并获取 + self._search_states[params] = search_state + # 不是第一页且已有状态 - 使用上次的结果 + if search_state.last_cursor: + last_cursor = search_state.last_cursor + # 这里实现基于上次结果的逻辑 + else: + # 重新搜索 + first_page = True + else: + first_page = True + result = self.__advanced_title_search(params, sha256, first_page, last_cursor) + if result: + page_info = result.get("pageInfo", {}) + end_cursor = page_info.get("endCursor", "") + search_state = SearchState(end_cursor) + self._search_states[params] = search_state + if len(self._search_states) > self._max_states: + self._search_states.popitem(last=False) # 移除最旧的条目 + return result + + def __advanced_title_search(self, + params: SearchParams, + sha256: str, + first_page: bool = True, + last_cursor: Optional[str] = None, + ) -> Optional[Dict]: + variables = {"first": 50, "locale": "en-US", - "sortBy": sort_by, - "sortOrder": sort_order, - "titleTypeConstraint": {"anyTitleTypeIds": self._qid_map[title_type], - "excludeTitleTypeIds": []}} - if genres: - variables["genreConstraint"] = {"allGenreIds": genres, "excludeGenreIds": []} - if countries: - variables["originCountryConstraint"] = {"allCountries": countries} - if languages: - variables["languageConstraint"] = {"anyPrimaryLanguages": languages} - if rating_min or rating_max: - rating_min = rating_min if rating_min else 1 + "sortBy": params.sort_by, + "sortOrder": params.sort_order, + } + if params.title_types: + title_type_ids = [] + for title_type in params.title_types: + if title_type in self.all_title_types: + title_type_ids.append(title_type) + if len(title_type_ids): + variables["titleTypeConstraint"] = {"anyTitleTypeIds": params.title_types, + "excludeTitleTypeIds": []} + if params.genres: + variables["genreConstraint"] = {"allGenreIds": params.genres, "excludeGenreIds": []} + if params.countries: + variables["originCountryConstraint"] = {"allCountries": params.countries} + if params.languages: + variables["languageConstraint"] = {"anyPrimaryLanguages": params.languages} + if params.rating_min or params.rating_max: + rating_min = params.rating_min if params.rating_min else 1 rating_min = max(rating_min, 1) - rating_max = rating_max if rating_max else 10 + rating_max = params.rating_max if params.rating_max else 10 rating_max = min(rating_max, 10) variables["userRatingsConstraint"] = {"aggregateRatingRange": {"max": rating_max, "min": rating_min}} - if release_date_start or release_date_end: + if params.release_date_start or params.release_date_end: release_dict = {} - if release_date_start: - release_dict["start"] = release_date_start - if release_date_end: - release_dict["end"] = release_date_end + if params.release_date_start: + release_dict["start"] = params.release_date_start + if params.release_date_end: + release_dict["end"] = params.release_date_end variables["releaseDateConstraint"] = {"releaseDateRange": release_dict} - if award_constraint: + if params.award_constraint: constraints = [] - for award in award_constraint: + for award in params.award_constraint: c = self.__award_to_constraint(award) if c: constraints.append(c) variables["awardConstraint"] = {"allEventNominations": constraints} - if not first_page and self._last_cursor: - variables["after"] = self._last_cursor + if params.ranked: + constraints = [] + for r in params.ranked: + c = self.__ranked_list_to_constraint(r) + if c: + constraints.append(c) + variables["rankedTitleListConstraint"] = {"allRankedTitleLists": constraints, + "excludeRankedTitleLists": []} + if params.interests: + constraints = [] + for interest in params.interests: + in_id = self.interest_id.get(interest) + if in_id: + constraints.append(in_id) + variables["interestConstraint"] = {"allInterestIds": constraints, "excludeInterestIds": []} + if not first_page and last_cursor: + variables["after"] = last_cursor params = {"operationName": "AdvancedTitleSearch", "variables": variables} data = self.__request(params, sha256) if not data: return None - page_info = data.get("advancedTitleSearch", {}).get("pageInfo", {}) - end_cursor = page_info.get("endCursor", "") - self._last_cursor = end_cursor return data.get("advancedTitleSearch") def __known_as(self, imdbid: str, @@ -531,10 +652,10 @@ class ImdbHelper: mtype: MediaType, imdbid: str) -> dict: """ - 给定IMDB号,查询一条媒体信息 - :param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份) - :param imdbid: IMDB的ID - """ + 给定IMDB号,查询一条媒体信息 + :param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份) + :param imdbid: IMDB的ID + """ # 查询TMDB详情 if mtype == MediaType.MOVIE: imdb_info = self.imdbid(imdbid)