import re from typing import Optional, Any, Dict, List, Tuple from io import StringIO from collections import OrderedDict from dataclasses import dataclass import graphene import requests from requests_html import HTMLSession import ijson import json import base64 from app.log import logger from app.utils.http import RequestUtils from app.utils.string import StringUtils from app.utils.common import retry from app.schemas.types import MediaType from app.core.cache import cached @dataclass(frozen=True) class SearchParams: title_types: Optional[Tuple[str, ...]] = None genres: Optional[Tuple[str, ...]] = None sort_by: str = 'POPULARITY' sort_order: str = 'ASC' rating_min: Optional[float] = None rating_max: Optional[float] = None countries: Optional[Tuple[str, ...]] = None languages: Optional[Tuple[str, ...]] = None release_date_end: Optional[str] = None release_date_start: Optional[str] = None award_constraint: Optional[Tuple[str, ...]] = None ranked: Optional[Tuple[str, ...]] = None interests: Optional[Tuple[str, ...]] = None class SearchState: def __init__(self, pageinfo: dict, total: int): self.pageinfo = pageinfo self.total = total class ImdbHelper: _query_by_id = """query queryWithVariables($id: ID!) { title(id: $id) { id type is_adult primary_title original_title start_year end_year runtime_minutes plot rating { aggregate_rating votes_count } genres posters { url width height } certificates { country { code name } rating } spoken_languages { code name } origin_countries { code name } critic_review { score review_count } directors: credits(first: 5, categories: ["director"]) { name { id display_name avatars { url width height } } } writers: credits(first: 5, categories: ["writer"]) { name { id display_name avatars { url width height } } } casts: credits(first: 5, categories: ["actor", "actress"]) { name { id display_name avatars { url width height } } characters } } }""" _endpoint = "https://graph.imdbapi.dev/v1" _search_endpoint = "https://v3.sg.media-imdb.com/suggestion/x/%s.json?includeVideos=0" _official_endpoint = "https://caching.graphql.imdb.com/" _hash_update_url = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/" "refs/heads/imdbsource_assets/plugins.v2/imdbsource/imdb_hash.json") _qid_map = { MediaType.TV: ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"], MediaType.MOVIE: ["movie"] } _imdb_headers = { "Accept": "application/json, text/plain, */*", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome" "/84.0.4147.105 Safari/537.36", "Referer": "https://www.imdb.com/", } all_title_types = ["tvSeries", "tvMiniSeries", "movie", "tvMovie", "musicVideo", "tvShort", "short", "tvEpisode", "tvSpecial", "videoGame"] interest_id = { "Anime": "in0000027", "Superhero": "in0000008", "Sitcom": "in0000044", "Coming-of-Age": "in0000073", "Slasher Horror": "in0000115", "Raunchy Comedy": "in0000041", "Documentary": "in0000060" } def __init__(self, proxies=None): self._proxies = proxies self._session = HTMLSession() self._req_utils = RequestUtils(headers=self._imdb_headers, session=self._session, timeout=10, proxies=proxies) self._imdb_req = RequestUtils(accept_type="application/json", content_type="application/json", headers=self._imdb_headers, timeout=10, proxies=proxies, session=requests.Session()) self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None} self._search_states = OrderedDict() self._max_states = 30 def imdbid(self, imdbid: str) -> Optional[Dict]: params = {"operationName": "queryWithVariables", "query": self._query_by_id, "variables": {"id": imdbid}} ret = RequestUtils( accept_type="application/json", content_type="application/json" ).post_res(f"{self._endpoint}", json=params) if not ret: return None data = ret.json() if "errors" in data: logger.error(f"Imdb query ({imdbid}) errors {data.get('errors')}") logger.error(f"{params}") return None info = data.get("data").get("title", None) return info @cached(maxsize=1000, ttl=3600) def __episodes_by_season(self, imdbid: str, build_id: str, season: str) -> Optional[Dict]: if not build_id or not season: return None prefix = "pageProps.contentData.section" url = (f"https://www.imdb.com/_next/data/{build_id}" f"/en-US/title/{imdbid}/episodes.json?season={season}&ref_=ttep&tconst={imdbid}") response = self._req_utils.get_res(url) if not response or response.status_code != 200: return json_content = response.text try: section = next(ijson.items(json_content, prefix)) except StopIteration: logger.warn(f"No data found at prefix: {prefix}") return None except (ijson.JSONError, ValueError) as e: logger.warn(f"JSON parsing error: {e}") return None except TypeError as e: logger.warn(f"Invalid input type: {e}") return None return section @cached(maxsize=1000, ttl=3600) def __episodes(self, imdbid: str) -> Optional[Dict]: prefix = "props.pageProps.contentData.section" url = f"https://www.imdb.com/title/{imdbid}/episodes/" response = self._req_utils.get_res(url) if not response or response.status_code != 200: return script_content = response.html.xpath('//script[@id="__NEXT_DATA__"]/text()') if len(script_content) == 0: return None json_content = script_content[0] # 直接定位到目标路径提取 items try: section = next(ijson.items(json_content, prefix)) except StopIteration: logger.warn(f"No data found at prefix: {prefix}") return None except (ijson.JSONError, ValueError) as e: logger.warn(f"JSON parsing error: {e}") return None except TypeError as e: logger.warn(f"Invalid input type: {e}") return None total_seasons = [] for s in section.get("seasons"): if s.get("value") and s.get("value") not in total_seasons: total_seasons.append(s.get("value")) build_id = next(ijson.items(json_content, 'buildId')) current_season = section.get('currentSeason') or '1' total_seasons.remove(current_season) for season in total_seasons: section_next = self.__episodes_by_season(imdbid, build_id=build_id, season=season) if section_next: section["episodes"]["items"].extend(section_next.get("episodes", {}).get("items", [])) section["episodes"]["total"] += section_next.get("episodes", {}).get("total", 0) return section @retry(Exception, logger=logger) @cached(maxsize=32, ttl=1800) def __request(self, params: Dict, sha256) -> Optional[Dict]: params["extensions"] = {"persistedQuery": {"sha256Hash": sha256, "version": 1}} ret = self._imdb_req.post_res(f"{self._official_endpoint}", json=params, raise_exception=True) if not ret: return None data = ret.json() if "errors" in data: logger.error(f"Imdb query errors") return None return data.get("data") @cached(maxsize=1, ttl=30 * 24 * 3600) def __get_hash(self) -> Optional[dict]: """ 根据IMDb hash使用 """ headers = { "Accept": "text/html", } res = RequestUtils(headers=headers).get_res( self._hash_update_url, proxies=self._proxies ) if not res: logger.error("获取IMDb hash") return None return res.json() def __update_hash(self): imdb_hash = self.__get_hash() if imdb_hash: self._imdb_api_hash["AdvancedTitleSearch"] = imdb_hash.get("AdvancedTitleSearch") self._imdb_api_hash["TitleAkasPaginated"] = imdb_hash.get("TitleAkasPaginated") @staticmethod def __award_to_constraint(award: str) -> Optional[Dict]: pattern = r'^(ev\d+)(?:-(best\w+))?-(Winning|Nominated)$' match = re.match(pattern, award) constraint = {} if match: ev_id = match.group(1) # 第一部分:evXXXXXXXX best = match.group(2) # 第二部分:bestXX(可选) status = match.group(3) # 第三部分:Winning/Nominated constraint["eventId"] = ev_id if status == "Winning": constraint["winnerFilter"] = "WINNER_ONLY" if best: constraint["searchAwardCategoryId"] = best return constraint else: return None @staticmethod def __ranked_list_to_constraint(ranked: str) -> Optional[Dict]: """ "TOP_RATED_MOVIES-100": "IMDb Top 100", "TOP_RATED_MOVIES-250": "IMDb Top 250", "TOP_RATED_MOVIES-1000": "IMDb Top 1000", "LOWEST_RATED_MOVIES-100": "IMDb Bottom 100", "LOWEST_RATED_MOVIES-250": "IMDb Bottom 250", "LOWEST_RATED_MOVIES-1000": "IMDb Bottom 1000" """ pattern = r'^(TOP_RATED_MOVIES|LOWEST_RATED_MOVIES)-(\d+)$' match = re.match(pattern, ranked) if match: ranked_title_list_type = match.group(1) rank_range = int(match.group(2)) constraint = {"rankRange": {"max": rank_range}, "rankedTitleListType": ranked_title_list_type} return constraint return None def advanced_title_search(self, first_page: bool = True, title_types: Optional[Tuple[str, ...]] = None, genres: Optional[Tuple[str, ...]] = None, sort_by: str = 'POPULARITY', sort_order: str = 'ASC', rating_min: Optional[float] = None, rating_max: Optional[float] = None, countries: Optional[Tuple[str, ...]] = None, languages: Optional[Tuple[str, ...]] = None, release_date_end: Optional[str] = None, release_date_start: Optional[str] = None, award_constraint: Optional[Tuple[str, ...]] = None, ranked: Optional[Tuple[str, ...]] = None, interests: Optional[Tuple[str, ...]] = None): # 创建参数对象 params = SearchParams( title_types=title_types, genres=genres, sort_by=sort_by, sort_order=sort_order, rating_min=rating_min, rating_max=rating_max, countries=countries, languages=languages, release_date_end=release_date_end, release_date_start=release_date_start, award_constraint=award_constraint, ranked=ranked, interests=interests ) sha256 = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4' self.__update_hash() if self._imdb_api_hash.get("AdvancedTitleSearch"): sha256 = self._imdb_api_hash["AdvancedTitleSearch"] # 获取或创建搜索状态 last_cursor = None if not first_page and params in self._search_states: search_state: SearchState = self._search_states.pop(params) # 移除并获取 self._search_states[params] = search_state # 不是第一页且已有状态 - 使用上次的结果 if not search_state.pageinfo.get("hasNextPage"): return {'pageInfo': {'endCursor': None, 'hasNextPage': False, 'hasPreviousPage': True, 'startCursor': None}, 'edges': [], 'total': search_state.total, 'genres': [], 'keywords': [], 'titleTypes': [], 'jobCategories': []} if search_state.pageinfo.get('endCursor'): last_cursor = search_state.pageinfo.get('endCursor') # 这里实现基于上次结果的逻辑 else: # 重新搜索 first_page = True else: first_page = True result = self.__advanced_title_search(params, sha256, first_page, last_cursor) if result: page_info = result.get("pageInfo", {}) total = result.get("total", 0) search_state = SearchState(page_info, total) self._search_states[params] = search_state if len(self._search_states) > self._max_states: self._search_states.popitem(last=False) # 移除最旧的条目 return result def __advanced_title_search(self, params: SearchParams, sha256: str, first_page: bool = True, last_cursor: Optional[str] = None, ) -> Optional[Dict]: variables = {"first": 50, "locale": "en-US", "sortBy": params.sort_by, "sortOrder": params.sort_order, } if params.title_types: title_type_ids = [] for title_type in params.title_types: if title_type in self.all_title_types: title_type_ids.append(title_type) if len(title_type_ids): variables["titleTypeConstraint"] = {"anyTitleTypeIds": params.title_types, "excludeTitleTypeIds": []} if params.genres: variables["genreConstraint"] = {"allGenreIds": params.genres, "excludeGenreIds": []} if params.countries: variables["originCountryConstraint"] = {"allCountries": params.countries} if params.languages: variables["languageConstraint"] = {"anyPrimaryLanguages": params.languages} if params.rating_min or params.rating_max: rating_min = params.rating_min if params.rating_min else 1 rating_min = max(rating_min, 1) rating_max = params.rating_max if params.rating_max else 10 rating_max = min(rating_max, 10) variables["userRatingsConstraint"] = {"aggregateRatingRange": {"max": rating_max, "min": rating_min}} if params.release_date_start or params.release_date_end: release_dict = {} if params.release_date_start: release_dict["start"] = params.release_date_start if params.release_date_end: release_dict["end"] = params.release_date_end variables["releaseDateConstraint"] = {"releaseDateRange": release_dict} if params.award_constraint: constraints = [] for award in params.award_constraint: c = self.__award_to_constraint(award) if c: constraints.append(c) variables["awardConstraint"] = {"allEventNominations": constraints} if params.ranked: constraints = [] for r in params.ranked: c = self.__ranked_list_to_constraint(r) if c: constraints.append(c) variables["rankedTitleListConstraint"] = {"allRankedTitleLists": constraints, "excludeRankedTitleLists": []} if params.interests: constraints = [] for interest in params.interests: in_id = self.interest_id.get(interest) if in_id: constraints.append(in_id) variables["interestConstraint"] = {"allInterestIds": constraints, "excludeInterestIds": []} if not first_page and last_cursor: variables["after"] = last_cursor params = {"operationName": "AdvancedTitleSearch", "variables": variables} data = self.__request(params, sha256) if not data: return None return data.get("advancedTitleSearch") def __known_as(self, imdbid: str, sha256='48d4f7bfa73230fb550147bd4704d8050080e65fe2ad576da6276cac2330e446') -> Optional[List]: """ 获取电影和电视别名 :param imdbid: IMBd id :return: 别名列表 """ self.__update_hash() if self._imdb_api_hash.get("TitleAkasPaginated"): sha256 = self._imdb_api_hash["TitleAkasPaginated"] params = {"operationName": "TitleAkasPaginated", "variables": {"const": imdbid, "first": 50, "locale": "en-US", "originalTitleText": False}} data = self.__request(params=params, sha256=sha256) if not data: return None if not data.get("data", {}).get("title", {}).get("akas", {}).get("total"): return None akas = [] for edge in data["data"]["title"]["akas"]["edges"]: title = edge.get("node", {}).get("displayableProperty", {}).get("value", {}).get("plainText") if not title: continue country = edge.get("node", {}).get("country", {}) language = edge.get("node", {}).get("language", {}) akas.append({"title": title, "country": country, "language": language}) return akas def __search_on_imdb(self, term, mtype, release_year=None): params = f"{term}" if release_year is not None: params += f" {release_year}" ret = RequestUtils( accept_type="application/json", ).get_res(f"{self._search_endpoint % params}") if not ret: return None data = ret.json() if "d" not in data: return None result = [d for d in data["d"] if d.get("qid") in self._qid_map.get(mtype)] return result def search_tvs(self, title: str, year: str = None) -> List[dict]: if not title: return [] if year: tvs = self.__search_on_imdb(title, MediaType.TV, year) or [] else: tvs = self.__search_on_imdb(title, MediaType.TV, ) or [] ret_infos = [] for tv in tvs: # if title in tv.get("l"): # if self.__compare_names(title, [tv.get("l")]): # tv['media_type'] = MediaType.TV ret_infos.append(tv) return ret_infos def search_movies(self, title: str, year: str = None) -> List[dict]: if not title: return [] if year: movies = self.__search_on_imdb(title, MediaType.MOVIE, year) or [] else: movies = self.__search_on_imdb(title, MediaType.MOVIE) or [] ret_infos = [] for movie in movies: # if title in movie.get("l"): # if self.__compare_names(title, [movie.get("l")]): # movie['media_type'] = MediaType.MOVIE ret_infos.append(movie) return ret_infos @staticmethod def __compare_names(file_name: str, tmdb_names: list) -> bool: """ 比较文件名是否匹配,忽略大小写和特殊字符 :param file_name: 识别的文件名或者种子名 :param tmdb_names: TMDB返回的译名 :return: True or False """ if not file_name or not tmdb_names: return False if not isinstance(tmdb_names, list): tmdb_names = [tmdb_names] file_name = StringUtils.clear(file_name).upper() for tmdb_name in tmdb_names: tmdb_name = StringUtils.clear(tmdb_name).strip().upper() if file_name == tmdb_name: return True return False def __search_movie_by_name(self, name: str, year: str) -> Optional[dict]: """ 根据名称查询电影IMDB匹配 :param name: 识别的文件名或种子名 :param year: 电影上映日期 :return: 匹配的媒体信息 """ movies = self.search_movies(name, year=year) if (movies is None) or (len(movies) == 0): logger.debug(f"{name} 未找到相关电影信息!") return {} movies = sorted( movies, key=lambda x: str(x.get("y") or '0000'), reverse=True ) for movie in movies: movie_year = f"{movie.get('y')}" if year and movie_year != year: # 年份不匹配 continue # 匹配标题、原标题 movie_info = self.imdbid(movie.get("id")) if not movie_info: continue if self.__compare_names(name, [movie_info.get("primary_title")]): return movie_info if movie_info.get("original_title") and self.__compare_names(name, [movie_info.get("original_title")]): return movie_info akas = self.__known_as(movie.get("id")) if not akas: continue akas_names = [item.get("title") for item in akas] if self.__compare_names(name, akas_names): return movie_info return {} def __search_tv_by_name(self, name: str, year: str) -> Optional[dict]: """ 根据名称查询电视剧IMDB匹配 :param name: 识别的文件名或者种子名 :param year: 电视剧的首播年份 :return: 匹配的媒体信息 """ tvs = self.search_tvs(name, year=year) if (tvs is None) or (len(tvs) == 0): logger.debug(f"{name} 未找到相关电影信息!") return {} tvs = sorted( tvs, key=lambda x: str(x.get("y") or '0000'), reverse=True ) for tv in tvs: tv_year = f"{tv.get('y')}" if year and tv_year != year: # 年份不匹配 continue # 匹配标题、原标题 tv_info = self.imdbid(tv.get("id")) if not tv_info: continue if self.__compare_names(name, [tv_info.get("primary_title")]): return tv_info if tv_info.get("original_title") and self.__compare_names(name, [tv_info.get("original_title")]): return tv_info akas = self.__known_as(tv.get("id")) if not akas: continue akas_names = [item.get("title") for item in akas] if self.__compare_names(name, akas_names): return tv_info return {} def __search_tv_by_season(self, name: str, season_year: str, season_number: int) -> Optional[dict]: """ 根据电视剧的名称和季的年份及序号匹配IMDB :param name: 识别的文件名或者种子名 :param season_year: 季的年份 :param season_number: 季序号 :return: 匹配的媒体信息 """ def __season_match(_tv_info: dict, _season_year: str) -> bool: tv_extra_info = self.__episodes(_tv_info.get("id")) if not tv_extra_info: return False release_year = [] for item in tv_extra_info["episodes"]["items"]: if item.get("season") == season_number: release_year.append(item.get("releaseDate").get("year") or item.get("releaseYear")) first_release_year = min(release_year) if release_year else tv_extra_info["currentYear"] if first_release_year == _season_year: _tv_info["seasons"] = tv_extra_info["seasons"] _tv_info["episodes"] = tv_extra_info["episodes"] return True tvs = self.search_tvs(title=name) if (tvs is None) or (len(tvs) == 0): logger.debug("%s 未找到季%s相关信息!" % (name, season_number)) return {} tvs = sorted( tvs, key=lambda x: str(x.get('y') or '0000'), reverse=True ) for tv in tvs: tv_info = self.imdbid(tv.get("id")) if not tv_info: continue tv_year = f"{tv.get('y')}" if tv.get('y') else None if (self.__compare_names(name, [tv_info.get('primary_title')]) or (tv_info.get('original_title') and self.__compare_names(name, [tv_info.get('original_title')]))) \ and (tv_year == str(season_year)): return tv_info akas = self.__known_as(tv.get("id")) if not akas: continue akas_names = [item.get("title") for item in akas] if not self.__compare_names(name, akas_names): continue if __season_match(_tv_info=tv_info, _season_year=season_year): return tv_info def get_info(self, mtype: MediaType, imdbid: str) -> dict: """ 给定IMDB号,查询一条媒体信息 :param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份) :param imdbid: IMDB的ID """ # 查询TMDB详情 if mtype == MediaType.MOVIE: imdb_info = self.imdbid(imdbid) if imdb_info: imdb_info['media_type'] = MediaType.MOVIE elif mtype == MediaType.TV: imdb_info = self.imdbid(imdbid) if imdb_info: imdb_info['media_type'] = MediaType.TV tv_extra_info = self.__episodes(imdbid) imdb_info["seasons"] = tv_extra_info["seasons"] imdb_info["episodes"] = tv_extra_info["episodes"] else: imdb_info = None logger.warn(f"IMDb id:{imdbid} 未查询到媒体信息") return imdb_info def match_multi(self, name: str) -> Optional[dict]: """ 根据名称同时查询电影和电视剧,没有类型也没有年份时使用 :param name: 识别的文件名或种子名 :return: 匹配的媒体信息 """ multis = self.search_tvs(name) + self.search_movies(name) ret_info = {} if len(multis) == 0: logger.debug(f"{name} 未找到相关媒体息!") return {} else: multis = sorted( multis, key=lambda x: ("1" if x.get("media_type") == MediaType.MOVIE else "0") + str(x.get('y') or '0000'), reverse=True ) media_t = MediaType.UNKNOWN for multi in multis: media_info = self.imdbid(multi.get("id")) if not media_info: continue if multi.get("media_type") == MediaType.MOVIE: if self.__compare_names(name, media_info.get('primary_title')) \ or self.__compare_names(name, multi.get('primary_title')): ret_info = media_info media_t = MediaType.MOVIE break elif multi.get("media_type") == MediaType.TV: if self.__compare_names(name, media_info.get('primary_title')) \ or self.__compare_names(name, multi.get('primary_title')): ret_info = media_info media_t = MediaType.TV break if ret_info and not isinstance(ret_info.get("media_type"), MediaType): ret_info['media_type'] = media_t return ret_info def match(self, name: str, mtype: MediaType, year: Optional[str] = None, season_year: Optional[str] = None, season_number: Optional[int] = None, group_seasons: Optional[List[dict]] = None) -> Optional[dict]: """ 搜索imdb中的媒体信息,匹配返回一条尽可能正确的信息 :param name: 检索的名称 :param mtype: 类型:电影、电视剧 :param year: 年份,如要是季集需要是首播年份(first_air_date) :param season_year: 当前季集年份 :param season_number: 季集,整数 :param group_seasons: 集数组信息 :return: TMDB的INFO,同时会将mtype赋值到media_type中 """ if not name: return None info = {} if mtype != MediaType.TV: year_range = [year] if year: year_range.append(str(int(year) + 1)) year_range.append(str(int(year) - 1)) for year in year_range: logger.debug( f"正在识别{mtype.value}:{name}, 年份={year} ...") info = self.__search_movie_by_name(name, year) if info: info['media_type'] = MediaType.MOVIE break else: # 有当前季和当前季集年份,使用精确匹配 if season_year and season_number: logger.debug( f"正在识别{mtype.value}:{name}, 季集={season_number}, 季集年份={season_year} ...") info = self.__search_tv_by_season(name, season_year, season_number) if not info: year_range = [year] if year: year_range.append(str(int(year) + 1)) year_range.append(str(int(year) - 1)) for year in year_range: logger.debug( f"正在识别{mtype.value}:{name}, 年份={year} ...") info = self.__search_tv_by_name(name, year) if info: break if info: info['media_type'] = MediaType.TV if not info.get("seasons"): tv_extra_info = self.__episodes(info.get('id')) if tv_extra_info: info["seasons"] = tv_extra_info["seasons"] info["episodes"] = tv_extra_info["episodes"] return info