diff --git a/app/core/context.py b/app/core/context.py index d8a913dc..a28e8a49 100644 --- a/app/core/context.py +++ b/app/core/context.py @@ -310,21 +310,6 @@ class MediaInfo: if isinstance(self.type, str): self.type = MediaType(self.type) - def set_image(self, name: str, image: str): - """ - 设置图片地址 - """ - setattr(self, f"{name}_path", image) - - def get_image(self, name: str): - """ - 获取图片地址 - """ - try: - return getattr(self, f"{name}_path") - except AttributeError: - return None - def set_category(self, cat: str): """ 设置二级分类 diff --git a/app/modules/fanart/__init__.py b/app/modules/fanart/__init__.py index 8522ce6d..6a12df1e 100644 --- a/app/modules/fanart/__init__.py +++ b/app/modules/fanart/__init__.py @@ -1,4 +1,6 @@ +import asyncio import re +from pathlib import Path from typing import Optional, Tuple, Union from app.core.cache import cached @@ -6,7 +8,7 @@ from app.core.context import MediaInfo, settings from app.log import logger from app.modules import _ModuleBase from app.schemas.types import MediaType, ModuleType, OtherModulesType -from app.utils.http import RequestUtils +from app.utils.http import RequestUtils, AsyncRequestUtils class FanartModule(_ModuleBase): @@ -307,8 +309,12 @@ class FanartModule(_ModuleBase): _proxies: dict = settings.PROXY # Fanart Api - _movie_url: str = f'https://webservice.fanart.tv/v3/movies/%s?api_key={settings.FANART_API_KEY}' - _tv_url: str = f'https://webservice.fanart.tv/v3/tv/%s?api_key={settings.FANART_API_KEY}' + _movie_url: str = ( + f"https://webservice.fanart.tv/v3/movies/%s?api_key={settings.FANART_API_KEY}" + ) + _tv_url: str = ( + f"https://webservice.fanart.tv/v3/tv/%s?api_key={settings.FANART_API_KEY}" + ) def init_module(self) -> None: pass @@ -361,21 +367,112 @@ class FanartModule(_ModuleBase): :param mediainfo: 识别的媒体信息 :return: 更新后的媒体信息 """ + images = self.__obtain_fanart_images(mediainfo=mediainfo) + if not images: + return None + + self.__set_mediainfo_images(mediainfo=mediainfo, images=images) + return mediainfo + + async def async_obtain_images(self, mediainfo: MediaInfo) -> Optional[MediaInfo]: + """ + 获取图片(异步版本) + :param mediainfo: 识别的媒体信息 + :return: 更新后的媒体信息 + """ + images = await self.__async_obtain_fanart_images(mediainfo=mediainfo) + if not images: + return None + + self.__set_mediainfo_images(mediainfo=mediainfo, images=images) + return mediainfo + + @classmethod + def __set_mediainfo_images(cls, mediainfo: MediaInfo, images: dict) -> None: + """ + 显式回填 MediaInfo 支持的展示图片字段 + """ + for image_name, image_url in images.items(): + image_attr = cls.__mediainfo_image_attr(image_name) + if image_attr and not getattr(mediainfo, image_attr, None): + setattr(mediainfo, image_attr, image_url) + logger.debug(f"{mediainfo.title_year} 使用 Fanart 图片回填 {image_attr}:{image_name}") + + def metadata_img( + self, + mediainfo: MediaInfo, + season: Optional[int] = None, + episode: Optional[int] = None, + ) -> Optional[dict]: + """ + 获取图片名称和url + :param mediainfo: 媒体信息 + :param season: 季号 + :param episode: 集号 + """ + if episode is not None: + # Fanart 没有集图片 + return None + return self.__obtain_fanart_images(mediainfo=mediainfo, season=season) + + def __obtain_fanart_images( + self, mediainfo: MediaInfo, season: Optional[int] = None + ) -> Optional[dict]: + """ + 获取 Fanart 图片并转换为刮削图片名称 + """ + query = self.__fanart_query(mediainfo=mediainfo) + if not query: + return None + result = self.__request_fanart(*query) + return self.__extract_images(mediainfo=mediainfo, result=result, season=season) + + async def __async_obtain_fanart_images( + self, + mediainfo: MediaInfo, + season: Optional[int] = None, + ) -> Optional[dict]: + """ + 异步获取 Fanart 图片并转换为刮削图片名称 + """ + query = self.__fanart_query(mediainfo=mediainfo) + if not query: + return None + result = await self.__async_request_fanart(*query) + return self.__extract_images(mediainfo=mediainfo, result=result, season=season) + + @staticmethod + def __fanart_query( + mediainfo: MediaInfo, + ) -> Optional[Tuple[MediaType, Union[str, int]]]: + """ + 获取 Fanart 查询参数 + """ if not settings.FANART_ENABLE: return None if not mediainfo.tmdb_id and not mediainfo.tvdb_id: return None if mediainfo.type == MediaType.MOVIE: - result = self.__request_fanart(mediainfo.type, mediainfo.tmdb_id) - else: - if mediainfo.tvdb_id: - result = self.__request_fanart(mediainfo.type, mediainfo.tvdb_id) - else: - logger.info(f"{mediainfo.title_year} 没有tvdbid,无法获取fanart图片") - return None - if not result or result.get('status') == 'error': + return mediainfo.type, mediainfo.tmdb_id + if mediainfo.tvdb_id: + return mediainfo.type, mediainfo.tvdb_id + logger.info(f"{mediainfo.title_year} 没有tvdbid,无法获取fanart图片") + return None + + def __extract_images( + self, + mediainfo: MediaInfo, + result: Optional[dict], + season: Optional[int] = None, + ) -> Optional[dict]: + """ + 从 Fanart 响应中提取图片名称和地址 + """ + if not result or result.get("status") == "error": logger.warn(f"没有获取到 {mediainfo.title_year} 的fanart图片数据") return None + + ret = {} # 获取所有图片 for name, images in result.items(): if not images: @@ -386,68 +483,92 @@ class FanartModule(_ModuleBase): # 图片属性xx_path image_name = self.__name(name) if image_name.startswith("season"): + image_type = image_name[6:] # 季图片,图片格式seasonxx-xxxx/season-specials-xxxx for image_obj in images: - image_season = image_obj.get('season') + image_season = image_obj.get("season") if image_season is not None: + if season is not None and str(image_season) != str(season): + continue # 包括poster,thumb,banner - if image_season == '0': - season_image = f"season-specials-{image_name[6:]}" + if image_season == "0": + season_image = f"season-specials-{image_type}" else: - season_image = f"season{str(image_season).rjust(2, '0')}-{image_name[6:]}" - # 设置图片,没有图片才设置 - if not mediainfo.get_image(season_image): - mediainfo.set_image(season_image, image_obj.get('url')) + season_image = ( + f"season{str(image_season).rjust(2, '0')}-{image_type}" + ) + if image_url := image_obj.get("url"): + ret.setdefault( + f"{season_image}{Path(image_url).suffix}", image_url + ) else: + if season is not None: + continue + image_obj = self.__pick_best_image(images) + if image_url := image_obj.get("url"): + ret[f"{image_name}{Path(image_url).suffix}"] = image_url - # 其他图片,优先环境变量指定语言,再like最多 - def __pick_best_image(_images): - lang_env = settings.FANART_LANG - if lang_env: - langs = [lang.strip() for lang in lang_env.split(",") if lang.strip()] - for lang in langs: - lang_images = [img for img in _images if img.get('lang') == lang] - if lang_images: - lang_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True) - return lang_images[0] - # 没设置或没找到,按原逻辑 zh、en、like最多 - zh_images = [img for img in _images if img.get('lang') == 'zh'] - if zh_images: - zh_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True) - return zh_images[0] - en_images = [img for img in _images if img.get('lang') == 'en'] - if en_images: - en_images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True) - return en_images[0] - _images.sort(key=lambda x: int(x.get('likes', 0)), reverse=True) - return _images[0] + return ret or None - image_obj = __pick_best_image(images) - # 设置图片,没有图片才设置 - if not mediainfo.get_image(image_name): - mediainfo.set_image(image_name, image_obj.get('url')) + @staticmethod + def __mediainfo_image_attr(image_name: str) -> Optional[str]: + """ + 将 Fanart 刮削图片名映射为 MediaInfo 的显式图片字段 + """ + image_key = Path(image_name).stem + if image_key == "poster": + return "poster_path" + if image_key in ("background", "fanart", "backdrop"): + return "backdrop_path" + if image_key == "logo": + return "logo_path" + return None - return mediainfo + @staticmethod + def __pick_best_image(_images: list): + """ + 其他图片,优先环境变量指定语言,再like最多 + """ + lang_env = settings.FANART_LANG + if lang_env: + langs = [lang.strip() for lang in lang_env.split(",") if lang.strip()] + for lang in langs: + lang_images = [img for img in _images if img.get("lang") == lang] + if lang_images: + lang_images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True) + return lang_images[0] + # 没设置或没找到,按原逻辑 zh、en、like最多 + zh_images = [img for img in _images if img.get("lang") == "zh"] + if zh_images: + zh_images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True) + return zh_images[0] + en_images = [img for img in _images if img.get("lang") == "en"] + if en_images: + en_images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True) + return en_images[0] + _images.sort(key=lambda x: int(x.get("likes", 0)), reverse=True) + return _images[0] @staticmethod def __name(fanart_name: str) -> str: """ 转换Fanart图片的名字 """ - words_to_remove = r'tv|movie|hdmovie|hdtv|show|hd' + words_to_remove = r"tv|movie|hdmovie|hdtv|show|hd" pattern = re.compile(words_to_remove, re.IGNORECASE) - result = re.sub(pattern, '', fanart_name) + result = re.sub(pattern, "", fanart_name) return result @classmethod @cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta, shared_key="get") - def __request_fanart(cls, media_type: MediaType, queryid: Union[str, int]) -> Optional[dict]: - if media_type == MediaType.MOVIE: - image_url = cls._movie_url % queryid - else: - image_url = cls._tv_url % queryid + def __request_fanart( + cls, media_type: MediaType, queryid: Union[str, int] + ) -> Optional[dict]: + image_url = cls.__fanart_url(media_type=media_type, queryid=queryid) try: - ret = RequestUtils(proxies=cls._proxies, timeout=10).get_res(image_url, raise_exception=True) + ret = RequestUtils(proxies=cls._proxies, timeout=10).get_res( + image_url, raise_exception=True + ) if ret: return ret.json() else: @@ -457,10 +578,43 @@ class FanartModule(_ModuleBase): logger.error(f"获取{queryid}的Fanart图片失败:{str(err)}") return None + @classmethod + @cached(maxsize=settings.CONF.fanart, ttl=settings.CONF.meta, shared_key="get") + async def __async_request_fanart( + cls, media_type: MediaType, queryid: Union[str, int] + ) -> Optional[dict]: + image_url = cls.__fanart_url(media_type=media_type, queryid=queryid) + try: + ret = await AsyncRequestUtils(proxies=cls._proxies, timeout=10).get_json( + image_url + ) + if ret: + return ret + logger.debug(f"未能获取到 {queryid} 的Fanart图片") + return {} + except Exception as err: + logger.error(f"获取{queryid}的Fanart图片失败:{str(err)}") + return None + + @classmethod + def __fanart_url(cls, media_type: MediaType, queryid: Union[str, int]) -> str: + """ + 生成 Fanart 请求地址 + """ + if media_type == MediaType.MOVIE: + return cls._movie_url % queryid + return cls._tv_url % queryid + def clear_cache(self): """ 清除缓存 """ logger.info(f"开始清除{self.get_name()}缓存 ...") self.__request_fanart.cache_clear() + async_cache_clear = self.__async_request_fanart.cache_clear() + try: + loop = asyncio.get_running_loop() + loop.create_task(async_cache_clear) + except RuntimeError: + asyncio.run(async_cache_clear) logger.info(f"{self.get_name()}缓存清除完成") diff --git a/app/modules/themoviedb/scraper.py b/app/modules/themoviedb/scraper.py index 85037d39..1996a953 100644 --- a/app/modules/themoviedb/scraper.py +++ b/app/modules/themoviedb/scraper.py @@ -126,14 +126,12 @@ class TmdbScraper: poster_name, poster_url = self.get_season_poster(seasoninfo, season) if poster_name and poster_url: images[poster_name] = poster_url - return images else: - # 获取媒体信息中原有图片(TheMovieDb或Fanart) + # 获取媒体信息中原有图片 for attr_name, attr_value in vars(mediainfo).items(): if ( attr_value and attr_name.endswith("_path") - and attr_value and isinstance(attr_value, str) and attr_value.startswith("http") ): @@ -141,6 +139,7 @@ class TmdbScraper: attr_name.replace("_path", "") + Path(attr_value).suffix ) images[image_name] = attr_value + # 替换原语言Poster if settings.TMDB_SCRAP_ORIGINAL_IMAGE: _mediainfo = self.original_tmdb(mediainfo).get_info( @@ -154,7 +153,7 @@ class TmdbScraper: attr_name.replace("_path", "") + Path(image_url).suffix ) images[image_name] = image_url - return images + return images @staticmethod def get_season_poster(seasoninfo: dict, season: int) -> Tuple[str, str]: