add: IMDb源插件

This commit is contained in:
wumode
2025-05-22 18:10:25 +08:00
parent 538dcfb0cd
commit 96e8d0fbea
5 changed files with 1619 additions and 0 deletions

BIN
icons/IMDb_IOS-OSX_App.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 18 KiB

View File

@@ -419,5 +419,17 @@
"v1.3": "新增一些Trackers",
"v1.4": "异步查询DNS"
}
},
"ImdbSource": {
"name": "IMDb源",
"description": "让探索支持IMDb数据源。",
"labels": "探索",
"version": "1.0",
"icon": "IMDb_IOS-OSX_App.png",
"author": "wumode",
"level": 1,
"history": {
"v1.0": "探索支持IMDb数据源"
}
}
}

View File

@@ -0,0 +1,953 @@
import re
import json
from typing import Optional, Any, List, Dict, Tuple
from app.core.config import settings
from app.core.event import eventmanager, Event
from app.log import logger
from app.plugins import _PluginBase
from app.schemas import DiscoverSourceEventData
from app.schemas.types import EventType, ChainEventType, MediaType
from app.core.meta import MetaBase
from app.core.context import MediaInfo
from app.plugins.imdbsource.imdb_helper import ImdbHelper
from app import schemas
class ImdbSource(_PluginBase):
# 插件名称
plugin_name = "IMDb源"
# 插件描述
plugin_desc = "让探索支持IMDb数据源。"
# 插件图标
plugin_icon = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/refs/heads/imdbsource_assets/icons/"
"IMDb_IOS-OSX_App.png")
# 插件版本
plugin_version = "1.0"
# 插件作者
plugin_author = "wumode"
# 作者主页
author_url = "https://github.com/wumode"
# 插件配置项ID前缀
plugin_config_prefix = "imdbsource_"
# 加载顺序
plugin_order = 22
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled = False
_proxy = False
_imdb_helper = None
_discover_cache = []
def init_plugin(self, config: dict = None):
if config:
self._enabled = config.get("enabled")
self._proxy = config.get("proxy")
self._imdb_helper = ImdbHelper(proxies=settings.PROXY if self._proxy else None)
if "media-amazon.com" not in settings.SECURITY_IMAGE_DOMAINS:
settings.SECURITY_IMAGE_DOMAINS.append("media-amazon.com")
if "media-imdb.com" not in settings.SECURITY_IMAGE_DOMAINS:
settings.SECURITY_IMAGE_DOMAINS.append("media-imdb.com")
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
"component": "VForm",
"content": [
{
"component": "VRow",
"content": [
{
"component": "VCol",
"props": {"cols": 12, "md": 4},
"content": [
{
"component": "VSwitch",
"props": {
"model": "enabled",
"label": "启用插件",
},
}
],
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 4
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'proxy',
'label': '使用代理服务器',
}
}
]
}
],
}
],
}
], {
"enabled": False,
"proxy": False
}
def get_page(self) -> List[dict]:
pass
def stop_service(self):
"""
退出插件
"""
pass
def get_module(self) -> Dict[str, Any]:
"""
获取插件模块声明,用于胁持系统模块实现(方法名:方法实现)
{
"id1": self.xxx1,
"id2": self.xxx2,
}
"""
# return {"recognize_media": (self.recognize_media, ModuleExecutionType.Hijack)}
pass
@staticmethod
# @MediaInfo.source_processor("imdb")
def process_imdb_info(mediainfo: MediaInfo, info: dict):
"""处理 IMDB 信息"""
mediainfo.source_info["imdb"] = info
if isinstance(info.get('media_type'), MediaType):
mediainfo.type = info.get('media_type')
elif info.get('media_type'):
mediainfo.type = MediaType.MOVIE if info.get("type") == "movie" else MediaType.TV
mediainfo.title = info.get("title")
mediainfo.release_date = info.get('release_date')
if info.get("id"):
mediainfo.source_id["imdb"] = info.get("id")
mediainfo.imdb_id = info.get('id')
if not mediainfo.source_id:
return
mediainfo.vote_average = round(float(info.get("rating").get("aggregate_rating")), 1) if info.get("rating") else 0
mediainfo.overview = info.get('plot')
mediainfo.genre_ids = info.get('genre') or []
# 风格
if not mediainfo.genres:
mediainfo.genres = [{"id": genre, "name": genre} for genre in info.get("genres") or []]
if info.get('spoken_languages', []):
mediainfo.original_language = info.get('spoken_languages', [])[0].get("name")
mediainfo.en_title = info.get('primary_title')
mediainfo.title = info.get('primary_title')
mediainfo.original_title = info.get('original_title')
# mediainfo.release_date = info.get('start_year')
mediainfo.year = info.get('start_year')
if info.get('posters', []):
mediainfo.poster_path = info.get("posters", [])[0].get("url")
directors = []
if info.get('directors', []):
for dn in info.get('directors', []):
director = dn.get("name")
if not director:
continue
d_ = {"name": director.get("display_name"), "id": director.get("id"), "avatars": director.get("avatars")}
directors.append(d_)
if info.get('writers', []):
for wn in info.get('writers', []):
writer = wn.get("name")
d_ = {"name": writer.get("display_name"), "id": writer.get("id"), "avatars": writer.get("avatars")}
directors.append(d_)
mediainfo.directors = directors
actors = []
if info.get('casts', []):
for cast in info.get('casts', []):
cn = cast.get("name", {})
character_name = cast.get("characters")[0] if cast.get("characters") else ''
d_ = {"name": cn.get("display_name"), "id": cn.get("id"),
"avatars": cn.get("avatars"), "character": character_name}
actors.append(d_)
def recognize_media(self, meta: MetaBase = None,
mtype: MediaType = None,
imdbid: Optional[str] = None,
episode_group: Optional[str] = None,
cache: Optional[bool] = True,
**kwargs) -> Optional[MediaInfo]:
logger.warn(f"IMDb Source: {MetaBase.title}")
if not self._imdb_helper:
return None
if not imdbid and not meta:
return None
if not meta:
# 未提供元数据时直接使用imdbid查询不使用缓存
cache_info = {}
elif not meta.name:
logger.warn("识别媒体信息时未提供元数据名称")
return None
cache_info = {}
if not cache_info or not cache:
info = None
if imdbid:
info = self._imdb_helper.get_info(mtype=mtype, imdbid=imdbid)
if not info and meta:
info = {}
names = list(dict.fromkeys([k for k in [meta.cn_name, meta.en_name] if k]))
for name in names:
if meta.begin_season:
logger.info(f"正在识别 {name}{meta.begin_season}季 ...")
else:
logger.info(f"正在识别 {name} ...")
if meta.type == MediaType.UNKNOWN and not meta.year:
info = self._imdb_helper.match_multi(name)
else:
if meta.type == MediaType.TV:
# 确定是电视
info = self._imdb_helper.match(name=name,
year=meta.year,
mtype=meta.type,season_year=meta.year,
season_number=meta.begin_season)
if not info:
# 去掉年份再查一次
info = self._imdb_helper.match(name=name, mtype=meta.type)
else:
# 有年份先按电影查
info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.MOVIE)
# 没有再按电视剧查
if not info:
info = self._imdb_helper.match(name=name,
year=meta.year,
mtype=MediaType.TV)
if not info:
# 去掉年份和类型再查一次
info = self._imdb_helper.match_multi(name=name)
if info:
break
else:
info = None
if info:
# mediainfo = MediaInfo(source_info={"imdb": info})
mediainfo = MediaInfo()
if meta:
logger.info(f"{meta.name} IMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year} "
f"{mediainfo.imdb_id}")
else:
logger.info(f"{imdbid} IMDB识别结果{mediainfo.type.value} "
f"{mediainfo.title_year}")
return mediainfo
logger.info(f"{meta.name if meta else imdbid} 未匹配到IMDB媒体信息")
return None
def imdb_discover(self, apikey: str, mtype: str = "series",
country: str = None,
lang: str = None,
genre: str = None,
sort_by: str = 'POPULARITY',
sort_order: str = 'ASC',
using_rating: bool = False,
user_rating: str = None,
year: str = None,
award: str = None,
page: int = 1, count: int = 30) -> List[schemas.MediaInfo]:
def __movie_to_media(movie_info: dict) -> schemas.MediaInfo:
title = ""
if movie_info.get("titleText"):
title = movie_info.get("titleText", {}).get("text", "")
release_year = 0
if movie_info.get("releaseYear"):
release_year = movie_info.get("releaseYear", {}).get("year")
poster_path = None
if movie_info.get("primaryImage"):
poster_path = movie_info.get("primaryImage").get("url")
vote_average = 0
if movie_info.get("ratingsSummary"):
vote_average = movie_info.get("ratingsSummary").get("aggregateRating")
runtime = 0
if movie_info.get("runtime"):
runtime = movie_info.get("runtime").get("seconds")
overview = ''
if movie_info.get("plot"):
overview = movie_info.get("plot").get("plotText").get("plainText")
return schemas.MediaInfo(
type="电影",
title=title,
year=release_year,
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(movie_info.get("id")),
poster_path=poster_path,
vote_average=vote_average,
runtime=runtime,
overview=overview
)
def __series_to_media(series_info: dict) -> schemas.MediaInfo:
title = ""
if series_info.get("titleText"):
title = series_info.get("titleText", {}).get("text", "")
release_year = 0
if series_info.get("releaseYear"):
release_year = series_info.get("releaseYear", {}).get("year")
poster_path = None
if series_info.get("primaryImage"):
poster_path = series_info.get("primaryImage").get("url")
vote_average = 0
if series_info.get("ratingsSummary"):
vote_average = series_info.get("ratingsSummary").get("aggregateRating")
runtime = 0
if series_info.get("runtime"):
runtime = series_info.get("runtime").get("seconds")
overview = ''
if series_info.get("plot"):
if series_info.get("plot").get("plotText"):
overview = series_info.get("plot").get("plotText").get("plainText")
release_date_str = '0000-00-00'
if series_info.get("releaseDate"):
release_date = series_info.get('releaseDate')
release_date_str = f"{release_date.get('year')}-{release_date.get('month')}-{release_date.get('day')}"
return schemas.MediaInfo(
type="电视剧",
title=title,
year=release_year,
title_year=f"{title} ({release_year})",
mediaid_prefix="imdb",
media_id=str(series_info.get("id")),
release_date=release_date_str,
poster_path=poster_path,
vote_average=vote_average,
runtime=runtime,
overview=overview
)
if not self._imdb_helper:
return []
title_type: MediaType = MediaType.TV
if mtype == 'movies':
title_type = MediaType.MOVIE
if user_rating and using_rating:
user_rating = float(user_rating)
else:
user_rating = None
genres = [genre] if genre else None
countries = [country] if country else None
languages = [lang] if lang else None
release_date_start = None
release_date_end = None
if year:
if year == "2025":
release_date_start = "2025-01-01"
elif year == "2024":
release_date_start = "2024-01-01"
release_date_end = "2024-12-31"
elif year == "2023":
release_date_start = "2023-01-01"
release_date_end = "2023-12-31"
elif year == "2022":
release_date_start = "2022-01-01"
release_date_end = "2022-12-31"
elif year == "2021":
release_date_start = "2021-01-01"
release_date_end = "2021-12-31"
elif year == "2020":
release_date_start = "2020-01-01"
release_date_end = "2020-12-31"
elif year == "2020s":
release_date_start = "2020-01-01"
release_date_end = "2029-12-31"
elif year == "2010s":
release_date_start = "2010-01-01"
release_date_end = "2019-12-31"
elif year == "2000s":
release_date_start = "2000-01-01"
release_date_end = "2009-12-31"
elif year == "1990s":
release_date_start = "1990-01-01"
release_date_end = "1999-12-31"
elif year == "1980s":
release_date_start = "1980-01-01"
release_date_end = "1989-12-31"
elif year == "1970s":
release_date_start = "1970-01-01"
release_date_end = "1979-12-31"
awards = [award] if award else None
first_page = False
if page == 1:
first_page = True
self._discover_cache = [] # 清空缓存
results = []
if len(self._discover_cache) >= count:
results = self._discover_cache[:30]
self._discover_cache = self._discover_cache[30:]
else:
results.extend(self._discover_cache)
remaining = 30 - len(results)
self._discover_cache = [] # 清空缓存
data = self._imdb_helper.advanced_title_search(first_page=first_page,
title_type=title_type,
genres=genres,
sort_by=sort_by,
sort_order=sort_order,
rating_min=user_rating,
countries=countries,
languages=languages,
release_date_end=release_date_end,
release_date_start=release_date_start,
award_constraint=awards)
if not data:
new_results = []
else:
new_results = data.get("edges")
if new_results:
results.extend(new_results[:remaining])
self._discover_cache = new_results[remaining:]
if mtype == "movies":
results = [__movie_to_media(movie.get('node').get("title")) for movie in results]
else:
results = [__series_to_media(series.get('node').get("title")) for series in results]
return results
def get_api(self) -> List[Dict[str, Any]]:
"""
获取插件API
[{
"path": "/xx",
"endpoint": self.xxx,
"methods": ["GET", "POST"],
"summary": "API说明"
}]
"""
return [{
"path": "/imdb_discover",
"endpoint": self.imdb_discover,
"methods": ["GET"],
"summary": "TheTVDB探索数据源",
"description": "获取TheTVDB探索数据",
}]
@staticmethod
def imdb_filter_ui() -> List[dict]:
"""
IMDb过滤参数UI配置
"""
# 国家字典
country_dict = {
"US": "美国",
"CN": "中国",
"JP": "日本",
"KR": "韩国",
"IN": "印度",
"FR": "法国",
"DE": "德国",
"IT": "意大利",
"ES": "西班牙",
"UK": "英国",
"AU": "澳大利亚",
"CA": "加拿大",
"RU": "俄罗斯",
"BR": "巴西",
"MX": "墨西哥",
"AR": "阿根廷"
}
cuntry_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in country_dict.items()
]
# 原始语种字典
lang_dict = {
"en": "英语",
"zh": "中文",
"jp": "日语",
"ko": "韩语",
"fr": "法语",
"de": "德语",
"it": "意大利语",
"es": "西班牙语",
"pt": "葡萄牙语",
"ru": "俄语"
}
lang_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in lang_dict.items()
]
# 风格字典
genre_dict = {
"Action": "动作",
"Adventure": "冒险",
"Animation": "动画",
"Biography": "传记",
"Comedy": "喜剧",
"Crime": "犯罪",
"Documentary": "纪录片",
"Drama": "剧情",
"Family": "家庭",
"Fantasy": "奇幻",
"Game-Show": "游戏节目",
"History": "历史",
"Horror": "恐怖",
"Music": "音乐",
"Musical": "歌舞",
"Mystery": "悬疑",
"News": "新闻",
"Reality-TV": "真人秀",
"Romance": "爱情",
"Sci-Fi": "科幻",
"Short": "短片",
"Sport": "体育",
"Talk-Show": "脱口秀",
"Thriller": "惊悚",
"War": "战争",
"Western": "西部片"
}
genre_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in genre_dict.items()
]
# 排序字典
sort_dict = {
"POPULARITY": "人气",
"USER_RATING": "评分",
"RELEASE_DATE": "发布日期",
"TITLE_REGIONAL": "A-Z"
}
sort_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in sort_dict.items()
]
sort_order_dict = {
"ASC": "升序",
"DESC": "降序",
}
sort_order_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in sort_order_dict.items()
]
year_dict = {
"2025": "2025",
"2024": "2024",
"2023": "2023",
"2022": "2022",
"2021": "2021",
"2020": "2020",
"2020s": "2020s",
"2010s": "2010s",
"2000s": "2000s",
"1990s": "1990s",
"1980s": "1980s",
"1970s": "1970s",
}
year_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in year_dict.items()
]
award_dict = {
"ev0000003-Winning": "奥斯卡奖",
"ev0000223-Winning": "艾美奖",
"ev0000292-Winning": "金球奖",
"ev0000003-Nominated": "奥斯卡提名",
"ev0000223-Nominated": "艾美奖提名",
"ev0000292-Nominated": "金球奖提名",
"ev0000003-bestPicture-Winning": "最佳影片",
"ev0000003-bestPicture-Nominated": "最佳影片提名",
"ev0000003-bestDirector-Winning": "最佳导演",
"ev0000003-bestDirector-Nominated": "最佳导演提名",
"ev0000558-Winning": "金酸莓奖",
"ev0000558-Nominated": "金酸莓奖提名"
}
award_ui = [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": key
},
"text": value
} for key, value in award_dict.items()
]
return [
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "类型"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "mtype"
},
"content": [
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": "series"
},
"text": "电视剧"
},
{
"component": "VChip",
"props": {
"filter": True,
"tile": True,
"value": "movies"
},
"text": "电影"
}
]
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "风格"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "genre"
},
"content": genre_ui
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "国家"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "country"
},
"content": cuntry_ui
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "语言"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "lang"
},
"content": lang_ui
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "年份"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "year"
},
"content": year_ui
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "奖项"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "award"
},
"content": award_ui
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "排序依据"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "sort_by"
},
"content": sort_ui
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "排序方式"
}
]
},
{
"component": "VChipGroup",
"props": {
"model": "sort_order"
},
"content": sort_order_ui
}
]
},
{
"component": "div",
"props": {
"class": "flex justify-start items-center"
},
"content": [
{
"component": "div",
"props": {
"class": "mr-5"
},
"content": [
{
"component": "VLabel",
"text": "评分"
}
]
},
{
"component": "VSwitch",
"props": {
"model": "using_rating",
"label": "启用",
},
},
{
"component": "VDivider",
"props": {
"class": "my-3"
}
},
{
"component": "VSlider",
"props": {
"v-model": "user_rating",
"thumb-label": True,
"max": "10",
"min": "1",
"step": "1",
"hide-details": True,
}
}
]
}
]
@eventmanager.register(ChainEventType.DiscoverSource)
def discover_source(self, event: Event):
"""
监听识别事件
"""
if not self._enabled:
return
event_data: DiscoverSourceEventData = event.event_data
imdb_source = schemas.DiscoverMediaSource(
name="IMDb",
mediaid_prefix="imdb",
api_path=f"plugin/ImdbSource/imdb_discover?apikey={settings.API_TOKEN}",
filter_params={
"mtype": "series",
"company": None,
"contentRating": None,
"country": None,
"genre": None,
"lang": None,
"sort_by": "POPULARITY",
"sort_order": "ASC",
"status": None,
"year": None,
"user_rating": 1,
"using_rating": False,
"award": None
},
filter_ui=self.imdb_filter_ui()
)
if not event_data.extra_sources:
event_data.extra_sources = [imdb_source]
else:
event_data.extra_sources.append(imdb_source)

