From 81ac3bb3ed5698181374fb66aa2597743a415a58 Mon Sep 17 00:00:00 2001 From: wumode Date: Sat, 19 Jul 2025 15:57:42 +0800 Subject: [PATCH 1/3] =?UTF-8?q?update(ImdbSource)=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=AA=92=E4=BD=93=E8=AF=86=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.v2.json | 5 +- plugins.v2/imdbsource/__init__.py | 221 ++++++++++++++-- plugins.v2/imdbsource/imdbhelper.py | 381 +++++++++++++++++++++++++++- 3 files changed, 569 insertions(+), 38 deletions(-) diff --git a/package.v2.json b/package.v2.json index 32c7001..1638045 100644 --- a/package.v2.json +++ b/package.v2.json @@ -433,13 +433,14 @@ }, "ImdbSource": { "name": "IMDb源", - "description": "让探索和推荐支持IMDb数据源。", + "description": "让探索,推荐和媒体识别支持IMDb数据源。", "labels": "探索", - "version": "1.4.4", + "version": "1.5.0", "icon": "IMDb_IOS-OSX_App.png", "author": "wumode", "level": 1, "history": { + "v1.5.0": "支持媒体识别", "v1.4.4": "更新数据源", "v1.4.3": "为仪表盘组件添加缓存", "v1.4.2": "优化小屏幕组件显示", diff --git a/plugins.v2/imdbsource/__init__.py b/plugins.v2/imdbsource/__init__.py index b242ba8..36fb59d 100644 --- a/plugins.v2/imdbsource/__init__.py +++ b/plugins.v2/imdbsource/__init__.py @@ -3,26 +3,32 @@ from datetime import datetime import re from apscheduler.schedulers.background import BackgroundScheduler +import zhconv from app.core.config import settings from app.core.event import eventmanager, Event +from app.chain import ChainBase from app.plugins import _PluginBase from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData -from app.schemas.types import ChainEventType, MediaType +from app.schemas.types import ChainEventType from app.plugins.imdbsource.imdbhelper import ImdbHelper from app import schemas from app.utils.http import RequestUtils +from app.schemas.types import MediaType +from app.core.meta import MetaBase +from app.core.context import MediaInfo +from app.log import logger class ImdbSource(_PluginBase): # 插件名称 plugin_name = "IMDb源" # 插件描述 - plugin_desc = "让探索和推荐支持IMDb数据源。" + plugin_desc = "让探索,推荐和媒体识别支持IMDb数据源。" # 插件图标 plugin_icon = "IMDb_IOS-OSX_App.png" # 插件版本 - plugin_version = "1.4.4" + plugin_version = "1.5.0" # 插件作者 plugin_author = "wumode" # 作者主页 @@ -38,7 +44,9 @@ class ImdbSource(_PluginBase): _enabled: bool = False _proxy: bool = False _staff_picks: bool = False + _recognize_media: bool = False _component_size: str = 'medium' + _recognition_mode = 'auxiliary' # 私有属性 _imdb_helper = None @@ -46,14 +54,35 @@ class ImdbSource(_PluginBase): "trending_in_documentary": [], "imdb_top_250": [], "staff_picks": {}} _img_proxy_prefix = '' _scheduler: Optional[BackgroundScheduler] = None + _original_method = None def init_plugin(self, config: dict = None): + # monkey patching + if ChainBase.recognize_media.__name__ != 'patched_recognize_media': + self._original_method = ChainBase.recognize_media + plugin_instance = self + # 通过闭包捕获 plugin_instance + def patched_recognize_media(chain_self, meta: MetaBase = None, + mtype: Optional[MediaType] = None, + tmdbid: Optional[int] = None, + doubanid: Optional[str] = None, + bangumiid: Optional[int] = None, + episode_group: Optional[str] = None, + cache: bool = True): + result = plugin_instance._original_method(chain_self, meta, mtype, tmdbid, doubanid, bangumiid, + episode_group, cache) + if result is None and plugin_instance._enabled and plugin_instance._recognize_media: + logger.info(f"通过插件 {self.plugin_name} 执行:recognize_media ...") + return plugin_instance.recognize_media(meta, mtype) + return result + if config: self._enabled = config.get("enabled") self._proxy = config.get("proxy") self._staff_picks = config.get("staff_picks") - self._component_size = config.get("component_size", "medium") - self._imdb_helper = ImdbHelper() + self._recognize_media = config.get("recognize_media") + self._component_size = config.get("component_size") or "medium" + self._recognition_mode = config.get("recognition_mode") or "auxiliary" self._imdb_helper = ImdbHelper(proxies=settings.PROXY if self._proxy else None) if "media-amazon.com" not in settings.SECURITY_IMAGE_DOMAINS: settings.SECURITY_IMAGE_DOMAINS.append("media-amazon.com") @@ -63,6 +92,16 @@ class ImdbSource(_PluginBase): self._scheduler = BackgroundScheduler(timezone=settings.TZ) self._scheduler.start() self._scheduler.add_job(self.__cache_staff_picks, trigger='date', run_date=None) + if self._recognize_media and self._recognition_mode == 'auxiliary': + # 替换 ChainBase.recognize_media + ChainBase.recognize_media = patched_recognize_media + else: + # 恢复 ChainBase.recognize_media + if self._original_method and ChainBase.recognize_media.__name__ == 'patched_recognize_media': + ChainBase.recognize_media = self._original_method + else: + self.stop_service() + def get_state(self) -> bool: return self._enabled @@ -99,7 +138,7 @@ class ImdbSource(_PluginBase): return MediaType.MOVIE, datetime.now().date().strftime("%Y"), '' media_id = title.get('titleType', {}).get('id') release_year = title.get('releaseYear', {}).get('year') or datetime.now().date().strftime("%Y") - media_type = ImdbSource.title_id_to_mtype(media_id) + media_type = ImdbHelper.type_to_mtype(media_id) media_plot = title.get("plot", {}).get("plotText", {}).get("plainText", '') return media_type, release_year, media_plot @@ -440,7 +479,7 @@ class ImdbSource(_PluginBase): "content": [ { "component": "VCol", - "props": {"cols": 12, "md": 4}, + "props": {"cols": 12, "md": 3}, "content": [ { "component": "VSwitch", @@ -455,7 +494,7 @@ class ImdbSource(_PluginBase): 'component': 'VCol', 'props': { 'cols': 12, - 'md': 4 + 'md': 3 }, 'content': [ { @@ -471,7 +510,7 @@ class ImdbSource(_PluginBase): 'component': 'VCol', 'props': { 'cols': 12, - 'md': 4 + 'md': 3 }, 'content': [ { @@ -483,6 +522,22 @@ class ImdbSource(_PluginBase): } ] }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSwitch', + 'props': { + 'model': 'recognize_media', + 'label': '媒体识别', + } + } + ] + } ], }, { @@ -507,6 +562,26 @@ class ImdbSource(_PluginBase): } } ] + }, + { + 'component': 'VCol', + 'props': { + 'cols': 12, + 'md': 3 + }, + 'content': [ + { + 'component': 'VSelect', + 'props': { + 'model': 'recognition_mode', + 'label': '媒体识别工作模式', + 'items': [ + {"title": "仅当系统无法识别", "value": "auxiliary"}, + {"title": "正常", "value": "hijacking"} + ] + } + } + ] } ] } @@ -516,7 +591,9 @@ class ImdbSource(_PluginBase): "enabled": False, "proxy": False, "staff_picks": False, - "component_size": "medium" + "recognize_media": False, + "component_size": "medium", + "recognition_mode": "auxiliary" } def get_page(self) -> List[dict]: @@ -526,7 +603,8 @@ class ImdbSource(_PluginBase): """ 退出插件 """ - pass + if ChainBase.recognize_media.__name__ == 'patched_recognize_media' and self._original_method: + ChainBase.recognize_media = self._original_method def get_module(self) -> Dict[str, Any]: """ @@ -536,7 +614,10 @@ class ImdbSource(_PluginBase): "id2": self.xxx2, } """ - pass + modules = {} + if self._recognize_media and self._recognition_mode == 'hijacking': + modules['recognize_media'] = self.recognize_media + return modules def __cache_staff_picks(self): entries = self._imdb_helper.staff_picks() @@ -613,7 +694,7 @@ class ImdbSource(_PluginBase): release_date_str = '0000-00-00' if series_info.get("releaseDate"): release_date = series_info.get('releaseDate') - release_date_str = f"{release_date.get('year')}-{release_date.get('month')}-{release_date.get('day')}" + release_date_str = ImdbHelper.release_date_string(release_date) return schemas.MediaInfo( type="电视剧", title=title, @@ -629,14 +710,6 @@ class ImdbSource(_PluginBase): imdb_id=series_info.get("id") ) - @staticmethod - def title_id_to_mtype(title_id: str) -> MediaType: - if title_id in ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"]: - return MediaType.TV - elif title_id in ["movie", "tvMovie"]: - return MediaType.MOVIE - return MediaType.UNKNOWN - @staticmethod def is_mobile(user_agent): mobile_keywords = [ @@ -681,7 +754,7 @@ class ImdbSource(_PluginBase): res = [] for item in results: title_type_id = item.get('node').get("title").get("titleType", {}).get("id") - mtype = self.title_id_to_mtype(title_type_id) + mtype = ImdbHelper.type_to_mtype(title_type_id) if mtype == MediaType.MOVIE: res.append(self.__movie_to_media(item.get('node').get("title"))) elif mtype == MediaType.TV: @@ -722,7 +795,7 @@ class ImdbSource(_PluginBase): res = [] for item in results: title_type_id = item.get('node').get("title").get("titleType", {}).get("id") - mtype = self.title_id_to_mtype(title_type_id) + mtype = ImdbHelper.type_to_mtype(title_type_id) if mtype == MediaType.MOVIE: res.append(self.__movie_to_media(item.get('node').get("title"))) return res @@ -761,7 +834,7 @@ class ImdbSource(_PluginBase): res = [] for item in results: title_type_id = item.get('node').get("title").get("titleType", {}).get("id") - mtype = self.title_id_to_mtype(title_type_id) + mtype = ImdbHelper.type_to_mtype(title_type_id) if mtype == MediaType.TV: res.append(self.__series_to_media(item.get('node').get("title"))) return res @@ -800,7 +873,7 @@ class ImdbSource(_PluginBase): res = [] for item in results: title_type_id = item.get('node').get("title").get("titleType", {}).get("id") - mtype = self.title_id_to_mtype(title_type_id) + mtype = ImdbHelper.type_to_mtype(title_type_id) if mtype == MediaType.MOVIE: res.append(self.__movie_to_media(item.get('node').get("title"))) elif mtype == MediaType.TV: @@ -840,7 +913,7 @@ class ImdbSource(_PluginBase): res = [] for item in results: title_type_id = item.get('node').get("title").get("titleType", {}).get("id") - mtype = self.title_id_to_mtype(title_type_id) + mtype = ImdbHelper.type_to_mtype(title_type_id) if mtype == MediaType.MOVIE: res.append(self.__movie_to_media(item.get('node').get("title"))) elif mtype == MediaType.TV: @@ -1645,3 +1718,99 @@ class ImdbSource(_PluginBase): event_data.extra_sources = trending_source else: event_data.extra_sources.extend(trending_source) + + def recognize_media(self, meta: MetaBase = None, + mtype: MediaType = None, + **kwargs) -> Optional[MediaInfo]: + """ + 识别媒体信息 + :param meta: 识别的元数据 + :param mtype: 识别的媒体类型 + :return: 识别的媒体信息,包括剧集信息 + """ + if not self._enabled: + return None + if not meta: + return None + elif not meta.name: + logger.warn("识别媒体信息时未提供元数据名称") + return None + else: + if mtype: + meta.type = mtype + info = {} + # 简体名称 + zh_name = zhconv.convert(meta.cn_name, 'zh-hans') if meta.cn_name else None + names = list(dict.fromkeys([k for k in [meta.cn_name, zh_name, meta.en_name] if k])) + for name in names: + if meta.begin_season: + logger.info(f"正在识别 {name} 第{meta.begin_season}季 ...") + else: + logger.info(f"正在识别 {name} ...") + if meta.type == MediaType.UNKNOWN and not meta.year: + info = self._imdb_helper.match_by(name) + else: + if meta.type == MediaType.TV: + info = self._imdb_helper.match(name=name, year=meta.year, mtype=meta.type, season_year=meta.year, + season_number=meta.begin_season) + if not info: + # 去掉年份再查一次 + info = self._imdb_helper.match(name=name, mtype=meta.type) + else: + # 有年份先按电影查 + info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.MOVIE) + # 没有再按电视剧查 + if not info: + info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.TV) + if not info: + # 去掉年份和类型再查一次 + info = self._imdb_helper.match_by(name=name) + if info: + break + if info: + info = self._imdb_helper.update_info(info.get('id'), info=info) or {} + mediainfo = ImdbSource._convert_mediainfo(info) + logger.info(f"{meta.name} IMDb 识别结果:{mediainfo.type.value} " + f"{mediainfo.title_year} " + f"{mediainfo.imdb_id}") + return mediainfo + return None + + @staticmethod + def _convert_mediainfo(info: Dict[str, Any]) -> MediaInfo: + mediainfo = MediaInfo() + mediainfo.source = 'imdb' + mediainfo.type = info.get('media_type') + mediainfo.title = info.get('primaryTitle', '') + mediainfo.year = f"{info.get('startYear', 0)}" + mediainfo.imdb_id = info.get('id') + mediainfo.overview = info.get('plot') or '' + spoken_languages = info.get('spokenLanguages') or [] + mediainfo.original_language = spoken_languages[0].get('code') if spoken_languages else None + mediainfo.original_title = info.get('originalTitle') + mediainfo.names = [aka.get('text', '') for aka in (info.get('akas') or [])] + origin_countries = info.get('originCountries') or [] + mediainfo.origin_country = [origin_country.get('code', '') for origin_country in origin_countries] + mediainfo.poster_path = (info.get('primaryImage') or {}).get('url') + mediainfo.genres = info.get('genres') or [] + directors = [] + actors = [] + for credit in (info.get('credits') or []): + name = credit.get('name') or {} + if credit.get('category') == 'DIRECTOR': + directors.append({'name': name.get('displayName')}) + elif credit.get('category') in ['CAST', 'ACTOR', 'ACTRESS']: + actors.append({'name': name.get('displayName')}) + mediainfo.director = directors + mediainfo.actor = actors + mediainfo.vote_average = round(float((info.get('rating') or {}).get('aggregateRating') or 0), 1) + if mediainfo.type == MediaType.TV: + for season, season_info in info.get('seasons', {}).items(): + episode_count = season_info.get("episode_count") + mediainfo.seasons[season] = list(range(1, episode_count + 1)) + air_date = season_info.get("air_date") + if air_date: + mediainfo.season_years[season] = air_date[:4] + if not mediainfo.release_date: + mediainfo.release_date = air_date + return mediainfo diff --git a/plugins.v2/imdbsource/imdbhelper.py b/plugins.v2/imdbsource/imdbhelper.py index addd559..aa5f279 100644 --- a/plugins.v2/imdbsource/imdbhelper.py +++ b/plugins.v2/imdbsource/imdbhelper.py @@ -1,6 +1,6 @@ import re from json import JSONDecodeError -from typing import Optional, Any, Dict, Tuple, List +from typing import Optional, Any, Dict, Tuple, List, Union from collections import OrderedDict from dataclasses import dataclass import json @@ -11,8 +11,10 @@ import requests from app.core.config import settings from app.log import logger from app.utils.http import RequestUtils +from app.utils.string import StringUtils from app.utils.common import retry from app.core.cache import cached +from app.schemas.types import MediaType @dataclass(frozen=True) @@ -39,15 +41,9 @@ class SearchState: class ImdbHelper: - _official_endpoint = "https://caching.graphql.imdb.com/" - _imdb_headers = { - "Accept": "text/html,application/json,text/plain,*/*", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome" - "/84.0.4147.105 Safari/537.36", - "Referer": "https://www.imdb.com/", - } + all_title_types = ["tvSeries", "tvMiniSeries", "movie", "tvMovie", "musicVideo", "tvShort", "short", - "tvEpisode", "tvSpecial", "videoGame"] + "tvEpisode", "tvSpecial"] interest_id = { "Anime": "in0000027", "Superhero": "in0000008", @@ -57,6 +53,14 @@ class ImdbHelper: "Raunchy Comedy": "in0000041", "Documentary": "in0000060" } + _official_endpoint = "https://caching.graphql.imdb.com/" + _imdb_headers = { + "Accept": "text/html,application/json,text/plain,*/*", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome" + "/84.0.4147.105 Safari/537.36", + "Referer": "https://www.imdb.com/", + } + _free_api = "https://api.imdbapi.dev" def __init__(self, proxies=None): self._proxies = proxies @@ -66,6 +70,7 @@ class ImdbHelper: timeout=10, proxies=proxies, session=requests.Session()) + self._free_imdb_req = RequestUtils(accept_type="application/json", proxies=proxies, session=requests.Session()) self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None} self.hash_status = {"AdvancedTitleSearch": False, "TitleAkasPaginated": False} self._search_states = OrderedDict() @@ -212,6 +217,37 @@ class ImdbHelper: return constraint return None + @staticmethod + def __compare_names(file_name: str, names: Union[list|str]) -> bool: + """ + 比较文件名是否匹配,忽略大小写和特殊字符 + :param file_name: 识别的文件名或者种子名 + :param names: TMDB返回的译名 + :return: True or False + """ + if not file_name or not names: + return False + if not isinstance(names, list): + names = [names] + file_name = StringUtils.clear(file_name).upper() + for name in names: + name = StringUtils.clear(name).strip().upper() + if file_name == name: + return True + return False + + @staticmethod + def type_to_mtype(title_id: str) -> MediaType: + if title_id in ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"]: + return MediaType.TV + elif title_id in ["movie", "tvMovie"]: + return MediaType.MOVIE + return MediaType.UNKNOWN + + @staticmethod + def release_date_string(release_date: Dict) -> Optional[str]: + return f"{release_date.get('year', 0):04d}-{release_date.get('month', 0):02d}-{release_date.get('day', 0):02d}" + def advanced_title_search(self, first_page: bool = True, title_types: Optional[Tuple[str, ...]] = None, @@ -374,6 +410,7 @@ class ImdbHelper: """ return (self.__get_staff_picks() or {}).get('entries') + @cached(maxsize=128, ttl=3600) def vertical_list_page_items(self, titles: Optional[List[str]] = None, names: Optional[List[str]] = None, @@ -391,6 +428,7 @@ class ImdbHelper: }, 'titleType': {'id': 'movie'}, 'releaseYear': {'year': 2025}, + 'akas': {'edges': [{'node': {'text': 'Kite Festival of Love', 'country': None, 'language': None}}]} 'primaryImage': { 'id': 'rm3920935426', 'url': '', @@ -429,7 +467,7 @@ class ImdbHelper: ] } """ - query = "query VerticalListPageItems( $titles: [ID!]! $names: [ID!]! $images: [ID!]! $videos: [ID!]! ) {\n titles(ids: $titles) { ...TitleParts meterRanking { currentRank meterType rankChange {changeDirection difference} } ratingsSummary { aggregateRating } }\n names(ids: $names) { ...NameParts }\n videos(ids: $videos) { ...VideoParts }\n images(ids: $images) { ...ImageParts }\n }\n fragment TitleParts on Title {\n id\n titleText { text }\n titleType { id }\n releaseYear { year }\n plot { plotText {plainText}}\n primaryImage { id url width height }\n}\n fragment NameParts on Name {\n id\n nameText { text }\n primaryImage { id url width height }\n}\n fragment ImageParts on Image {\n id\n height\n width\n url \n}\n fragment VideoParts on Video {\n id\n name { value }\n contentType { displayName { value } id }\n previewURLs { displayName { value } url videoDefinition videoMimeType }\n playbackURLs { displayName { value } url videoDefinition videoMimeType }\n thumbnail { height url width }\n}\n " + query = "query VerticalListPageItems( $titles: [ID!]! $names: [ID!]! $images: [ID!]! $videos: [ID!]!) {\n titles(ids: $titles) { ...TitleParts meterRanking { currentRank meterType rankChange {changeDirection difference} } ratingsSummary { aggregateRating } }\n names(ids: $names) { ...NameParts }\n videos(ids: $videos) { ...VideoParts }\n images(ids: $images) { ...ImageParts }\n}\nfragment TitleParts on Title {\n id\n titleText { text }\n titleType { id }\n releaseYear { year }\n akas(first: 50) { edges { node { text country { id text } language { text text } } } }\n plot { plotText {plainText}}\n primaryImage { id url width height }\n}\nfragment NameParts on Name {\n id\n nameText { text }\n primaryImage { id url width height }\n}\nfragment ImageParts on Image {\n id\n height\n width\n url\n}\nfragment VideoParts on Video {\n id\n name { value }\n contentType { displayName { value } id }\n previewURLs { displayName { value } url videoDefinition videoMimeType }\n playbackURLs { displayName { value } url videoDefinition videoMimeType }\n thumbnail { height url width }\n}" variables = {'images': images or [], 'titles': titles or [], 'names': names or [], @@ -444,3 +482,326 @@ class ImdbHelper: return None return data + @retry(Exception, logger=logger) + @cached(ttl=6 * 3600) + def __free_imdb_api(self, path: str, params: Optional[dict] = None) -> Optional[dict]: + r = self._free_imdb_req.get_res(url=f"{self._free_api}{path}", params=params, raise_exception=True) + if r is None: + return None + if r.status_code != 200: + logger.warn(f"{r.json().get('message')}") + return None + return r.json() + + def search(self, query: str, media_types: Optional[List[str]] = None, start_year: Optional[int] = None, + end_year: Optional[int] = None, country_code: Optional[str] = None) -> Optional[list]: + """ + Search for titles using a query string. + :param query: The search query for titles. + :param media_types: The type of titles to filter by. + MOVIE: Represents a movie title. + TV_SERIES: Represents a TV series title. + TV_MINI_SERIES: Represents a TV mini-series title. + TV_SPECIAL: Represents a TV special title. + TV_MOVIE: Represents a TV movie title. + SHORT: Represents a short title. + VIDEO: Represents a video title. + VIDEO_GAME: Represents a video game title. + :param start_year: The start year for filtering titles. + :param end_year: The end year for filtering titles. + :param country_code: The country code for filtering titles. + :return: Search results. + See `curl -X 'GET' 'https://api.imdbapi.dev/search/titles?query=Kite' -H 'accept: application/json'` + """ + endpoint = '/search/titles' + params: Dict[str, Any] = {'query': query} + if media_types: + params['types'] = media_types + if start_year: + params['startYear'] = start_year + if end_year: + params['endYear'] = end_year + if country_code: + params['countryCode'] = country_code + r = self.__free_imdb_api(path=endpoint, params=params) + if r is None: + return None + return r.get('titles') + + def details(self, title_id: str) -> Optional[dict]: + """ + Retrieve a title's details using its IMDb ID. + :param title_id: IMDb title ID in the format "tt1234567". + :return: Details. + See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947' -H 'accept: application/json'` + """ + endpoint = '/titles/%s' + r = self.__free_imdb_api(path=endpoint % title_id) + return r + + def episodes(self, title_id: str, season: Optional[str]=None, + page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[dict]: + """ + Retrieve the episodes associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :param season: The season number to filter episodes by. + :param page_size: The maximum number of episodes to return per page. + The value must be between 1 and 50. Default is 20. + :param page_token: Token for pagination, if applicable. + :return: Episodes. + See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947/episodes?season=1&pageSize=5' \ + -H 'accept: application/json'` + """ + endpoint = '/titles/%s/episodes' + param: Dict[str, Any] = {} + if season is not None: + param['season'] = season + if page_size is not None: + param['pageSize'] = page_size + if page_token is not None: + param['pageToken'] = page_token + r = self.__free_imdb_api(path=endpoint % title_id, params=param) + return r + + def seasons(self, title_id: str) -> Optional[List[dict]]: + """ + Retrieve the seasons associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :return: Seasons. + """ + """ + {[{"season": "1", "episodeCount": 11}]} + """ + endpoint = '/titles/%s/seasons' + r = self.__free_imdb_api(path=endpoint % title_id) + if r is None: + return None + return r.get('seasons') + + def credits(self, title_id: str, categories: Optional[List[str]] = None, + page_size: Optional[int] = None, page_token: Optional[str] = None) -> Optional[dict]: + """ + Retrieve the credits associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :param categories: The categories to filter credits by. + DIRECTOR: The director category. + WRITER: The writer category. + CAST: The cast category, which includes all actors and actresses. + ACTOR: The actor category. + ACTRESS: The actress category. + :param page_size: The maximum number of episodes to return per page. + The value must be between 1 and 50. Default is 20. + :param page_token: Token for pagination, if applicable. + :return: Credits. + See `curl -X 'GET' 'https://api.imdbapi.dev/titles/tt0944947/credits?categories=CAST' \ + -H 'accept: application/json'` + """ + endpoint = '/titles/%s/credits' + param: Dict[str, Any] = {} + if categories: + param['categories'] = categories + if page_size is not None: + param['pageSize'] = page_size + if page_token is not None: + param['pageToken'] = page_token + r = self.__free_imdb_api(path=endpoint % title_id, params=param) or {} + return r.get('credits') + + def akas(self, title_id: str) -> Optional[list]: + """ + Retrieve the alternative titles (AKAs) associated with a specific title. + :param title_id: IMDb title ID in the format "tt1234567". + :return: AKAs. + [{ + "text": "Kite Festival of Love", + "country": { + "code": "CA", + "name": "Canada" + }, + "language": { + "code": "fra", + "name": "French" + } + },] + """ + endpoint = '/titles/%s/akas' + r = self.__free_imdb_api(path=endpoint % title_id) + if r is None: + return None + return r.get('akas') + + def __get_tv_seasons(self, title_id: str) -> Optional[dict]: + seasons = self.seasons(title_id) + if not seasons: + return None + seasons_dict = {season.get('season'): {**season, 'episode_count': 0, 'air_date': '0000-00-00'} + for season in seasons} + page_token = None + while True: + episodes = self.episodes(title_id, page_size=50, page_token=page_token) or {} + for episode in episodes.get('episodes', []): + s = episode.get('season') + seasons_dict[s]['episode_count'] += 1 + if not seasons_dict[s].get('release_date'): + seasons_dict[s]['air_date'] = ImdbHelper.release_date_string(episode.get('releaseDate', {})) + seasons_dict[s]['release_date'] = episode.get('releaseDate') + page_token = episodes.get('nextPageToken') + if not page_token: + break + return seasons_dict + + def match_by(self, name: str, mtype: Optional[MediaType] = None, year: Optional[str] = None) -> Optional[dict]: + """ + 根据名称同时查询电影和电视剧,没有类型也没有年份时使用 + :param name: 识别的文件名或种子名 + :param mtype: 类型:电影、电视剧 + :param year: 年份,如要是季集需要是首播年份 + :return: 匹配的媒体信息 + """ + + mtypes = [MediaType.MOVIE, MediaType.TV] if not mtype else [mtype] + search_types = [] + if MediaType.TV in mtypes: + search_types.extend(['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL']) + if MediaType.MOVIE in mtypes: + search_types.extend(['MOVIE', 'TV_MOVIE']) + if year: + multi_res = self.search(query=name, start_year=int(year), end_year=int(year), media_types=search_types) + else: + multi_res = self.search(query=name, media_types=search_types) + ret_info = {} + if multi_res is None or len(multi_res) == 0: + logger.debug(f"{name} 未找到相关媒体息!") + return None + multi_res = [r for r in multi_res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) in mtypes] + multi_res = sorted( + multi_res, + key=lambda x: ('1' if x.get('type') in ['movie', 'tvMovie'] else '0') + (f"{x.get('startYear')}" or '0000'), + reverse=True + ) + items = self.vertical_list_page_items([ x.get('id') for x in multi_res]) + titles = items.get('titles') if items else [] + titles_dict = {} + for title in titles: + titles_dict[title.get('id')] = title + for result in multi_res: + title = titles_dict.get(result.get('id'), {}) + start_year = result.get('startYear') + if year and str(start_year) != year: + continue + if ImdbHelper.__compare_names(name, [result.get('primaryTitle', ''), result.get('originalTitle', '')]): + ret_info = result + break + names = [edge.get('node', {}).get('text', '') for edge in title.get('akas', {}).get('edges', [])] + if ImdbHelper.__compare_names(name, names): + ret_info = result + break + if ret_info: + title = titles_dict.get(ret_info.get('id'), {}) + ret_info['akas'] = [e.get('node', {}) for e in title.get('akas', {}).get('edges', [])] + ret_info['rating'] = title.get('ratingsSummary') or {} + ret_info['media_type'] = ImdbHelper.type_to_mtype(ret_info.get('type')) + return ret_info + + def match_by_season(self, name: str, season_year: str, season_number: int) -> Optional[dict]: + """ + 根据电视剧的名称和季的年份及序号匹配 IMDb + :param name: 识别的文件名或者种子名 + :param season_year: 季的年份 + :param season_number: 季序号 + :return: 匹配的媒体信息 + """ + + def __season_match(_tv_info: dict, _season_year: str) -> bool: + if not _tv_info: + return False + seasons = self.__get_tv_seasons(_tv_info.get('id')) or {} + for season, season_info in seasons.items(): + if season_info.get("air_date"): + if season_info.get("air_date")[0:4] == str(_season_year) \ + and season == str(season_number): + _tv_info['seasons'] = seasons + return True + return False + + search_types = ['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL'] + res = self.search(query=name, media_types=search_types) + if not res: + logger.debug("%s 未找到季%s相关信息!" % (name, season_number)) + return None + tvs = [r for r in res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) == MediaType.TV] + tvs = sorted(tvs, key=lambda x: x.get('startYear') or 0, reverse=True) + items = self.vertical_list_page_items([x.get('id') for x in tvs]) + titles = items.get('titles') if items else [] + titles_dict = {} + for title in titles: + titles_dict[title.get('id')] = title + for tv in tvs: + # 年份 + title = titles_dict.get(tv.get('id'), {}) + akas = [e.get('node', {}) for e in title.get('akas', {}).get('edges', [])] + tv_year = tv.get('startYear') + if self.__compare_names(name, [tv.get('primaryTitle', ''), tv.get('originalTitle', '')]) and \ + str(tv_year) == season_year: + tv['akas'] = akas + tv['rating'] = title.get('ratingsSummary') or {} + return tv + names = [aka.get('text', '') for aka in akas] + if not tv or not self.__compare_names(name, names): + continue + if __season_match(_tv_info=tv, _season_year=season_year): + tv['akas'] = akas + tv['rating'] = title.get('ratingsSummary') or {} + return tv + return None + + def match(self, name: str, + mtype: MediaType, + year: Optional[str] = None, + season_year: Optional[str] = None, + season_number: Optional[int] = None, + ) -> Optional[dict]: + """ + 搜索 IMDb 中的媒体信息,匹配返回一条尽可能正确的信息 + :param name: 检索的名称 + :param mtype: 类型:电影、电视剧 + :param year: 年份,如要是季集需要是首播年份 + :param season_year: 当前季集年份 + :param season_number: 季集,整数 + :return: 匹配的媒体信息 + """ + if not name: + return None + info = {} + if mtype == MediaType.TV: + # 有当前季和当前季集年份,使用精确匹配 + if season_year and season_number: + logger.debug(f"正在识别{mtype.value}:{name}, 季集={season_number}, 季集年份={season_year} ...") + info = self.match_by_season(name, season_year, season_number) + if info: + info['media_type'] = MediaType.TV + return info + year_range = [year, str(int(year) + 1), str(int(year) - 1)] if year else [None] + for year in year_range: + logger.debug(f"正在识别{mtype.value}:{name}, 年份={year} ...") + info = self.match_by(name, mtype, year) + if info: + break + return info + + def update_info(self, title_id: str, info: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """ + Given a Title ID, update its media information. + :param title_id: IMDb ID. + :param info: Media information to be updated. + :return: IMDb info. + """ + details = self.details(title_id) or {} + info = info or {} + info.update(details) + if info.get("akas") is None: + info['akas'] = self.akas(title_id) or [] + info['credits'] = self.credits(title_id, page_size=30) + if info.get('media_type') == MediaType.TV and info.get('seasons') is None: + info['seasons'] = self.__get_tv_seasons(info.get('id')) or {} + return info \ No newline at end of file From 89adf2c9ac1cb6b9a3ff9d75ad59638f79c3daf1 Mon Sep 17 00:00:00 2001 From: wumode Date: Sat, 19 Jul 2025 17:17:59 +0800 Subject: [PATCH 2/3] =?UTF-8?q?update(ImdbSource)=20=E6=9B=B4=E6=96=B0API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- plugins.v2/imdbsource/__init__.py | 6 +++--- plugins.v2/imdbsource/imdbhelper.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/plugins.v2/imdbsource/__init__.py b/plugins.v2/imdbsource/__init__.py index 36fb59d..74f4726 100644 --- a/plugins.v2/imdbsource/__init__.py +++ b/plugins.v2/imdbsource/__init__.py @@ -1693,12 +1693,12 @@ class ImdbSource(_PluginBase): type='Rankings' ) trending_in_anime: schemas.RecommendMediaSource = schemas.RecommendMediaSource( - name="IMDb Trending in Anime", + name="Trending Anime on IMDb", api_path=f"plugin/ImdbSource/trending_in_anime?apikey={settings.API_TOKEN}", type='Anime' ) trending_in_sitcom: schemas.RecommendMediaSource = schemas.RecommendMediaSource( - name="IMDb Trending in Sitcom", + name="Trending Sitcom on IMDb", api_path=f"plugin/ImdbSource/trending_in_sitcom?apikey={settings.API_TOKEN}", type='TV Shows' ) @@ -1709,7 +1709,7 @@ class ImdbSource(_PluginBase): type='Movies' ) imdb_documentary: schemas.RecommendMediaSource = schemas.RecommendMediaSource( - name="IMDb Trending in Documentary", + name="Trending Documentary on IMDb", api_path=f"plugin/ImdbSource/trending_in_documentary?apikey={settings.API_TOKEN}", type='Rankings' ) diff --git a/plugins.v2/imdbsource/imdbhelper.py b/plugins.v2/imdbsource/imdbhelper.py index aa5f279..4583cac 100644 --- a/plugins.v2/imdbsource/imdbhelper.py +++ b/plugins.v2/imdbsource/imdbhelper.py @@ -493,10 +493,10 @@ class ImdbHelper: return None return r.json() - def search(self, query: str, media_types: Optional[List[str]] = None, start_year: Optional[int] = None, + def advanced_search(self, query: str, media_types: Optional[List[str]] = None, start_year: Optional[int] = None, end_year: Optional[int] = None, country_code: Optional[str] = None) -> Optional[list]: """ - Search for titles using a query string. + Perform an advanced search for titles using a query string with additional filters. :param query: The search query for titles. :param media_types: The type of titles to filter by. MOVIE: Represents a movie title. @@ -513,7 +513,7 @@ class ImdbHelper: :return: Search results. See `curl -X 'GET' 'https://api.imdbapi.dev/search/titles?query=Kite' -H 'accept: application/json'` """ - endpoint = '/search/titles' + endpoint = '/advancedSearch/titles' params: Dict[str, Any] = {'query': query} if media_types: params['types'] = media_types @@ -666,9 +666,9 @@ class ImdbHelper: if MediaType.MOVIE in mtypes: search_types.extend(['MOVIE', 'TV_MOVIE']) if year: - multi_res = self.search(query=name, start_year=int(year), end_year=int(year), media_types=search_types) + multi_res = self.advanced_search(query=name, start_year=int(year), end_year=int(year), media_types=search_types) else: - multi_res = self.search(query=name, media_types=search_types) + multi_res = self.advanced_search(query=name, media_types=search_types) ret_info = {} if multi_res is None or len(multi_res) == 0: logger.debug(f"{name} 未找到相关媒体息!") @@ -725,7 +725,7 @@ class ImdbHelper: return False search_types = ['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL'] - res = self.search(query=name, media_types=search_types) + res = self.advanced_search(query=name, media_types=search_types) if not res: logger.debug("%s 未找到季%s相关信息!" % (name, season_number)) return None From 53ae912bfc7fcac48dbaafd1c83ccd4f9290ddc1 Mon Sep 17 00:00:00 2001 From: wumode Date: Sat, 19 Jul 2025 18:38:52 +0800 Subject: [PATCH 3/3] fix: bugs --- plugins.v2/imdbsource/__init__.py | 28 ++++++++++++++++++++-------- plugins.v2/imdbsource/imdbhelper.py | 5 ++--- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/plugins.v2/imdbsource/__init__.py b/plugins.v2/imdbsource/__init__.py index 74f4726..83a4fa1 100644 --- a/plugins.v2/imdbsource/__init__.py +++ b/plugins.v2/imdbsource/__init__.py @@ -57,11 +57,8 @@ class ImdbSource(_PluginBase): _original_method = None def init_plugin(self, config: dict = None): - # monkey patching - if ChainBase.recognize_media.__name__ != 'patched_recognize_media': - self._original_method = ChainBase.recognize_media + plugin_instance = self - # 通过闭包捕获 plugin_instance def patched_recognize_media(chain_self, meta: MetaBase = None, mtype: Optional[MediaType] = None, tmdbid: Optional[int] = None, @@ -69,12 +66,21 @@ class ImdbSource(_PluginBase): bangumiid: Optional[int] = None, episode_group: Optional[str] = None, cache: bool = True): + # 调用原始方法 + if not plugin_instance._original_method: + return None result = plugin_instance._original_method(chain_self, meta, mtype, tmdbid, doubanid, bangumiid, episode_group, cache) if result is None and plugin_instance._enabled and plugin_instance._recognize_media: - logger.info(f"通过插件 {self.plugin_name} 执行:recognize_media ...") + logger.info(f"通过插件 {plugin_instance.plugin_name} 执行:recognize_media ...") return plugin_instance.recognize_media(meta, mtype) return result + # 给 patch 函数加唯一标记 + patched_recognize_media._patched_by = id(self) + # 保存原始方法 + if not (hasattr(ChainBase.recognize_media, "_patched_by") and + ChainBase.recognize_media._patched_by == id(self)): + self._original_method = getattr(ChainBase, "recognize_media", None) if config: self._enabled = config.get("enabled") @@ -94,10 +100,14 @@ class ImdbSource(_PluginBase): self._scheduler.add_job(self.__cache_staff_picks, trigger='date', run_date=None) if self._recognize_media and self._recognition_mode == 'auxiliary': # 替换 ChainBase.recognize_media - ChainBase.recognize_media = patched_recognize_media + if not (hasattr(ChainBase.recognize_media, "_patched_by") and + ChainBase.recognize_media._patched_by == id(self)): + ChainBase.recognize_media = patched_recognize_media else: # 恢复 ChainBase.recognize_media - if self._original_method and ChainBase.recognize_media.__name__ == 'patched_recognize_media': + if (hasattr(ChainBase.recognize_media, "_patched_by") and + ChainBase.recognize_media._patched_by == id(self) and + self._original_method): ChainBase.recognize_media = self._original_method else: self.stop_service() @@ -603,7 +613,9 @@ class ImdbSource(_PluginBase): """ 退出插件 """ - if ChainBase.recognize_media.__name__ == 'patched_recognize_media' and self._original_method: + if (hasattr(ChainBase.recognize_media, "_patched_by") and + ChainBase.recognize_media._patched_by == id(self) and + self._original_method): ChainBase.recognize_media = self._original_method def get_module(self) -> Dict[str, Any]: diff --git a/plugins.v2/imdbsource/imdbhelper.py b/plugins.v2/imdbsource/imdbhelper.py index 4583cac..9679b16 100644 --- a/plugins.v2/imdbsource/imdbhelper.py +++ b/plugins.v2/imdbsource/imdbhelper.py @@ -218,7 +218,7 @@ class ImdbHelper: return None @staticmethod - def __compare_names(file_name: str, names: Union[list|str]) -> bool: + def __compare_names(file_name: str, names: Union[list,str]) -> bool: """ 比较文件名是否匹配,忽略大小写和特殊字符 :param file_name: 识别的文件名或者种子名 @@ -506,7 +506,6 @@ class ImdbHelper: TV_MOVIE: Represents a TV movie title. SHORT: Represents a short title. VIDEO: Represents a video title. - VIDEO_GAME: Represents a video game title. :param start_year: The start year for filtering titles. :param end_year: The end year for filtering titles. :param country_code: The country code for filtering titles. @@ -727,7 +726,7 @@ class ImdbHelper: search_types = ['TV_SERIES', 'TV_MINI_SERIES', 'TV_SPECIAL'] res = self.advanced_search(query=name, media_types=search_types) if not res: - logger.debug("%s 未找到季%s相关信息!" % (name, season_number)) + logger.debug(f"{name} 未找到季{season_number}相关信息!") return None tvs = [r for r in res if r.get('id') and ImdbHelper.type_to_mtype(r.get('type')) == MediaType.TV] tvs = sorted(tvs, key=lambda x: x.get('startYear') or 0, reverse=True)