imdbsource: 推荐支持IMDb数据源

This commit is contained in:
wumode
2025-05-23 17:09:31 +08:00
parent 5f5246a726
commit 8d20085565
3 changed files with 593 additions and 145 deletions

View File

@@ -6,24 +6,25 @@ from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.schemas import DiscoverSourceEventData
from app.schemas.types import EventType, ChainEventType, MediaType
from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData
from app.schemas.types import ChainEventType, MediaType
from app.core.meta import MetaBase
from app.core.context import MediaInfo
from app.plugins.imdbsource.imdb_helper import ImdbHelper
from app import schemas
from app.utils.http import RequestUtils
class ImdbSource(_PluginBase):
# 插件名称
plugin_name = "IMDb源"
# 插件描述
plugin_desc = "让探索支持IMDb数据源。"
plugin_desc = "让探索和推荐支持IMDb数据源。"
# 插件图标
plugin_icon = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/refs/heads/imdbsource_assets/icons/"
"IMDb_IOS-OSX_App.png")
plugin_icon = ("https://raw.githubusercontent.com/jxxghp/"
"MoviePilot-Plugins/refs/heads/main/icons/IMDb_IOS-OSX_App.png")
# 插件版本
plugin_version = "1.0"
plugin_version = "1.1"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -40,7 +41,7 @@ class ImdbSource(_PluginBase):
_proxy = False
_imdb_helper = None
_discover_cache = []
_cache = {"discover": [], "trending": [], "trending_in_anime": [], "trending_in_sitcom": []}
def init_plugin(self, config: dict = None):
if config:
@@ -254,6 +255,251 @@ class ImdbSource(_PluginBase):
logger.info(f"{meta.name if meta else imdbid} 未匹配到IMDB媒体信息")
return None
@staticmethod
def __movie_to_media(movie_info: dict) -> schemas.MediaInfo:
title = ""
if movie_info.get("titleText"):
title = movie_info.get("titleText", {}).get("text", "")
release_year = 0
if movie_info.get("releaseYear"):
release_year = movie_info.get("releaseYear", {}).get("year")
poster_path = None
if movie_info.get("primaryImage"):
primary_image = movie_info.get("primaryImage").get("url")
if primary_image:
poster_path = primary_image.replace('@._V1', '@._V1_QL75_UY414_CR6,0,280,414_')
vote_average = 0
if movie_info.get("ratingsSummary"):
vote_average = movie_info.get("ratingsSummary").get("aggregateRating")
runtime = 0
if movie_info.get("runtime"):
runtime = movie_info.get("runtime").get("seconds")
overview = ''
if movie_info.get("plot"):
overview = movie_info.get("plot").get("plotText").get("plainText")
return schemas.MediaInfo(
type="电影",
title=title,
year=release_year,
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(movie_info.get("id")),
poster_path=poster_path,
vote_average=vote_average,
runtime=runtime,
overview=overview,
imdb_id=movie_info.get("id")
)
@staticmethod
def __series_to_media(series_info: dict) -> schemas.MediaInfo:
title = ""
if series_info.get("titleText"):
title = series_info.get("titleText", {}).get("text", "")
release_year = 0
if series_info.get("releaseYear"):
release_year = series_info.get("releaseYear", {}).get("year")
poster_path = None
if series_info.get("primaryImage"):
primary_image = series_info.get("primaryImage").get("url")
if primary_image:
poster_path = primary_image.replace('@._V1', '@._V1_QL75_UY414_CR6,0,280,414_')
vote_average = 0
if series_info.get("ratingsSummary"):
vote_average = series_info.get("ratingsSummary").get("aggregateRating")
runtime = 0
if series_info.get("runtime"):
runtime = series_info.get("runtime").get("seconds")
overview = ''
if series_info.get("plot"):
if series_info.get("plot").get("plotText"):
overview = series_info.get("plot").get("plotText").get("plainText")
release_date_str = '0000-00-00'
if series_info.get("releaseDate"):
release_date = series_info.get('releaseDate')
release_date_str = f"{release_date.get('year')}-{release_date.get('month')}-{release_date.get('day')}"
return schemas.MediaInfo(
type="电视剧",
title=title,
year=release_year,
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(series_info.get("id")),
release_date=release_date_str,
poster_path=poster_path,
vote_average=vote_average,
runtime=runtime,
overview=overview,
imdb_id=series_info.get("id")
)
@staticmethod
def title_id_to_mtype(title_id: str) -> MediaType:
if title_id in ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"]:
return MediaType.TV
elif title_id in ["movie", "tvMovie"]:
return MediaType.MOVIE
return MediaType.UNKNOWN
def imdb_top_250(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
if apikey != settings.API_TOKEN:
return []
if not self._imdb_helper:
return []
title_types = ("movie",)
first_page = False
if page == 1:
first_page = True
self._cache["trending_in_sitcom"] = [] # 清空缓存
results = []
if len(self._cache["trending_in_sitcom"]) >= count:
results = self._cache["trending_in_sitcom"][:count]
self._cache["trending_in_sitcom"] = self._cache["trending_in_sitcom"][count:]
else:
results.extend(self._cache["trending_in_sitcom"])
remaining = count - len(results)
self._cache["trending_in_sitcom"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_types=title_types,
sort_by="POPULARITY",
sort_order="ASC",
ranked=("TOP_RATED_MOVIES-250",)
)
if not data:
new_results = []
else:
new_results = data.get("edges")
if new_results:
results.extend(new_results[:remaining])
self._cache["trending_in_sitcom"] = new_results[remaining:]
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
if mtype == MediaType.MOVIE:
res.append(self.__movie_to_media(item.get('node').get("title")))
return res
def trending_in_sitcom(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
if apikey != settings.API_TOKEN:
return []
if not self._imdb_helper:
return []
title_types = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode")
first_page = False
if page == 1:
first_page = True
self._cache["trending_in_sitcom"] = [] # 清空缓存
results = []
if len(self._cache["trending_in_sitcom"]) >= count:
results = self._cache["trending_in_sitcom"][:count]
self._cache["trending_in_sitcom"] = self._cache["trending_in_sitcom"][count:]
else:
results.extend(self._cache["trending_in_sitcom"])
remaining = count - len(results)
self._cache["trending_in_sitcom"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_types=title_types,
sort_by="POPULARITY",
sort_order="ASC",
interests=("Sitcom",)
)
if not data:
new_results = []
else:
new_results = data.get("edges")
if new_results:
results.extend(new_results[:remaining])
self._cache["trending_in_sitcom"] = new_results[remaining:]
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
if mtype == MediaType.TV:
res.append(self.__series_to_media(item.get('node').get("title")))
return res
def trending_in_anime(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
if apikey != settings.API_TOKEN:
return []
if not self._imdb_helper:
return []
title_types = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode", 'movie')
first_page = False
if page == 1:
first_page = True
self._cache["trending_in_anime"] = [] # 清空缓存
results = []
if len(self._cache["trending_in_anime"]) >= count:
results = self._cache["trending_in_anime"][:count]
self._cache["trending_in_anime"] = self._cache["trending_in_anime"][count:]
else:
results.extend(self._cache["trending_in_anime"])
remaining = count - len(results)
self._cache["trending_in_anime"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_types=title_types,
sort_by="POPULARITY",
sort_order="ASC",
interests=("Anime",)
)
if not data:
new_results = []
else:
new_results = data.get("edges")
if new_results:
results.extend(new_results[:remaining])
self._cache["trending_in_anime"] = new_results[remaining:]
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
if mtype == MediaType.MOVIE:
res.append(self.__movie_to_media(item.get('node').get("title")))
elif mtype == MediaType.TV:
res.append(self.__series_to_media(item.get('node').get("title")))
return res
def imdb_trending(self, apikey: str, page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
if apikey != settings.API_TOKEN:
return []
if not self._imdb_helper:
return []
title_types = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode", 'movie')
first_page = False
if page == 1:
first_page = True
self._cache["discover"] = [] # 清空缓存
results = []
if len(self._cache["discover"]) >= count:
results = self._cache["discover"][:count]
self._cache["discover"] = self._cache["discover"][count:]
else:
results.extend(self._cache["discover"])
remaining = count - len(results)
self._cache["discover"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_types=title_types,
sort_by="POPULARITY",
sort_order="ASC",
)
if not data:
new_results = []
else:
new_results = data.get("edges")
if new_results:
results.extend(new_results[:remaining])
self._cache["discover"] = new_results[remaining:]
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
if mtype == MediaType.MOVIE:
res.append(self.__movie_to_media(item.get('node').get("title")))
elif mtype == MediaType.TV:
res.append(self.__series_to_media(item.get('node').get("title")))
return res
def imdb_discover(self, apikey: str, mtype: str = "series",
country: str = None,
lang: str = None,
@@ -264,89 +510,23 @@ class ImdbSource(_PluginBase):
user_rating: str = None,
year: str = None,
award: str = None,
ranked_list: str = None,
page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
def __movie_to_media(movie_info: dict) -> schemas.MediaInfo:
title = ""
if movie_info.get("titleText"):
title = movie_info.get("titleText", {}).get("text", "")
release_year = 0
if movie_info.get("releaseYear"):
release_year = movie_info.get("releaseYear", {}).get("year")
poster_path = None
if movie_info.get("primaryImage"):
poster_path = movie_info.get("primaryImage").get("url")
vote_average = 0
if movie_info.get("ratingsSummary"):
vote_average = movie_info.get("ratingsSummary").get("aggregateRating")
runtime = 0
if movie_info.get("runtime"):
runtime = movie_info.get("runtime").get("seconds")
overview = ''
if movie_info.get("plot"):
overview = movie_info.get("plot").get("plotText").get("plainText")
return schemas.MediaInfo(
type="电影",
title=title,
year=release_year,
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(movie_info.get("id")),
poster_path=poster_path,
vote_average=vote_average,
runtime=runtime,
overview=overview
)
def __series_to_media(series_info: dict) -> schemas.MediaInfo:
title = ""
if series_info.get("titleText"):
title = series_info.get("titleText", {}).get("text", "")
release_year = 0
if series_info.get("releaseYear"):
release_year = series_info.get("releaseYear", {}).get("year")
poster_path = None
if series_info.get("primaryImage"):
poster_path = series_info.get("primaryImage").get("url")
vote_average = 0
if series_info.get("ratingsSummary"):
vote_average = series_info.get("ratingsSummary").get("aggregateRating")
runtime = 0
if series_info.get("runtime"):
runtime = series_info.get("runtime").get("seconds")
overview = ''
if series_info.get("plot"):
if series_info.get("plot").get("plotText"):
overview = series_info.get("plot").get("plotText").get("plainText")
release_date_str = '0000-00-00'
if series_info.get("releaseDate"):
release_date = series_info.get('releaseDate')
release_date_str = f"{release_date.get('year')}-{release_date.get('month')}-{release_date.get('day')}"
return schemas.MediaInfo(
type="电视剧",
title=title,
year=release_year,
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(series_info.get("id")),
release_date=release_date_str,
poster_path=poster_path,
vote_average=vote_average,
runtime=runtime,
overview=overview
)
if apikey != settings.API_TOKEN:
return []
if not self._imdb_helper:
return []
title_type: MediaType = MediaType.TV
title_type = ("tvSeries", "tvMiniSeries", "tvShort", "tvEpisode")
if mtype == 'movies':
title_type = MediaType.MOVIE
title_type = ("movie",)
if user_rating and using_rating:
user_rating = float(user_rating)
else:
user_rating = None
genres = [genre] if genre else None
countries = [country] if country else None
languages = [lang] if lang else None
genres = (genre,) if genre else None
countries = (country,) if country else None
languages = (lang,) if lang else None
release_date_start = None
release_date_end = None
if year:
@@ -385,21 +565,22 @@ class ImdbSource(_PluginBase):
elif year == "1970s":
release_date_start = "1970-01-01"
release_date_end = "1979-12-31"
awards = [award] if award else None
awards = (award,) if award else None
ranked_lists = (ranked_list,) if ranked_list else None
first_page = False
if page == 1:
first_page = True
self._discover_cache = [] # 清空缓存
self._cache["discover"] = [] # 清空缓存
results = []
if len(self._discover_cache) >= count:
results = self._discover_cache[:30]
self._discover_cache = self._discover_cache[30:]
if len(self._cache["discover"]) >= count:
results = self._cache["discover"][:count]
self._cache["discover"] = self._cache["discover"][count:]
else:
results.extend(self._discover_cache)
remaining = 30 - len(results)
self._discover_cache = [] # 清空缓存
results.extend(self._cache["discover"])
remaining = count - len(results)
self._cache["discover"] = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_type=title_type,
title_types=title_type,
genres=genres,
sort_by=sort_by,
sort_order=sort_order,
@@ -408,18 +589,19 @@ class ImdbSource(_PluginBase):
languages=languages,
release_date_end=release_date_end,
release_date_start=release_date_start,
award_constraint=awards)
award_constraint=awards,
ranked=ranked_lists)
if not data:
new_results = []
else:
new_results = data.get("edges")
if new_results:
results.extend(new_results[:remaining])
self._discover_cache = new_results[remaining:]
self._cache["discover"] = new_results[remaining:]
if mtype == "movies":
results = [__movie_to_media(movie.get('node').get("title")) for movie in results]
results = [self.__movie_to_media(movie.get('node').get("title")) for movie in results]
else:
results = [__series_to_media(series.get('node').get("title")) for series in results]
results = [self.__series_to_media(series.get('node').get("title")) for series in results]
return results
def get_api(self) -> List[Dict[str, Any]]:
@@ -432,13 +614,43 @@ class ImdbSource(_PluginBase):
"summary": "API说明"
}]
"""
return [{
"path": "/imdb_discover",
"endpoint": self.imdb_discover,
"methods": ["GET"],
"summary": "TheTVDB探索数据源",
"description": "获取TheTVDB探索数据",
}]
return [
{
"path": "/imdb_discover",
"endpoint": self.imdb_discover,
"methods": ["GET"],
"summary": "IMDb探索数据",
"description": "获取 IMDb探索 数据",
},
{
"path": "/imdb_trending",
"endpoint": self.imdb_trending,
"methods": ["GET"],
"summary": "IMDb Trending",
"description": "获取 IMDb Trending 数据",
},
{
"path": "/trending_in_anime",
"endpoint": self.trending_in_anime,
"methods": ["GET"],
"summary": "IMDb Trending in Anime",
"description": "获取 IMDb Trending in Anime 数据",
},
{
"path": "/trending_in_sitcom",
"endpoint": self.trending_in_sitcom,
"methods": ["GET"],
"summary": "IMDb Trending in Sitcom",
"description": "获取 IMDb Trending in Sitcom 数据",
},
{
"path": "/imdb_top_250",
"endpoint": self.imdb_top_250,
"methods": ["GET"],
"summary": "IMDb Top 250 Movies",
"description": "获取 IMDb Top 250 Movies 数据",
}
]
@staticmethod
def imdb_filter_ui() -> List[dict]:
@@ -481,7 +693,7 @@ class ImdbSource(_PluginBase):
lang_dict = {
"en": "英语",
"zh": "中文",
"jp": "日语",
"ja": "日语",
"ko": "韩语",
"fr": "法语",
"de": "德语",
@@ -636,6 +848,27 @@ class ImdbSource(_PluginBase):
} for key, value in award_dict.items()
]
ranked_list_dict = {
"TOP_RATED_MOVIES-100": "IMDb Top 100",
"TOP_RATED_MOVIES-250": "IMDb Top 250",
"TOP_RATED_MOVIES-1000": "IMDb Top 1000",
"LOWEST_RATED_MOVIES-100": "IMDb Bottom 100",
"LOWEST_RATED_MOVIES-250": "IMDb Bottom 250",
"LOWEST_RATED_MOVIES-1000": "IMDb Bottom 1000",
}
ranked_list_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in ranked_list_dict.items()
]
return [
{
"component": "div",
@@ -818,6 +1051,34 @@ class ImdbSource(_PluginBase):
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center",
"show": "{{mtype == 'movies'}}"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "排名"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "ranked_list"
},
"content": ranked_list_ui
}
]
},
{
"component": "div",
"props": {
@@ -943,7 +1204,11 @@ class ImdbSource(_PluginBase):
"year": None,
"user_rating": 1,
"using_rating": False,
"award": None
"award": None,
"ranked_list": None
},
depends={
"ranked_list": ["mtype"]
},
filter_ui=self.imdb_filter_ui()
)
@@ -951,3 +1216,64 @@ class ImdbSource(_PluginBase):
event_data.extra_sources = [imdb_source]
else:
event_data.extra_sources.append(imdb_source)
@eventmanager.register(ChainEventType.MediaRecognizeConvert)
def media_recognize_covert(self, event: Event) -> Optional[dict]:
if not self._enabled:
return
event_data: MediaRecognizeConvertEventData = event.event_data
if not event_data:
return
api_key = settings.TMDB_API_KEY
if event_data.convert_type != "themoviedb" or not api_key:
return
if not event_data.mediaid.startswith("imdb"):
return
imdb_id = event_data.mediaid[5:]
api_url = f"https://{settings.TMDB_API_DOMAIN}/3/find/{imdb_id}?api_key={api_key}&external_source=imdb_id"
ret = RequestUtils(accept_type="application/json").get_res(api_url)
if ret:
data = ret.json()
all_results = []
for result_type in ["movie_results", "tv_results"]:
if data.get(result_type):
all_results.extend(data[result_type])
if not all_results:
return # 无匹配结果
# 按 popularity 降序排序,取最高人气的条目
most_popular_item = max(all_results, key=lambda x: x.get("popularity", -1))
event_data.media_dict["id"] = most_popular_item.get("id")
@eventmanager.register(ChainEventType.RecommendSource)
def recommend_source(self, event: Event):
if not self._enabled:
return
event_data: RecommendSourceEventData = event.event_data
if not event_data:
return
imdb_trending: schemas.RecommendMediaSource = schemas.RecommendMediaSource(
name="IMDb Trending",
api_path=f"plugin/ImdbSource/imdb_trending?apikey={settings.API_TOKEN}",
type='Rankings'
)
trending_in_anime: schemas.RecommendMediaSource = schemas.RecommendMediaSource(
name="IMDb Trending in Anime",
api_path=f"plugin/ImdbSource/trending_in_anime?apikey={settings.API_TOKEN}",
type='Anime'
)
trending_in_sitcom: schemas.RecommendMediaSource = schemas.RecommendMediaSource(
name="IMDb Trending in Sitcom",
api_path=f"plugin/ImdbSource/trending_in_sitcom?apikey={settings.API_TOKEN}",
type='TV Shows'
)
imdb_top_250: schemas.RecommendMediaSource = schemas.RecommendMediaSource(
name="IMDb Top 250 Movies",
api_path=f"plugin/ImdbSource/imdb_top_250?apikey={settings.API_TOKEN}",
type='Movies'
)
trending_source = [imdb_trending, trending_in_anime, trending_in_sitcom, imdb_top_250]
if not event_data.extra_sources:
event_data.extra_sources = trending_source
else:
event_data.extra_sources.extend(trending_source)

View File

@@ -1,6 +1,8 @@
import re
from typing import Optional, Dict, List
from typing import Optional, Any, Dict, List, Tuple
from io import StringIO
from collections import OrderedDict
from dataclasses import dataclass
import graphene
from requests_html import HTMLSession
@@ -15,6 +17,28 @@ from app.schemas.types import MediaType
from app.core.cache import cached
@dataclass(frozen=True)
class SearchParams:
title_types: Optional[Tuple[str, ...]] = None
genres: Optional[Tuple[str, ...]] = None
sort_by: str = 'POPULARITY'
sort_order: str = 'ASC'
rating_min: Optional[float] = None
rating_max: Optional[float] = None
countries: Optional[Tuple[str, ...]] = None
languages: Optional[Tuple[str, ...]] = None
release_date_end: Optional[str] = None
release_date_start: Optional[str] = None
award_constraint: Optional[Tuple[str, ...]] = None
ranked: Optional[Tuple[str, ...]] = None
interests: Optional[Tuple[str, ...]] = None
class SearchState:
def __init__(self, last_cursor: str):
self.last_cursor = last_cursor
class ImdbHelper:
_query_by_id = """query queryWithVariables($id: ID!) {
title(id: $id) {
@@ -98,15 +122,27 @@ class ImdbHelper:
_hash_update_url = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/"
"refs/heads/imdbsource_assets/plugins.v2/imdbsource/imdb_hash.json")
_qid_map = {
MediaType.TV: ["tvSeries", "tvMiniSeries"],
MediaType.TV: ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"],
MediaType.MOVIE: ["movie"]
}
_imdb_headers = {
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome"
"/84.0.4147.105 Safari/537.36",
"Referer": "https://www.imdb.com/",
}
all_title_types = ["tvSeries", "tvMiniSeries", "movie", "tvMovie", "musicVideo", "tvShort", "short",
"tvEpisode", "tvSpecial", "videoGame"]
interest_id = {
"Anime": "in0000027",
"Superhero": "in0000008",
"Sitcom": "in0000044",
"Coming-of-Age": "in0000073",
"Slasher Horror": "in0000115",
"Raunchy Comedy": "in0000041",
"Documentary": "in0000060"
}
def __init__(self, proxies=None):
self._proxies = proxies
@@ -114,8 +150,9 @@ class ImdbHelper:
self._req_utils = RequestUtils(headers=self._imdb_headers, session=self._session, timeout=10, proxies=proxies)
self._imdb_req = RequestUtils(accept_type="application/json", content_type="application/json",
headers=self._imdb_headers, timeout=10, proxies=proxies)
self._last_cursor = ''
self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None}
self._search_states = OrderedDict()
self._max_states = 30
def imdbid(self, imdbid: str) -> Optional[Dict]:
params = {"operationName": "queryWithVariables", "query": self._query_by_id, "variables": {"id": imdbid}}
@@ -247,69 +284,153 @@ class ImdbHelper:
else:
return None
@staticmethod
def __ranked_list_to_constraint(ranked: str) -> Optional[Dict]:
"""
"TOP_RATED_MOVIES-100": "IMDb Top 100",
"TOP_RATED_MOVIES-250": "IMDb Top 250",
"TOP_RATED_MOVIES-1000": "IMDb Top 1000",
"LOWEST_RATED_MOVIES-100": "IMDb Bottom 100",
"LOWEST_RATED_MOVIES-250": "IMDb Bottom 250",
"LOWEST_RATED_MOVIES-1000": "IMDb Bottom 1000"
"""
pattern = r'^(TOP_RATED_MOVIES|LOWEST_RATED_MOVIES)-(\d+)$'
match = re.match(pattern, ranked)
if match:
ranked_title_list_type = match.group(1)
rank_range = int(match.group(2))
constraint = {"rankRange": {"max": rank_range}, "rankedTitleListType": ranked_title_list_type}
return constraint
return None
def advanced_title_search(self,
sha256: str = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4',
first_page: bool = True,
title_type: MediaType = MediaType.TV,
genres: Optional[List] = None,
title_types: Optional[Tuple[str, ...]] = None,
genres: Optional[Tuple[str, ...]] = None,
sort_by: str = 'POPULARITY',
sort_order: str = 'ASC',
rating_min: Optional[float] = None,
rating_max: Optional[float] = None,
countries: Optional[List] = None,
languages: Optional[list] = None,
countries: Optional[Tuple[str, ...]] = None,
languages: Optional[Tuple[str, ...]] = None,
release_date_end: Optional[str] = None,
release_date_start: Optional[str] = None,
award_constraint: Optional[List[str]] = None
) -> Optional[Dict]:
award_constraint: Optional[Tuple[str, ...]] = None,
ranked: Optional[Tuple[str, ...]] = None,
interests: Optional[Tuple[str, ...]] = None):
# 创建参数对象
params = SearchParams(
title_types=title_types,
genres=genres,
sort_by=sort_by,
sort_order=sort_order,
rating_min=rating_min,
rating_max=rating_max,
countries=countries,
languages=languages,
release_date_end=release_date_end,
release_date_start=release_date_start,
award_constraint=award_constraint,
ranked=ranked,
interests=interests
)
sha256 = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4'
self.__update_hash()
if self._imdb_api_hash.get("AdvancedTitleSearch"):
sha256 = self._imdb_api_hash["AdvancedTitleSearch"]
if title_type not in [MediaType.TV, MediaType.MOVIE]:
return None
# 获取或创建搜索状态
last_cursor = None
if not first_page and params in self._search_states:
search_state = self._search_states.pop(params) # 移除并获取
self._search_states[params] = search_state
# 不是第一页且已有状态 - 使用上次的结果
if search_state.last_cursor:
last_cursor = search_state.last_cursor
# 这里实现基于上次结果的逻辑
else:
# 重新搜索
first_page = True
else:
first_page = True
result = self.__advanced_title_search(params, sha256, first_page, last_cursor)
if result:
page_info = result.get("pageInfo", {})
end_cursor = page_info.get("endCursor", "")
search_state = SearchState(end_cursor)
self._search_states[params] = search_state
if len(self._search_states) > self._max_states:
self._search_states.popitem(last=False) # 移除最旧的条目
return result
def __advanced_title_search(self,
params: SearchParams,
sha256: str,
first_page: bool = True,
last_cursor: Optional[str] = None,
) -> Optional[Dict]:
variables = {"first": 50,
"locale": "en-US",
"sortBy": sort_by,
"sortOrder": sort_order,
"titleTypeConstraint": {"anyTitleTypeIds": self._qid_map[title_type],
"excludeTitleTypeIds": []}}
if genres:
variables["genreConstraint"] = {"allGenreIds": genres, "excludeGenreIds": []}
if countries:
variables["originCountryConstraint"] = {"allCountries": countries}
if languages:
variables["languageConstraint"] = {"anyPrimaryLanguages": languages}
if rating_min or rating_max:
rating_min = rating_min if rating_min else 1
"sortBy": params.sort_by,
"sortOrder": params.sort_order,
}
if params.title_types:
title_type_ids = []
for title_type in params.title_types:
if title_type in self.all_title_types:
title_type_ids.append(title_type)
if len(title_type_ids):
variables["titleTypeConstraint"] = {"anyTitleTypeIds": params.title_types,
"excludeTitleTypeIds": []}
if params.genres:
variables["genreConstraint"] = {"allGenreIds": params.genres, "excludeGenreIds": []}
if params.countries:
variables["originCountryConstraint"] = {"allCountries": params.countries}
if params.languages:
variables["languageConstraint"] = {"anyPrimaryLanguages": params.languages}
if params.rating_min or params.rating_max:
rating_min = params.rating_min if params.rating_min else 1
rating_min = max(rating_min, 1)
rating_max = rating_max if rating_max else 10
rating_max = params.rating_max if params.rating_max else 10
rating_max = min(rating_max, 10)
variables["userRatingsConstraint"] = {"aggregateRatingRange": {"max": rating_max, "min": rating_min}}
if release_date_start or release_date_end:
if params.release_date_start or params.release_date_end:
release_dict = {}
if release_date_start:
release_dict["start"] = release_date_start
if release_date_end:
release_dict["end"] = release_date_end
if params.release_date_start:
release_dict["start"] = params.release_date_start
if params.release_date_end:
release_dict["end"] = params.release_date_end
variables["releaseDateConstraint"] = {"releaseDateRange": release_dict}
if award_constraint:
if params.award_constraint:
constraints = []
for award in award_constraint:
for award in params.award_constraint:
c = self.__award_to_constraint(award)
if c:
constraints.append(c)
variables["awardConstraint"] = {"allEventNominations": constraints}
if not first_page and self._last_cursor:
variables["after"] = self._last_cursor
if params.ranked:
constraints = []
for r in params.ranked:
c = self.__ranked_list_to_constraint(r)
if c:
constraints.append(c)
variables["rankedTitleListConstraint"] = {"allRankedTitleLists": constraints,
"excludeRankedTitleLists": []}
if params.interests:
constraints = []
for interest in params.interests:
in_id = self.interest_id.get(interest)
if in_id:
constraints.append(in_id)
variables["interestConstraint"] = {"allInterestIds": constraints, "excludeInterestIds": []}
if not first_page and last_cursor:
variables["after"] = last_cursor
params = {"operationName": "AdvancedTitleSearch",
"variables": variables}
data = self.__request(params, sha256)
if not data:
return None
page_info = data.get("advancedTitleSearch", {}).get("pageInfo", {})
end_cursor = page_info.get("endCursor", "")
self._last_cursor = end_cursor
return data.get("advancedTitleSearch")
def __known_as(self, imdbid: str,
@@ -531,10 +652,10 @@ class ImdbHelper:
mtype: MediaType,
imdbid: str) -> dict:
"""
给定IMDB号查询一条媒体信息
:param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份)
:param imdbid: IMDB的ID
"""
给定IMDB号查询一条媒体信息
:param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份)
:param imdbid: IMDB的ID
"""
# 查询TMDB详情
if mtype == MediaType.MOVIE:
imdb_info = self.imdbid(imdbid)