mirror of
https://github.com/jxxghp/MoviePilot-Plugins.git
synced 2026-03-27 10:05:57 +00:00
610 lines
28 KiB
Python
610 lines
28 KiB
Python
import base64
|
||
from collections import OrderedDict
|
||
from json import JSONDecodeError
|
||
import json
|
||
from typing import Dict, List, Optional, Union, AsyncGenerator
|
||
|
||
from pydantic import ValidationError
|
||
|
||
from app import schemas
|
||
from app.core.cache import cached
|
||
from app.core.config import settings
|
||
from app.core.context import MediaInfo
|
||
from app.log import logger
|
||
from app.schemas.types import MediaType
|
||
from app.utils.http import AsyncRequestUtils
|
||
from app.utils.string import StringUtils
|
||
|
||
from .imdbapi import ImdbApiClient
|
||
from .officialapi import SearchParams, OfficialApiClient, PersistedQueryNotFound
|
||
from .schema import StaffPickApiResponse, ImdbMediaInfo, ImdbApiHash, TitleEdge
|
||
from .schema.imdbapi import ImdbapiPrecisionDate, ImdbApiTitle
|
||
from .schema.imdbtypes import ImdbType, AkasNode, ImdbTitle, ImdbDate
|
||
from ...utils.common import retry
|
||
|
||
|
||
class ImdbHelper:
|
||
MAX_STATES = 128
|
||
|
||
def __init__(self, proxies: Dict[str, str] = None):
|
||
self._proxies = proxies
|
||
self.imdbapi_client = ImdbApiClient(proxies=self._proxies, ua=settings.NORMAL_USER_AGENT)
|
||
self.official_api_client = OfficialApiClient(proxies=self._proxies, ua=settings.NORMAL_USER_AGENT)
|
||
self._imdb_api_hash = ImdbApiHash(
|
||
AdvancedTitleSearch='d32303ed2711e4d03bd5e36cfe0e5304bcffd7e31d1898695f6b6919736ff2a8'
|
||
)
|
||
self._search_states = OrderedDict()
|
||
self._title_generators: OrderedDict[SearchParams, AsyncGenerator[TitleEdge, None]] = OrderedDict()
|
||
|
||
def get_interests_id(self) -> Dict[str, str]:
|
||
return self.official_api_client.interests_id
|
||
|
||
@staticmethod
|
||
@retry(Exception, logger=logger, delay=1)
|
||
async def _async_fetch_github_file(proxies: Dict[str, str] | None, repo: str, owner: str, file_path: str,
|
||
branch: str = None) -> Optional[str]:
|
||
"""
|
||
异步从GitHub仓库获取指定文本文件内容
|
||
|
||
:param proxies: 代理配置
|
||
:param repo: 仓库名称
|
||
:param owner: 仓库所有者
|
||
:param file_path: 文件路径(相对于仓库根目录)
|
||
:param branch: 分支名称,默认为 None(使用默认分支)
|
||
:return: 文件内容字符串,若获取失败则返回 None
|
||
"""
|
||
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}"
|
||
if branch:
|
||
api_url = f"{api_url}?ref={branch}"
|
||
response = await AsyncRequestUtils(headers=settings.GITHUB_HEADERS, proxies=proxies
|
||
).get_res(api_url, raise_exception=True)
|
||
if not response or response.status_code != 200:
|
||
return None
|
||
data = response.json()
|
||
content_base64 = data['content']
|
||
json_bytes = base64.b64decode(content_base64)
|
||
json_text = json_bytes.decode('utf-8')
|
||
return json_text
|
||
|
||
@staticmethod
|
||
async def async_fetch_github_file(proxies: Dict[str, str] | None, repo: str, owner: str, file_path: str,
|
||
branch: str = None) -> Optional[str]:
|
||
try:
|
||
return await ImdbHelper._async_fetch_github_file(proxies, repo, owner, file_path, branch)
|
||
except Exception as e:
|
||
logger.error(f"Error getting GitHub file: {str(e)}")
|
||
return None
|
||
|
||
@cached(maxsize=1)
|
||
async def async_fetch_hash(self) -> Optional[ImdbApiHash]:
|
||
"""
|
||
异步获取 IMDb hash
|
||
"""
|
||
res = await ImdbHelper.async_fetch_github_file(
|
||
self._proxies,
|
||
'MoviePilot-Plugins',
|
||
'wumode',
|
||
'plugins.v2/imdbsource/imdb_hash.json',
|
||
'imdbsource_assets'
|
||
)
|
||
if not res:
|
||
return None
|
||
try:
|
||
hash_data = json.loads(res)
|
||
data = ImdbApiHash.model_validate(hash_data)
|
||
except (JSONDecodeError, ValidationError):
|
||
return None
|
||
return data
|
||
|
||
@cached(maxsize=2, ttl=6 * 3600)
|
||
async def async_fetch_staff_picks(self, zh: bool = False) -> Optional[StaffPickApiResponse]:
|
||
"""
|
||
获取 IMDb Staff Picks
|
||
"""
|
||
file = 'staff_picks.zh.json' if zh else 'staff_picks.json'
|
||
res = await ImdbHelper.async_fetch_github_file(
|
||
self._proxies,
|
||
'MoviePilot-Plugins',
|
||
'wumode',
|
||
f'plugins.v2/imdbsource/{file}',
|
||
'imdbsource_assets'
|
||
)
|
||
if not res:
|
||
logger.error("Error getting staff picks")
|
||
return None
|
||
try:
|
||
data = StaffPickApiResponse.model_validate_json(res, by_name=True)
|
||
except (JSONDecodeError, ValidationError):
|
||
return None
|
||
return data
|
||
|
||
async def _async_update_hash(self, force: bool = False):
|
||
if force:
|
||
await self.async_fetch_hash.cache_clear()
|
||
imdb_hash = await self.async_fetch_hash()
|
||
if isinstance(imdb_hash, ImdbApiHash):
|
||
self._imdb_api_hash = imdb_hash
|
||
|
||
@staticmethod
|
||
def compare_names(file_name: str, names: Union[list, str]) -> bool:
|
||
"""
|
||
比较文件名是否匹配,忽略大小写和特殊字符
|
||
|
||
:param file_name: 识别的文件名或者种子名
|
||
:param names: TMDB返回的译名
|
||
:return: True or False
|
||
"""
|
||
if not file_name or not names:
|
||
return False
|
||
if not isinstance(names, list):
|
||
names = [names]
|
||
names = [name for name in names if name]
|
||
file_name = StringUtils.clear(file_name).upper()
|
||
for name in names:
|
||
name = StringUtils.clear(name).strip().upper()
|
||
if file_name == name:
|
||
return True
|
||
return False
|
||
|
||
@staticmethod
|
||
def type_to_mtype(title_id: str) -> MediaType:
|
||
if title_id in ["tvSeries", "tvMiniSeries", "tvShort", "tvEpisode"]:
|
||
return MediaType.TV
|
||
elif title_id in ["movie", "tvMovie"]:
|
||
return MediaType.MOVIE
|
||
return MediaType.UNKNOWN
|
||
|
||
@staticmethod
|
||
def release_date_string(release_date: ImdbDate) -> Optional[str]:
|
||
year = release_date.year or 0
|
||
month = release_date.month or 0
|
||
day = release_date.day or 0
|
||
return f"{year:04d}-{month:02d}-{day:02d}"
|
||
|
||
@staticmethod
|
||
def get_category(mtype: MediaType, imdb_info: dict) -> str:
|
||
tv_category = {
|
||
'国漫': {'genres': 'Animation', 'originCountries': 'CN,TW,HK'},
|
||
'日番': {'genres': 'Animation', 'originCountries': 'JP'},
|
||
'纪录片': {'genres': 'Documentary'},
|
||
'综艺': {'genres': 'Reality-TV,Game-Show'},
|
||
'国产剧': {'originCountries': 'CN,TW,HK'},
|
||
'欧美剧': {'originCountries': 'US,FR,GB,DE,ES,IT,NL,PT,RU,UK'},
|
||
'日韩剧': {'originCountries': 'JP,KP,KR,TH,IN,SG'},
|
||
'未分类': None
|
||
}
|
||
movie_category = {
|
||
'动画电影': {'genres': 'Animation'},
|
||
'华语电影': {'spokenLanguages': 'zho,cmn,yue,nan'},
|
||
'外语电影': None}
|
||
categories = {MediaType.TV: tv_category, MediaType.MOVIE: movie_category}
|
||
category = categories.get(mtype)
|
||
if not imdb_info or not category:
|
||
return ""
|
||
for key, item in category.items():
|
||
if not item:
|
||
return key
|
||
match_flag = True
|
||
for attr, value in item.items():
|
||
if not value:
|
||
continue
|
||
if attr == 'originCountries':
|
||
origin_countries = imdb_info.get('originCountries')
|
||
info_value = origin_countries[0].get('code') or [] if origin_countries else []
|
||
elif attr == 'spokenLanguages':
|
||
spoken_languages = imdb_info.get('spokenLanguages')
|
||
info_value = spoken_languages[0].get('code') or [] if spoken_languages else []
|
||
else:
|
||
info_value = imdb_info.get(attr)
|
||
if isinstance(info_value, list):
|
||
info_values = info_value
|
||
else:
|
||
info_values = [info_value]
|
||
if value.find(',') != -1:
|
||
values = [str(val) for val in value.split(',') if val]
|
||
else:
|
||
values = [str(value)]
|
||
if not set(values).intersection(set(info_values)):
|
||
match_flag = False
|
||
if match_flag:
|
||
return key
|
||
return ""
|
||
|
||
async def advanced_title_search_generator(self, params: SearchParams, first_page: bool = True
|
||
) -> AsyncGenerator[TitleEdge, None]:
|
||
await self._async_update_hash()
|
||
sha256 = self._imdb_api_hash.advanced_title_search
|
||
if not first_page and params in self._title_generators:
|
||
return self._title_generators[params]
|
||
generator = self.official_api_client.advanced_title_search_generator(params, sha256)
|
||
self._title_generators[params] = generator
|
||
if len(self._title_generators) > ImdbHelper.MAX_STATES:
|
||
_, popped = self._title_generators.popitem(last=False)
|
||
await popped.aclose()
|
||
return generator
|
||
|
||
async def async_advanced_title_search(self, params: SearchParams, first_page: bool = True, count: int = 50
|
||
) -> List[TitleEdge]:
|
||
edges: List[TitleEdge] = []
|
||
generator = await self.advanced_title_search_generator(params=params, first_page=first_page)
|
||
try:
|
||
async for edge in generator:
|
||
edges.append(edge)
|
||
if len(edges) >= count:
|
||
break
|
||
except PersistedQueryNotFound:
|
||
await self.async_fetch_hash.cache_clear()
|
||
except RuntimeError:
|
||
pass
|
||
return edges
|
||
|
||
def _tv_release_data_by_season(self, title_id: str) -> Optional[Dict[str, ImdbapiPrecisionDate]]:
|
||
seasons_dict = {}
|
||
for episode in self.imdbapi_client.episodes_generator(title_id):
|
||
s = episode.season
|
||
if not seasons_dict.get(s):
|
||
seasons_dict[s] = episode.release_date
|
||
return seasons_dict
|
||
|
||
async def _async_tv_release_data_by_season(self, title_id: str) -> Optional[Dict[str, ImdbapiPrecisionDate]]:
|
||
seasons_dict = {}
|
||
async for episode in self.imdbapi_client.async_episodes_generator(title_id):
|
||
s = episode.season
|
||
if not seasons_dict.get(s):
|
||
seasons_dict[s] = episode.release_date
|
||
return seasons_dict
|
||
|
||
def match_by(self, name: str, mtype: MediaType | None = None, year: str | None = None) -> ImdbMediaInfo | None:
|
||
"""
|
||
根据名称同时查询电影和电视剧,没有类型也没有年份时使用
|
||
|
||
:param name: 识别的文件名或种子名
|
||
:param mtype: 类型:电影、电视剧
|
||
:param year: 年份,如要是季集需要是首播年份
|
||
:return: 匹配的媒体信息
|
||
"""
|
||
|
||
mtypes = [MediaType.MOVIE, MediaType.TV] if not mtype else [mtype]
|
||
search_types: List[ImdbType] = []
|
||
if MediaType.TV in mtypes:
|
||
search_types.extend([ImdbType.TV_SERIES, ImdbType.TV_MINI_SERIES, ImdbType.TV_SPECIAL])
|
||
if MediaType.MOVIE in mtypes:
|
||
search_types.extend([ImdbType.MOVIE, ImdbType.TV_MOVIE])
|
||
if year:
|
||
multi_res = self.imdbapi_client.advanced_search(query=name, year=int(year),
|
||
media_types=search_types)
|
||
else:
|
||
multi_res = self.imdbapi_client.advanced_search(query=name, media_types=search_types)
|
||
ret_info = None
|
||
if multi_res is None or len(multi_res) == 0:
|
||
logger.debug(f"{name} 未找到相关媒体息!")
|
||
return None
|
||
multi_res = [r for r in multi_res if r.id and ImdbHelper.type_to_mtype(r.type.value) in mtypes]
|
||
multi_res = sorted(
|
||
multi_res,
|
||
key=lambda x: ('1' if x.type in [ImdbType.MOVIE, ImdbType.TV_MOVIE] else '0') + f"{x.start_year}",
|
||
reverse=True
|
||
)
|
||
items = self.official_api_client.vertical_list_page_items([x.id for x in multi_res])
|
||
titles = items.titles if items else []
|
||
|
||
for result in multi_res:
|
||
title = next((t for t in titles if t.id == result.id), None)
|
||
if not title:
|
||
continue
|
||
title_akas = title.akas
|
||
akas = [edge.node for edge in title_akas.edges] if title_akas is not None else []
|
||
start_year = result.start_year
|
||
if year and str(start_year) != year:
|
||
continue
|
||
if ImdbHelper.compare_names(name, [result.primary_title or '', result.original_title or '']):
|
||
ret_info = ImdbMediaInfo.from_title(result, akas=akas)
|
||
return ret_info
|
||
names = [edge.node.text for edge in title.akas.edges] if title.akas is not None else []
|
||
if ImdbHelper.compare_names(name, names):
|
||
ret_info = ImdbMediaInfo.from_title(result, akas=akas)
|
||
return ret_info
|
||
return ret_info
|
||
|
||
async def async_match_by(self, name: str, mtype: Optional[MediaType] = None, year: Optional[str] = None
|
||
) -> Optional[ImdbMediaInfo]:
|
||
mtypes = [MediaType.MOVIE, MediaType.TV] if not mtype else [mtype]
|
||
search_types: List[ImdbType] = []
|
||
if MediaType.TV in mtypes:
|
||
search_types.extend([ImdbType.TV_SERIES, ImdbType.TV_MINI_SERIES, ImdbType.TV_SPECIAL])
|
||
if MediaType.MOVIE in mtypes:
|
||
search_types.extend([ImdbType.MOVIE, ImdbType.TV_MOVIE])
|
||
if year:
|
||
multi_res = await self.imdbapi_client.async_advanced_search(query=name, year=int(year),
|
||
media_types=search_types)
|
||
else:
|
||
multi_res = await self.imdbapi_client.async_advanced_search(query=name, media_types=search_types)
|
||
ret_info = None
|
||
if multi_res is None or len(multi_res) == 0:
|
||
logger.debug(f"{name} 未找到相关媒体息!")
|
||
return None
|
||
multi_res = [r for r in multi_res if r.id and ImdbHelper.type_to_mtype(r.type.value) in mtypes]
|
||
multi_res = sorted(
|
||
multi_res,
|
||
key=lambda x: ('1' if x.type in [ImdbType.MOVIE, ImdbType.TV_MOVIE] else '0') + f"{x.start_year}",
|
||
reverse=True
|
||
)
|
||
items = await self.official_api_client.async_vertical_list_page_items([x.id for x in multi_res])
|
||
titles = items.titles if items else []
|
||
|
||
for result in multi_res:
|
||
title = next((t for t in titles if t.id == result.id), None)
|
||
if not title:
|
||
continue
|
||
title_akas = title.akas
|
||
akas = [edge.node for edge in title_akas.edges] if title_akas is not None else []
|
||
start_year = result.start_year
|
||
if year and str(start_year) != year:
|
||
continue
|
||
if ImdbHelper.compare_names(name, [result.primary_title or '', result.original_title or '']):
|
||
ret_info = ImdbMediaInfo.from_title(result, akas=akas)
|
||
return ret_info
|
||
names = [edge.node.text for edge in title.akas.edges] if title.akas is not None else []
|
||
if ImdbHelper.compare_names(name, names):
|
||
ret_info = ImdbMediaInfo.from_title(result, akas=akas)
|
||
return ret_info
|
||
return ret_info
|
||
|
||
def match_by_season(self, name: str, season_year: str, season_number: int) -> Optional[ImdbMediaInfo]:
|
||
"""
|
||
根据电视剧的名称和季的年份及序号匹配 IMDb
|
||
|
||
:param name: 识别的文件名或者种子名
|
||
:param season_year: 季的年份
|
||
:param season_number: 季序号
|
||
:return: 匹配的媒体信息
|
||
"""
|
||
|
||
def __season_match(imdb_id: str, _season_year: str, _season_number: int) -> bool:
|
||
release_dates = self._tv_release_data_by_season(imdb_id)
|
||
for s, release_date in release_dates.items():
|
||
if not release_date or not release_date.year:
|
||
continue
|
||
if str(release_date.year) == _season_year and s == str(_season_number):
|
||
return True
|
||
return False
|
||
|
||
search_types = [ImdbType.TV_SERIES, ImdbType.TV_MINI_SERIES, ImdbType.TV_SPECIAL]
|
||
res = self.imdbapi_client.advanced_search(query=name, media_types=search_types)
|
||
if not res:
|
||
logger.debug(f"{name} 未找到季{season_number}相关信息!")
|
||
return None
|
||
tvs: List[ImdbApiTitle] = [r for r in res if r.id and ImdbHelper.type_to_mtype(r.type.value) == MediaType.TV]
|
||
tvs = sorted(tvs, key=lambda x: x.start_year or 0, reverse=True)
|
||
items = self.official_api_client.vertical_list_page_items([x.id for x in tvs])
|
||
titles = items.titles if items else []
|
||
titles_dict: Dict[str, ImdbTitle] = {}
|
||
for title in titles:
|
||
titles_dict[title.id] = title
|
||
for tv in tvs:
|
||
# 年份
|
||
title = titles_dict.get(tv.id)
|
||
if not title:
|
||
continue
|
||
akas: List[AkasNode] = [e.node for e in title.akas.edges]
|
||
tv_year = tv.start_year
|
||
if self.compare_names(name, [tv.primary_title or '', tv.original_title or '']) and str(tv_year) == season_year:
|
||
info = ImdbMediaInfo.from_title(tv, akas=akas)
|
||
return info
|
||
names = [aka.text for aka in akas]
|
||
if not tv or not self.compare_names(name, names):
|
||
continue
|
||
if __season_match(imdb_id=tv.id, _season_year=season_year, _season_number=season_number):
|
||
info = ImdbMediaInfo.from_title(tv, akas=akas)
|
||
return info
|
||
return None
|
||
|
||
async def async_match_by_season(self, name: str, season_year: str, season_number: int) -> Optional[ImdbMediaInfo]:
|
||
|
||
async def __season_match(imdb_id: str, _season_year: str, _season_number: int) -> bool:
|
||
release_dates = await self._async_tv_release_data_by_season(imdb_id)
|
||
if not release_dates:
|
||
return False
|
||
for s, release_date in release_dates.items():
|
||
if not release_date or not release_date.year:
|
||
continue
|
||
if str(release_date.year) == _season_year and s == str(_season_number):
|
||
return True
|
||
return False
|
||
|
||
search_types = [ImdbType.TV_SERIES, ImdbType.TV_MINI_SERIES, ImdbType.TV_SPECIAL]
|
||
res = await self.imdbapi_client.async_advanced_search(query=name, media_types=search_types)
|
||
if not res:
|
||
logger.debug(f"{name} 未找到季{season_number}相关信息!")
|
||
return None
|
||
tvs: List[ImdbApiTitle] = [r for r in res if r.id and ImdbHelper.type_to_mtype(r.type.value) == MediaType.TV]
|
||
tvs = sorted(tvs, key=lambda x: x.start_year or 0, reverse=True)
|
||
items = await self.official_api_client.async_vertical_list_page_items([x.id for x in tvs])
|
||
titles = items.titles if items else []
|
||
titles_dict: Dict[str, ImdbTitle] = {}
|
||
for title in titles:
|
||
titles_dict[title.id] = title
|
||
for tv in tvs:
|
||
# 年份
|
||
title = titles_dict.get(tv.id)
|
||
if not title:
|
||
continue
|
||
akas: List[AkasNode] = [e.node for e in title.akas.edges]
|
||
tv_year = tv.start_year
|
||
if self.compare_names(name, [tv.primary_title or '', tv.original_title or '']) and str(tv_year) == season_year:
|
||
info = ImdbMediaInfo.from_title(tv, akas=akas)
|
||
return info
|
||
names = [aka.text for aka in akas]
|
||
if not tv or not self.compare_names(name, names):
|
||
continue
|
||
if await __season_match(imdb_id=tv.id, _season_year=season_year, _season_number=season_number):
|
||
info = ImdbMediaInfo.from_title(tv, akas=akas)
|
||
return info
|
||
return None
|
||
|
||
def match(self, name: str,
|
||
mtype: MediaType,
|
||
year: Optional[str] = None,
|
||
season_year: Optional[str] = None,
|
||
season_number: Optional[int] = None,
|
||
) -> Optional[ImdbMediaInfo]:
|
||
"""
|
||
搜索 IMDb 中的媒体信息,匹配返回一条尽可能正确的信息
|
||
|
||
:param name: 检索的名称
|
||
:param mtype: 类型:电影、电视剧
|
||
:param year: 年份,如要是季集需要是首播年份
|
||
:param season_year: 当前季集年份
|
||
:param season_number: 季集,整数
|
||
:return: 匹配的媒体信息
|
||
"""
|
||
if not name:
|
||
return None
|
||
info = None
|
||
if mtype == MediaType.TV:
|
||
# 有当前季和当前季集年份,使用精确匹配
|
||
if season_year and season_number:
|
||
logger.debug(f"正在识别{mtype.value}:{name}, 季集={season_number}, 季集年份={season_year} ...")
|
||
info = self.match_by_season(name, season_year, season_number)
|
||
if info:
|
||
return info
|
||
year_range = [year, str(int(year) + 1), str(int(year) - 1)] if year else [None]
|
||
for year in year_range:
|
||
logger.debug(f"正在识别{mtype.value}:{name}, 年份={year} ...")
|
||
info = self.match_by(name, mtype, year)
|
||
if info:
|
||
break
|
||
return info
|
||
|
||
async def async_match(self, name: str,
|
||
mtype: MediaType,
|
||
year: Optional[str] = None,
|
||
season_year: Optional[str] = None,
|
||
season_number: Optional[int] = None,
|
||
) -> Optional[ImdbMediaInfo]:
|
||
|
||
if not name:
|
||
return None
|
||
info = None
|
||
if mtype == MediaType.TV:
|
||
# 有当前季和当前季集年份,使用精确匹配
|
||
if season_year and season_number:
|
||
logger.debug(f"正在识别{mtype.value}:{name}, 季集={season_number}, 季集年份={season_year} ...")
|
||
info = await self.async_match_by_season(name, season_year, season_number)
|
||
if info:
|
||
return info
|
||
year_range = [year, str(int(year) + 1), str(int(year) - 1)] if year else [None]
|
||
for year in year_range:
|
||
logger.debug(f"正在识别{mtype.value}:{name}, 年份={year} ...")
|
||
info = await self.async_match_by(name, mtype, year)
|
||
if info:
|
||
break
|
||
return info
|
||
|
||
def update_info(self, title_id: str, info: ImdbMediaInfo) -> ImdbMediaInfo:
|
||
"""
|
||
Given a Title ID, update its media information.
|
||
|
||
:param title_id: IMDb ID.
|
||
:param info: Media information to be updated.
|
||
:return: IMDb info.
|
||
"""
|
||
details = self.imdbapi_client.title(title_id) or info
|
||
akas = info.akas
|
||
if not akas:
|
||
resp = self.imdbapi_client.akas(title_id)
|
||
akas = resp.akas if resp else []
|
||
credit_list = [credit for credit in self.imdbapi_client.credits_generator(title_id)]
|
||
episodes = [episode for episode in self.imdbapi_client.episodes_generator(title_id)]
|
||
images = [image for image in self.imdbapi_client.images_generator(title_id)]
|
||
return ImdbMediaInfo.from_title(details, akas=akas, api_credits=credit_list, episodes=episodes, images=images)
|
||
|
||
async def async_update_info(self, title_id: str, info: ImdbMediaInfo) -> ImdbMediaInfo:
|
||
details = await self.imdbapi_client.async_title(title_id) or info
|
||
akas = info.akas
|
||
if not akas:
|
||
resp = await self.imdbapi_client.async_akas(title_id)
|
||
akas = resp.akas if resp else []
|
||
credit_list = [credit async for credit in self.imdbapi_client.async_credits_generator(title_id)]
|
||
episodes = [episode async for episode in self.imdbapi_client.async_episodes_generator(title_id)]
|
||
images = [image async for image in self.imdbapi_client.async_images_generator(title_id)]
|
||
return ImdbMediaInfo.from_title(details, akas=akas, api_credits=credit_list, episodes=episodes, images=images)
|
||
|
||
@staticmethod
|
||
def convert_mediainfo(info: ImdbMediaInfo) -> MediaInfo:
|
||
mediainfo = MediaInfo()
|
||
mediainfo.source = 'imdb'
|
||
mediainfo.type = ImdbHelper.type_to_mtype(info.type.value)
|
||
mediainfo.title = info.primary_title or ""
|
||
mediainfo.year = f"{info.start_year}" if info.start_year else ""
|
||
mediainfo.imdb_id = info.id
|
||
mediainfo.overview = info.plot or ""
|
||
if info.spoken_languages:
|
||
original_language = info.spoken_languages[0].code
|
||
if original_language:
|
||
mediainfo.original_language = original_language
|
||
if info.original_title:
|
||
mediainfo.original_title = info.original_title
|
||
mediainfo.names = [aka.text for aka in info.akas]
|
||
if info.origin_countries:
|
||
mediainfo.origin_country = [origin_country.code for origin_country in info.origin_countries]
|
||
if info.primary_image and info.primary_image.url:
|
||
mediainfo.poster_path = info.primary_image.url
|
||
if info.images:
|
||
mediainfo.backdrop_path = info.backdrop_path() # noqa
|
||
mediainfo.genres = [{"id": genre, "name": genre} for genre in info.genres or []]
|
||
directors = []
|
||
actors = []
|
||
for credit in (info.credits or []):
|
||
if not credit.name:
|
||
continue
|
||
if credit.category == 'DIRECTOR':
|
||
directors.append({'name': f"{credit.name.display_name or ''}"})
|
||
elif credit.category in ['CAST', 'ACTOR', 'ACTRESS']:
|
||
actors.append({'name': f"{credit.name.display_name or ''}"})
|
||
mediainfo.director = directors
|
||
mediainfo.actor = actors
|
||
vote = info.rating.aggregate_rating if info.rating and info.rating.aggregate_rating else None
|
||
if vote:
|
||
mediainfo.vote_average = round(float(vote), 1)
|
||
season_years: Dict[int, int] = {}
|
||
if mediainfo.type == MediaType.TV:
|
||
for episode in info.episodes:
|
||
ep_season = episode.season
|
||
if ep_season is None:
|
||
continue
|
||
season = int(ep_season) if StringUtils.is_number(ep_season) else 0
|
||
if season not in season_years:
|
||
if episode.release_date and episode.release_date.year:
|
||
season_years[season] = episode.release_date.year
|
||
else:
|
||
season_years[season] = 0
|
||
mediainfo.seasons.setdefault(season, []).append(episode)
|
||
mediainfo.season_years[season] = season_years[season]
|
||
|
||
return mediainfo
|
||
|
||
@staticmethod
|
||
def title_to_mediainfo(info: ImdbTitle) -> schemas.MediaInfo:
|
||
mediainfo = schemas.MediaInfo(mediaid_prefix="imdb", media_id=info.id, imdb_id=info.id)
|
||
mediainfo.title = info.title_text.text if info.title_text else ''
|
||
if ImdbHelper.type_to_mtype(info.title_type.id.value) == MediaType.TV:
|
||
mediainfo.type = '电视剧'
|
||
elif ImdbHelper.type_to_mtype(info.title_type.id.value) == MediaType.MOVIE:
|
||
mediainfo.type = '电影'
|
||
if info.release_year:
|
||
mediainfo.year = f"{info.release_year.year}"
|
||
mediainfo.title_year = f"{mediainfo.title} ({mediainfo.year})" if mediainfo.year else mediainfo.title
|
||
if info.primary_image:
|
||
mediainfo.poster_path = info.primary_image.poster_path()
|
||
if info.ratings_summary:
|
||
mediainfo.vote_average = info.ratings_summary.aggregate_rating
|
||
if info.runtime:
|
||
mediainfo.runtime = info.runtime.seconds
|
||
if info.plot and info.plot.plot_text:
|
||
mediainfo.overview = info.plot.plot_text.plain_text
|
||
if info.release_date:
|
||
mediainfo.release_date = ImdbHelper.release_date_string(info.release_date)
|
||
|
||
return mediainfo
|