mirror of
https://github.com/d0zingcat/MoviePilot-Plugins.git
synced 2026-05-13 15:09:12 +00:00
fix(ImdbSource): 修复插件依赖问题
This commit is contained in:
@@ -433,11 +433,12 @@
|
||||
"name": "IMDb源",
|
||||
"description": "让探索支持IMDb数据源。",
|
||||
"labels": "探索",
|
||||
"version": "1.3.2",
|
||||
"version": "1.3.3",
|
||||
"icon": "IMDb_IOS-OSX_App.png",
|
||||
"author": "wumode",
|
||||
"level": 1,
|
||||
"history": {
|
||||
"v1.3.3": "修复依赖问题",
|
||||
"v1.3.2": "更新 API query hash",
|
||||
"v1.3.1": "修复按日期排序错误",
|
||||
"v1.3": "优化网络连接",
|
||||
|
||||
@@ -1,16 +1,11 @@
|
||||
import re
|
||||
import json
|
||||
from typing import Optional, Any, List, Dict, Tuple
|
||||
from datetime import datetime
|
||||
|
||||
from app.core.config import settings
|
||||
from app.core.event import eventmanager, Event
|
||||
from app.log import logger
|
||||
from app.plugins import _PluginBase
|
||||
from app.schemas import DiscoverSourceEventData, MediaRecognizeConvertEventData, RecommendSourceEventData
|
||||
from app.schemas.types import ChainEventType, MediaType
|
||||
from app.core.meta import MetaBase
|
||||
from app.core.context import MediaInfo
|
||||
from app.plugins.imdbsource.imdb_helper import ImdbHelper
|
||||
from app import schemas
|
||||
from app.utils.http import RequestUtils
|
||||
@@ -24,7 +19,7 @@ class ImdbSource(_PluginBase):
|
||||
# 插件图标
|
||||
plugin_icon = "IMDb_IOS-OSX_App.png"
|
||||
# 插件版本
|
||||
plugin_version = "1.3.2"
|
||||
plugin_version = "1.3.3"
|
||||
# 插件作者
|
||||
plugin_author = "wumode"
|
||||
# 作者主页
|
||||
@@ -130,132 +125,6 @@ class ImdbSource(_PluginBase):
|
||||
# return {"recognize_media": (self.recognize_media, ModuleExecutionType.Hijack)}
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
# @MediaInfo.source_processor("imdb")
|
||||
def process_imdb_info(mediainfo: MediaInfo, info: dict):
|
||||
"""处理 IMDB 信息"""
|
||||
mediainfo.source_info["imdb"] = info
|
||||
if isinstance(info.get('media_type'), MediaType):
|
||||
mediainfo.type = info.get('media_type')
|
||||
elif info.get('media_type'):
|
||||
mediainfo.type = MediaType.MOVIE if info.get("type") == "movie" else MediaType.TV
|
||||
mediainfo.title = info.get("title")
|
||||
mediainfo.release_date = info.get('release_date')
|
||||
if info.get("id"):
|
||||
mediainfo.source_id["imdb"] = info.get("id")
|
||||
mediainfo.imdb_id = info.get('id')
|
||||
if not mediainfo.source_id:
|
||||
return
|
||||
mediainfo.vote_average = round(float(info.get("rating").get("aggregate_rating")), 1) if info.get("rating") else 0
|
||||
mediainfo.overview = info.get('plot')
|
||||
mediainfo.genre_ids = info.get('genre') or []
|
||||
# 风格
|
||||
if not mediainfo.genres:
|
||||
mediainfo.genres = [{"id": genre, "name": genre} for genre in info.get("genres") or []]
|
||||
if info.get('spoken_languages', []):
|
||||
mediainfo.original_language = info.get('spoken_languages', [])[0].get("name")
|
||||
mediainfo.en_title = info.get('primary_title')
|
||||
mediainfo.title = info.get('primary_title')
|
||||
mediainfo.original_title = info.get('original_title')
|
||||
# mediainfo.release_date = info.get('start_year')
|
||||
mediainfo.year = info.get('start_year')
|
||||
if info.get('posters', []):
|
||||
mediainfo.poster_path = info.get("posters", [])[0].get("url")
|
||||
directors = []
|
||||
if info.get('directors', []):
|
||||
for dn in info.get('directors', []):
|
||||
director = dn.get("name")
|
||||
if not director:
|
||||
continue
|
||||
d_ = {"name": director.get("display_name"), "id": director.get("id"), "avatars": director.get("avatars")}
|
||||
directors.append(d_)
|
||||
if info.get('writers', []):
|
||||
for wn in info.get('writers', []):
|
||||
writer = wn.get("name")
|
||||
d_ = {"name": writer.get("display_name"), "id": writer.get("id"), "avatars": writer.get("avatars")}
|
||||
directors.append(d_)
|
||||
mediainfo.directors = directors
|
||||
actors = []
|
||||
if info.get('casts', []):
|
||||
for cast in info.get('casts', []):
|
||||
cn = cast.get("name", {})
|
||||
character_name = cast.get("characters")[0] if cast.get("characters") else ''
|
||||
d_ = {"name": cn.get("display_name"), "id": cn.get("id"),
|
||||
"avatars": cn.get("avatars"), "character": character_name}
|
||||
actors.append(d_)
|
||||
|
||||
def recognize_media(self, meta: MetaBase = None,
|
||||
mtype: MediaType = None,
|
||||
imdbid: Optional[str] = None,
|
||||
episode_group: Optional[str] = None,
|
||||
cache: Optional[bool] = True,
|
||||
**kwargs) -> Optional[MediaInfo]:
|
||||
logger.warn(f"IMDb Source: {MetaBase.title}")
|
||||
if not self._imdb_helper:
|
||||
return None
|
||||
if not imdbid and not meta:
|
||||
return None
|
||||
if not meta:
|
||||
# 未提供元数据时,直接使用imdbid查询,不使用缓存
|
||||
cache_info = {}
|
||||
elif not meta.name:
|
||||
logger.warn("识别媒体信息时未提供元数据名称")
|
||||
return None
|
||||
cache_info = {}
|
||||
if not cache_info or not cache:
|
||||
info = None
|
||||
if imdbid:
|
||||
info = self._imdb_helper.get_info(mtype=mtype, imdbid=imdbid)
|
||||
if not info and meta:
|
||||
info = {}
|
||||
names = list(dict.fromkeys([k for k in [meta.cn_name, meta.en_name] if k]))
|
||||
for name in names:
|
||||
if meta.begin_season:
|
||||
logger.info(f"正在识别 {name} 第{meta.begin_season}季 ...")
|
||||
else:
|
||||
logger.info(f"正在识别 {name} ...")
|
||||
if meta.type == MediaType.UNKNOWN and not meta.year:
|
||||
info = self._imdb_helper.match_multi(name)
|
||||
else:
|
||||
if meta.type == MediaType.TV:
|
||||
# 确定是电视
|
||||
info = self._imdb_helper.match(name=name,
|
||||
year=meta.year,
|
||||
mtype=meta.type,season_year=meta.year,
|
||||
season_number=meta.begin_season)
|
||||
if not info:
|
||||
# 去掉年份再查一次
|
||||
info = self._imdb_helper.match(name=name, mtype=meta.type)
|
||||
else:
|
||||
# 有年份先按电影查
|
||||
info = self._imdb_helper.match(name=name, year=meta.year, mtype=MediaType.MOVIE)
|
||||
# 没有再按电视剧查
|
||||
if not info:
|
||||
info = self._imdb_helper.match(name=name,
|
||||
year=meta.year,
|
||||
mtype=MediaType.TV)
|
||||
if not info:
|
||||
# 去掉年份和类型再查一次
|
||||
info = self._imdb_helper.match_multi(name=name)
|
||||
if info:
|
||||
break
|
||||
else:
|
||||
info = None
|
||||
if info:
|
||||
# mediainfo = MediaInfo(source_info={"imdb": info})
|
||||
mediainfo = MediaInfo()
|
||||
if meta:
|
||||
logger.info(f"{meta.name} IMDB识别结果:{mediainfo.type.value} "
|
||||
f"{mediainfo.title_year} "
|
||||
f"{mediainfo.imdb_id}")
|
||||
else:
|
||||
logger.info(f"{imdbid} IMDB识别结果:{mediainfo.type.value} "
|
||||
f"{mediainfo.title_year}")
|
||||
return mediainfo
|
||||
|
||||
logger.info(f"{meta.name if meta else imdbid} 未匹配到IMDB媒体信息")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def __movie_to_media(movie_info: dict) -> schemas.MediaInfo:
|
||||
title = ""
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
import re
|
||||
from typing import Optional, Any, Dict, List, Tuple
|
||||
from typing import Optional, Any, Dict, Tuple
|
||||
from collections import OrderedDict
|
||||
from dataclasses import dataclass
|
||||
|
||||
import requests
|
||||
from requests_html import HTMLSession
|
||||
import ijson
|
||||
|
||||
from app.log import logger
|
||||
from app.utils.http import RequestUtils
|
||||
from app.utils.string import StringUtils
|
||||
from app.utils.common import retry
|
||||
from app.schemas.types import MediaType
|
||||
from app.core.cache import cached
|
||||
@@ -145,7 +142,6 @@ class ImdbHelper:
|
||||
|
||||
def __init__(self, proxies=None):
|
||||
self._proxies = proxies
|
||||
self._req_utils = RequestUtils(headers=self._imdb_headers, session=HTMLSession(), timeout=10, proxies=proxies)
|
||||
self._imdb_req = RequestUtils(accept_type="application/json",
|
||||
content_type="application/json",
|
||||
headers=self._imdb_headers,
|
||||
@@ -172,68 +168,6 @@ class ImdbHelper:
|
||||
info = data.get("data").get("title", None)
|
||||
return info
|
||||
|
||||
@cached(maxsize=1000, ttl=3600)
|
||||
def __episodes_by_season(self, imdbid: str, build_id: str, season: str) -> Optional[Dict]:
|
||||
if not build_id or not season:
|
||||
return None
|
||||
prefix = "pageProps.contentData.section"
|
||||
url = (f"https://www.imdb.com/_next/data/{build_id}"
|
||||
f"/en-US/title/{imdbid}/episodes.json?season={season}&ref_=ttep&tconst={imdbid}")
|
||||
response = self._req_utils.get_res(url)
|
||||
if not response or response.status_code != 200:
|
||||
return None
|
||||
json_content = response.text
|
||||
try:
|
||||
section = next(ijson.items(json_content, prefix))
|
||||
except StopIteration:
|
||||
logger.warn(f"No data found at prefix: {prefix}")
|
||||
return None
|
||||
except (ijson.JSONError, ValueError) as e:
|
||||
logger.warn(f"JSON parsing error: {e}")
|
||||
return None
|
||||
except TypeError as e:
|
||||
logger.warn(f"Invalid input type: {e}")
|
||||
return None
|
||||
return section
|
||||
|
||||
@cached(maxsize=1000, ttl=3600)
|
||||
def __episodes(self, imdbid: str) -> Optional[Dict]:
|
||||
prefix = "props.pageProps.contentData.section"
|
||||
url = f"https://www.imdb.com/title/{imdbid}/episodes/"
|
||||
|
||||
response = self._req_utils.get_res(url)
|
||||
if not response or response.status_code != 200:
|
||||
return
|
||||
script_content = response.html.xpath('//script[@id="__NEXT_DATA__"]/text()')
|
||||
if len(script_content) == 0:
|
||||
return None
|
||||
json_content = script_content[0]
|
||||
# 直接定位到目标路径提取 items
|
||||
try:
|
||||
section = next(ijson.items(json_content, prefix))
|
||||
except StopIteration:
|
||||
logger.warn(f"No data found at prefix: {prefix}")
|
||||
return None
|
||||
except (ijson.JSONError, ValueError) as e:
|
||||
logger.warn(f"JSON parsing error: {e}")
|
||||
return None
|
||||
except TypeError as e:
|
||||
logger.warn(f"Invalid input type: {e}")
|
||||
return None
|
||||
total_seasons = []
|
||||
for s in section.get("seasons"):
|
||||
if s.get("value") and s.get("value") not in total_seasons:
|
||||
total_seasons.append(s.get("value"))
|
||||
build_id = next(ijson.items(json_content, 'buildId'))
|
||||
current_season = section.get('currentSeason') or '1'
|
||||
total_seasons.remove(current_season)
|
||||
for season in total_seasons:
|
||||
section_next = self.__episodes_by_season(imdbid, build_id=build_id, season=season)
|
||||
if section_next:
|
||||
section["episodes"]["items"].extend(section_next.get("episodes", {}).get("items", []))
|
||||
section["episodes"]["total"] += section_next.get("episodes", {}).get("total", 0)
|
||||
return section
|
||||
|
||||
@retry(Exception, logger=logger)
|
||||
@cached(maxsize=32, ttl=1800)
|
||||
def __request(self, params: Dict, sha256) -> Optional[Dict]:
|
||||
@@ -456,352 +390,3 @@ class ImdbHelper:
|
||||
return None
|
||||
self.hash_status[operation_name] = True
|
||||
return data.get('advancedTitleSearch')
|
||||
|
||||
def __known_as(self, imdbid: str,
|
||||
sha256='48d4f7bfa73230fb550147bd4704d8050080e65fe2ad576da6276cac2330e446') -> Optional[List]:
|
||||
"""
|
||||
获取电影和电视别名
|
||||
:param imdbid: IMBd id
|
||||
:return: 别名列表
|
||||
"""
|
||||
operation_name = "TitleAkasPaginated"
|
||||
self.__update_hash()
|
||||
if self._imdb_api_hash.get(operation_name):
|
||||
sha256 = self._imdb_api_hash[operation_name]
|
||||
params = {"operationName": operation_name,
|
||||
"variables": {"const": imdbid, "first": 50, "locale": "en-US", "originalTitleText": False}}
|
||||
data = self.__request(params=params, sha256=sha256)
|
||||
if not data:
|
||||
return None
|
||||
if 'error' in data:
|
||||
error = data['error']
|
||||
if error:
|
||||
logger.error(f"Error querying {operation_name} API: {error.get('message')}")
|
||||
if error.get('message') == 'PersistedQueryNotFound':
|
||||
self.hash_status[operation_name] = False
|
||||
return None
|
||||
self.hash_status[operation_name] = True
|
||||
if not data.get("data", {}).get("title", {}).get("akas", {}).get("total"):
|
||||
return None
|
||||
akas = []
|
||||
for edge in data["data"]["title"]["akas"]["edges"]:
|
||||
title = edge.get("node", {}).get("displayableProperty", {}).get("value", {}).get("plainText")
|
||||
if not title:
|
||||
continue
|
||||
country = edge.get("node", {}).get("country", {})
|
||||
language = edge.get("node", {}).get("language", {})
|
||||
akas.append({"title": title, "country": country, "language": language})
|
||||
return akas
|
||||
|
||||
def __search_on_imdb(self, term, mtype, release_year=None):
|
||||
params = f"{term}"
|
||||
if release_year is not None:
|
||||
params += f" {release_year}"
|
||||
ret = RequestUtils(
|
||||
accept_type="application/json",
|
||||
).get_res(f"{self._search_endpoint % params}")
|
||||
if not ret:
|
||||
return None
|
||||
data = ret.json()
|
||||
if "d" not in data:
|
||||
return None
|
||||
result = [d for d in data["d"] if d.get("qid") in self._qid_map.get(mtype)]
|
||||
return result
|
||||
|
||||
def search_tvs(self, title: str, year: str = None) -> List[dict]:
|
||||
if not title:
|
||||
return []
|
||||
if year:
|
||||
tvs = self.__search_on_imdb(title, MediaType.TV, year) or []
|
||||
else:
|
||||
tvs = self.__search_on_imdb(title, MediaType.TV, ) or []
|
||||
ret_infos = []
|
||||
for tv in tvs:
|
||||
# if title in tv.get("l"):
|
||||
# if self.__compare_names(title, [tv.get("l")]):
|
||||
# tv['media_type'] = MediaType.TV
|
||||
ret_infos.append(tv)
|
||||
return ret_infos
|
||||
|
||||
def search_movies(self, title: str, year: str = None) -> List[dict]:
|
||||
if not title:
|
||||
return []
|
||||
if year:
|
||||
movies = self.__search_on_imdb(title, MediaType.MOVIE, year) or []
|
||||
else:
|
||||
movies = self.__search_on_imdb(title, MediaType.MOVIE) or []
|
||||
ret_infos = []
|
||||
for movie in movies:
|
||||
# if title in movie.get("l"):
|
||||
# if self.__compare_names(title, [movie.get("l")]):
|
||||
# movie['media_type'] = MediaType.MOVIE
|
||||
ret_infos.append(movie)
|
||||
return ret_infos
|
||||
|
||||
@staticmethod
|
||||
def __compare_names(file_name: str, tmdb_names: list) -> bool:
|
||||
"""
|
||||
比较文件名是否匹配,忽略大小写和特殊字符
|
||||
:param file_name: 识别的文件名或者种子名
|
||||
:param tmdb_names: TMDB返回的译名
|
||||
:return: True or False
|
||||
"""
|
||||
if not file_name or not tmdb_names:
|
||||
return False
|
||||
if not isinstance(tmdb_names, list):
|
||||
tmdb_names = [tmdb_names]
|
||||
file_name = StringUtils.clear(file_name).upper()
|
||||
for tmdb_name in tmdb_names:
|
||||
tmdb_name = StringUtils.clear(tmdb_name).strip().upper()
|
||||
if file_name == tmdb_name:
|
||||
return True
|
||||
return False
|
||||
|
||||
def __search_movie_by_name(self, name: str, year: str) -> Optional[dict]:
|
||||
"""
|
||||
根据名称查询电影IMDB匹配
|
||||
:param name: 识别的文件名或种子名
|
||||
:param year: 电影上映日期
|
||||
:return: 匹配的媒体信息
|
||||
"""
|
||||
movies = self.search_movies(name, year=year)
|
||||
if (movies is None) or (len(movies) == 0):
|
||||
logger.debug(f"{name} 未找到相关电影信息!")
|
||||
return {}
|
||||
movies = sorted(
|
||||
movies,
|
||||
key=lambda x: str(x.get("y") or '0000'),
|
||||
reverse=True
|
||||
)
|
||||
for movie in movies:
|
||||
movie_year = f"{movie.get('y')}"
|
||||
if year and movie_year != year:
|
||||
# 年份不匹配
|
||||
continue
|
||||
# 匹配标题、原标题
|
||||
movie_info = self.imdbid(movie.get("id"))
|
||||
if not movie_info:
|
||||
continue
|
||||
if self.__compare_names(name, [movie_info.get("primary_title")]):
|
||||
return movie_info
|
||||
if movie_info.get("original_title") and self.__compare_names(name, [movie_info.get("original_title")]):
|
||||
return movie_info
|
||||
akas = self.__known_as(movie.get("id"))
|
||||
if not akas:
|
||||
continue
|
||||
akas_names = [item.get("title") for item in akas]
|
||||
if self.__compare_names(name, akas_names):
|
||||
return movie_info
|
||||
return {}
|
||||
|
||||
def __search_tv_by_name(self, name: str, year: str) -> Optional[dict]:
|
||||
"""
|
||||
根据名称查询电视剧IMDB匹配
|
||||
:param name: 识别的文件名或者种子名
|
||||
:param year: 电视剧的首播年份
|
||||
:return: 匹配的媒体信息
|
||||
"""
|
||||
tvs = self.search_tvs(name, year=year)
|
||||
if (tvs is None) or (len(tvs) == 0):
|
||||
logger.debug(f"{name} 未找到相关电影信息!")
|
||||
return {}
|
||||
tvs = sorted(
|
||||
tvs,
|
||||
key=lambda x: str(x.get("y") or '0000'),
|
||||
reverse=True
|
||||
)
|
||||
for tv in tvs:
|
||||
tv_year = f"{tv.get('y')}"
|
||||
if year and tv_year != year:
|
||||
# 年份不匹配
|
||||
continue
|
||||
# 匹配标题、原标题
|
||||
tv_info = self.imdbid(tv.get("id"))
|
||||
if not tv_info:
|
||||
continue
|
||||
if self.__compare_names(name, [tv_info.get("primary_title")]):
|
||||
return tv_info
|
||||
if tv_info.get("original_title") and self.__compare_names(name, [tv_info.get("original_title")]):
|
||||
return tv_info
|
||||
akas = self.__known_as(tv.get("id"))
|
||||
if not akas:
|
||||
continue
|
||||
akas_names = [item.get("title") for item in akas]
|
||||
if self.__compare_names(name, akas_names):
|
||||
return tv_info
|
||||
return {}
|
||||
|
||||
def __search_tv_by_season(self, name: str, season_year: str, season_number: int) -> Optional[dict]:
|
||||
"""
|
||||
根据电视剧的名称和季的年份及序号匹配IMDB
|
||||
:param name: 识别的文件名或者种子名
|
||||
:param season_year: 季的年份
|
||||
:param season_number: 季序号
|
||||
:return: 匹配的媒体信息
|
||||
"""
|
||||
|
||||
def __season_match(_tv_info: dict, _season_year: str) -> bool:
|
||||
tv_extra_info = self.__episodes(_tv_info.get("id"))
|
||||
if not tv_extra_info:
|
||||
return False
|
||||
release_year = []
|
||||
for item in tv_extra_info["episodes"]["items"]:
|
||||
if item.get("season") == season_number:
|
||||
release_year.append(item.get("releaseDate").get("year") or item.get("releaseYear"))
|
||||
first_release_year = min(release_year) if release_year else tv_extra_info["currentYear"]
|
||||
if first_release_year == _season_year:
|
||||
_tv_info["seasons"] = tv_extra_info["seasons"]
|
||||
_tv_info["episodes"] = tv_extra_info["episodes"]
|
||||
return True
|
||||
return False
|
||||
|
||||
tvs = self.search_tvs(title=name)
|
||||
if (tvs is None) or (len(tvs) == 0):
|
||||
logger.debug("%s 未找到季%s相关信息!" % (name, season_number))
|
||||
return {}
|
||||
tvs = sorted(
|
||||
tvs,
|
||||
key=lambda x: str(x.get('y') or '0000'),
|
||||
reverse=True
|
||||
)
|
||||
for tv in tvs:
|
||||
tv_info = self.imdbid(tv.get("id"))
|
||||
if not tv_info:
|
||||
continue
|
||||
tv_year = f"{tv.get('y')}" if tv.get('y') else None
|
||||
if (self.__compare_names(name, [tv_info.get('primary_title')])
|
||||
or (tv_info.get('original_title') and self.__compare_names(name, [tv_info.get('original_title')]))) \
|
||||
and (tv_year == str(season_year)):
|
||||
return tv_info
|
||||
akas = self.__known_as(tv.get("id"))
|
||||
if not akas:
|
||||
continue
|
||||
akas_names = [item.get("title") for item in akas]
|
||||
if not self.__compare_names(name, akas_names):
|
||||
continue
|
||||
if __season_match(_tv_info=tv_info, _season_year=season_year):
|
||||
return tv_info
|
||||
return None
|
||||
|
||||
def get_info(self,
|
||||
mtype: MediaType,
|
||||
imdbid: str) -> dict:
|
||||
"""
|
||||
给定IMDB号,查询一条媒体信息
|
||||
:param mtype: 类型:电影、电视剧,为空时都查(此时用不上年份)
|
||||
:param imdbid: IMDB的ID
|
||||
"""
|
||||
# 查询TMDB详情
|
||||
if mtype == MediaType.MOVIE:
|
||||
imdb_info = self.imdbid(imdbid)
|
||||
if imdb_info:
|
||||
imdb_info['media_type'] = MediaType.MOVIE
|
||||
elif mtype == MediaType.TV:
|
||||
imdb_info = self.imdbid(imdbid)
|
||||
if imdb_info:
|
||||
imdb_info['media_type'] = MediaType.TV
|
||||
tv_extra_info = self.__episodes(imdbid)
|
||||
imdb_info["seasons"] = tv_extra_info["seasons"]
|
||||
imdb_info["episodes"] = tv_extra_info["episodes"]
|
||||
else:
|
||||
imdb_info = None
|
||||
logger.warn(f"IMDb id:{imdbid} 未查询到媒体信息")
|
||||
return imdb_info
|
||||
|
||||
def match_multi(self, name: str) -> Optional[dict]:
|
||||
"""
|
||||
根据名称同时查询电影和电视剧,没有类型也没有年份时使用
|
||||
:param name: 识别的文件名或种子名
|
||||
:return: 匹配的媒体信息
|
||||
"""
|
||||
|
||||
multis = self.search_tvs(name) + self.search_movies(name)
|
||||
ret_info = {}
|
||||
if len(multis) == 0:
|
||||
logger.debug(f"{name} 未找到相关媒体息!")
|
||||
return {}
|
||||
else:
|
||||
multis = sorted(
|
||||
multis,
|
||||
key=lambda x: ("1" if x.get("media_type") == MediaType.MOVIE else "0") + str(x.get('y') or '0000'),
|
||||
reverse=True
|
||||
)
|
||||
media_t = MediaType.UNKNOWN
|
||||
for multi in multis:
|
||||
media_info = self.imdbid(multi.get("id"))
|
||||
if not media_info:
|
||||
continue
|
||||
if multi.get("media_type") == MediaType.MOVIE:
|
||||
if self.__compare_names(name, media_info.get('primary_title')) \
|
||||
or self.__compare_names(name, multi.get('primary_title')):
|
||||
ret_info = media_info
|
||||
media_t = MediaType.MOVIE
|
||||
break
|
||||
elif multi.get("media_type") == MediaType.TV:
|
||||
if self.__compare_names(name, media_info.get('primary_title')) \
|
||||
or self.__compare_names(name, multi.get('primary_title')):
|
||||
ret_info = media_info
|
||||
media_t = MediaType.TV
|
||||
break
|
||||
if ret_info and not isinstance(ret_info.get("media_type"), MediaType):
|
||||
ret_info['media_type'] = media_t
|
||||
return ret_info
|
||||
|
||||
def match(self, name: str,
|
||||
mtype: MediaType,
|
||||
year: Optional[str] = None,
|
||||
season_year: Optional[str] = None,
|
||||
season_number: Optional[int] = None,
|
||||
group_seasons: Optional[List[dict]] = None) -> Optional[dict]:
|
||||
"""
|
||||
搜索imdb中的媒体信息,匹配返回一条尽可能正确的信息
|
||||
:param name: 检索的名称
|
||||
:param mtype: 类型:电影、电视剧
|
||||
:param year: 年份,如要是季集需要是首播年份(first_air_date)
|
||||
:param season_year: 当前季集年份
|
||||
:param season_number: 季集,整数
|
||||
:param group_seasons: 集数组信息
|
||||
:return: TMDB的INFO,同时会将mtype赋值到media_type中
|
||||
"""
|
||||
if not name:
|
||||
return None
|
||||
info = {}
|
||||
if mtype != MediaType.TV:
|
||||
year_range = [year]
|
||||
if year:
|
||||
year_range.append(str(int(year) + 1))
|
||||
year_range.append(str(int(year) - 1))
|
||||
for year in year_range:
|
||||
logger.debug(
|
||||
f"正在识别{mtype.value}:{name}, 年份={year} ...")
|
||||
info = self.__search_movie_by_name(name, year)
|
||||
if info:
|
||||
info['media_type'] = MediaType.MOVIE
|
||||
break
|
||||
else:
|
||||
# 有当前季和当前季集年份,使用精确匹配
|
||||
if season_year and season_number:
|
||||
logger.debug(
|
||||
f"正在识别{mtype.value}:{name}, 季集={season_number}, 季集年份={season_year} ...")
|
||||
info = self.__search_tv_by_season(name,
|
||||
season_year,
|
||||
season_number)
|
||||
if not info:
|
||||
year_range = [year]
|
||||
if year:
|
||||
year_range.append(str(int(year) + 1))
|
||||
year_range.append(str(int(year) - 1))
|
||||
for year in year_range:
|
||||
logger.debug(
|
||||
f"正在识别{mtype.value}:{name}, 年份={year} ...")
|
||||
info = self.__search_tv_by_name(name, year)
|
||||
if info:
|
||||
break
|
||||
if info:
|
||||
info['media_type'] = MediaType.TV
|
||||
if not info.get("seasons"):
|
||||
tv_extra_info = self.__episodes(info.get('id'))
|
||||
if tv_extra_info:
|
||||
info["seasons"] = tv_extra_info["seasons"]
|
||||
info["episodes"] = tv_extra_info["episodes"]
|
||||
return info
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
graphene~=3.4.3
|
||||
ijson~=3.4.0
|
||||
requests-html~=0.10.0
|
||||
Reference in New Issue
Block a user