Merge pull request #852 from wumode/imdbsource

This commit is contained in:
jxxghp
2025-07-19 19:24:10 +08:00
committed by GitHub
3 changed files with 583 additions and 41 deletions

View File

@@ -433,13 +433,14 @@
},
"ImdbSource": {
"name": "IMDb源",
"description": "让探索推荐支持IMDb数据源。",
"description": "让探索推荐和媒体识别支持IMDb数据源。",
"labels": "探索",
"version": "1.4.4",
"version": "1.5.0",
"icon": "IMDb_IOS-OSX_App.png",
"author": "wumode",
"level": 1,
"history": {
"v1.5.0": "支持媒体识别",
"v1.4.4": "更新数据源",
"v1.4.3": "为仪表盘组件添加缓存",
"v1.4.2": "优化小屏幕组件显示",

View File

@@ -3,26 +3,32 @@ from datetime import datetime
import re
from apscheduler.schedulers.background import BackgroundScheduler
import zhconv
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.chain import ChainBase
from app.plugins import _PluginBase
from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData
from app.schemas.types import ChainEventType, MediaType
from app.schemas.types import ChainEventType
from app.plugins.imdbsource.imdbhelper import ImdbHelper
from app import schemas
from app.utils.http import RequestUtils
from app.schemas.types import MediaType
from app.core.meta import MetaBase
from app.core.context import MediaInfo
from app.log import logger
class ImdbSource(_PluginBase):
# 插件名称
plugin_name = "IMDb源"
# 插件描述
plugin_desc = "让探索推荐支持IMDb数据源。"
plugin_desc = "让探索推荐和媒体识别支持IMDb数据源。"
# 插件图标
plugin_icon = "IMDb_IOS-OSX_App.png"
# 插件版本
plugin_version = "1.4.4"
plugin_version = "1.5.0"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -38,7 +44,9 @@ class ImdbSource(_PluginBase):
_enabled: bool = False
_proxy: bool = False
_staff_picks: bool = False
_recognize_media: bool = False
_component_size: str = 'medium'
_recognition_mode = 'auxiliary'
# 私有属性
_imdb_helper = None
@@ -46,14 +54,41 @@ class ImdbSource(_PluginBase):
"trending_in_documentary": [], "imdb_top_250": [], "staff_picks": {}}
_img_proxy_prefix = ''
_scheduler: Optional[BackgroundScheduler] = None
_original_method = None
def init_plugin(self, config: dict = None):
plugin_instance = self
def patched_recognize_media(chain_self, meta: MetaBase = None,
mtype: Optional[MediaType] = None,
tmdbid: Optional[int] = None,
doubanid: Optional[str] = None,
bangumiid: Optional[int] = None,
episode_group: Optional[str] = None,
cache: bool = True):
# 调用原始方法
if not plugin_instance._original_method:
return None
result = plugin_instance._original_method(chain_self, meta, mtype, tmdbid, doubanid, bangumiid,
episode_group, cache)
if result is None and plugin_instance._enabled and plugin_instance._recognize_media:
logger.info(f"通过插件 {plugin_instance.plugin_name} 执行recognize_media ...")
return plugin_instance.recognize_media(meta, mtype)
return result
# 给 patch 函数加唯一标记
patched_recognize_media._patched_by = id(self)
# 保存原始方法
if not (hasattr(ChainBase.recognize_media, "_patched_by") and
ChainBase.recognize_media._patched_by == id(self)):
self._original_method = getattr(ChainBase, "recognize_media", None)
if config:
self._enabled = config.get("enabled")
self._proxy = config.get("proxy")
self._staff_picks = config.get("staff_picks")
self._component_size = config.get("component_size", "medium")
self._imdb_helper = ImdbHelper()
self._recognize_media = config.get("recognize_media")
self._component_size = config.get("component_size") or "medium"
self._recognition_mode = config.get("recognition_mode") or "auxiliary"
self._imdb_helper = ImdbHelper(proxies=settings.PROXY if self._proxy else None)
if "media-amazon.com" not in settings.SECURITY_IMAGE_DOMAINS:
settings.SECURITY_IMAGE_DOMAINS.append("media-amazon.com")
@@ -63,6 +98,20 @@ class ImdbSource(_PluginBase):
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
self._scheduler.start()
self._scheduler.add_job(self.__cache_staff_picks, trigger='date', run_date=None)
if self._recognize_media and self._recognition_mode == 'auxiliary':
# 替换 ChainBase.recognize_media
if not (hasattr(ChainBase.recognize_media, "_patched_by") and
ChainBase.recognize_media._patched_by == id(self)):
ChainBase.recognize_media = patched_recognize_media
else:
# 恢复 ChainBase.recognize_media
if (hasattr(ChainBase.recognize_media, "_patched_by") and
ChainBase.recognize_media._patched_by == id(self) and
self._original_method):
ChainBase.recognize_media = self._original_method
else:
self.stop_service()
def get_state(self) -> bool:
return self._enabled
@@ -99,7 +148,7 @@ class ImdbSource(_PluginBase):
return MediaType.MOVIE, datetime.now().date().strftime("%Y"), ''
media_id = title.get('titleType', {}).get('id')
release_year = title.get('releaseYear', {}).get('year') or datetime.now().date().strftime("%Y")
media_type = ImdbSource.title_id_to_mtype(media_id)
media_type = ImdbHelper.type_to_mtype(media_id)
media_plot = title.get("plot", {}).get("plotText", {}).get("plainText", '')
return media_type, release_year, media_plot
@@ -440,7 +489,7 @@ class ImdbSource(_PluginBase):
"content": [
{
"component": "VCol",
"props": {"cols": 12, "md": 4},
"props": {"cols": 12, "md": 3},
"content": [
{
"component": "VSwitch",
@@ -455,7 +504,7 @@ class ImdbSource(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
'md': 3
},
'content': [
{
@@ -471,7 +520,7 @@ class ImdbSource(_PluginBase):
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
'md': 3
},
'content': [
{
@@ -483,6 +532,22 @@ class ImdbSource(_PluginBase):
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'recognize_media',
'label': '媒体识别',
}
}
]
}
],
},
{
@@ -507,6 +572,26 @@ class ImdbSource(_PluginBase):
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSelect',
'props': {
'model': 'recognition_mode',
'label': '媒体识别工作模式',
'items': [
{"title": "仅当系统无法识别", "value": "auxiliary"},
{"title": "正常", "value": "hijacking"}
]
}
}
]
}
]
}
@@ -516,7 +601,9 @@ class ImdbSource(_PluginBase):
"enabled": False,
"proxy": False,
"staff_picks": False,
"component_size": "medium"
"recognize_media": False,
"component_size": "medium",
"recognition_mode": "auxiliary"
}
def get_page(self) -> List[dict]:
@@ -526,7 +613,10 @@ class ImdbSource(_PluginBase):
"""
退出插件
"""
pass
if (hasattr(ChainBase.recognize_media, "_patched_by") and
ChainBase.recognize_media._patched_by == id(self) and
self._original_method):
ChainBase.recognize_media = self._original_method
def get_module(self) -> Dict[str, Any]:
"""
@@ -536,7 +626,10 @@ class ImdbSource(_PluginBase):
"id2": self.xxx2,
}
"""
pass
modules = {}
if self._recognize_media and self._recognition_mode == 'hijacking':
modules['recognize_media'] = self.recognize_media
return modules
def __cache_staff_picks(self):
entries = self._imdb_helper.staff_picks()
@@ -613,7 +706,7 @@ class ImdbSource(_PluginBase):
release_date_str = '0000-00-00'
if series_info.get("releaseDate"):
release_date = series_info.get('releaseDate')
release_date_str = f"{release_date.get('year')}-{release_date.get('month')}-{release_date.get('day')}"
release_date_str = ImdbHelper.release_date_string(release_date)
return schemas.MediaInfo(
type="电视剧",
title=title,
@@ -629,14 +722,6 @@ class ImdbSource(_PluginBase):
imdb_id=series_info.get("id")
)
@staticmethod
def title_id_to_mtype(title_id: str) -> MediaType:
if title_id in ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"]:
return MediaType.TV
elif title_id in ["movie", "tvMovie"]:
return MediaType.MOVIE
return MediaType.UNKNOWN
@staticmethod
def is_mobile(user_agent):
mobile_keywords = [
@@ -681,7 +766,7 @@ class ImdbSource(_PluginBase):
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
mtype = ImdbHelper.type_to_mtype(title_type_id)
if mtype == MediaType.MOVIE:
res.append(self.__movie_to_media(item.get('node').get("title")))
elif mtype == MediaType.TV:
@@ -722,7 +807,7 @@ class ImdbSource(_PluginBase):
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
mtype = ImdbHelper.type_to_mtype(title_type_id)
if mtype == MediaType.MOVIE:
res.append(self.__movie_to_media(item.get('node').get("title")))
return res
@@ -761,7 +846,7 @@ class ImdbSource(_PluginBase):
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
mtype = ImdbHelper.type_to_mtype(title_type_id)
if mtype == MediaType.TV:
res.append(self.__series_to_media(item.get('node').get("title")))
return res
@@ -800,7 +885,7 @@ class ImdbSource(_PluginBase):
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
mtype = ImdbHelper.type_to_mtype(title_type_id)
if mtype == MediaType.MOVIE:
res.append(self.__movie_to_media(item.get('node').get("title")))
elif mtype == MediaType.TV:
@@ -840,7 +925,7 @@ class ImdbSource(_PluginBase):
res = []
for item in results:
title_type_id = item.get('node').get("title").get("titleType", {}).get("id")
mtype = self.title_id_to_mtype(title_type_id)
mtype = ImdbHelper.type_to_mtype(title_type_id)
if mtype == MediaType.MOVIE:
res.append(self.__movie_to_media(item.get('node').get("title")))
elif mtype == MediaType.TV:
@@ -1620,12 +1705,12 @@ class ImdbSource(_PluginBase):
type='Rankings'
)
trending_in_anime: schemas.RecommendMediaSource = schemas.RecommendMediaSource(
name="IMDb Trending in Anime",
name="Trending Anime on IMDb",
api_path=f"plugin/ImdbSource/trending_in_anime?apikey={settings.API_TOKEN}",
type='Anime'
)
trending_in_sitcom: schemas.RecommendMediaSource = schemas.RecommendMediaSource(
name="IMDb Trending in Sitcom",
name="Trending Sitcom on IMDb",
api_path=f"plugin/ImdbSource/trending_in_sitcom?apikey={settings.API_TOKEN}",
type='TV Shows'
)
@@ -1636,7 +1721,7 @@ class ImdbSource(_PluginBase):
type='Movies'
)
imdb_documentary: schemas.RecommendMediaSource = schemas.RecommendMediaSource(
name="IMDb Trending in Documentary",
name="Trending Documentary on IMDb",
api_path=f"plugin/ImdbSource/trending_in_documentary?apikey={settings.API_TOKEN}",
type='Rankings'
)
@@ -1645,3 +1730,99 @@ class ImdbSource(_PluginBase):
event_data.extra_sources = trending_source
else:
event_data.extra_sources.extend(trending_source)
def recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
**kwargs) -> Optional[MediaInfo]:
"""
识别媒体信息
:param meta: 识别的元数据
:param mtype: 识别的媒体类型
:return: 识别的媒体信息,包括剧集信息
"""
if not self._enabled:
return None
if not meta:
return None
elif not meta.name:
logger.warn("识别媒体信息时未提供元数据名称")
return None
else:
if mtype:
meta.type = mtype
info = {}
# 简体名称
zh_name = zhconv.convert(meta.cn_name, 'zh-hans') if meta.cn_name else None
names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k]))
for name in names:
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
if meta.type == MediaType.UNKNOWN and not meta.year:
info = self._imdb_helper.match_by(name)
else:
if meta.type == MediaType.TV:
info = self._imdb_helper.match(name=name, year=meta.year, mtype=meta.type, season_year=meta.year,
season_number=meta.begin_season)
if not info:
# 去掉年份再查一次
info = self._imdb_helper.match(name=name, mtype=meta.type)
else:
# 有年份先按电影查
info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.TV)
if not info:
# 去掉年份和类型再查一次
info = self._imdb_helper.match_by(name=name)
if info:
break
if info:
info = self._imdb_helper.update_info(info.get('id'), info=info) or {}
mediainfo = ImdbSource._convert_mediainfo(info)
logger.info(f"{meta.name} IMDb 识别结果:{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.imdb_id}")
return mediainfo
return None
@staticmethod
def _convert_mediainfo(info: Dict[str, Any]) -> MediaInfo:
mediainfo = MediaInfo()
mediainfo.source = 'imdb'
mediainfo.type = info.get('media_type')
mediainfo.title = info.get('primaryTitle', '')
mediainfo.year = f"{info.get('startYear', 0)}"
mediainfo.imdb_id = info.get('id')
mediainfo.overview = info.get('plot') or ''
spoken_languages = info.get('spokenLanguages') or []
mediainfo.original_language = spoken_languages[0].get('code') if spoken_languages else None
mediainfo.original_title = info.get('originalTitle')
mediainfo.names = [aka.get('text', '') for aka in (info.get('akas') or [])]
origin_countries = info.get('originCountries') or []
mediainfo.origin_country = [origin_country.get('code', '') for origin_country in origin_countries]
mediainfo.poster_path = (info.get('primaryImage') or {}).get('url')
mediainfo.genres = info.get('genres') or []
directors = []
actors = []
for credit in (info.get('credits') or []):
name = credit.get('name') or {}
if credit.get('category') == 'DIRECTOR':
directors.append({'name': name.get('displayName')})
elif credit.get('category') in ['CAST', 'ACTOR', 'ACTRESS']:
actors.append({'name': name.get('displayName')})
mediainfo.director = directors
mediainfo.actor = actors
mediainfo.vote_average = round(float((info.get('rating') or {}).get('aggregateRating') or 0), 1)
if mediainfo.type == MediaType.TV:
for season, season_info in info.get('seasons', {}).items():
episode_count = season_info.get("episode_count")
mediainfo.seasons[season] = list(range(1, episode_count + 1))
air_date = season_info.get("air_date")
if air_date:
mediainfo.season_years[season] = air_date[:4]
if not mediainfo.release_date:
mediainfo.release_date = air_date
return mediainfo

