Files
archived-MoviePilot/app/modules/zspace/zspace.py

1085 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import re
import traceback
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union, Dict, Generator, Tuple, Any
from requests import Response
from app import schemas
from app.log import logger
from app.schemas import MediaServerItem
from app.schemas.types import MediaType
from app.utils.http import RequestUtils
from app.utils.url import UrlUtils
class ZSpace:
_host: Optional[str] = None
_playhost: Optional[str] = None
_apikey: Optional[str] = None
_sync_libraries: List[str] = []
user: Optional[Union[str, int]] = None
_username: Optional[str] = None
_password: Optional[str] = None
def __init__(self, host: Optional[str] = None, username: Optional[str] = None,
password: Optional[str] = None, play_host: Optional[str] = None,
sync_libraries: list = None, **kwargs):
if not host or not username or not password:
logger.error("极影视服务器配置不完整!")
return
self._host = host
if self._host:
self._host = UrlUtils.standardize_base_url(self._host)
self._playhost = play_host
if self._playhost:
self._playhost = UrlUtils.standardize_base_url(self._playhost)
self._username = username
self._password = password
self._sync_libraries = sync_libraries or []
self.user = None
self.folders = []
self.serverid = None
if not self.reconnect():
logger.error(f"请检查极影视服务端地址 {host}")
def is_inactive(self) -> bool:
"""
判断是否需要重连
"""
if not self._host or not self._username or not self._password:
return False
if not self._apikey or not self.user:
return True
current_user = self.__get_current_user()
if not current_user:
return True
self.user = current_user.get("Id") or self.user
return False
def reconnect(self) -> bool:
"""
重连
"""
token, user_id = self.__login(self._username, self._password)
if not token:
self._apikey = None
self.user = None
self.folders = []
self.serverid = None
return False
self._apikey = token
self.user = user_id or self.__get_current_user_id()
if not self.user:
self._apikey = None
self.folders = []
self.serverid = None
return False
self.folders = self.get_library_folders()
self.serverid = self.get_server_id()
return True
def get_library_folders(self) -> List[dict]:
"""
获取极影视媒体库路径列表
"""
if not self._host or not self._apikey:
return []
url = f"{self._host}emby/Library/SelectableMediaFolders"
params = {
'api_key': self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
return res.json()
else:
logger.error("Library/SelectableMediaFolders 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Library/SelectableMediaFolders 出错:{e}")
return []
def get_virtual_folders(self) -> List[dict]:
"""
获取极影视媒体库所有路径列表(包含共享路径)
"""
if not self._host or not self._apikey:
return []
url = f"{self._host}emby/Library/VirtualFolders/Query"
params = {
'api_key': self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
library_items = res.json().get("Items")
libraries = []
for library_item in library_items or []:
library_id = library_item.get('ItemId')
library_name = library_item.get('Name')
path_infos = library_item.get('LibraryOptions', {}).get('PathInfos')
library_paths = []
for path in path_infos or []:
if path.get('NetworkPath'):
library_paths.append(path.get('NetworkPath'))
else:
library_paths.append(path.get('Path'))
if library_name and library_paths:
libraries.append({
'Id': library_id,
'Name': library_name,
'Path': library_paths
})
return libraries
else:
logger.error("Library/VirtualFolders/Query 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Library/VirtualFolders/Query 出错:{e}")
return []
def __get_library_views(self, username: Optional[str] = None) -> List[dict]:
"""
获取极影视媒体库列表
"""
if not self._host or not self._apikey:
return []
if username:
user = self.get_user(username)
else:
user = self.user
if not user:
return []
url = f"{self._host}emby/Users/{user}/Views"
params = {"api_key": self._apikey}
try:
res = RequestUtils().get_res(url, params)
if res:
return res.json().get("Items")
else:
logger.error("Users/Views 未获取到返回数据")
return []
except Exception as e:
logger.error(f"连接Users/Views 出错:{e}")
return []
def get_librarys(self, username: Optional[str] = None, hidden: Optional[bool] = False) -> List[
schemas.MediaServerLibrary]:
"""
获取媒体服务器所有媒体库列表
"""
if not self._host or not self._apikey:
return []
libraries = []
for library in self.__get_library_views(username) or []:
if hidden and self._sync_libraries and "all" not in self._sync_libraries \
and library.get("Id") not in self._sync_libraries:
continue
if library.get("CollectionType") == "movies":
library_type = MediaType.MOVIE.value
elif library.get("CollectionType") == "tvshows":
library_type = MediaType.TV.value
else:
library_type = MediaType.UNKNOWN.value
image = self.__get_local_image_by_id(library.get("Id"))
libraries.append(
schemas.MediaServerLibrary(
server="zspace",
id=library.get("Id"),
name=library.get("Name"),
path=library.get("Path"),
type=library_type,
image=image,
link=f'{self._playhost or self._host}web/index.html'
f'#!/videos?serverId={self.serverid}&parentId={library.get("Id")}',
server_type="zspace"
)
)
return libraries
def get_user(self, user_name: Optional[str] = None) -> Optional[Union[str, int]]:
"""
获取可用用户ID优先按用户名匹配失败时回退当前登录用户。
"""
if not self._host or not self._apikey:
return None
url = f"{self._host}Users"
params = {
"api_key": self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
users = res.json()
if isinstance(users, list):
if user_name:
for user in users:
if user.get("Name") == user_name:
return user.get("Id")
for user in users:
if user.get("Policy", {}).get("IsAdministrator"):
return user.get("Id")
for user in users:
if user.get("Id"):
return user.get("Id")
else:
logger.error("Users 返回数据格式错误")
else:
logger.error("Users 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users出错{e}")
return self.__get_current_user_id() or self.user
def authenticate(self, username: str, password: str) -> Optional[str]:
"""
用户认证
:param username: 用户名
:param password: 密码
:return: 认证token
"""
token, _ = self.__login(username, password)
if token:
logger.info(f"用户 {username} 极影视认证成功")
return token
def get_server_id(self) -> Optional[str]:
"""
获得服务器信息
"""
if not self._host or not self._apikey:
return None
url = f"{self._host}System/Info"
params = {
'api_key': self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
return res.json().get("Id")
else:
logger.error("System/Info 未获取到返回数据")
except Exception as e:
logger.error(f"连接System/Info出错{e}")
return None
def get_user_count(self) -> int:
"""
获得用户数量
"""
if not self._host or not self._apikey:
return 0
url = f"{self._host}emby/Users/Query"
params = {
'api_key': self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
count = res.json().get("TotalRecordCount")
if count:
return count
else:
logger.error("Users/Query 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Query出错{e}")
return 1 if self.user else 0
def get_medias_count(self) -> schemas.Statistic:
"""
获得电影、电视剧、动漫媒体数量
:return: MovieCount SeriesCount SongCount
"""
if not self._host or not self._apikey:
return schemas.Statistic()
url = f"{self._host}emby/Items/Counts"
params = {
'api_key': self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
result = res.json()
return schemas.Statistic(
movie_count=result.get("MovieCount") or 0,
tv_count=result.get("SeriesCount") or 0,
episode_count=result.get("EpisodeCount") or 0
)
else:
logger.error("Items/Counts 未获取到返回数据")
return schemas.Statistic()
except Exception as e:
logger.error(f"连接Items/Counts出错{e}")
return schemas.Statistic()
def __get_series_id_by_name(self, name: str, year: str) -> Optional[str]:
"""
根据名称查询极影视中剧集的 SeriesId
:param name: 标题
:param year: 年份
:return: None 表示连不通,""表示未找到找到返回ID
"""
if not self._host or not self._apikey:
return None
url = f"{self._host}emby/Items"
params = {
"IncludeItemTypes": "Series",
"Fields": "ProductionYear",
"StartIndex": 0,
"Recursive": "true",
"SearchTerm": name,
"Limit": 10,
"IncludeSearchTypes": "false",
"api_key": self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
res_items = res.json().get("Items")
if res_items:
for res_item in res_items:
if res_item.get('Name') == name and (
not year or str(res_item.get('ProductionYear')) == str(year)):
return res_item.get('Id')
except Exception as e:
logger.error(f"连接Items出错{e}")
return None
return ""
def get_movies(self,
title: str,
year: Optional[str] = None,
tmdb_id: Optional[int] = None) -> Optional[List[schemas.MediaServerItem]]:
"""
根据标题和年份,检查电影是否在极影视中存在,存在则返回列表
:param title: 标题
:param year: 年份,可以为空,为空时不按年份过滤
:param tmdb_id: TMDB ID
:return: 含title、year属性的字典列表
"""
if not self._host or not self._apikey:
return None
url = f"{self._host}emby/Items"
params = {
"IncludeItemTypes": "Movie",
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId",
"StartIndex": 0,
"Recursive": "true",
"SearchTerm": title,
"Limit": 10,
"IncludeSearchTypes": "false",
"api_key": self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
res_items = res.json().get("Items")
if res_items:
ret_movies = []
for item in res_items:
if not item:
continue
mediaserver_item = self.__format_item_info(item)
if mediaserver_item:
if (not tmdb_id or mediaserver_item.tmdbid == tmdb_id) and \
mediaserver_item.title == title and \
(not year or str(mediaserver_item.year) == str(year)):
ret_movies.append(mediaserver_item)
return ret_movies
except Exception as e:
logger.error(f"连接Items出错{e}")
return None
return []
def get_tv_episodes(self,
item_id: Optional[str] = None,
title: Optional[str] = None,
year: Optional[str] = None,
tmdb_id: Optional[int] = None,
season: Optional[int] = None
) -> Tuple[Optional[str], Optional[Dict[int, List[int]]]]:
"""
根据标题和年份和季,返回极影视中的剧集列表
:param item_id: 极影视中的ID
:param title: 标题
:param year: 年份
:param tmdb_id: TMDBID
:param season: 季
:return: 每一季的已有集数
"""
if not self._host or not self._apikey:
return None, None
cached_item_id = item_id
if not item_id:
item_id = self.__get_series_id_by_name(title, year)
if item_id is None:
return None, None
if not item_id:
return None, {}
item_info = self.get_iteminfo(item_id)
if not item_info and cached_item_id and title:
logger.warning(f"极影视缓存的电视剧媒体ID {cached_item_id} 已失效,尝试按标题重新搜索:{title}")
item_id = self.__get_series_id_by_name(title, year)
if item_id is None:
return None, None
if not item_id:
return None, {}
item_info = self.get_iteminfo(item_id)
if not item_info:
return None, {}
if item_info and tmdb_id and item_info.tmdbid:
if str(tmdb_id) != str(item_info.tmdbid):
return None, {}
if season is None:
season = None
try:
url = f"{self._host}emby/Shows/{item_id}/Episodes"
params = {
"Season": season,
"IsMissing": "false",
"api_key": self._apikey
}
res_json = RequestUtils().get_res(url, params)
if res_json:
tv_item = res_json.json()
res_items = tv_item.get("Items")
season_episodes = {}
for res_item in res_items or []:
season_index = res_item.get("ParentIndexNumber")
if season_index is None:
continue
if season is not None and season != season_index:
continue
episode_index = res_item.get("IndexNumber")
if episode_index is None:
continue
if season_index not in season_episodes:
season_episodes[season_index] = []
season_episodes[season_index].append(episode_index)
return item_id, season_episodes
except Exception as e:
logger.error(f"连接Shows/Id/Episodes出错{e}")
return None, None
return None, {}
def get_remote_image_by_id(self, item_id: str, image_type: str) -> Optional[str]:
"""
根据ItemId从极影视查询TMDB的图片地址
:param item_id: 在极影视中的ID
:param image_type: 图片类型poster或者backdrop等
:return: 图片对应在TMDB中的URL
"""
if not self._host or not self._apikey:
return None
url = f"{self._host}emby/Items/{item_id}/RemoteImages"
params = {
"api_key": self._apikey
}
try:
res = RequestUtils(timeout=10).get_res(url, params)
if res:
images = res.json().get("Images")
if images:
for image in images:
if image.get("ProviderName") == "TheMovieDb" and image.get("Type") == image_type:
return image.get("Url")
logger.info("Items/RemoteImages 未获取到返回数据,采用本地图片")
return self.generate_external_image_link(item_id, image_type)
except Exception as e:
logger.error(f"连接Items/Id/RemoteImages出错{e}")
return None
def generate_external_image_link(self, item_id: str, image_type: str) -> Optional[str]:
"""
根据ItemId和imageType查询本地对应图片
:param item_id: 在极影视中的ID
:param image_type: 图片类型如Backdrop、Primary
:return: 图片对应在外网播放器中的URL
"""
if not self._playhost:
logger.error("极影视外网播放地址未能获取或为空")
return None
url = f"{self._playhost}Items/{item_id}/Images/{image_type}"
try:
res = RequestUtils().get_res(url)
if res and res.status_code != 404:
logger.info(f"影片图片链接:{res.url}")
return res.url
else:
logger.info(f"Items/Id/Images 未获取到返回数据或无该影片{image_type}图片")
return None
except Exception as e:
logger.error(f"连接Items/Id/Images出错{e}")
return None
def __refresh_library_by_id(self, item_id: str) -> bool:
"""
通知极影视刷新一个项目的媒体库
"""
if not self._host or not self._apikey:
return False
url = f"{self._host}emby/Items/{item_id}/Refresh"
params = {
"Recursive": "true",
"api_key": self._apikey
}
try:
res = RequestUtils().post_res(url, params=params)
if res:
return True
else:
logger.info(f"刷新媒体库对象 {item_id} 失败,无法连接极影视!")
except Exception as e:
logger.error(f"连接Items/Id/Refresh出错{e}")
return False
return False
def refresh_root_library(self) -> bool:
"""
通知极影视刷新整个媒体库
"""
if not self._host or not self._apikey:
return False
url = f"{self._host}emby/Library/Refresh"
params = {
"api_key": self._apikey
}
try:
res = RequestUtils().post_res(url, params=params)
if res:
return True
else:
logger.info("刷新媒体库失败,无法连接极影视!")
except Exception as e:
logger.error(f"连接Library/Refresh出错{e}")
return False
return False
def refresh_library_by_items(self, items: List[schemas.RefreshMediaItem]) -> Optional[bool]:
"""
按类型、名称、年份来刷新媒体库
:param items: 已识别的需要刷新媒体库的媒体信息列表
"""
if not items:
return False
logger.info("开始刷新极影视媒体库...")
library_ids = []
for item in items:
library_id = self.__get_library_id_by_item(item)
if library_id and library_id not in library_ids:
library_ids.append(library_id)
if "/" in library_ids:
return self.refresh_root_library()
for library_id in library_ids:
if library_id != "/":
return self.__refresh_library_by_id(library_id)
logger.info("极影视媒体库刷新完成")
return True
def __get_library_id_by_item(self, item: schemas.RefreshMediaItem) -> Optional[str]:
"""
根据媒体信息查询在哪个媒体库返回要刷新的位置的ID
:param item: {title, year, type, category, target_path}
"""
if not item.title or not item.year or not item.type:
return None
if item.type != MediaType.MOVIE.value:
item_id = self.__get_series_id_by_name(item.title, item.year)
if item_id:
return item_id
else:
if self.get_movies(item.title, item.year):
return None
item_path = Path(item.target_path)
for folder in self.folders:
for subfolder in folder.get("SubFolders") or []:
try:
subfolder_path = Path(subfolder.get("Path"))
if item_path.is_relative_to(subfolder_path):
return folder.get("Id")
except Exception as err:
logger.debug(f"匹配子目录出错:{err} - {traceback.format_exc()}")
for folder in self.folders:
for subfolder in folder.get("SubFolders") or []:
if subfolder.get("Path") and re.search(r"[/\\]%s" % item.category,
subfolder.get("Path")):
return folder.get("Id")
return "/"
@staticmethod
def __format_item_info(item) -> Optional[schemas.MediaServerItem]:
"""
格式化item
"""
try:
user_data = item.get("UserData", {})
if not user_data:
user_state = None
else:
resume = item.get("UserData", {}).get("PlaybackPositionTicks") and item.get("UserData", {}).get(
"PlaybackPositionTicks") > 0
last_played_date = item.get("UserData", {}).get("LastPlayedDate")
if last_played_date is not None and "." in last_played_date:
last_played_date = last_played_date.split(".")[0]
user_state = schemas.MediaServerItemUserState(
played=item.get("UserData", {}).get("Played"),
resume=resume,
last_played_date=datetime.strptime(last_played_date, "%Y-%m-%dT%H:%M:%S").strftime(
"%Y-%m-%d %H:%M:%S") if last_played_date else None,
play_count=item.get("UserData", {}).get("PlayCount"),
percentage=item.get("UserData", {}).get("PlayedPercentage"),
)
tmdbid = item.get("ProviderIds", {}).get("Tmdb")
return schemas.MediaServerItem(
server="zspace",
library=item.get("ParentId"),
item_id=item.get("Id"),
item_type=item.get("Type"),
title=item.get("Name"),
original_title=item.get("OriginalTitle"),
year=item.get("ProductionYear"),
tmdbid=int(tmdbid) if tmdbid else None,
imdbid=item.get("ProviderIds", {}).get("Imdb"),
tvdbid=item.get("ProviderIds", {}).get("Tvdb"),
path=item.get("Path"),
user_state=user_state
)
except Exception as e:
logger.error(e)
return None
def get_iteminfo(self, itemid: str) -> Optional[schemas.MediaServerItem]:
"""
获取单个项目详情
"""
if not itemid:
return None
if not self._host or not self._apikey or not self.user:
return None
url = f"{self._host}emby/Users/{self.user}/Items/{itemid}"
params = {
"api_key": self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res and res.status_code == 200:
iteminfo = self.__format_item_info(res.json())
return iteminfo
except Exception as e:
logger.error(f"连接/Users/{self.user}/Items/{itemid}出错:{e}")
return None
def get_items(self, parent: Union[str, int], start_index: Optional[int] = 0,
limit: Optional[int] = -1) -> Generator[MediaServerItem | None | Any, Any, None]:
"""
获取媒体服务器项目列表,支持分页和不分页逻辑,默认不分页获取所有数据
:param parent: 媒体库ID用于标识要获取的媒体库
:param start_index: 起始索引,用于分页获取数据。默认为 0即从第一个项目开始获取
:param limit: 每次请求的最大项目数,用于分页。如果为 None 或 -1则表示一次性获取所有数据默认为 -1
:return: 返回一个生成器对象,用于逐步获取媒体服务器中的项目
"""
if not parent or not self._host or not self._apikey or not self.user:
return None
url = f"{self._host}emby/Users/{self.user}/Items"
params = {
"ParentId": parent,
"api_key": self._apikey,
"Fields": "ProviderIds,OriginalTitle,ProductionYear,Path,UserDataPlayCount,UserDataLastPlayedDate,ParentId"
}
if limit is not None and limit != -1:
params.update({
"StartIndex": start_index,
"Limit": limit
})
try:
res = RequestUtils().get_res(url, params)
if not res or res.status_code != 200:
return None
items = res.json().get("Items") or []
for item in items:
if not item:
continue
if "Folder" in item.get("Type"):
for sub_item in self.get_items(parent=item.get('Id')) or []:
yield sub_item
elif item.get("Type") in ["Movie", "Series"]:
yield self.__format_item_info(item)
except Exception as e:
logger.error(f"连接Users/Items出错{e}")
return None
def get_webhook_message(self, form: Any, args: dict) -> Optional[schemas.WebhookEventInfo]:
"""
解析极影视 Webhook 报文
"""
if not form and not args:
return None
try:
if form and form.get("data"):
result = form.get("data")
else:
result = json.dumps(dict(args))
message = json.loads(result)
except Exception as e:
logger.debug(f"解析极影视 webhook报文出错{e}")
return None
event_type = message.get('Event')
if not event_type:
return None
logger.debug(f"接收到极影视 webhook{message}")
event_item = schemas.WebhookEventInfo(event=event_type, channel="zspace")
if message.get('Item'):
event_item.media_type = message.get('Item', {}).get('Type')
if message.get('Item', {}).get('Type') == 'Episode' \
or message.get('Item', {}).get('Type') == 'Series' \
or message.get('Item', {}).get('Type') == 'Season':
event_item.item_type = "TV"
if message.get('Item', {}).get('SeriesName') \
and message.get('Item', {}).get('ParentIndexNumber') \
and message.get('Item', {}).get('IndexNumber'):
event_item.item_name = "%s %s%s %s" % (
message.get('Item', {}).get('SeriesName'),
"S" + str(message.get('Item', {}).get('ParentIndexNumber')),
"E" + str(message.get('Item', {}).get('IndexNumber')),
message.get('Item', {}).get('Name'))
elif message.get('Item', {}).get('SeriesName'):
event_item.item_name = "%s %s" % (
message.get('Item', {}).get('SeriesName'),
message.get('Item', {}).get('Name'))
else:
event_item.item_name = message.get('Item', {}).get('Name')
event_item.item_id = message.get('Item', {}).get('SeriesId')
event_item.season_id = message.get('Item', {}).get('ParentIndexNumber')
event_item.episode_id = message.get('Item', {}).get('IndexNumber')
elif message.get('Item', {}).get('Type') == 'Audio':
event_item.item_type = "AUD"
album = message.get('Item', {}).get('Album')
file_name = message.get('Item', {}).get('FileName')
event_item.item_name = album
event_item.overview = file_name
event_item.item_id = message.get('Item', {}).get('AlbumId')
else:
event_item.item_type = "MOV"
event_item.item_name = "%s %s" % (
message.get('Item', {}).get('Name'), "(" + str(message.get('Item', {}).get('ProductionYear')) + ")")
event_item.item_id = message.get('Item', {}).get('Id')
event_item.item_path = message.get('Item', {}).get('Path')
event_item.tmdb_id = message.get('Item', {}).get('ProviderIds', {}).get('Tmdb')
if message.get('Item', {}).get('Overview') and len(message.get('Item', {}).get('Overview')) > 100:
event_item.overview = str(message.get('Item', {}).get('Overview'))[:100] + "..."
else:
event_item.overview = message.get('Item', {}).get('Overview')
event_item.percentage = message.get('TranscodingInfo', {}).get('CompletionPercentage')
if not event_item.percentage:
if message.get('PlaybackInfo', {}).get('PositionTicks') and message.get('Item', {}).get('RunTimeTicks'):
event_item.percentage = message.get('PlaybackInfo', {}).get('PositionTicks') / \
message.get('Item', {}).get('RunTimeTicks') * 100
if message.get('Session'):
event_item.ip = message.get('Session').get('RemoteEndPoint')
event_item.device_name = message.get('Session').get('DeviceName')
event_item.client = message.get('Session').get('Client')
if message.get("User"):
event_item.user_name = message.get("User").get('Name')
if message.get("item_isvirtual"):
event_item.item_isvirtual = message.get("item_isvirtual")
event_item.item_type = message.get("item_type")
event_item.item_name = message.get("item_name")
event_item.item_path = message.get("item_path")
event_item.tmdb_id = message.get("tmdb_id")
event_item.season_id = message.get("season_id")
event_item.episode_id = message.get("episode_id")
if event_item.item_id:
event_item.image_url = self.get_remote_image_by_id(item_id=event_item.item_id,
image_type="Backdrop")
event_item.json_object = message
return event_item
def get_data(self, url: str) -> Optional[Response]:
"""
自定义URL从媒体服务器获取数据其中[HOST]、[APIKEY]、[USER]会被替换成实际的值
:param url: 请求地址
"""
if not self._host or not self._apikey:
return None
url = url.replace("[HOST]", self._host or '') \
.replace("[APIKEY]", self._apikey or '') \
.replace("[USER]", self.user or '')
try:
return RequestUtils(content_type="application/json").get_res(url=url)
except Exception as e:
logger.error(f"连接极影视出错:{e}")
return None
def post_data(self, url: str, data: Optional[str] = None, headers: dict = None) -> Optional[Response]:
"""
自定义URL从媒体服务器获取数据其中[HOST]、[APIKEY]、[USER]会被替换成实际的值
:param url: 请求地址
:param data: 请求数据
:param headers: 请求头
"""
if not self._host or not self._apikey:
return None
url = url.replace("[HOST]", self._host or '') \
.replace("[APIKEY]", self._apikey or '') \
.replace("[USER]", self.user or '')
try:
return RequestUtils(
headers=headers,
).post_res(url=url, data=data)
except Exception as e:
logger.error(f"连接极影视出错:{e}")
return None
def get_play_url(self, item_id: str) -> str:
"""
拼装媒体播放链接
:param item_id: 媒体的ID
"""
return f"{self._playhost or self._host}web/index.html#!" \
f"/item?id={item_id}&context=home&serverId={self.serverid}"
def get_backdrop_url(self, item_id: str, image_tag: str, remote: Optional[bool] = False) -> str:
"""
获取极影视的Backdrop图片地址
:param item_id: 在极影视中的ID
:param image_tag: 图片的tag
:param remote: 是否远程使用TG微信等客户端调用应为True
"""
if not self._host or not self._apikey:
return ""
if not image_tag or not item_id:
return ""
if remote:
host_url = self._playhost or self._host
else:
host_url = self._host
return f"{host_url}Items/{item_id}/" \
f"Images/Backdrop?tag={image_tag}&api_key={self._apikey}"
def __get_local_image_by_id(self, item_id: str) -> str:
"""
根据ItemId从媒体服务器查询本地图片地址
:param item_id: 在极影视中的ID
"""
if not self._host or not self._apikey:
return ""
return f"{self._host}Items/{item_id}/Images/Primary"
def get_resume(self, num: Optional[int] = 12, username: Optional[str] = None) -> Optional[
List[schemas.MediaServerPlayItem]]:
"""
获得继续观看
"""
if not self._host or not self._apikey:
return None
if username:
user = self.get_user(username) or self.user
else:
user = self.user
if not user:
return []
url = f"{self._host}Users/{user}/Items/Resume"
params = {
"Limit": 100,
"MediaTypes": "Video",
"Fields": "ProductionYear,Path",
"api_key": self._apikey,
}
try:
res = RequestUtils().get_res(url, params)
if res:
result = res.json().get("Items") or []
ret_resume = []
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_resume) == num:
break
if item.get("Type") not in ["Movie", "Episode"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
if item_type == MediaType.MOVIE.value:
title = item.get("Name")
subtitle = str(item.get("ProductionYear")) if item.get("ProductionYear") else None
else:
title = f'{item.get("SeriesName")}'
subtitle = f'S{item.get("ParentIndexNumber")}:{item.get("IndexNumber")} - {item.get("Name")}'
if item_type == MediaType.MOVIE.value:
if item.get("BackdropImageTags"):
image = self.get_backdrop_url(item_id=item.get("Id"),
image_tag=item.get("BackdropImageTags")[0])
else:
image = self.__get_local_image_by_id(item.get("Id"))
else:
image = self.get_backdrop_url(item_id=item.get("SeriesId"),
image_tag=item.get("SeriesPrimaryImageTag"))
if not image:
image = self.__get_local_image_by_id(item.get("SeriesId"))
ret_resume.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=title,
subtitle=subtitle,
type=item_type,
image=image,
link=link,
percent=item.get("UserData", {}).get("PlayedPercentage"),
server_type='zspace'
))
return ret_resume
else:
logger.error("Users/Items/Resume 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Resume出错{e}")
return []
def get_latest(self, num: Optional[int] = 20, username: Optional[str] = None) -> Optional[
List[schemas.MediaServerPlayItem]]:
"""
获得最近更新
"""
if not self._host or not self._apikey:
return None
if username:
user = self.get_user(username) or self.user
else:
user = self.user
if not user:
return []
url = f"{self._host}Users/{user}/Items/Latest"
params = {
"Limit": 100,
"MediaTypes": "Video",
"Fields": "ProductionYear,Path,BackdropImageTags",
"api_key": self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
result = res.json() or []
ret_latest = []
library_folders = self.get_user_library_folders()
for item in result:
if len(ret_latest) == num:
break
if item.get("Type") not in ["Movie", "Series"]:
continue
item_path = item.get("Path")
if item_path and library_folders and not any(
str(item_path).startswith(folder) for folder in library_folders):
continue
item_type = MediaType.MOVIE.value if item.get("Type") == "Movie" else MediaType.TV.value
link = self.get_play_url(item.get("Id"))
image = self.__get_local_image_by_id(item_id=item.get("Id"))
ret_latest.append(schemas.MediaServerPlayItem(
id=item.get("Id"),
title=item.get("Name"),
subtitle=str(item.get("ProductionYear")) if item.get("ProductionYear") else None,
type=item_type,
image=image,
link=link,
BackdropImageTags=item.get("BackdropImageTags"),
server_type='zspace'
))
return ret_latest
else:
logger.error("Users/Items/Latest 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Items/Latest出错{e}")
return []
def get_user_library_folders(self):
"""
获取极影视媒体库文件夹列表(排除黑名单)
"""
if not self._host or not self._apikey:
return []
library_folders = []
for library in self.get_virtual_folders() or []:
if self._sync_libraries and library.get("Id") not in self._sync_libraries:
continue
library_folders += [folder for folder in library.get("Path")]
return library_folders
def __login(self, username: Optional[str], password: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
"""
使用用户名密码登录极影视返回访问令牌和用户ID
"""
if not self._host or not username or not password:
return None, None
url = f"{self._host}emby/Users/AuthenticateByName"
try:
res = RequestUtils(headers={
'X-Emby-Authorization': 'MediaBrowser Client="MoviePilot", '
'Device="requests", '
'DeviceId="1", '
'Version="1.0.0"',
'Content-Type': 'application/json',
'Accept': 'application/json'
}).post_res(
url=url,
data=json.dumps({
"Username": username,
"Pw": password
})
)
if res:
result = res.json() or {}
auth_token = result.get("AccessToken")
user_id = result.get("User", {}).get("Id")
if auth_token:
return auth_token, user_id
else:
logger.error("Users/AuthenticateByName 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/AuthenticateByName出错{e}")
return None, None
def __get_current_user(self) -> Optional[dict]:
"""
获取当前登录用户信息
"""
if not self._host or not self._apikey:
return None
url = f"{self._host}emby/Users/Me"
params = {
"api_key": self._apikey
}
try:
res = RequestUtils().get_res(url, params)
if res:
result = res.json()
if isinstance(result, dict):
return result
else:
logger.error("Users/Me 未获取到返回数据")
except Exception as e:
logger.error(f"连接Users/Me出错{e}")
return None
def __get_current_user_id(self) -> Optional[str]:
"""
获取当前登录用户ID
"""
current_user = self.__get_current_user()
if current_user:
current_user_id = current_user.get("Id")
if current_user_id:
self.user = current_user_id
return current_user_id
return None