update(ImdbSource): 更新IMdb hash

This commit is contained in:
wumode
2025-06-17 15:25:43 +08:00
parent 6f7f72f784
commit b8a167e597
3 changed files with 186 additions and 32 deletions

View File

@@ -430,11 +430,12 @@
"name": "IMDb源",
"description": "让探索支持IMDb数据源。",
"labels": "探索",
"version": "1.3.1",
"version": "1.3.2",
"icon": "IMDb_IOS-OSX_App.png",
"author": "wumode",
"level": 1,
"history": {
"v1.3.2": "更新 API query hash",
"v1.3.1": "修复按日期排序错误",
"v1.3": "优化网络连接",
"v1.2": "推荐热门纪录片",
@@ -451,8 +452,8 @@
"author": "wumode",
"level": 1,
"history": {
"v0.1.0": "新增ClashRuleProvider",
"v1.0.0": "支持: 规则分页; 导入规则; 代理组; 附加出站代理; 按区域分组"
"v1.0.0": "支持: 规则分页; 导入规则; 代理组; 附加出站代理; 按区域分组",
"v0.1.0": "新增ClashRuleProvider"
}
},
"LexiAnnot": {

View File

@@ -1,13 +1,18 @@
from datetime import datetime
import re
import json
from typing import Optional, Any, List, Dict, Tuple
from datetime import datetime
from app import schemas
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.plugins.imdbsource.imdb_helper import ImdbHelper
from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData
from app.schemas.types import ChainEventType, MediaType
from app.core.meta import MetaBase
from app.core.context import MediaInfo
from app.plugins.imdbsource.imdb_helper import ImdbHelper
from app import schemas
from app.utils.http import RequestUtils
@@ -17,10 +22,9 @@ class ImdbSource(_PluginBase):
# 插件描述
plugin_desc = "让探索和推荐支持IMDb数据源。"
# 插件图标
plugin_icon = ("https://raw.githubusercontent.com/jxxghp/"
"MoviePilot-Plugins/refs/heads/main/icons/IMDb_IOS-OSX_App.png")
plugin_icon = "IMDb_IOS-OSX_App.png"
# 插件版本
plugin_version = "1.3.1"
plugin_version = "1.3.2"
# 插件作者
plugin_author = "wumode"
# 作者主页
@@ -123,8 +127,135 @@ class ImdbSource(_PluginBase):
"id2": self.xxx2,
}
"""
# return {"recognize_media": (self.recognize_media, ModuleExecutionType.Hijack)}
pass
@staticmethod
# @MediaInfo.source_processor("imdb")
def process_imdb_info(mediainfo: MediaInfo, info: dict):
"""处理 IMDB 信息"""
mediainfo.source_info["imdb"] = info
if isinstance(info.get('media_type'), MediaType):
mediainfo.type = info.get('media_type')
elif info.get('media_type'):
mediainfo.type = MediaType.MOVIE if info.get("type") == "movie" else MediaType.TV
mediainfo.title = info.get("title")
mediainfo.release_date = info.get('release_date')
if info.get("id"):
mediainfo.source_id["imdb"] = info.get("id")
mediainfo.imdb_id = info.get('id')
if not mediainfo.source_id:
return
mediainfo.vote_average = round(float(info.get("rating").get("aggregate_rating")), 1) if info.get("rating") else 0
mediainfo.overview = info.get('plot')
mediainfo.genre_ids = info.get('genre') or []
# 风格
if not mediainfo.genres:
mediainfo.genres = [{"id": genre, "name": genre} for genre in info.get("genres") or []]
if info.get('spoken_languages', []):
mediainfo.original_language = info.get('spoken_languages', [])[0].get("name")
mediainfo.en_title = info.get('primary_title')
mediainfo.title = info.get('primary_title')
mediainfo.original_title = info.get('original_title')
# mediainfo.release_date = info.get('start_year')
mediainfo.year = info.get('start_year')
if info.get('posters', []):
mediainfo.poster_path = info.get("posters", [])[0].get("url")
directors = []
if info.get('directors', []):
for dn in info.get('directors', []):
director = dn.get("name")
if not director:
continue
d_ = {"name": director.get("display_name"), "id": director.get("id"), "avatars": director.get("avatars")}
directors.append(d_)
if info.get('writers', []):
for wn in info.get('writers', []):
writer = wn.get("name")
d_ = {"name": writer.get("display_name"), "id": writer.get("id"), "avatars": writer.get("avatars")}
directors.append(d_)
mediainfo.directors = directors
actors = []
if info.get('casts', []):
for cast in info.get('casts', []):
cn = cast.get("name", {})
character_name = cast.get("characters")[0] if cast.get("characters") else ''
d_ = {"name": cn.get("display_name"), "id": cn.get("id"),
"avatars": cn.get("avatars"), "character": character_name}
actors.append(d_)
def recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
imdbid: Optional[str] = None,
episode_group: Optional[str] = None,
cache: Optional[bool] = True,
**kwargs) -> Optional[MediaInfo]:
logger.warn(f"IMDb Source: {MetaBase.title}")
if not self._imdb_helper:
return None
if not imdbid and not meta:
return None
if not meta:
# 未提供元数据时直接使用imdbid查询不使用缓存
cache_info = {}
elif not meta.name:
logger.warn("识别媒体信息时未提供元数据名称")
return None
cache_info = {}
if not cache_info or not cache:
info = None
if imdbid:
info = self._imdb_helper.get_info(mtype=mtype, imdbid=imdbid)
if not info and meta:
info = {}
names = list(dict.fromkeys([k for k in [meta.cn_name, meta.en_name] if k]))
for name in names:
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
if meta.type == MediaType.UNKNOWN and not meta.year:
info = self._imdb_helper.match_multi(name)
else:
if meta.type == MediaType.TV:
# 确定是电视
info = self._imdb_helper.match(name=name,
year=meta.year,
mtype=meta.type,season_year=meta.year,
season_number=meta.begin_season)
if not info:
# 去掉年份再查一次
info = self._imdb_helper.match(name=name, mtype=meta.type)
else:
# 有年份先按电影查
info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = self._imdb_helper.match(name=name,
year=meta.year,
mtype=MediaType.TV)
if not info:
# 去掉年份和类型再查一次
info = self._imdb_helper.match_multi(name=name)
if info:
break
else:
info = None
if info:
# mediainfo = MediaInfo(source_info={"imdb": info})
mediainfo = MediaInfo()
if meta:
logger.info(f"{meta.name} IMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.imdb_id}")
else:
logger.info(f"{imdbid} IMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year}")
return mediainfo
logger.info(f"{meta.name if meta else imdbid} 未匹配到IMDB媒体信息")
return None
@staticmethod
def __movie_to_media(movie_info: dict) -> schemas.MediaInfo:
title = ""
@@ -150,7 +281,7 @@ class ImdbSource(_PluginBase):
return schemas.MediaInfo(
type="电影",
title=title,
year=release_year,
year=f'{release_year}',
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(movie_info.get("id")),
@@ -191,7 +322,7 @@ class ImdbSource(_PluginBase):
return schemas.MediaInfo(
type="电视剧",
title=title,
year=release_year,
year=f'{release_year}',
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(series_info.get("id")),

View File

@@ -1,15 +1,11 @@
import re
from typing import Optional, Any, Dict, List, Tuple
from io import StringIO
from collections import OrderedDict
from dataclasses import dataclass
import graphene
import requests
from requests_html import HTMLSession
import ijson
import json
import base64
from app.log import logger
from app.utils.http import RequestUtils
@@ -149,8 +145,7 @@ class ImdbHelper:
def __init__(self, proxies=None):
self._proxies = proxies
self._session = HTMLSession()
self._req_utils = RequestUtils(headers=self._imdb_headers, session=self._session, timeout=10, proxies=proxies)
self._req_utils = RequestUtils(headers=self._imdb_headers, session=HTMLSession(), timeout=10, proxies=proxies)
self._imdb_req = RequestUtils(accept_type="application/json",
content_type="application/json",
headers=self._imdb_headers,
@@ -158,6 +153,7 @@ class ImdbHelper:
proxies=proxies,
session=requests.Session())
self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None}
self.hash_status = {"AdvancedTitleSearch": False, "TitleAkasPaginated": False}
self._search_states = OrderedDict()
self._max_states = 30
@@ -185,7 +181,7 @@ class ImdbHelper:
f"/en-US/title/{imdbid}/episodes.json?season={season}&ref_=ttep&tconst={imdbid}")
response = self._req_utils.get_res(url)
if not response or response.status_code != 200:
return
return None
json_content = response.text
try:
section = next(ijson.items(json_content, prefix))
@@ -247,11 +243,14 @@ class ImdbHelper:
return None
data = ret.json()
if "errors" in data:
logger.error(f"Imdb query errors")
return None
error = data.get("errors")[0] if data.get("errors") else {}
if error and error.get("message") == 'PersistedQueryNotFound':
logger.warn(f"PersistedQuery hash has expired, trying to update...")
self.__get_hash.cache_clear()
return {'error': error}
return data.get("data")
@cached(maxsize=1, ttl=30 * 24 * 3600)
@cached(maxsize=1, ttl=6 * 3600)
def __get_hash(self) -> Optional[dict]:
"""
根据IMDb hash使用
@@ -264,11 +263,13 @@ class ImdbHelper:
proxies=self._proxies
)
if not res:
logger.error("获取IMDb hash")
logger.error("Error getting hash")
return None
return res.json()
def __update_hash(self):
def __update_hash(self, force: bool = False) -> None:
if force:
self.__get_hash.cache_clear()
imdb_hash = self.__get_hash()
if imdb_hash:
self._imdb_api_hash["AdvancedTitleSearch"] = imdb_hash.get("AdvancedTitleSearch")
@@ -325,7 +326,8 @@ class ImdbHelper:
release_date_start: Optional[str] = None,
award_constraint: Optional[Tuple[str, ...]] = None,
ranked: Optional[Tuple[str, ...]] = None,
interests: Optional[Tuple[str, ...]] = None):
interests: Optional[Tuple[str, ...]] = None
)->Optional[Dict]:
# 创建参数对象
params = SearchParams(
title_types=title_types,
@@ -342,7 +344,7 @@ class ImdbHelper:
ranked=ranked,
interests=interests
)
sha256 = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4'
sha256 = '81b46290a78cc1e8b3d713e6a43c191c55b4dccf3e1945d6b46668945846d832'
self.__update_hash()
if self._imdb_api_hash.get("AdvancedTitleSearch"):
sha256 = self._imdb_api_hash["AdvancedTitleSearch"]
@@ -359,7 +361,7 @@ class ImdbHelper:
'titleTypes': [], 'jobCategories': []}
if search_state.pageinfo.get('endCursor'):
last_cursor = search_state.pageinfo.get('endCursor')
# 这里实现基于上次结果的逻辑
# 实现基于上次结果的逻辑
else:
# 重新搜索
first_page = True
@@ -382,11 +384,12 @@ class ImdbHelper:
last_cursor: Optional[str] = None,
) -> Optional[Dict]:
variables = {"first": 50,
variables: Dict[str, Any] = {"first": 50,
"locale": "en-US",
"sortBy": params.sort_by,
"sortOrder": params.sort_order,
}
operation_name = 'AdvancedTitleSearch'
if params.title_types:
title_type_ids = []
for title_type in params.title_types:
@@ -439,12 +442,20 @@ class ImdbHelper:
if not first_page and last_cursor:
variables["after"] = last_cursor
params = {"operationName": "AdvancedTitleSearch",
params = {"operationName": operation_name,
"variables": variables}
data = self.__request(params, sha256)
if not data:
return None
return data.get("advancedTitleSearch")
if 'error' in data:
error = data['error']
if error:
logger.error(f"Error querying {operation_name}: {error.get('message')}")
if error.get('message') == 'PersistedQueryNotFound':
self.hash_status[operation_name] = False
return None
self.hash_status[operation_name] = True
return data.get('advancedTitleSearch')
def __known_as(self, imdbid: str,
sha256='48d4f7bfa73230fb550147bd4704d8050080e65fe2ad576da6276cac2330e446') -> Optional[List]:
@@ -453,14 +464,23 @@ class ImdbHelper:
:param imdbid: IMBd id
:return: 别名列表
"""
operation_name = "TitleAkasPaginated"
self.__update_hash()
if self._imdb_api_hash.get("TitleAkasPaginated"):
sha256 = self._imdb_api_hash["TitleAkasPaginated"]
params = {"operationName": "TitleAkasPaginated",
if self._imdb_api_hash.get(operation_name):
sha256 = self._imdb_api_hash[operation_name]
params = {"operationName": operation_name,
"variables": {"const": imdbid, "first": 50, "locale": "en-US", "originalTitleText": False}}
data = self.__request(params=params, sha256=sha256)
if not data:
return None
if 'error' in data:
error = data['error']
if error:
logger.error(f"Error querying {operation_name} API: {error.get('message')}")
if error.get('message') == 'PersistedQueryNotFound':
self.hash_status[operation_name] = False
return None
self.hash_status[operation_name] = True
if not data.get("data", {}).get("title", {}).get("akas", {}).get("total"):
return None
akas = []
@@ -633,6 +653,7 @@ class ImdbHelper:
_tv_info["seasons"] = tv_extra_info["seasons"]
_tv_info["episodes"] = tv_extra_info["episodes"]
return True
return False
tvs = self.search_tvs(title=name)
if (tvs is None) or (len(tvs) == 0):
@@ -660,6 +681,7 @@ class ImdbHelper:
continue
if __season_match(_tv_info=tv_info, _season_year=season_year):
return tv_info
return None
def get_info(self,
mtype: MediaType,