View File

@@ -1,6 +1,6 @@
import re
from json import JSONDecodeError
from typing import Optional, Any, Dict, Tuple, List
from typing import Optional, Any, Dict, Tuple, List, Union
from collections import OrderedDict
from dataclasses import dataclass
import json
@@ -11,8 +11,10 @@ import requests
from app.core.config import settings
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
from app.utils.common import retry
from app.core.cache import cached
from app.schemas.types import MediaType
@dataclass(frozen=True)
@@ -39,15 +41,9 @@ class SearchState:
class ImdbHelper:
_official_endpoint = "https://caching.graphql.imdb.com/"
_imdb_headers = {
"Accept": "text/html,application/json,text/plain,*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome"
"/84.0.4147.105 Safari/537.36",
"Referer": "https://www.imdb.com/",
}
all_title_types = ["tvSeries", "tvMiniSeries", "movie", "tvMovie", "musicVideo", "tvShort", "short",
"tvEpisode", "tvSpecial", "videoGame"]
"tvEpisode", "tvSpecial"]
interest_id = {
"Anime": "in0000027",
"Superhero": "in0000008",
@@ -57,6 +53,14 @@ class ImdbHelper:
"Raunchy Comedy": "in0000041",
"Documentary": "in0000060"
}
_official_endpoint = "https://caching.graphql.imdb.com/"
_imdb_headers = {
"Accept": "text/html,application/json,text/plain,*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome"
"/84.0.4147.105 Safari/537.36",
"Referer": "https://www.imdb.com/",
}
_free_api = "https://api.imdbapi.dev"
def __init__(self, proxies=None):
self._proxies = proxies
@@ -66,6 +70,7 @@ class ImdbHelper:
timeout=10,
proxies=proxies,
session=requests.Session())
self._free_imdb_req = RequestUtils(accept_type="application/json", proxies=proxies, session=requests.Session())
self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None}
self.hash_status = {"AdvancedTitleSearch": False, "TitleAkasPaginated": False}
self._search_states = OrderedDict()
@@ -212,6 +217,37 @@ class ImdbHelper:
return constraint
return None
@staticmethod
def __compare_names(file_name: str, names: Union[list,str]) -> bool:
"""
比较文件名是否匹配,忽略大小写和特殊字符
:param file_name: 识别的文件名或者种子名
:param names: TMDB返回的译名
:return: True or False
"""
if not file_name or not names:
return False
if not isinstance(names, list):
names = [names]
file_name = StringUtils.clear(file_name).upper()
for name in names:
name = StringUtils.clear(name).strip().upper()
if file_name == name:
return True
return False
@staticmethod
def type_to_mtype(title_id: str) -> MediaType:
if title_id in ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"]:
return MediaType.TV
elif title_id in ["movie", "tvMovie"]:
return MediaType.MOVIE
return MediaType.UNKNOWN
@staticmethod
def release_date_string(release_date: Dict) -> Optional[str]:
return f"{release_date.get('year', 0):04d}-{release_date.get('month', 0):02d}-{release_date.get('day', 0):02d}"
def advanced_title_search(self,
first_page: bool = True,
title_types: Optional[Tuple[str, ...]] = None,
@@ -374,6 +410,7 @@ class ImdbHelper:
"""
return (self.__get_staff_picks() or {}).get('entries')
@cached(maxsize=128, ttl=3600)
def vertical_list_page_items(self,
titles: Optional[List[str]] = None,
names: Optional[List[str]] = None,
@@ -391,6 +428,7 @@ class ImdbHelper:
},
'titleType': {'id': 'movie'},
'releaseYear': {'year': 2025},
'akas': {'edges': [{'node': {'text': 'Kite Festival of Love', 'country': None, 'language': None}}]}
'primaryImage': {
'id': 'rm3920935426',
'url': '',
@@ -429,7 +467,7 @@ class ImdbHelper:
]
}
"""
query = "query VerticalListPageItems( $titles: [ID!]! $names: [ID!]! $images: [ID!]! $videos: [ID!]! ) {\n titles(ids: $titles) { ...TitleParts meterRanking { currentRank meterType rankChange {changeDirection difference} } ratingsSummary { aggregateRating } }\n names(ids: $names) { ...NameParts }\n videos(ids: $videos) { ...VideoParts }\n images(ids: $images) { ...ImageParts }\n }\n fragment TitleParts on Title {\n id\n titleText { text }\n titleType { id }\n releaseYear { year }\n plot { plotText {plainText}}\n primaryImage { id url width height }\n}\n fragment NameParts on Name {\n id\n nameText { text }\n primaryImage { id url width height }\n}\n fragment ImageParts on Image {\n id\n height\n width\n url \n}\n fragment VideoParts on Video {\n id\n name { value }\n contentType { displayName { value } id }\n previewURLs { displayName { value } url videoDefinition videoMimeType }\n playbackURLs { displayName { value } url videoDefinition videoMimeType }\n thumbnail { height url width }\n}\n "
query = "query VerticalListPageItems( $titles: [ID!]! $names: [ID!]! $images: [ID!]! $videos: [ID!]!) {\n titles(ids: $titles) { ...TitleParts meterRanking { currentRank meterType rankChange {changeDirection difference} } ratingsSummary { aggregateRating } }\n names(ids: $names) { ...NameParts }\n videos(ids: $videos) { ...VideoParts }\n images(ids: $images) { ...ImageParts }\n}\nfragment TitleParts on Title {\n id\n titleText { text }\n titleType { id }\n releaseYear { year }\n akas(first: 50) { edges { node { text country { id text } language { text text } } } }\n plot { plotText {plainText}}\n primaryImage { id url width height }\n}\nfragment NameParts on Name {\n id\n nameText { text }\n primaryImage { id url width height }\n}\nfragment ImageParts on Image {\n id\n height\n width\n url\n}\nfragment VideoParts on Video {\n id\n name { value }\n contentType { displayName { value } id }\n previewURLs { displayName { value } url videoDefinition videoMimeType }\n playbackURLs { displayName { value } url videoDefinition videoMimeType }\n thumbnail { height url width }\n}"
variables = {'images': images or [],
'titles': titles or [],
'names': names or [],
@@ -444,3 +482,325 @@ class ImdbHelper:
return None
return data
@retry(Exception, logger=logger)
@cached(ttl=6 * 3600)
def __free_imdb_api(self, path: str, params: Optional[dict] = None) -> Optional[dict]:
r = self._free_imdb_req.get_res(url=f"{self._free_api}{path}", params=params, raise_exception=True)
if r is None:
return None
if r.status_code != 200:
logger.warn(f"{r.json().get('message')}")
return None
return r.json()
def advanced_search(self, query: str, media_types: Optional[List[str]] = None, start_year: Optional[int] = None,
end_year: Optional[int] = None, country_code: Optional[str] = None) -> Optional[list]:
"""
Perform an advanced search for titles using a query string with additional filters.
:param query: The search query for titles.
:param media_types: The type of titles to filter by.
MOVIE: Represents a movie title.
TV_SERIES: Represents a TV series title.
TV_MINI_SERIES: Represents a TV mini-series title.
TV_SPECIAL: Represents a TV special title.
TV_MOVIE: Represents a TV movie title.
SHORT: Represents a short title.
VIDEO: Represents a video title.
:param start_year: The start year for filtering titles.
:param end_year: The end year for filtering titles.
:param country_code: The country code for filtering titles.
:return: Search results.
See `curl -X 'GET' 'https://api.imdbapi.dev/search/titles?query=Kite' -H 'accept: application/json'`
"""
endpoint = '/advancedSearch/titles'
params: Dict[str, Any] = {'query': query}
if media_types:
params['types'] = media_types
if start_year:
params['startYear'] = start_year
if end_year:
params['endYear'] = end_year
if country_code:
params['countryCode'] = country_code
r = self.__free_imdb_api(path=endpoint, params=params)
if r is None:
return None
return r.get('titles')
def details(self, title_id: str) -> Optional[dict]:
"""
Retrieve a title's details using its IMDb ID.
:param title_id: IMDb title ID in the format "tt1234567".
:return: Details.
See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947' -H 'accept: application/json'`
"""
endpoint = '/titles/%s'
r = self.__free_imdb_api(path=endpoint % title_id)
return r
def episodes(self, title_id: str, season: Optional[str]=None,
page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[dict]:
"""
Retrieve the episodes associated with a specific title.
:param title_id: IMDb title ID in the format "tt1234567".
:param season: The season number to filter episodes by.
:param page_size: The maximum number of episodes to return per page.
The value must be between 1 and 50. Default is 20.
:param page_token: Token for pagination, if applicable.
:return: Episodes.
See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947/episodes?season=1&pageSize=5' \
-H 'accept: application/json'`
"""
endpoint = '/titles/%s/episodes'
param: Dict[str, Any] = {}
if season is not None:
param['season'] = season
if page_size is not None:
param['pageSize'] = page_size
if page_token is not None:
param['pageToken'] = page_token
r = self.__free_imdb_api(path=endpoint % title_id, params=param)
return r
def seasons(self, title_id: str) -> Optional[List[dict]]:
"""
Retrieve the seasons associated with a specific title.
:param title_id: IMDb title ID in the format "tt1234567".
:return: Seasons.
"""
"""
{[{"season": "1", "episodeCount": 11}]}
"""
endpoint = '/titles/%s/seasons'
r = self.__free_imdb_api(path=endpoint % title_id)
if r is None:
return None
return r.get('seasons')
def credits(self, title_id: str, categories: Optional[List[str]] = None,
page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[dict]:
"""
Retrieve the credits associated with a specific title.
:param title_id: IMDb title ID in the format "tt1234567".
:param categories: The categories to filter credits by.
DIRECTOR: The director category.
WRITER: The writer category.
CAST: The cast category, which includes all actors and actresses.
ACTOR: The actor category.
ACTRESS: The actress category.
:param page_size: The maximum number of episodes to return per page.
The value must be between 1 and 50. Default is 20.
:param page_token: Token for pagination, if applicable.
:return: Credits.
See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947/credits?categories=CAST' \
-H 'accept: application/json'`
"""
endpoint = '/titles/%s/credits'
param: Dict[str, Any] = {}
if categories:
param['categories'] = categories
if page_size is not None:
param['pageSize'] = page_size
if page_token is not None:
param['pageToken'] = page_token
r = self.__free_imdb_api(path=endpoint % title_id, params=param) or {}
return r.get('credits')
def akas(self, title_id: str) -> Optional[list]:
"""
Retrieve the alternative titles (AKAs) associated with a specific title.
:param title_id: IMDb title ID in the format "tt1234567".
:return: AKAs.
[{
"text": "Kite Festival of Love",
"country": {
"code": "CA",
"name": "Canada"
},
"language": {
"code": "fra",
"name": "French"
}
},]
"""
endpoint = '/titles/%s/akas'
r = self.__free_imdb_api(path=endpoint % title_id)
if r is None:
return None
return r.get('akas')
def __get_tv_seasons(self, title_id: str) -> Optional[dict]:
seasons = self.seasons(title_id)
if not seasons:
return None
seasons_dict = {season.get('season'): {**season, 'episode_count': 0, 'air_date': '0000-00-00'}
for season in seasons}
page_token = None
while True:
episodes = self.episodes(title_id, page_size=50, page_token=page_token) or {}
for episode in episodes.get('episodes', []):
s = episode.get('season')
seasons_dict[s]['episode_count'] += 1
if not seasons_dict[s].get('release_date'):
seasons_dict[s]['air_date'] = ImdbHelper.release_date_string(episode.get('releaseDate', {}))
seasons_dict[s]['release_date'] = episode.get('releaseDate')
page_token = episodes.get('nextPageToken')
if not page_token:
break
return seasons_dict
def match_by(self, name: str, mtype: Optional[MediaType] = None, year: Optional[str] = None) -> Optional[dict]:
"""
根据名称同时查询电影和电视剧,没有类型也没有年份时使用
:param name: 识别的文件名或种子名
:param mtype: 类型:电影、电视剧
:param year: 年份,如要是季集需要是首播年份
:return: 匹配的媒体信息
"""
mtypes = [MediaType.MOVIE, MediaType.TV] if not mtype else [mtype]
search_types = []
if MediaType.TV in mtypes:
search_types.extend(['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL'])
if MediaType.MOVIE in mtypes:
search_types.extend(['MOVIE', 'TV_MOVIE'])
if year:
multi_res = self.advanced_search(query=name, start_year=int(year), end_year=int(year), media_types=search_types)
else:
multi_res = self.advanced_search(query=name, media_types=search_types)
ret_info = {}
if multi_res is None or len(multi_res) == 0:
logger.debug(f"{name} 未找到相关媒体息!")
return None
multi_res = [r for r in multi_res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) in mtypes]
multi_res = sorted(
multi_res,
key=lambda x: ('1' if x.get('type') in ['movie', 'tvMovie'] else '0') + (f"{x.get('startYear')}" or '0000'),
reverse=True
)
items = self.vertical_list_page_items([ x.get('id') for x in multi_res])
titles = items.get('titles') if items else []
titles_dict = {}
for title in titles:
titles_dict[title.get('id')] = title
for result in multi_res:
title = titles_dict.get(result.get('id'), {})
start_year = result.get('startYear')
if year and str(start_year) != year:
continue
if ImdbHelper.__compare_names(name, [result.get('primaryTitle', ''), result.get('originalTitle', '')]):
ret_info = result
break
names = [edge.get('node', {}).get('text', '') for edge in title.get('akas', {}).get('edges', [])]
if ImdbHelper.__compare_names(name, names):
ret_info = result
break
if ret_info:
title = titles_dict.get(ret_info.get('id'), {})
ret_info['akas'] = [e.get('node', {}) for e in title.get('akas', {}).get('edges', [])]
ret_info['rating'] = title.get('ratingsSummary') or {}
ret_info['media_type'] = ImdbHelper.type_to_mtype(ret_info.get('type'))
return ret_info
def match_by_season(self, name: str, season_year: str, season_number: int) -> Optional[dict]:
"""
根据电视剧的名称和季的年份及序号匹配 IMDb
:param name: 识别的文件名或者种子名
:param season_year: 季的年份
:param season_number: 季序号
:return: 匹配的媒体信息
"""
def __season_match(_tv_info: dict, _season_year: str) -> bool:
if not _tv_info:
return False
seasons = self.__get_tv_seasons(_tv_info.get('id')) or {}
for season, season_info in seasons.items():
if season_info.get("air_date"):
if season_info.get("air_date")[0:4] == str(_season_year) \
and season == str(season_number):
_tv_info['seasons'] = seasons
return True
return False
search_types = ['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL']
res = self.advanced_search(query=name, media_types=search_types)
if not res:
logger.debug(f"{name} 未找到季{season_number}相关信息!")
return None
tvs = [r for r in res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) == MediaType.TV]
tvs = sorted(tvs, key=lambda x: x.get('startYear') or 0, reverse=True)
items = self.vertical_list_page_items([x.get('id') for x in tvs])
titles = items.get('titles') if items else []
titles_dict = {}
for title in titles:
titles_dict[title.get('id')] = title
for tv in tvs:
# 年份
title = titles_dict.get(tv.get('id'), {})
akas = [e.get('node', {}) for e in title.get('akas', {}).get('edges', [])]
tv_year = tv.get('startYear')
if self.__compare_names(name, [tv.get('primaryTitle', ''), tv.get('originalTitle', '')]) and \
str(tv_year) == season_year:
tv['akas'] = akas
tv['rating'] = title.get('ratingsSummary') or {}
return tv
names = [aka.get('text', '') for aka in akas]
if not tv or not self.__compare_names(name, names):
continue
if __season_match(_tv_info=tv, _season_year=season_year):
tv['akas'] = akas
tv['rating'] = title.get('ratingsSummary') or {}
return tv
return None
def match(self, name: str,
mtype: MediaType,
year: Optional[str] = None,
season_year: Optional[str] = None,
season_number: Optional[int] = None,
) -> Optional[dict]:
"""
搜索 IMDb 中的媒体信息,匹配返回一条尽可能正确的信息
:param name: 检索的名称
:param mtype: 类型:电影、电视剧
:param year: 年份,如要是季集需要是首播年份
:param season_year: 当前季集年份
:param season_number: 季集,整数
:return: 匹配的媒体信息
"""
if not name:
return None
info = {}
if mtype == MediaType.TV:
# 有当前季和当前季集年份,使用精确匹配
if season_year and season_number:
logger.debug(f"正在识别{mtype.value}{name}, 季集={season_number}, 季集年份={season_year} ...")
info = self.match_by_season(name, season_year, season_number)
if info:
info['media_type'] = MediaType.TV
return info
year_range = [year, str(int(year) + 1), str(int(year) - 1)] if year else [None]
for year in year_range:
logger.debug(f"正在识别{mtype.value}{name}, 年份={year} ...")
info = self.match_by(name, mtype, year)
if info:
break
return info
def update_info(self, title_id: str, info: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""
Given a Title ID, update its media information.
:param title_id: IMDb ID.
:param info: Media information to be updated.
:return: IMDb info.
"""
details = self.details(title_id) or {}
info = info or {}
info.update(details)
if info.get("akas") is None:
info['akas'] = self.akas(title_id) or []
info['credits'] = self.credits(title_id, page_size=30)
if info.get('media_type') == MediaType.TV and info.get('seasons') is None:
info['seasons'] = self.__get_tv_seasons(info.get('id')) or {}
return info