diff --git a/app/modules/zspace/zspace.py b/app/modules/zspace/zspace.py index a7370023..5b0c91c5 100644 --- a/app/modules/zspace/zspace.py +++ b/app/modules/zspace/zspace.py @@ -1,14 +1,27 @@ import json -from typing import Any, Generator, List, Optional, Tuple, Union +import re +import traceback +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Union, Dict, Generator, Tuple, Any + +from requests import Response from app import schemas from app.log import logger -from app.modules.emby.emby import Emby +from app.schemas import MediaServerItem +from app.schemas.types import MediaType from app.utils.http import RequestUtils from app.utils.url import UrlUtils -class ZSpace(Emby): +class ZSpace: + _host: Optional[str] = None + _playhost: Optional[str] = None + _apikey: Optional[str] = None + _sync_libraries: List[str] = [] + user: Optional[Union[str, int]] = None + _username: Optional[str] = None _password: Optional[str] = None def __init__(self, host: Optional[str] = None, username: Optional[str] = None, @@ -25,11 +38,10 @@ class ZSpace(Emby): self._playhost = UrlUtils.standardize_base_url(self._playhost) self._username = username self._password = password - self._apikey = None + self._sync_libraries = sync_libraries or [] self.user = None self.folders = [] self.serverid = None - self._sync_libraries = sync_libraries or [] if not self.reconnect(): logger.error(f"请检查极影视服务端地址 {host}") @@ -59,20 +71,169 @@ class ZSpace(Emby): self.serverid = None return False self._apikey = token - if not user_id: - current_user = self.__get_current_user() - if not current_user: - self._apikey = None - self.user = None - self.folders = [] - self.serverid = None - return False - user_id = current_user.get("Id") - self.user = user_id + self.user = user_id or self.__get_current_user_id() + if not self.user: + self._apikey = None + self.folders = [] + self.serverid = None + return False self.folders = self.get_emby_folders() self.serverid = self.get_server_id() return True + def get_emby_folders(self) -> List[dict]: + """ + 获取极影视媒体库路径列表 + """ + if not self._host or not self._apikey: + return [] + url = f"{self._host}emby/Library/SelectableMediaFolders" + params = { + 'api_key': self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + return res.json() + else: + logger.error("Library/SelectableMediaFolders 未获取到返回数据") + return [] + except Exception as e: + logger.error(f"连接Library/SelectableMediaFolders 出错:{e}") + return [] + + def get_emby_virtual_folders(self) -> List[dict]: + """ + 获取极影视媒体库所有路径列表(包含共享路径) + """ + if not self._host or not self._apikey: + return [] + url = f"{self._host}emby/Library/VirtualFolders/Query" + params = { + 'api_key': self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + library_items = res.json().get("Items") + librarys = [] + for library_item in library_items or []: + library_id = library_item.get('ItemId') + library_name = library_item.get('Name') + path_infos = library_item.get('LibraryOptions', {}).get('PathInfos') + library_paths = [] + for path in path_infos or []: + if path.get('NetworkPath'): + library_paths.append(path.get('NetworkPath')) + else: + library_paths.append(path.get('Path')) + + if library_name and library_paths: + librarys.append({ + 'Id': library_id, + 'Name': library_name, + 'Path': library_paths + }) + return librarys + else: + logger.error("Library/VirtualFolders/Query 未获取到返回数据") + return [] + except Exception as e: + logger.error(f"连接Library/VirtualFolders/Query 出错:{e}") + return [] + + def __get_emby_librarys(self, username: Optional[str] = None) -> List[dict]: + """ + 获取极影视媒体库列表 + """ + if not self._host or not self._apikey: + return [] + if username: + user = self.get_user(username) + else: + user = self.user + if not user: + return [] + url = f"{self._host}emby/Users/{user}/Views" + params = {"api_key": self._apikey} + try: + res = RequestUtils().get_res(url, params) + if res: + return res.json().get("Items") + else: + logger.error("Users/Views 未获取到返回数据") + return [] + except Exception as e: + logger.error(f"连接Users/Views 出错:{e}") + return [] + + def get_librarys(self, username: Optional[str] = None, hidden: Optional[bool] = False) -> List[ + schemas.MediaServerLibrary]: + """ + 获取媒体服务器所有媒体库列表 + """ + if not self._host or not self._apikey: + return [] + libraries = [] + for library in self.__get_emby_librarys(username) or []: + if hidden and self._sync_libraries and "all" not in self._sync_libraries \ + and library.get("Id") not in self._sync_libraries: + continue + if library.get("CollectionType") == "movies": + library_type = MediaType.MOVIE.value + elif library.get("CollectionType") == "tvshows": + library_type = MediaType.TV.value + else: + library_type = MediaType.UNKNOWN.value + image = self.__get_local_image_by_id(library.get("Id")) + libraries.append( + schemas.MediaServerLibrary( + server="zspace", + id=library.get("Id"), + name=library.get("Name"), + path=library.get("Path"), + type=library_type, + image=image, + link=f'{self._playhost or self._host}web/index.html' + f'#!/videos?serverId={self.serverid}&parentId={library.get("Id")}', + server_type="zspace" + ) + ) + return libraries + + def get_user(self, user_name: Optional[str] = None) -> Optional[Union[str, int]]: + """ + 获取可用用户ID,优先按用户名匹配,失败时回退当前登录用户。 + """ + if not self._host or not self._apikey: + return None + url = f"{self._host}Users" + params = { + "api_key": self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + users = res.json() + if isinstance(users, list): + if user_name: + for user in users: + if user.get("Name") == user_name: + return user.get("Id") + for user in users: + if user.get("Policy", {}).get("IsAdministrator"): + return user.get("Id") + for user in users: + if user.get("Id"): + return user.get("Id") + else: + logger.error("Users 返回数据格式错误") + else: + logger.error("Users 未获取到返回数据") + except Exception as e: + logger.error(f"连接Users出错:{e}") + return self.__get_current_user_id() or self.user + def authenticate(self, username: str, password: str) -> Optional[str]: """ 用户认证 @@ -85,104 +246,774 @@ class ZSpace(Emby): logger.info(f"用户 {username} 极影视认证成功") return token - def get_user(self, user_name: Optional[str] = None) -> Optional[Union[str, int]]: + def get_server_id(self) -> Optional[str]: """ - 获取用户ID。 - 极影视使用登录态 token 时,不一定总能枚举全部用户,失败时回退当前登录用户。 + 获得服务器信息 """ - if user_name and user_name == self._username and self.user: - return self.user - user_id = super().get_user(user_name) - if user_id: - return user_id - current_user = self.__get_current_user() - if current_user: - current_user_id = current_user.get("Id") - current_user_name = current_user.get("Name") - if current_user_id: - self.user = current_user_id - if not user_name or user_name == current_user_name: - return current_user_id - return self.user + if not self._host or not self._apikey: + return None + url = f"{self._host}System/Info" + params = { + 'api_key': self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + return res.json().get("Id") + else: + logger.error("System/Info 未获取到返回数据") + except Exception as e: + logger.error(f"连接System/Info出错:{e}") + return None def get_user_count(self) -> int: """ - 获取用户数量。 - 无法枚举用户时,至少返回当前登录用户数量。 + 获得用户数量 """ - count = super().get_user_count() - if count: - return count + if not self._host or not self._apikey: + return 0 + url = f"{self._host}emby/Users/Query" + params = { + 'api_key': self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + count = res.json().get("TotalRecordCount") + if count: + return count + else: + logger.error("Users/Query 未获取到返回数据") + except Exception as e: + logger.error(f"连接Users/Query出错:{e}") return 1 if self.user else 0 - def get_librarys(self, username: Optional[str] = None, - hidden: Optional[bool] = False) -> List[schemas.MediaServerLibrary]: + def get_medias_count(self) -> schemas.Statistic: """ - 获取媒体服务器所有媒体库列表 + 获得电影、电视剧、动漫媒体数量 + :return: MovieCount SeriesCount SongCount """ - libraries = super().get_librarys(username=username, hidden=hidden) - for library in libraries or []: - library.server = "zspace" - library.server_type = "zspace" - return libraries + if not self._host or not self._apikey: + return schemas.Statistic() + url = f"{self._host}emby/Items/Counts" + params = { + 'api_key': self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + result = res.json() + return schemas.Statistic( + movie_count=result.get("MovieCount") or 0, + tv_count=result.get("SeriesCount") or 0, + episode_count=result.get("EpisodeCount") or 0 + ) + else: + logger.error("Items/Counts 未获取到返回数据") + return schemas.Statistic() + except Exception as e: + logger.error(f"连接Items/Counts出错:{e}") + return schemas.Statistic() - def get_movies(self, title: str, year: Optional[str] = None, + def __get_emby_series_id_by_name(self, name: str, year: str) -> Optional[str]: + """ + 根据名称查询极影视中剧集的 SeriesId + :param name: 标题 + :param year: 年份 + :return: None 表示连不通,""表示未找到,找到返回ID + """ + if not self._host or not self._apikey: + return None + url = f"{self._host}emby/Items" + params = { + "IncludeItemTypes": "Series", + "Fields": "ProductionYear", + "StartIndex": 0, + "Recursive": "true", + "SearchTerm": name, + "Limit": 10, + "IncludeSearchTypes": "false", + "api_key": self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + res_items = res.json().get("Items") + if res_items: + for res_item in res_items: + if res_item.get('Name') == name and ( + not year or str(res_item.get('ProductionYear')) == str(year)): + return res_item.get('Id') + except Exception as e: + logger.error(f"连接Items出错:{e}") + return None + return "" + + def get_movies(self, + title: str, + year: Optional[str] = None, tmdb_id: Optional[int] = None) -> Optional[List[schemas.MediaServerItem]]: """ 根据标题和年份,检查电影是否在极影视中存在,存在则返回列表 + :param title: 标题 + :param year: 年份,可以为空,为空时不按年份过滤 + :param tmdb_id: TMDB ID + :return: 含title、year属性的字典列表 """ - movies = super().get_movies(title=title, year=year, tmdb_id=tmdb_id) - for movie in movies or []: - movie.server = "zspace" - return movies + if not self._host or not self._apikey: + return None + url = f"{self._host}emby/Items" + params = { + "IncludeItemTypes": "Movie", + "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId", + "StartIndex": 0, + "Recursive": "true", + "SearchTerm": title, + "Limit": 10, + "IncludeSearchTypes": "false", + "api_key": self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + res_items = res.json().get("Items") + if res_items: + ret_movies = [] + for item in res_items: + if not item: + continue + mediaserver_item = self.__format_item_info(item) + if mediaserver_item: + if (not tmdb_id or mediaserver_item.tmdbid == tmdb_id) and \ + mediaserver_item.title == title and \ + (not year or str(mediaserver_item.year) == str(year)): + ret_movies.append(mediaserver_item) + return ret_movies + except Exception as e: + logger.error(f"连接Items出错:{e}") + return None + return [] + + def get_tv_episodes(self, + item_id: Optional[str] = None, + title: Optional[str] = None, + year: Optional[str] = None, + tmdb_id: Optional[int] = None, + season: Optional[int] = None + ) -> Tuple[Optional[str], Optional[Dict[int, List[int]]]]: + """ + 根据标题和年份和季,返回极影视中的剧集列表 + :param item_id: 极影视中的ID + :param title: 标题 + :param year: 年份 + :param tmdb_id: TMDBID + :param season: 季 + :return: 每一季的已有集数 + """ + if not self._host or not self._apikey: + return None, None + cached_item_id = item_id + if not item_id: + item_id = self.__get_emby_series_id_by_name(title, year) + if item_id is None: + return None, None + if not item_id: + return None, {} + item_info = self.get_iteminfo(item_id) + if not item_info and cached_item_id and title: + logger.warning(f"极影视缓存的电视剧媒体ID {cached_item_id} 已失效,尝试按标题重新搜索:{title}") + item_id = self.__get_emby_series_id_by_name(title, year) + if item_id is None: + return None, None + if not item_id: + return None, {} + item_info = self.get_iteminfo(item_id) + if not item_info: + return None, {} + if item_info and tmdb_id and item_info.tmdbid: + if str(tmdb_id) != str(item_info.tmdbid): + return None, {} + if season is None: + season = None + try: + url = f"{self._host}emby/Shows/{item_id}/Episodes" + params = { + "Season": season, + "IsMissing": "false", + "api_key": self._apikey + } + res_json = RequestUtils().get_res(url, params) + if res_json: + tv_item = res_json.json() + res_items = tv_item.get("Items") + season_episodes = {} + for res_item in res_items or []: + season_index = res_item.get("ParentIndexNumber") + if season_index is None: + continue + if season is not None and season != season_index: + continue + episode_index = res_item.get("IndexNumber") + if episode_index is None: + continue + if season_index not in season_episodes: + season_episodes[season_index] = [] + season_episodes[season_index].append(episode_index) + return item_id, season_episodes + except Exception as e: + logger.error(f"连接Shows/Id/Episodes出错:{e}") + return None, None + return None, {} + + def get_remote_image_by_id(self, item_id: str, image_type: str) -> Optional[str]: + """ + 根据ItemId从极影视查询TMDB的图片地址 + :param item_id: 在极影视中的ID + :param image_type: 图片类型,poster或者backdrop等 + :return: 图片对应在TMDB中的URL + """ + if not self._host or not self._apikey: + return None + url = f"{self._host}emby/Items/{item_id}/RemoteImages" + params = { + "api_key": self._apikey + } + try: + res = RequestUtils(timeout=10).get_res(url, params) + if res: + images = res.json().get("Images") + if images: + for image in images: + if image.get("ProviderName") == "TheMovieDb" and image.get("Type") == image_type: + return image.get("Url") + logger.info("Items/RemoteImages 未获取到返回数据,采用本地图片") + return self.generate_external_image_link(item_id, image_type) + except Exception as e: + logger.error(f"连接Items/Id/RemoteImages出错:{e}") + return None + + def generate_external_image_link(self, item_id: str, image_type: str) -> Optional[str]: + """ + 根据ItemId和imageType查询本地对应图片 + :param item_id: 在极影视中的ID + :param image_type: 图片类型,如Backdrop、Primary + :return: 图片对应在外网播放器中的URL + """ + if not self._playhost: + logger.error("极影视外网播放地址未能获取或为空") + return None + + url = f"{self._playhost}Items/{item_id}/Images/{image_type}" + try: + res = RequestUtils().get_res(url) + if res and res.status_code != 404: + logger.info(f"影片图片链接:{res.url}") + return res.url + else: + logger.info(f"Items/Id/Images 未获取到返回数据或无该影片{image_type}图片") + return None + except Exception as e: + logger.error(f"连接Items/Id/Images出错:{e}") + return None + + def __refresh_emby_library_by_id(self, item_id: str) -> bool: + """ + 通知极影视刷新一个项目的媒体库 + """ + if not self._host or not self._apikey: + return False + url = f"{self._host}emby/Items/{item_id}/Refresh" + params = { + "Recursive": "true", + "api_key": self._apikey + } + try: + res = RequestUtils().post_res(url, params=params) + if res: + return True + else: + logger.info(f"刷新媒体库对象 {item_id} 失败,无法连接极影视!") + except Exception as e: + logger.error(f"连接Items/Id/Refresh出错:{e}") + return False + return False + + def refresh_root_library(self) -> bool: + """ + 通知极影视刷新整个媒体库 + """ + if not self._host or not self._apikey: + return False + url = f"{self._host}emby/Library/Refresh" + params = { + "api_key": self._apikey + } + try: + res = RequestUtils().post_res(url, params=params) + if res: + return True + else: + logger.info("刷新媒体库失败,无法连接极影视!") + except Exception as e: + logger.error(f"连接Library/Refresh出错:{e}") + return False + return False + + def refresh_library_by_items(self, items: List[schemas.RefreshMediaItem]) -> Optional[bool]: + """ + 按类型、名称、年份来刷新媒体库 + :param items: 已识别的需要刷新媒体库的媒体信息列表 + """ + if not items: + return False + logger.info("开始刷新极影视媒体库...") + library_ids = [] + for item in items: + library_id = self.__get_emby_library_id_by_item(item) + if library_id and library_id not in library_ids: + library_ids.append(library_id) + if "/" in library_ids: + return self.refresh_root_library() + for library_id in library_ids: + if library_id != "/": + return self.__refresh_emby_library_by_id(library_id) + logger.info("极影视媒体库刷新完成") + return True + + def __get_emby_library_id_by_item(self, item: schemas.RefreshMediaItem) -> Optional[str]: + """ + 根据媒体信息查询在哪个媒体库,返回要刷新的位置的ID + :param item: {title, year, type, category, target_path} + """ + if not item.title or not item.year or not item.type: + return None + if item.type != MediaType.MOVIE.value: + item_id = self.__get_emby_series_id_by_name(item.title, item.year) + if item_id: + return item_id + else: + if self.get_movies(item.title, item.year): + return None + item_path = Path(item.target_path) + for folder in self.folders: + for subfolder in folder.get("SubFolders") or []: + try: + subfolder_path = Path(subfolder.get("Path")) + if item_path.is_relative_to(subfolder_path): + return folder.get("Id") + except Exception as err: + logger.debug(f"匹配子目录出错:{err} - {traceback.format_exc()}") + for folder in self.folders: + for subfolder in folder.get("SubFolders") or []: + if subfolder.get("Path") and re.search(r"[/\\]%s" % item.category, + subfolder.get("Path")): + return folder.get("Id") + return "/" + + @staticmethod + def __format_item_info(item) -> Optional[schemas.MediaServerItem]: + """ + 格式化item + """ + try: + user_data = item.get("UserData", {}) + if not user_data: + user_state = None + else: + resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get( + "PlaybackPositionTicks") > 0 + last_played_date = item.get("UserData", {}).get("LastPlayedDate") + if last_played_date is not None and "." in last_played_date: + last_played_date = last_played_date.split(".")[0] + user_state = schemas.MediaServerItemUserState( + played=item.get("UserData", {}).get("Played"), + resume=resume, + last_played_date=datetime.strptime(last_played_date, "%Y-%m-%dT%H:%M:%S").strftime( + "%Y-%m-%d %H:%M:%S") if last_played_date else None, + play_count=item.get("UserData", {}).get("PlayCount"), + percentage=item.get("UserData", {}).get("PlayedPercentage"), + ) + tmdbid = item.get("ProviderIds", {}).get("Tmdb") + return schemas.MediaServerItem( + server="zspace", + library=item.get("ParentId"), + item_id=item.get("Id"), + item_type=item.get("Type"), + title=item.get("Name"), + original_title=item.get("OriginalTitle"), + year=item.get("ProductionYear"), + tmdbid=int(tmdbid) if tmdbid else None, + imdbid=item.get("ProviderIds", {}).get("Imdb"), + tvdbid=item.get("ProviderIds", {}).get("Tvdb"), + path=item.get("Path"), + user_state=user_state + + ) + except Exception as e: + logger.error(e) + return None def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]: """ 获取单个项目详情 """ - item = super().get_iteminfo(itemid) - if item: - item.server = "zspace" - return item + if not itemid: + return None + if not self._host or not self._apikey or not self.user: + return None + url = f"{self._host}emby/Users/{self.user}/Items/{itemid}" + params = { + "api_key": self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res and res.status_code == 200: + iteminfo = self.__format_item_info(res.json()) + return iteminfo + except Exception as e: + logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:{e}") + return None def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0, - limit: Optional[int] = -1) -> Generator[schemas.MediaServerItem, Any, None]: + limit: Optional[int] = -1) -> Generator[MediaServerItem | None | Any, Any, None]: """ - 获取媒体服务器项目列表 + 获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据 + + :param parent: 媒体库ID,用于标识要获取的媒体库 + :param start_index: 起始索引,用于分页获取数据。默认为 0,即从第一个项目开始获取 + :param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1,则表示一次性获取所有数据,默认为 -1 + + :return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目 """ - for item in super().get_items(parent=parent, start_index=start_index, limit=limit) or []: - if item: - item.server = "zspace" - yield item + if not parent or not self._host or not self._apikey or not self.user: + return None + url = f"{self._host}emby/Users/{self.user}/Items" + params = { + "ParentId": parent, + "api_key": self._apikey, + "Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId" + } + if limit is not None and limit != -1: + params.update({ + "StartIndex": start_index, + "Limit": limit + }) + try: + res = RequestUtils().get_res(url, params) + if not res or res.status_code != 200: + return None + items = res.json().get("Items") or [] + for item in items: + if not item: + continue + if "Folder" in item.get("Type"): + for sub_item in self.get_items(parent=item.get('Id')) or []: + yield sub_item + elif item.get("Type") in ["Movie", "Series"]: + yield self.__format_item_info(item) + except Exception as e: + logger.error(f"连接Users/Items出错:{e}") + return None def get_webhook_message(self, form: Any, args: dict) -> Optional[schemas.WebhookEventInfo]: """ 解析极影视 Webhook 报文 """ - event_item = super().get_webhook_message(form, args) - if event_item: - event_item.channel = "zspace" + if not form and not args: + return None + try: + if form and form.get("data"): + result = form.get("data") + else: + result = json.dumps(dict(args)) + message = json.loads(result) + except Exception as e: + logger.debug(f"解析极影视 webhook报文出错:{e}") + return None + event_type = message.get('Event') + if not event_type: + return None + logger.debug(f"接收到极影视 webhook:{message}") + event_item = schemas.WebhookEventInfo(event=event_type, channel="zspace") + if message.get('Item'): + event_item.media_type = message.get('Item', {}).get('Type') + if message.get('Item', {}).get('Type') == 'Episode' \ + or message.get('Item', {}).get('Type') == 'Series' \ + or message.get('Item', {}).get('Type') == 'Season': + event_item.item_type = "TV" + if message.get('Item', {}).get('SeriesName') \ + and message.get('Item', {}).get('ParentIndexNumber') \ + and message.get('Item', {}).get('IndexNumber'): + event_item.item_name = "%s %s%s %s" % ( + message.get('Item', {}).get('SeriesName'), + "S" + str(message.get('Item', {}).get('ParentIndexNumber')), + "E" + str(message.get('Item', {}).get('IndexNumber')), + message.get('Item', {}).get('Name')) + elif message.get('Item', {}).get('SeriesName'): + event_item.item_name = "%s %s" % ( + message.get('Item', {}).get('SeriesName'), + message.get('Item', {}).get('Name')) + else: + event_item.item_name = message.get('Item', {}).get('Name') + event_item.item_id = message.get('Item', {}).get('SeriesId') + event_item.season_id = message.get('Item', {}).get('ParentIndexNumber') + event_item.episode_id = message.get('Item', {}).get('IndexNumber') + elif message.get('Item', {}).get('Type') == 'Audio': + event_item.item_type = "AUD" + album = message.get('Item', {}).get('Album') + file_name = message.get('Item', {}).get('FileName') + event_item.item_name = album + event_item.overview = file_name + event_item.item_id = message.get('Item', {}).get('AlbumId') + else: + event_item.item_type = "MOV" + event_item.item_name = "%s %s" % ( + message.get('Item', {}).get('Name'), "(" + str(message.get('Item', {}).get('ProductionYear')) + ")") + event_item.item_id = message.get('Item', {}).get('Id') + + event_item.item_path = message.get('Item', {}).get('Path') + event_item.tmdb_id = message.get('Item', {}).get('ProviderIds', {}).get('Tmdb') + if message.get('Item', {}).get('Overview') and len(message.get('Item', {}).get('Overview')) > 100: + event_item.overview = str(message.get('Item', {}).get('Overview'))[:100] + "..." + else: + event_item.overview = message.get('Item', {}).get('Overview') + event_item.percentage = message.get('TranscodingInfo', {}).get('CompletionPercentage') + if not event_item.percentage: + if message.get('PlaybackInfo', {}).get('PositionTicks') and message.get('Item', {}).get('RunTimeTicks'): + event_item.percentage = message.get('PlaybackInfo', {}).get('PositionTicks') / \ + message.get('Item', {}).get('RunTimeTicks') * 100 + if message.get('Session'): + event_item.ip = message.get('Session').get('RemoteEndPoint') + event_item.device_name = message.get('Session').get('DeviceName') + event_item.client = message.get('Session').get('Client') + if message.get("User"): + event_item.user_name = message.get("User").get('Name') + if message.get("item_isvirtual"): + event_item.item_isvirtual = message.get("item_isvirtual") + event_item.item_type = message.get("item_type") + event_item.item_name = message.get("item_name") + event_item.item_path = message.get("item_path") + event_item.tmdb_id = message.get("tmdb_id") + event_item.season_id = message.get("season_id") + event_item.episode_id = message.get("episode_id") + + if event_item.item_id: + event_item.image_url = self.get_remote_image_by_id(item_id=event_item.item_id, + image_type="Backdrop") + + event_item.json_object = message + return event_item - def get_resume(self, num: Optional[int] = 12, - username: Optional[str] = None) -> Optional[List[schemas.MediaServerPlayItem]]: + def get_data(self, url: str) -> Optional[Response]: + """ + 自定义URL从媒体服务器获取数据,其中[HOST]、[APIKEY]、[USER]会被替换成实际的值 + :param url: 请求地址 + """ + if not self._host or not self._apikey: + return None + url = url.replace("[HOST]", self._host or '') \ + .replace("[APIKEY]", self._apikey or '') \ + .replace("[USER]", self.user or '') + try: + return RequestUtils(content_type="application/json").get_res(url=url) + except Exception as e: + logger.error(f"连接极影视出错:{e}") + return None + + def post_data(self, url: str, data: Optional[str] = None, headers: dict = None) -> Optional[Response]: + """ + 自定义URL从媒体服务器获取数据,其中[HOST]、[APIKEY]、[USER]会被替换成实际的值 + :param url: 请求地址 + :param data: 请求数据 + :param headers: 请求头 + """ + if not self._host or not self._apikey: + return None + url = url.replace("[HOST]", self._host or '') \ + .replace("[APIKEY]", self._apikey or '') \ + .replace("[USER]", self.user or '') + try: + return RequestUtils( + headers=headers, + ).post_res(url=url, data=data) + except Exception as e: + logger.error(f"连接极影视出错:{e}") + return None + + def get_play_url(self, item_id: str) -> str: + """ + 拼装媒体播放链接 + :param item_id: 媒体的ID + """ + return f"{self._playhost or self._host}web/index.html#!" \ + f"/item?id={item_id}&context=home&serverId={self.serverid}" + + def get_backdrop_url(self, item_id: str, image_tag: str, remote: Optional[bool] = False) -> str: + """ + 获取极影视的Backdrop图片地址 + :param item_id: 在极影视中的ID + :param image_tag: 图片的tag + :param remote: 是否远程使用,TG微信等客户端调用应为True + """ + if not self._host or not self._apikey: + return "" + if not image_tag or not item_id: + return "" + if remote: + host_url = self._playhost or self._host + else: + host_url = self._host + return f"{host_url}Items/{item_id}/" \ + f"Images/Backdrop?tag={image_tag}&api_key={self._apikey}" + + def __get_local_image_by_id(self, item_id: str) -> str: + """ + 根据ItemId从媒体服务器查询本地图片地址 + :param item_id: 在极影视中的ID + """ + if not self._host or not self._apikey: + return "" + return f"{self._host}Items/{item_id}/Images/Primary" + + def get_resume(self, num: Optional[int] = 12, username: Optional[str] = None) -> Optional[ + List[schemas.MediaServerPlayItem]]: """ 获得继续观看 """ - items = super().get_resume(num=num, username=username) - for item in items or []: - item.server_type = "zspace" - return items + if not self._host or not self._apikey: + return None + if username: + user = self.get_user(username) or self.user + else: + user = self.user + if not user: + return [] + url = f"{self._host}Users/{user}/Items/Resume" + params = { + "Limit": 100, + "MediaTypes": "Video", + "Fields": "ProductionYear,Path", + "api_key": self._apikey, + } + try: + res = RequestUtils().get_res(url, params) + if res: + result = res.json().get("Items") or [] + ret_resume = [] + library_folders = self.get_user_library_folders() + for item in result: + if len(ret_resume) == num: + break + if item.get("Type") not in ["Movie", "Episode"]: + continue + item_path = item.get("Path") + if item_path and library_folders and not any( + str(item_path).startswith(folder) for folder in library_folders): + continue + item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value + link = self.get_play_url(item.get("Id")) + if item_type == MediaType.MOVIE.value: + title = item.get("Name") + subtitle = str(item.get("ProductionYear")) if item.get("ProductionYear") else None + else: + title = f'{item.get("SeriesName")}' + subtitle = f'S{item.get("ParentIndexNumber")}:{item.get("IndexNumber")} - {item.get("Name")}' + if item_type == MediaType.MOVIE.value: + if item.get("BackdropImageTags"): + image = self.get_backdrop_url(item_id=item.get("Id"), + image_tag=item.get("BackdropImageTags")[0]) + else: + image = self.__get_local_image_by_id(item.get("Id")) + else: + image = self.get_backdrop_url(item_id=item.get("SeriesId"), + image_tag=item.get("SeriesPrimaryImageTag")) + if not image: + image = self.__get_local_image_by_id(item.get("SeriesId")) + ret_resume.append(schemas.MediaServerPlayItem( + id=item.get("Id"), + title=title, + subtitle=subtitle, + type=item_type, + image=image, + link=link, + percent=item.get("UserData", {}).get("PlayedPercentage"), + server_type='zspace' + )) + return ret_resume + else: + logger.error("Users/Items/Resume 未获取到返回数据") + except Exception as e: + logger.error(f"连接Users/Items/Resume出错:{e}") + return [] - def get_latest(self, num: Optional[int] = 20, - username: Optional[str] = None) -> Optional[List[schemas.MediaServerPlayItem]]: + def get_latest(self, num: Optional[int] = 20, username: Optional[str] = None) -> Optional[ + List[schemas.MediaServerPlayItem]]: """ 获得最近更新 """ - items = super().get_latest(num=num, username=username) - for item in items or []: - item.server_type = "zspace" - return items + if not self._host or not self._apikey: + return None + if username: + user = self.get_user(username) or self.user + else: + user = self.user + if not user: + return [] + url = f"{self._host}Users/{user}/Items/Latest" + params = { + "Limit": 100, + "MediaTypes": "Video", + "Fields": "ProductionYear,Path,BackdropImageTags", + "api_key": self._apikey + } + try: + res = RequestUtils().get_res(url, params) + if res: + result = res.json() or [] + ret_latest = [] + library_folders = self.get_user_library_folders() + for item in result: + if len(ret_latest) == num: + break + if item.get("Type") not in ["Movie", "Series"]: + continue + item_path = item.get("Path") + if item_path and library_folders and not any( + str(item_path).startswith(folder) for folder in library_folders): + continue + item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value + link = self.get_play_url(item.get("Id")) + image = self.__get_local_image_by_id(item_id=item.get("Id")) + ret_latest.append(schemas.MediaServerPlayItem( + id=item.get("Id"), + title=item.get("Name"), + subtitle=str(item.get("ProductionYear")) if item.get("ProductionYear") else None, + type=item_type, + image=image, + link=link, + BackdropImageTags=item.get("BackdropImageTags"), + server_type='zspace' + )) + return ret_latest + else: + logger.error("Users/Items/Latest 未获取到返回数据") + except Exception as e: + logger.error(f"连接Users/Items/Latest出错:{e}") + return [] + + def get_user_library_folders(self): + """ + 获取极影视媒体库文件夹列表(排除黑名单) + """ + if not self._host or not self._apikey: + return [] + library_folders = [] + for library in self.get_emby_virtual_folders() or []: + if self._sync_libraries and library.get("Id") not in self._sync_libraries: + continue + library_folders += [folder for folder in library.get("Path")] + return library_folders def __login(self, username: Optional[str], password: Optional[str]) -> Tuple[Optional[str], Optional[str]]: """ @@ -194,9 +1025,9 @@ class ZSpace(Emby): try: res = RequestUtils(headers={ 'X-Emby-Authorization': 'MediaBrowser Client="MoviePilot", ' - 'Device="requests", ' - 'DeviceId="1", ' - 'Version="1.0.0"', + 'Device="requests", ' + 'DeviceId="1", ' + 'Version="1.0.0"', 'Content-Type': 'application/json', 'Accept': 'application/json' }).post_res( @@ -208,10 +1039,10 @@ class ZSpace(Emby): ) if res: result = res.json() or {} - token = result.get("AccessToken") + auth_token = result.get("AccessToken") user_id = result.get("User", {}).get("Id") - if token: - return token, user_id + if auth_token: + return auth_token, user_id else: logger.error("Users/AuthenticateByName 未获取到返回数据") except Exception as e: @@ -231,8 +1062,23 @@ class ZSpace(Emby): try: res = RequestUtils().get_res(url, params) if res: - return res.json() - logger.error("Users/Me 未获取到返回数据") + result = res.json() + if isinstance(result, dict): + return result + else: + logger.error("Users/Me 未获取到返回数据") except Exception as e: logger.error(f"连接Users/Me出错:{e}") return None + + def __get_current_user_id(self) -> Optional[str]: + """ + 获取当前登录用户ID + """ + current_user = self.__get_current_user() + if current_user: + current_user_id = current_user.get("Id") + if current_user_id: + self.user = current_user_id + return current_user_id + return None diff --git a/tests/test_mediaserver_tv_stale_itemid.py b/tests/test_mediaserver_tv_stale_itemid.py index 40a50f5f..05937e65 100644 --- a/tests/test_mediaserver_tv_stale_itemid.py +++ b/tests/test_mediaserver_tv_stale_itemid.py @@ -160,9 +160,9 @@ class MediaServerTvStaleItemIdTest(unittest.TestCase): client._apikey = "api-key" client.user = "user-id" client.get_iteminfo = Mock(side_effect=[None, SimpleNamespace(tmdbid=12345)]) - client._Emby__get_emby_series_id_by_name = Mock(return_value="new-series-id") + client._ZSpace__get_emby_series_id_by_name = Mock(return_value="new-series-id") - with patch("app.modules.emby.emby.RequestUtils") as request_utils_cls: + with patch("app.modules.zspace.zspace.RequestUtils") as request_utils_cls: request_utils_cls.return_value.get_res.return_value = _FakeResponse({ "Items": [{"ParentIndexNumber": 1, "IndexNumber": 1}] }) @@ -176,7 +176,7 @@ class MediaServerTvStaleItemIdTest(unittest.TestCase): self.assertEqual(item_id, "new-series-id") self.assertEqual(episodes, {1: [1]}) - client._Emby__get_emby_series_id_by_name.assert_called_once_with("测试剧集", "2026") + client._ZSpace__get_emby_series_id_by_name.assert_called_once_with("测试剧集", "2026") def test_ugreen_tv_episodes_fallback_when_cached_item_id_missing(self): """绿联缓存ID失效时,应重新搜索剧集ID后再查询集信息。""" diff --git a/tests/test_zspace_mediaserver.py b/tests/test_zspace_mediaserver.py index dd442257..b7de949f 100644 --- a/tests/test_zspace_mediaserver.py +++ b/tests/test_zspace_mediaserver.py @@ -14,20 +14,17 @@ class _FakeResponse: class ZSpaceMediaServerTest(unittest.TestCase): def test_reconnect_uses_username_password_login(self): - login_request_utils = Mock() - login_request_utils.post_res.return_value = _FakeResponse({ + request_utils = Mock() + request_utils.post_res.return_value = _FakeResponse({ "AccessToken": "zspace-token", "User": {"Id": "user-id"}, }) - emby_request_utils = Mock() - emby_request_utils.get_res.side_effect = [ + request_utils.get_res.side_effect = [ _FakeResponse([]), _FakeResponse({"Id": "server-id"}), ] - with patch("app.modules.zspace.zspace.RequestUtils", return_value=login_request_utils), patch( - "app.modules.emby.emby.RequestUtils", return_value=emby_request_utils - ): + with patch("app.modules.zspace.zspace.RequestUtils", return_value=request_utils): client = ZSpace( host="http://zspace.local", username="admin", @@ -40,11 +37,14 @@ class ZSpaceMediaServerTest(unittest.TestCase): def test_get_user_falls_back_to_current_login_user(self): client = ZSpace.__new__(ZSpace) + client._host = "http://zspace.local/" + client._apikey = "zspace-token" client._username = "admin" client.user = "current-user-id" client._ZSpace__get_current_user = Mock(return_value={"Id": "current-user-id", "Name": "admin"}) - with patch("app.modules.emby.emby.Emby.get_user", return_value=None): + with patch("app.modules.zspace.zspace.RequestUtils") as request_utils_cls: + request_utils_cls.return_value.get_res.return_value = _FakeResponse({"invalid": True}) user_id = client.get_user("admin") self.assertEqual(user_id, "current-user-id")