View File

@@ -0,0 +1,651 @@
import re
from typing import Optional, Dict, List
from io import StringIO
import graphene
from requests_html import HTMLSession
import ijson
import json
import base64
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
from app.schemas.types import MediaType
from app.core.cache import cached
class ImdbHelper:
_query_by_id = """query queryWithVariables($id: ID!) {
title(id: $id) {
id
type
is_adult
primary_title
original_title
start_year
end_year
runtime_minutes
plot
rating {
aggregate_rating
votes_count
}
genres
posters {
url
width
height
}
certificates {
country {
code
name
}
rating
}
spoken_languages {
code
name
}
origin_countries {
code
name
}
critic_review {
score
review_count
}
directors: credits(first: 5, categories: ["director"]) {
name {
id
display_name
avatars {
url
width
height
}
}
}
writers: credits(first: 5, categories: ["writer"]) {
name {
id
display_name
avatars {
url
width
height
}
}
}
casts: credits(first: 5, categories: ["actor", "actress"]) {
name {
id
display_name
avatars {
url
width
height
}
}
characters
}
}
}"""
_endpoint = "https://graph.imdbapi.dev/v1"
_search_endpoint = "https://v3.sg.media-imdb.com/suggestion/x/%s.json?includeVideos=0"
_official_endpoint = "https://caching.graphql.imdb.com/"
_hash_update_url = ("https://raw.githubusercontent.com/wumode/MoviePilot-Plugins/"
"refs/heads/imdbsource_assets/plugins.v2/imdbsource/imdb_hash.json")
_qid_map = {
MediaType.TV: ["tvSeries", "tvMiniSeries"],
MediaType.MOVIE: ["movie"]
}
_imdb_headers = {
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome"
"/84.0.4147.105 Safari/537.36",
"Referer": "https://www.imdb.com/",
}
def __init__(self, proxies=None):
self._proxies = proxies
self._session = HTMLSession()
self._req_utils = RequestUtils(headers=self._imdb_headers, session=self._session, timeout=10, proxies=proxies)
self._imdb_req = RequestUtils(accept_type="application/json", content_type="application/json",
headers=self._imdb_headers, timeout=10, proxies=proxies)
self._last_cursor = ''
self._imdb_api_hash = {"AdvancedTitleSearch": None, "TitleAkasPaginated": None}
def imdbid(self, imdbid: str) -> Optional[Dict]:
params = {"operationName": "queryWithVariables", "query": self._query_by_id, "variables": {"id": imdbid}}
ret = RequestUtils(
accept_type="application/json", content_type="application/json"
).post_res(f"{self._endpoint}", json=params)
if not ret:
return None
data = ret.json()
if "errors" in data:
logger.error(f"Imdb query ({imdbid}) errors {data.get('errors')}")
logger.error(f"{params}")
return None
info = data.get("data").get("title", None)
return info
@cached(maxsize=1000, ttl=3600)
def __episodes_by_season(self, imdbid: str, build_id: str, season: str) -> Optional[Dict]:
if not build_id or not season:
return None
prefix = "pageProps.contentData.section"
url = (f"https://www.imdb.com/_next/data/{build_id}"
f"/en-US/title/{imdbid}/episodes.json?season={season}&ref_=ttep&tconst={imdbid}")
response = self._req_utils.get_res(url)
if not response or response.status_code != 200:
return
json_content = response.text
try:
section = next(ijson.items(json_content, prefix))
except StopIteration:
logger.warn(f"No data found at prefix: {prefix}")
return None
except (ijson.JSONError, ValueError) as e:
logger.warn(f"JSON parsing error: {e}")
return None
except TypeError as e:
logger.warn(f"Invalid input type: {e}")
return None
return section
@cached(maxsize=1000, ttl=3600)
def __episodes(self, imdbid: str) -> Optional[Dict]:
prefix = "props.pageProps.contentData.section"
url = f"https://www.imdb.com/title/{imdbid}/episodes/"
response = self._req_utils.get_res(url)
if not response or response.status_code != 200:
return
script_content = response.html.xpath('//script[@id="__NEXT_DATA__"]/text()')
if len(script_content) == 0:
return None
json_content = script_content[0]
# 直接定位到目标路径提取 items
try:
section = next(ijson.items(json_content, prefix))
except StopIteration:
logger.warn(f"No data found at prefix: {prefix}")
return None
except (ijson.JSONError, ValueError) as e:
logger.warn(f"JSON parsing error: {e}")
return None
except TypeError as e:
logger.warn(f"Invalid input type: {e}")
return None
total_seasons = []
for s in section.get("seasons"):
if s.get("value") and s.get("value") not in total_seasons:
total_seasons.append(s.get("value"))
build_id = next(ijson.items(json_content, 'buildId'))
current_season = section.get('currentSeason') or '1'
total_seasons.remove(current_season)
for season in total_seasons:
section_next = self.__episodes_by_season(imdbid, build_id=build_id, season=season)
if section_next:
section["episodes"]["items"].extend(section_next.get("episodes", {}).get("items", []))
section["episodes"]["total"] += section_next.get("episodes", {}).get("total", 0)
return section
@cached(maxsize=32, ttl=1800)
def __request(self, params: Dict, sha256) -> Optional[Dict]:
params["extensions"] = {"persistedQuery": {"sha256Hash": sha256, "version": 1}}
ret = self._imdb_req.post_res(f"{self._official_endpoint}", json=params)
if not ret:
return None
data = ret.json()
if "errors" in data:
logger.error(f"Imdb query errors")
return None
return data.get("data")
@cached(maxsize=1, ttl=30 * 24 * 3600)
def __get_hash(self) -> Optional[dict]:
"""
根据IMDb hash使用
"""
headers = {
"Accept": "text/html",
}
res = RequestUtils(headers=headers).get_res(
self._hash_update_url,
proxies=self._proxies
)
if not res:
logger.error("获取IMDb hash")
return None
return res.json()
def __update_hash(self):
imdb_hash = self.__get_hash()
if imdb_hash:
self._imdb_api_hash["AdvancedTitleSearch"] = imdb_hash.get("AdvancedTitleSearch")
self._imdb_api_hash["TitleAkasPaginated"] = imdb_hash.get("TitleAkasPaginated")
@staticmethod
def __award_to_constraint(award: str) -> Optional[Dict]:
pattern = r'^(ev\d+)(?:-(best\w+))?-(Winning|Nominated)$'
match = re.match(pattern, award)
constraint = {}
if match:
ev_id = match.group(1) # 第一部分evXXXXXXXX
best = match.group(2) # 第二部分bestXX可选
status = match.group(3) # 第三部分Winning/Nominated
constraint["eventId"] = ev_id
if status == "Winning":
constraint["winnerFilter"] = "WINNER_ONLY"
if best:
constraint["searchAwardCategoryId"] = best
return constraint
else:
return None
def advanced_title_search(self,
sha256: str = 'be358d7b41add9fd174461f4c8c673dfee5e2a88744e2d5dc037362a96e2b4e4',
first_page: bool = True,
title_type: MediaType = MediaType.TV,
genres: Optional[List] = None,
sort_by: str = 'POPULARITY',
sort_order: str = 'ASC',
rating_min: Optional[float] = None,
rating_max: Optional[float] = None,
countries: Optional[List] = None,
languages: Optional[list] = None,
release_date_end: Optional[str] = None,
release_date_start: Optional[str] = None,
award_constraint: Optional[List[str]] = None
) -> Optional[Dict]:
self.__update_hash()
if self._imdb_api_hash.get("AdvancedTitleSearch"):
sha256 = self._imdb_api_hash["AdvancedTitleSearch"]
if title_type not in [MediaType.TV, MediaType.MOVIE]:
return None
variables = {"first": 50,
"locale": "en-US",
"sortBy": sort_by,
"sortOrder": sort_order,
"titleTypeConstraint": {"anyTitleTypeIds": self._qid_map[title_type],
"excludeTitleTypeIds": []}}
if genres:
variables["genreConstraint"] = {"allGenreIds": genres, "excludeGenreIds": []}
if countries:
variables["originCountryConstraint"] = {"allCountries": countries}
if languages:
variables["languageConstraint"] = {"anyPrimaryLanguages": languages}
if rating_min or rating_max:
rating_min = rating_min if rating_min else 1
rating_min = max(rating_min, 1)
rating_max = rating_max if rating_max else 10
rating_max = min(rating_max, 10)
variables["userRatingsConstraint"] = {"aggregateRatingRange": {"max": rating_max, "min": rating_min}}
if release_date_start or release_date_end:
release_dict = {}
if release_date_start:
release_dict["start"] = release_date_start
if release_date_end:
release_dict["end"] = release_date_end
variables["releaseDateConstraint"] = {"releaseDateRange": release_dict}
if award_constraint:
constraints = []
for award in award_constraint:
c = self.__award_to_constraint(award)
if c:
constraints.append(c)
variables["awardConstraint"] = {"allEventNominations": constraints}
if not first_page and self._last_cursor:
variables["after"] = self._last_cursor
params = {"operationName": "AdvancedTitleSearch",
"variables": variables}
data = self.__request(params, sha256)
if not data:
return None
page_info = data.get("advancedTitleSearch", {}).get("pageInfo", {})
end_cursor = page_info.get("endCursor", "")
self._last_cursor = end_cursor
return data.get("advancedTitleSearch")
def __known_as(self, imdbid: str,
sha256='48d4f7bfa73230fb550147bd4704d8050080e65fe2ad576da6276cac2330e446') -> Optional[List]:
"""
获取电影和电视别名
:param imdbid: IMBd id
:return: 别名列表
"""
self.__update_hash()
if self._imdb_api_hash.get("TitleAkasPaginated"):
sha256 = self._imdb_api_hash["TitleAkasPaginated"]
params = {"operationName": "TitleAkasPaginated",
"variables": {"const": imdbid, "first": 50, "locale": "en-US", "originalTitleText": False}}
data = self.__request(params=params, sha256=sha256)
if not data:
return None
if not data.get("data", {}).get("title", {}).get("akas", {}).get("total"):
return None
akas = []
for edge in data["data"]["title"]["akas"]["edges"]:
title = edge.get("node", {}).get("displayableProperty", {}).get("value", {}).get("plainText")
if not title:
continue
country = edge.get("node", {}).get("country", {})
language = edge.get("node", {}).get("language", {})
akas.append({"title": title, "country": country, "language": language})
return akas
def __search_on_imdb(self, term, mtype, release_year=None):
params = f"{term}"
if release_year is not None:
params += f" {release_year}"
ret = RequestUtils(
accept_type="application/json",
).get_res(f"{self._search_endpoint % params}")
if not ret:
return None
data = ret.json()
if "d" not in data:
return None
result = [d for d in data["d"] if d.get("qid") in self._qid_map.get(mtype)]
return result
def search_tvs(self, title: str, year: str = None) -> List[dict]:
if not title:
return []
if year:
tvs = self.__search_on_imdb(title, MediaType.TV, year) or []
else:
tvs = self.__search_on_imdb(title, MediaType.TV, ) or []
ret_infos = []
for tv in tvs:
# if title in tv.get("l"):
# if self.__compare_names(title, [tv.get("l")]):
# tv['media_type'] = MediaType.TV
ret_infos.append(tv)
return ret_infos
def search_movies(self, title: str, year: str = None) -> List[dict]:
if not title:
return []
if year:
movies = self.__search_on_imdb(title, MediaType.MOVIE, year) or []
else:
movies = self.__search_on_imdb(title, MediaType.MOVIE) or []
ret_infos = []
for movie in movies:
# if title in movie.get("l"):
# if self.__compare_names(title, [movie.get("l")]):
# movie['media_type'] = MediaType.MOVIE
ret_infos.append(movie)
return ret_infos
@staticmethod
def __compare_names(file_name: str, tmdb_names: list) -> bool:
"""
比较文件名是否匹配,忽略大小写和特殊字符
:param file_name: 识别的文件名或者种子名
:param tmdb_names: TMDB返回的译名
:return: True or False
"""
if not file_name or not tmdb_names:
return False
if not isinstance(tmdb_names, list):
tmdb_names = [tmdb_names]
file_name = StringUtils.clear(file_name).upper()
for tmdb_name in tmdb_names:
tmdb_name = StringUtils.clear(tmdb_name).strip().upper()
if file_name == tmdb_name:
return True
return False
def __search_movie_by_name(self, name: str, year: str) -> Optional[dict]:
"""
根据名称查询电影IMDB匹配
:param name: 识别的文件名或种子名
:param year: 电影上映日期
:return: 匹配的媒体信息
"""
movies = self.search_movies(name, year=year)
if (movies is None) or (len(movies) == 0):
logger.debug(f"{name} 未找到相关电影信息!")
return {}
movies = sorted(
movies,
key=lambda x: str(x.get("y") or '0000'),
reverse=True
)
for movie in movies:
movie_year = f"{movie.get('y')}"
if year and movie_year != year:
# 年份不匹配
continue
# 匹配标题、原标题
movie_info = self.imdbid(movie.get("id"))
if not movie_info:
continue
if self.__compare_names(name, [movie_info.get("primary_title")]):
return movie_info
if movie_info.get("original_title") and self.__compare_names(name, [movie_info.get("original_title")]):
return movie_info
akas = self.__known_as(movie.get("id"))
if not akas:
continue
akas_names = [item.get("title") for item in akas]
if self.__compare_names(name, akas_names):
return movie_info
return {}
def __search_tv_by_name(self, name: str, year: str) -> Optional[dict]:
"""
根据名称查询电视剧IMDB匹配
:param name: 识别的文件名或者种子名
:param year: 电视剧的首播年份
:return: 匹配的媒体信息
"""
tvs = self.search_tvs(name, year=year)
if (tvs is None) or (len(tvs) == 0):
logger.debug(f"{name} 未找到相关电影信息!")
return {}
tvs = sorted(
tvs,
key=lambda x: str(x.get("y") or '0000'),
reverse=True
)
for tv in tvs:
tv_year = f"{tv.get('y')}"
if year and tv_year != year:
# 年份不匹配
continue
# 匹配标题、原标题
tv_info = self.imdbid(tv.get("id"))
if not tv_info:
continue
if self.__compare_names(name, [tv_info.get("primary_title")]):
return tv_info
if tv_info.get("original_title") and self.__compare_names(name, [tv_info.get("original_title")]):
return tv_info
akas = self.__known_as(tv.get("id"))
if not akas:
continue
akas_names = [item.get("title") for item in akas]
if self.__compare_names(name, akas_names):
return tv_info
return {}
def __search_tv_by_season(self, name: str, season_year: str, season_number: int) -> Optional[dict]:
"""
根据电视剧的名称和季的年份及序号匹配IMDB
:param name: 识别的文件名或者种子名
:param season_year: 季的年份
:param season_number: 季序号
:return: 匹配的媒体信息
"""
def __season_match(_tv_info: dict, _season_year: str) -> bool:
tv_extra_info = self.__episodes(_tv_info.get("id"))
if not tv_extra_info:
return False
release_year = []
for item in tv_extra_info["episodes"]["items"]:
if item.get("season") == season_number:
release_year.append(item.get("releaseDate").get("year") or item.get("releaseYear"))
first_release_year = min(release_year) if release_year else tv_extra_info["currentYear"]
if first_release_year == _season_year:
_tv_info["seasons"] = tv_extra_info["seasons"]
_tv_info["episodes"] = tv_extra_info["episodes"]
return True
tvs = self.search_tvs(title=name)
if (tvs is None) or (len(tvs) == 0):
logger.debug("%s 未找到季%s相关信息!" % (name, season_number))
return {}
tvs = sorted(
tvs,
key=lambda x: str(x.get('y') or '0000'),
reverse=True
)
for tv in tvs:
tv_info = self.imdbid(tv.get("id"))
if not tv_info:
continue
tv_year = f"{tv.get('y')}" if tv.get('y') else None
if (self.__compare_names(name, [tv_info.get('primary_title')])
or (tv_info.get('original_title') and self.__compare_names(name, [tv_info.get('original_title')]))) \
and (tv_year == str(season_year)):
return tv_info
akas = self.__known_as(tv.get("id"))
if not akas:
continue
akas_names = [item.get("title") for item in akas]
if not self.__compare_names(name, akas_names):
continue
if __season_match(_tv_info=tv_info, _season_year=season_year):
return tv_info
def get_info(self,
mtype: MediaType,
imdbid: str) -> dict:
"""
给定IMDB号查询一条媒体信息
:param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份)
:param imdbid: IMDB的ID
"""
# 查询TMDB详情
if mtype == MediaType.MOVIE:
imdb_info = self.imdbid(imdbid)
if imdb_info:
imdb_info['media_type'] = MediaType.MOVIE
elif mtype == MediaType.TV:
imdb_info = self.imdbid(imdbid)
if imdb_info:
imdb_info['media_type'] = MediaType.TV
tv_extra_info = self.__episodes(imdbid)
imdb_info["seasons"] = tv_extra_info["seasons"]
imdb_info["episodes"] = tv_extra_info["episodes"]
else:
imdb_info = None
logger.warn(f"IMDb id:{imdbid} 未查询到媒体信息")
return imdb_info
def match_multi(self, name: str) -> Optional[dict]:
"""
根据名称同时查询电影和电视剧,没有类型也没有年份时使用
:param name: 识别的文件名或种子名
:return: 匹配的媒体信息
"""
multis = self.search_tvs(name) + self.search_movies(name)
ret_info = {}
if len(multis) == 0:
logger.debug(f"{name} 未找到相关媒体息!")
return {}
else:
multis = sorted(
multis,
key=lambda x: ("1" if x.get("media_type") == MediaType.MOVIE else "0") + str(x.get('y') or '0000'),
reverse=True
)
media_t = MediaType.UNKNOWN
for multi in multis:
media_info = self.imdbid(multi.get("id"))
if not media_info:
continue
if multi.get("media_type") == MediaType.MOVIE:
if self.__compare_names(name, media_info.get('primary_title')) \
or self.__compare_names(name, multi.get('primary_title')):
ret_info = media_info
media_t = MediaType.MOVIE
break
elif multi.get("media_type") == MediaType.TV:
if self.__compare_names(name, media_info.get('primary_title')) \
or self.__compare_names(name, multi.get('primary_title')):
ret_info = media_info
media_t = MediaType.TV
break
if ret_info and not isinstance(ret_info.get("media_type"), MediaType):
ret_info['media_type'] = media_t
return ret_info
def match(self, name: str,
mtype: MediaType,
year: Optional[str] = None,
season_year: Optional[str] = None,
season_number: Optional[int] = None,
group_seasons: Optional[List[dict]] = None) -> Optional[dict]:
"""
搜索imdb中的媒体信息匹配返回一条尽可能正确的信息
:param name: 检索的名称
:param mtype: 类型:电影、电视剧
:param year: 年份,如要是季集需要是首播年份(first_air_date)
:param season_year: 当前季集年份
:param season_number: 季集,整数
:param group_seasons: 集数组信息
:return: TMDB的INFO同时会将mtype赋值到media_type中
"""
if not name:
return None
info = {}
if mtype != MediaType.TV:
year_range = [year]
if year:
year_range.append(str(int(year) + 1))
year_range.append(str(int(year) - 1))
for year in year_range:
logger.debug(
f"正在识别{mtype.value}{name}, 年份={year} ...")
info = self.__search_movie_by_name(name, year)
if info:
info['media_type'] = MediaType.MOVIE
break
else:
# 有当前季和当前季集年份,使用精确匹配
if season_year and season_number:
logger.debug(
f"正在识别{mtype.value}{name}, 季集={season_number}, 季集年份={season_year} ...")
info = self.__search_tv_by_season(name,
season_year,
season_number)
if not info:
year_range = [year]
if year:
year_range.append(str(int(year) + 1))
year_range.append(str(int(year) - 1))
for year in year_range:
logger.debug(
f"正在识别{mtype.value}{name}, 年份={year} ...")
info = self.__search_tv_by_name(name, year)
if info:
break
if info:
info['media_type'] = MediaType.TV
if not info.get("seasons"):
tv_extra_info = self.__episodes(info.get('id'))
if tv_extra_info:
info["seasons"] = tv_extra_info["seasons"]
info["episodes"] = tv_extra_info["episodes"]
return info

View File

@@ -0,0 +1,3 @@
graphene~=3.4.3
ijson~=3.4.0
requests-html~=0.10.0