fix 更新刷新封面逻辑

This commit is contained in:
thsrite
2025-02-24 16:55:11 +08:00
parent 4bd879622f
commit 2f426ebecd
2 changed files with 125 additions and 76 deletions

View File

@@ -3,11 +3,12 @@
"name": "Emby元数据刷新",
"description": "定时刷新Emby媒体库元数据演职人员中文。",
"labels": "Emby",
"version": "2.2.5",
"version": "2.2.6",
"icon": "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/emby-icon.png",
"author": "thsrite",
"level": 1,
"history": {
"v2.2.6": "更新封面逻辑",
"v2.2.5": "覆盖元数据和图片增加auto选项",
"v2.2.4": "优化执行周期输入需要MoviePilot v2.2.1+",
"v2.2.3": "优先插件更新剧集封面图",

View File

@@ -37,7 +37,7 @@ class EmbyMetaRefresh(_PluginBase):
# 插件图标
plugin_icon = "https://raw.githubusercontent.com/thsrite/MoviePilot-Plugins/main/icons/emby-icon.png"
# 插件版本
plugin_version = "2.2.5"
plugin_version = "2.2.6"
# 插件作者
plugin_author = "thsrite"
# 作者主页
@@ -72,6 +72,7 @@ class EmbyMetaRefresh(_PluginBase):
_EMBY_APIKEY = None
_scheduler: Optional[BackgroundScheduler] = None
_tmdb_cache = {}
_episodes_images = []
def init_plugin(self, config: dict = None):
# 停止现有任务
@@ -95,6 +96,8 @@ class EmbyMetaRefresh(_PluginBase):
self._mediaservers = config.get("mediaservers") or []
self._interval = config.get("interval") or 5
self._episodes_images = self.get_data("episodes_images") or []
# 加载模块
if self._enabled or self._onlyonce:
# 定时服务
@@ -242,84 +245,97 @@ class EmbyMetaRefresh(_PluginBase):
# 刷新媒体库
for item in latest:
refresh_meta = self._ReplaceAllMetadata
refresh_image = self._ReplaceAllImages
# 信息不全再刷新
if self._ReplaceAllMetadata == "auto":
refresh_meta = "false"
if (str(item.get('Type')) == 'Episode' and (str(
item.get("Name")) == f"{item.get('IndexNumber')}") or not item.get("Overview")):
refresh_meta = "true"
if self._ReplaceAllImages == "auto":
refresh_image = "false"
# 判断图片是否tmdb封面不是则刷新
if str(item.get('Type')) == 'Episode' and (not item.get("PrimaryImageAspectRatio") or float(
item.get("PrimaryImageAspectRatio")) >= 1.8):
# 判断是否有缓存
tv_info = None
key = f"{item.get('Type')}-{item.get('SeriesName')}-{str(item.get('ProductionYear'))}"
if key in self._tmdb_cache.keys():
tv_info = self._tmdb_cache[key]
if not tv_info:
# 判断下tmdb有没有封面没有则不刷新封面
tv_info = self.tmdbapi.match(name=item.get('SeriesName'),
mtype=MediaType.TV,
year=str(item.get('ProductionYear')))
if tv_info:
self._tmdb_cache[key] = tv_info
episode_info = TmdbApi().get_tv_episode_detail(tv_info["id"],
item.get('ParentIndexNumber'),
item.get('IndexNumber'))
if episode_info and episode_info.get("still_path"):
# 更新封面
flag = self.__update_item_image(item_id=item.get("Id"),
image_url=f"https://image.tmdb.org/t/p/original{episode_info.get('still_path')}")
if flag:
refresh_image = "false"
try:
refresh_meta = self._ReplaceAllMetadata
refresh_image = self._ReplaceAllImages
# 信息不全再刷新
if self._ReplaceAllMetadata == "auto":
refresh_meta = "false"
if (str(item.get('Type')) == 'Episode' and (
self.__contains_episode(item.get("Name")) or not item.get(
"Overview") or not self.__contains_chinese(item.get("Overview")))):
refresh_meta = "true"
if self._ReplaceAllImages == "auto":
refresh_image = "false"
# 判断图片是否tmdb封面不是则刷新
if str(item.get('Type')) == 'Episode':
if item.get("Id") in self._episodes_images:
refresh_image = "false"
logger.info(
f"最新媒体:电视剧 {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')} 封面更新 {flag}")
f"最新媒体:电视剧 {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')} 封面无需更新")
else:
# 判断是否有缓存
tv_info = None
key = f"{item.get('Type')}-{item.get('SeriesName')}-{str(item.get('ProductionYear'))}"
if key in self._tmdb_cache.keys():
tv_info = self._tmdb_cache[key]
if refresh_meta == "true" or refresh_image == "true":
logger.info(
f"开始刷新媒体库元数据,最新媒体:{'电视剧' if str(item.get('Type')) == 'Episode' else '电影'} {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')}")
self.__refresh_emby_library_by_id(item_id=item.get("Id"),
refresh_meta=refresh_meta,
refresh_image=refresh_image)
if self._interval:
logger.info(f"等待 {self._interval} 秒后继续刷新")
time.sleep(int(self._interval))
else:
logger.info(
f"最新媒体:{'电视剧' if str(item.get('Type')) == 'Episode' else '电影'} {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')} 元数据完整,跳过处理")
if not tv_info:
# 判断下tmdb有没有封面没有则不刷新封面
tv_info = self.tmdbapi.match(name=item.get('SeriesName'),
mtype=MediaType.TV,
year=str(item.get('ProductionYear')))
if tv_info:
self._tmdb_cache[key] = tv_info
episode_info = TmdbApi().get_tv_episode_detail(tv_info["id"],
item.get('ParentIndexNumber'),
item.get('IndexNumber'))
if episode_info and episode_info.get("still_path"):
# 更新封面
flag = self.__update_item_image(item_id=item.get("Id"),
image_url=f"https://image.tmdb.org/t/p/original{episode_info.get('still_path')}")
if flag:
refresh_image = "false"
# 缓存已处理的剧集
self._episodes_images.append(item.get("Id"))
self.save_data("episodes_images", self._episodes_images)
logger.info(
f"最新媒体:电视剧 {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')} 封面更新 {flag}")
# 刮演员中文
if self._actor_chi:
logger.info(
f"最新媒体:{'电视剧' if str(item.get('Type')) == 'Episode' else '电影'} {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')} 开始处理演员中文名")
key = f"{item.get('Type')}-{item.get('SeriesName') if str(item.get('Type')) == 'Episode' else item.get('Name')}"
peoples = None
if key not in handle_items.keys():
peoples = self.__update_people_chi(
item_id=item.get("SeriesId") if str(item.get('Type')) == 'Episode' else item.get("Id"),
title=item.get('SeriesName') if str(item.get('Type')) == 'Episode' else item.get(
'Name'),
type=MediaType('电视剧' if str(item.get('Type')) == 'Episode' else '电影'),
season=item.get("ParentIndexNumber") if str(item.get('Type')) == 'Episode' else None,
emby=emby
)
if refresh_meta == "true" or refresh_image == "true":
logger.info(
f"开始刷新媒体库元数据,最新媒体:{'电视剧' if str(item.get('Type')) == 'Episode' else '电影'} {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')}")
self.__refresh_emby_library_by_id(item_id=item.get("Id"),
refresh_meta=refresh_meta,
refresh_image=refresh_image)
if self._interval:
logger.info(f"等待 {self._interval} 秒后继续刷新")
time.sleep(int(self._interval))
else:
logger.info(
f"最新媒体:{'电视剧' if str(item.get('Type')) == 'Episode' else '电影'} {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')} 元数据完整,跳过处理")
# 是否有演员信息
if str(item.get('Type')) == 'Episode':
item_dicts = handle_items.get(key, {})
item_ids = item_dicts.get('itemIds', [])
item_actors = item_dicts.get('actors', [])
item_ids.append(item.get("Id"))
handle_items[key] = {
'itemIds': item_ids,
'actors': peoples or item_actors
}
# 刮演员中文
if self._actor_chi:
logger.info(
f"最新媒体:{'电视剧' if str(item.get('Type')) == 'Episode' else '电影'} {'%s S%02dE%02d %s' % (item.get('SeriesName'), item.get('ParentIndexNumber'), item.get('IndexNumber'), item.get('Name')) if str(item.get('Type')) == 'Episode' else item.get('Name')} {item.get('Id')} 开始处理演员中文名")
key = f"{item.get('Type')}-{item.get('SeriesName') if str(item.get('Type')) == 'Episode' else item.get('Name')}"
peoples = None
if key not in handle_items.keys():
peoples = self.__update_people_chi(
item_id=item.get("SeriesId") if str(item.get('Type')) == 'Episode' else item.get(
"Id"),
title=item.get('SeriesName') if str(item.get('Type')) == 'Episode' else item.get(
'Name'),
type=MediaType('电视剧' if str(item.get('Type')) == 'Episode' else '电影'),
season=item.get("ParentIndexNumber") if str(
item.get('Type')) == 'Episode' else None,
emby=emby
)
# 是否有演员信息
if str(item.get('Type')) == 'Episode':
item_dicts = handle_items.get(key, {})
item_ids = item_dicts.get('itemIds', [])
item_actors = item_dicts.get('actors', [])
item_ids.append(item.get("Id"))
handle_items[key] = {
'itemIds': item_ids,
'actors': peoples or item_actors
}
except Exception as e:
logger.error(f"刷新媒体库元数据失败:{str(e)}")
continue
# 处理剧集
for key, value in handle_items.items():
@@ -350,6 +366,38 @@ class EmbyMetaRefresh(_PluginBase):
logger.error(f"关闭 神医助手 独占模式失败:{str(e)}")
logger.info(f"刷新 {emby_name} 媒体库元数据完成")
@staticmethod
def __contains_chinese(text: str) -> bool:
"""
判断给定的字符串是否包含中文字符。
参数:
text (str): 要检查的字符串。
返回:
bool: 如果字符串包含中文字符,则返回 True否则返回 False。
"""
# 使用正则表达式查找中文字符
pattern = re.compile(r'[\u4e00-\u9fa5]')
contains = bool(pattern.search(text))
return contains
@staticmethod
def __contains_episode(text: str) -> bool:
"""
判断给定的字符串是否包含 "第***集" 的模式。
参数:
text (str): 要检查的字符串。
返回:
bool: 如果字符串包含 "第***集" 的模式,则返回 True否则返回 False。
"""
# 使用正则表达式查找 "第***集" 的模式
pattern = re.compile(r'\s*([0-9]|[十|一|二|三|四|五|六|七|八|九|零])+\s*集')
contains = bool(pattern.search(text))
return contains
def __get_latest_media(self) -> List[dict]:
"""
获取Emby中最新媒体