diff --git a/package.v2.json b/package.v2.json index 34c9df8..dec5c08 100644 --- a/package.v2.json +++ b/package.v2.json @@ -433,11 +433,12 @@ "name": "IMDb源", "description": "让探索支持IMDb数据源。", "labels": "探索", - "version": "1.3.2", + "version": "1.3.3", "icon": "IMDb_IOS-OSX_App.png", "author": "wumode", "level": 1, "history": { + "v1.3.3": "修复依赖问题", "v1.3.2": "更新 API query hash", "v1.3.1": "修复按日期排序错误", "v1.3": "优化网络连接", diff --git a/plugins.v2/imdbsource/__init__.py b/plugins.v2/imdbsource/__init__.py index 076474c..8764f59 100644 --- a/plugins.v2/imdbsource/__init__.py +++ b/plugins.v2/imdbsource/__init__.py @@ -1,16 +1,11 @@ -import re -import json from typing import Optional, Any, List, Dict, Tuple from datetime import datetime from app.core.config import settings from app.core.event import eventmanager, Event -from app.log import logger from app.plugins import _PluginBase from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData from app.schemas.types import ChainEventType, MediaType -from app.core.meta import MetaBase -from app.core.context import MediaInfo from app.plugins.imdbsource.imdb_helper import ImdbHelper from app import schemas from app.utils.http import RequestUtils @@ -24,7 +19,7 @@ class ImdbSource(_PluginBase): # 插件图标 plugin_icon = "IMDb_IOS-OSX_App.png" # 插件版本 - plugin_version = "1.3.2" + plugin_version = "1.3.3" # 插件作者 plugin_author = "wumode" # 作者主页 @@ -130,132 +125,6 @@ class ImdbSource(_PluginBase): # return {"recognize_media": (self.recognize_media, ModuleExecutionType.Hijack)} pass - @staticmethod - # @MediaInfo.source_processor("imdb") - def process_imdb_info(mediainfo: MediaInfo, info: dict): - """处理 IMDB 信息""" - mediainfo.source_info["imdb"] = info - if isinstance(info.get('media_type'), MediaType): - mediainfo.type = info.get('media_type') - elif info.get('media_type'): - mediainfo.type = MediaType.MOVIE if info.get("type") == "movie" else MediaType.TV - mediainfo.title = info.get("title") - mediainfo.release_date = info.get('release_date') - if info.get("id"): - mediainfo.source_id["imdb"] = info.get("id") - mediainfo.imdb_id = info.get('id') - if not mediainfo.source_id: - return - mediainfo.vote_average = round(float(info.get("rating").get("aggregate_rating")), 1) if info.get("rating") else 0 - mediainfo.overview = info.get('plot') - mediainfo.genre_ids = info.get('genre') or [] - # 风格 - if not mediainfo.genres: - mediainfo.genres = [{"id": genre, "name": genre} for genre in info.get("genres") or []] - if info.get('spoken_languages', []): - mediainfo.original_language = info.get('spoken_languages', [])[0].get("name") - mediainfo.en_title = info.get('primary_title') - mediainfo.title = info.get('primary_title') - mediainfo.original_title = info.get('original_title') - # mediainfo.release_date = info.get('start_year') - mediainfo.year = info.get('start_year') - if info.get('posters', []): - mediainfo.poster_path = info.get("posters", [])[0].get("url") - directors = [] - if info.get('directors', []): - for dn in info.get('directors', []): - director = dn.get("name") - if not director: - continue - d_ = {"name": director.get("display_name"), "id": director.get("id"), "avatars": director.get("avatars")} - directors.append(d_) - if info.get('writers', []): - for wn in info.get('writers', []): - writer = wn.get("name") - d_ = {"name": writer.get("display_name"), "id": writer.get("id"), "avatars": writer.get("avatars")} - directors.append(d_) - mediainfo.directors = directors - actors = [] - if info.get('casts', []): - for cast in info.get('casts', []): - cn = cast.get("name", {}) - character_name = cast.get("characters")[0] if cast.get("characters") else '' - d_ = {"name": cn.get("display_name"), "id": cn.get("id"), - "avatars": cn.get("avatars"), "character": character_name} - actors.append(d_) - - def recognize_media(self, meta: MetaBase = None, - mtype: MediaType = None, - imdbid: Optional[str] = None, - episode_group: Optional[str] = None, - cache: Optional[bool] = True, - **kwargs) -> Optional[MediaInfo]: - logger.warn(f"IMDb Source: {MetaBase.title}") - if not self._imdb_helper: - return None - if not imdbid and not meta: - return None - if not meta: - # 未提供元数据时,直接使用imdbid查询,不使用缓存 - cache_info = {} - elif not meta.name: - logger.warn("识别媒体信息时未提供元数据名称") - return None - cache_info = {} - if not cache_info or not cache: - info = None - if imdbid: - info = self._imdb_helper.get_info(mtype=mtype, imdbid=imdbid) - if not info and meta: - info = {} - names = list(dict.fromkeys([k for k in [meta.cn_name, meta.en_name] if k])) - for name in names: - if meta.begin_season: - logger.info(f"正在识别 {name} 第{meta.begin_season}季 ...") - else: - logger.info(f"正在识别 {name} ...") - if meta.type == MediaType.UNKNOWN and not meta.year: - info = self._imdb_helper.match_multi(name) - else: - if meta.type == MediaType.TV: - # 确定是电视 - info = self._imdb_helper.match(name=name, - year=meta.year, - mtype=meta.type,season_year=meta.year, - season_number=meta.begin_season) - if not info: - # 去掉年份再查一次 - info = self._imdb_helper.match(name=name, mtype=meta.type) - else: - # 有年份先按电影查 - info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.MOVIE) - # 没有再按电视剧查 - if not info: - info = self._imdb_helper.match(name=name, - year=meta.year, - mtype=MediaType.TV) - if not info: - # 去掉年份和类型再查一次 - info = self._imdb_helper.match_multi(name=name) - if info: - break - else: - info = None - if info: - # mediainfo = MediaInfo(source_info={"imdb": info}) - mediainfo = MediaInfo() - if meta: - logger.info(f"{meta.name} IMDB识别结果:{mediainfo.type.value} " - f"{mediainfo.title_year} " - f"{mediainfo.imdb_id}") - else: - logger.info(f"{imdbid} IMDB识别结果:{mediainfo.type.value} " - f"{mediainfo.title_year}") - return mediainfo - - logger.info(f"{meta.name if meta else imdbid} 未匹配到IMDB媒体信息") - return None - @staticmethod def __movie_to_media(movie_info: dict) -> schemas.MediaInfo: title = "" diff --git a/plugins.v2/imdbsource/imdb_helper.py b/plugins.v2/imdbsource/imdb_helper.py index b0031f7..7f3fe18 100644 --- a/plugins.v2/imdbsource/imdb_helper.py +++ b/plugins.v2/imdbsource/imdb_helper.py @@ -1,15 +1,12 @@ import re -from typing import Optional, Any, Dict, List, Tuple +from typing import Optional, Any, Dict, Tuple from collections import OrderedDict from dataclasses import dataclass import requests -from requests_html import HTMLSession -import ijson from app.log import logger from app.utils.http import RequestUtils -from app.utils.string import StringUtils from app.utils.common import retry from app.schemas.types import MediaType from app.core.cache import cached @@ -145,7 +142,6 @@ class ImdbHelper: def __init__(self, proxies=None): self._proxies = proxies - self._req_utils = RequestUtils(headers=self._imdb_headers, session=HTMLSession(), timeout=10, proxies=proxies) self._imdb_req = RequestUtils(accept_type="application/json", content_type="application/json", headers=self._imdb_headers, @@ -172,68 +168,6 @@ class ImdbHelper: info = data.get("data").get("title", None) return info - @cached(maxsize=1000, ttl=3600) - def __episodes_by_season(self, imdbid: str, build_id: str, season: str) -> Optional[Dict]: - if not build_id or not season: - return None - prefix = "pageProps.contentData.section" - url = (f"https://www.imdb.com/_next/data/{build_id}" - f"/en-US/title/{imdbid}/episodes.json?season={season}&ref_=ttep&tconst={imdbid}") - response = self._req_utils.get_res(url) - if not response or response.status_code != 200: - return None - json_content = response.text - try: - section = next(ijson.items(json_content, prefix)) - except StopIteration: - logger.warn(f"No data found at prefix: {prefix}") - return None - except (ijson.JSONError, ValueError) as e: - logger.warn(f"JSON parsing error: {e}") - return None - except TypeError as e: - logger.warn(f"Invalid input type: {e}") - return None - return section - - @cached(maxsize=1000, ttl=3600) - def __episodes(self, imdbid: str) -> Optional[Dict]: - prefix = "props.pageProps.contentData.section" - url = f"https://www.imdb.com/title/{imdbid}/episodes/" - - response = self._req_utils.get_res(url) - if not response or response.status_code != 200: - return - script_content = response.html.xpath('//script[@id="__NEXT_DATA__"]/text()') - if len(script_content) == 0: - return None - json_content = script_content[0] - # 直接定位到目标路径提取 items - try: - section = next(ijson.items(json_content, prefix)) - except StopIteration: - logger.warn(f"No data found at prefix: {prefix}") - return None - except (ijson.JSONError, ValueError) as e: - logger.warn(f"JSON parsing error: {e}") - return None - except TypeError as e: - logger.warn(f"Invalid input type: {e}") - return None - total_seasons = [] - for s in section.get("seasons"): - if s.get("value") and s.get("value") not in total_seasons: - total_seasons.append(s.get("value")) - build_id = next(ijson.items(json_content, 'buildId')) - current_season = section.get('currentSeason') or '1' - total_seasons.remove(current_season) - for season in total_seasons: - section_next = self.__episodes_by_season(imdbid, build_id=build_id, season=season) - if section_next: - section["episodes"]["items"].extend(section_next.get("episodes", {}).get("items", [])) - section["episodes"]["total"] += section_next.get("episodes", {}).get("total", 0) - return section - @retry(Exception, logger=logger) @cached(maxsize=32, ttl=1800) def __request(self, params: Dict, sha256) -> Optional[Dict]: @@ -456,352 +390,3 @@ class ImdbHelper: return None self.hash_status[operation_name] = True return data.get('advancedTitleSearch') - - def __known_as(self, imdbid: str, - sha256='48d4f7bfa73230fb550147bd4704d8050080e65fe2ad576da6276cac2330e446') -> Optional[List]: - """ - 获取电影和电视别名 - :param imdbid: IMBd id - :return: 别名列表 - """ - operation_name = "TitleAkasPaginated" - self.__update_hash() - if self._imdb_api_hash.get(operation_name): - sha256 = self._imdb_api_hash[operation_name] - params = {"operationName": operation_name, - "variables": {"const": imdbid, "first": 50, "locale": "en-US", "originalTitleText": False}} - data = self.__request(params=params, sha256=sha256) - if not data: - return None - if 'error' in data: - error = data['error'] - if error: - logger.error(f"Error querying {operation_name} API: {error.get('message')}") - if error.get('message') == 'PersistedQueryNotFound': - self.hash_status[operation_name] = False - return None - self.hash_status[operation_name] = True - if not data.get("data", {}).get("title", {}).get("akas", {}).get("total"): - return None - akas = [] - for edge in data["data"]["title"]["akas"]["edges"]: - title = edge.get("node", {}).get("displayableProperty", {}).get("value", {}).get("plainText") - if not title: - continue - country = edge.get("node", {}).get("country", {}) - language = edge.get("node", {}).get("language", {}) - akas.append({"title": title, "country": country, "language": language}) - return akas - - def __search_on_imdb(self, term, mtype, release_year=None): - params = f"{term}" - if release_year is not None: - params += f" {release_year}" - ret = RequestUtils( - accept_type="application/json", - ).get_res(f"{self._search_endpoint % params}") - if not ret: - return None - data = ret.json() - if "d" not in data: - return None - result = [d for d in data["d"] if d.get("qid") in self._qid_map.get(mtype)] - return result - - def search_tvs(self, title: str, year: str = None) -> List[dict]: - if not title: - return [] - if year: - tvs = self.__search_on_imdb(title, MediaType.TV, year) or [] - else: - tvs = self.__search_on_imdb(title, MediaType.TV, ) or [] - ret_infos = [] - for tv in tvs: - # if title in tv.get("l"): - # if self.__compare_names(title, [tv.get("l")]): - # tv['media_type'] = MediaType.TV - ret_infos.append(tv) - return ret_infos - - def search_movies(self, title: str, year: str = None) -> List[dict]: - if not title: - return [] - if year: - movies = self.__search_on_imdb(title, MediaType.MOVIE, year) or [] - else: - movies = self.__search_on_imdb(title, MediaType.MOVIE) or [] - ret_infos = [] - for movie in movies: - # if title in movie.get("l"): - # if self.__compare_names(title, [movie.get("l")]): - # movie['media_type'] = MediaType.MOVIE - ret_infos.append(movie) - return ret_infos - - @staticmethod - def __compare_names(file_name: str, tmdb_names: list) -> bool: - """ - 比较文件名是否匹配,忽略大小写和特殊字符 - :param file_name: 识别的文件名或者种子名 - :param tmdb_names: TMDB返回的译名 - :return: True or False - """ - if not file_name or not tmdb_names: - return False - if not isinstance(tmdb_names, list): - tmdb_names = [tmdb_names] - file_name = StringUtils.clear(file_name).upper() - for tmdb_name in tmdb_names: - tmdb_name = StringUtils.clear(tmdb_name).strip().upper() - if file_name == tmdb_name: - return True - return False - - def __search_movie_by_name(self, name: str, year: str) -> Optional[dict]: - """ - 根据名称查询电影IMDB匹配 - :param name: 识别的文件名或种子名 - :param year: 电影上映日期 - :return: 匹配的媒体信息 - """ - movies = self.search_movies(name, year=year) - if (movies is None) or (len(movies) == 0): - logger.debug(f"{name} 未找到相关电影信息!") - return {} - movies = sorted( - movies, - key=lambda x: str(x.get("y") or '0000'), - reverse=True - ) - for movie in movies: - movie_year = f"{movie.get('y')}" - if year and movie_year != year: - # 年份不匹配 - continue - # 匹配标题、原标题 - movie_info = self.imdbid(movie.get("id")) - if not movie_info: - continue - if self.__compare_names(name, [movie_info.get("primary_title")]): - return movie_info - if movie_info.get("original_title") and self.__compare_names(name, [movie_info.get("original_title")]): - return movie_info - akas = self.__known_as(movie.get("id")) - if not akas: - continue - akas_names = [item.get("title") for item in akas] - if self.__compare_names(name, akas_names): - return movie_info - return {} - - def __search_tv_by_name(self, name: str, year: str) -> Optional[dict]: - """ - 根据名称查询电视剧IMDB匹配 - :param name: 识别的文件名或者种子名 - :param year: 电视剧的首播年份 - :return: 匹配的媒体信息 - """ - tvs = self.search_tvs(name, year=year) - if (tvs is None) or (len(tvs) == 0): - logger.debug(f"{name} 未找到相关电影信息!") - return {} - tvs = sorted( - tvs, - key=lambda x: str(x.get("y") or '0000'), - reverse=True - ) - for tv in tvs: - tv_year = f"{tv.get('y')}" - if year and tv_year != year: - # 年份不匹配 - continue - # 匹配标题、原标题 - tv_info = self.imdbid(tv.get("id")) - if not tv_info: - continue - if self.__compare_names(name, [tv_info.get("primary_title")]): - return tv_info - if tv_info.get("original_title") and self.__compare_names(name, [tv_info.get("original_title")]): - return tv_info - akas = self.__known_as(tv.get("id")) - if not akas: - continue - akas_names = [item.get("title") for item in akas] - if self.__compare_names(name, akas_names): - return tv_info - return {} - - def __search_tv_by_season(self, name: str, season_year: str, season_number: int) -> Optional[dict]: - """ - 根据电视剧的名称和季的年份及序号匹配IMDB - :param name: 识别的文件名或者种子名 - :param season_year: 季的年份 - :param season_number: 季序号 - :return: 匹配的媒体信息 - """ - - def __season_match(_tv_info: dict, _season_year: str) -> bool: - tv_extra_info = self.__episodes(_tv_info.get("id")) - if not tv_extra_info: - return False - release_year = [] - for item in tv_extra_info["episodes"]["items"]: - if item.get("season") == season_number: - release_year.append(item.get("releaseDate").get("year") or item.get("releaseYear")) - first_release_year = min(release_year) if release_year else tv_extra_info["currentYear"] - if first_release_year == _season_year: - _tv_info["seasons"] = tv_extra_info["seasons"] - _tv_info["episodes"] = tv_extra_info["episodes"] - return True - return False - - tvs = self.search_tvs(title=name) - if (tvs is None) or (len(tvs) == 0): - logger.debug("%s 未找到季%s相关信息!" % (name, season_number)) - return {} - tvs = sorted( - tvs, - key=lambda x: str(x.get('y') or '0000'), - reverse=True - ) - for tv in tvs: - tv_info = self.imdbid(tv.get("id")) - if not tv_info: - continue - tv_year = f"{tv.get('y')}" if tv.get('y') else None - if (self.__compare_names(name, [tv_info.get('primary_title')]) - or (tv_info.get('original_title') and self.__compare_names(name, [tv_info.get('original_title')]))) \ - and (tv_year == str(season_year)): - return tv_info - akas = self.__known_as(tv.get("id")) - if not akas: - continue - akas_names = [item.get("title") for item in akas] - if not self.__compare_names(name, akas_names): - continue - if __season_match(_tv_info=tv_info, _season_year=season_year): - return tv_info - return None - - def get_info(self, - mtype: MediaType, - imdbid: str) -> dict: - """ - 给定IMDB号,查询一条媒体信息 - :param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份) - :param imdbid: IMDB的ID - """ - # 查询TMDB详情 - if mtype == MediaType.MOVIE: - imdb_info = self.imdbid(imdbid) - if imdb_info: - imdb_info['media_type'] = MediaType.MOVIE - elif mtype == MediaType.TV: - imdb_info = self.imdbid(imdbid) - if imdb_info: - imdb_info['media_type'] = MediaType.TV - tv_extra_info = self.__episodes(imdbid) - imdb_info["seasons"] = tv_extra_info["seasons"] - imdb_info["episodes"] = tv_extra_info["episodes"] - else: - imdb_info = None - logger.warn(f"IMDb id:{imdbid} 未查询到媒体信息") - return imdb_info - - def match_multi(self, name: str) -> Optional[dict]: - """ - 根据名称同时查询电影和电视剧,没有类型也没有年份时使用 - :param name: 识别的文件名或种子名 - :return: 匹配的媒体信息 - """ - - multis = self.search_tvs(name) + self.search_movies(name) - ret_info = {} - if len(multis) == 0: - logger.debug(f"{name} 未找到相关媒体息!") - return {} - else: - multis = sorted( - multis, - key=lambda x: ("1" if x.get("media_type") == MediaType.MOVIE else "0") + str(x.get('y') or '0000'), - reverse=True - ) - media_t = MediaType.UNKNOWN - for multi in multis: - media_info = self.imdbid(multi.get("id")) - if not media_info: - continue - if multi.get("media_type") == MediaType.MOVIE: - if self.__compare_names(name, media_info.get('primary_title')) \ - or self.__compare_names(name, multi.get('primary_title')): - ret_info = media_info - media_t = MediaType.MOVIE - break - elif multi.get("media_type") == MediaType.TV: - if self.__compare_names(name, media_info.get('primary_title')) \ - or self.__compare_names(name, multi.get('primary_title')): - ret_info = media_info - media_t = MediaType.TV - break - if ret_info and not isinstance(ret_info.get("media_type"), MediaType): - ret_info['media_type'] = media_t - return ret_info - - def match(self, name: str, - mtype: MediaType, - year: Optional[str] = None, - season_year: Optional[str] = None, - season_number: Optional[int] = None, - group_seasons: Optional[List[dict]] = None) -> Optional[dict]: - """ - 搜索imdb中的媒体信息,匹配返回一条尽可能正确的信息 - :param name: 检索的名称 - :param mtype: 类型:电影、电视剧 - :param year: 年份,如要是季集需要是首播年份(first_air_date) - :param season_year: 当前季集年份 - :param season_number: 季集,整数 - :param group_seasons: 集数组信息 - :return: TMDB的INFO,同时会将mtype赋值到media_type中 - """ - if not name: - return None - info = {} - if mtype != MediaType.TV: - year_range = [year] - if year: - year_range.append(str(int(year) + 1)) - year_range.append(str(int(year) - 1)) - for year in year_range: - logger.debug( - f"正在识别{mtype.value}:{name}, 年份={year} ...") - info = self.__search_movie_by_name(name, year) - if info: - info['media_type'] = MediaType.MOVIE - break - else: - # 有当前季和当前季集年份,使用精确匹配 - if season_year and season_number: - logger.debug( - f"正在识别{mtype.value}:{name}, 季集={season_number}, 季集年份={season_year} ...") - info = self.__search_tv_by_season(name, - season_year, - season_number) - if not info: - year_range = [year] - if year: - year_range.append(str(int(year) + 1)) - year_range.append(str(int(year) - 1)) - for year in year_range: - logger.debug( - f"正在识别{mtype.value}:{name}, 年份={year} ...") - info = self.__search_tv_by_name(name, year) - if info: - break - if info: - info['media_type'] = MediaType.TV - if not info.get("seasons"): - tv_extra_info = self.__episodes(info.get('id')) - if tv_extra_info: - info["seasons"] = tv_extra_info["seasons"] - info["episodes"] = tv_extra_info["episodes"] - return info diff --git a/plugins.v2/imdbsource/requirements.txt b/plugins.v2/imdbsource/requirements.txt deleted file mode 100644 index 86d7fe2..0000000 --- a/plugins.v2/imdbsource/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -graphene~=3.4.3 -ijson~=3.4.0 -requests-html~=0.10.0 \ No newline at end of file