from typing import Any, List, Dict, Tuple, Optional, Union from app.chain.download import DownloadChain from app.chain.media import MediaChain from app.core.metainfo import MetaInfo from app.db.downloadhistory_oper import DownloadHistoryOper from app.log import logger from app.plugins import _PluginBase from app.schemas import NotificationType, TransferTorrent, DownloadingTorrent from app.schemas.types import TorrentStatus, MessageChannel from app.utils.string import StringUtils class DownloadingMsg(_PluginBase): # 插件名称 plugin_name = "下载进度推送" # 插件描述 plugin_desc = "定时推送正在下载进度。" # 插件图标 plugin_icon = "downloadmsg.png" # 插件版本 plugin_version = "1.1" # 插件作者 plugin_author = "thsrite" # 作者主页 author_url = "https://github.com/thsrite" # 插件配置项ID前缀 plugin_config_prefix = "downloading_" # 加载顺序 plugin_order = 22 # 可使用的用户级别 auth_level = 2 # 私有属性 _enabled = False # 任务执行间隔 _seconds = None _type = None _adminuser = None def init_plugin(self, config: dict = None): # 停止现有任务 self.stop_service() if config: self._enabled = config.get("enabled") self._seconds = config.get("seconds") or 300 self._type = config.get("type") or 'admin' self._adminuser = config.get("adminuser") def __downloading(self): """ 定时推送正在下载进度 """ # 正在下载种子 torrents = DownloadChain().list_torrents(status=TorrentStatus.DOWNLOADING) if not torrents: logger.info("当前没有正在下载的任务!") return # 推送用户 if self._type == "admin" or self._type == "both": if not self._adminuser: logger.error("未配置管理员用户") return for username in str(self._adminuser).split(","): self.__send_msg(torrents=torrents, username=username) if self._type == "user" or self._type == "both": user_torrents = {} # 根据正在下载种子hash获取下载历史 _downloadhis = DownloadHistoryOper() for torrent in torrents: downloadhis = _downloadhis.get_by_hash(download_hash=torrent.hash) if not downloadhis: logger.warn(f"种子 {torrent.hash} 未获取到MoviePilot下载历史,无法推送下载进度") continue if not downloadhis.username: logger.debug(f"种子 {torrent.hash} 未获取到下载用户记录,无法推送下载进度") continue user_torrent = user_torrents.get(downloadhis.username) or [] user_torrent.append(torrent) user_torrents[downloadhis.username] = user_torrent if not user_torrents or not user_torrents.keys(): logger.warn("未获取到用户下载记录,无法推送下载进度") return # 推送用户下载任务进度 for username in list(user_torrents.keys()): if not username: continue # 如果用户是管理员,无需重复推送 if (self._type == "admin" or self._type == "both") and self._adminuser and username in str( self._adminuser).split(","): logger.debug("管理员已推送") continue user_torrent = user_torrents.get(username) if not user_torrent: logger.warn(f"未获取到用户 {username} 下载任务") continue self.__send_msg(torrents=user_torrent, username=username) if self._type == "all": self.__send_msg(torrents=torrents) def __send_msg(self, torrents: Optional[List[Union[TransferTorrent, DownloadingTorrent]]], username: str = None): """ 发送消息 """ title = f"共 {len(torrents)} 个任务正在下载:" messages = [] index = 1 channel_value = None _downloadhis = DownloadHistoryOper() for torrent in torrents: year = None name = None se = None ep = None # 先查询下载记录,没有再识别 downloadhis = _downloadhis.get_by_hash(download_hash=torrent.hash) if downloadhis: name = downloadhis.title year = downloadhis.year se = downloadhis.seasons ep = downloadhis.episodes if not channel_value: channel_value = downloadhis.channel else: try: meta = MetaInfo(torrent.title) media_info = MediaChain().recognize_media(meta) if not media_info: continue year = media_info.year name = media_info.title if media_info.number_of_seasons: se = f"S{str(media_info.number_of_seasons).rjust(2, '0')}" if media_info.number_of_episodes: ep = f"E{str(media_info.number_of_episodes).rjust(2, '0')}" except Exception as e: print(str(e)) # 拼装标题 if year: media_name = "%s (%s) %s%s" % (name, year, se, ep) elif name: media_name = "%s %s%s" % (name, se, ep) else: media_name = torrent.title if not self._adminuser or username not in str(self._adminuser).split(","): # 下载用户发送精简消息 messages.append(f"{index}. {media_name} {round(torrent.progress, 1)}%") else: messages.append(f"{index}. {media_name}\n" f"{torrent.title} " f"{StringUtils.str_filesize(torrent.size)} " f"{round(torrent.progress, 1)}%") index += 1 # 用户消息渠道 if channel_value: channel = next( (channel for channel in MessageChannel.__members__.values() if channel.value == channel_value), None) else: channel = None self.post_message(mtype=NotificationType.Download, channel=channel, title=title, text="\n".join(messages), userid=username) def get_state(self) -> bool: return self._enabled @staticmethod def get_command() -> List[Dict[str, Any]]: pass def get_api(self) -> List[Dict[str, Any]]: pass def get_service(self) -> List[Dict[str, Any]]: """ 注册插件公共服务 [{ "id": "服务ID", "name": "服务名称", "trigger": "触发器:cron/interval/date/CronTrigger.from_crontab()", "func": self.xxx, "kwargs": {} # 定时器参数 }] """ if self._enabled and self._seconds: return [ { "id": "downloading", "name": "下载进度推送服务", "trigger": "interval", "func": self.__downloading, "kwargs": { "seconds": int(self._seconds) } } ] return [] def get_form(self) -> Tuple[List[dict], Dict[str, Any]]: """ 拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构 """ return [ { 'component': 'VForm', 'content': [ { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12 }, 'content': [ { 'component': 'VSwitch', 'props': { 'model': 'enabled', 'label': '启用插件', } } ] } ] }, { 'component': 'VRow', 'content': [ { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'seconds', 'label': '执行间隔', 'placeholder': '单位(秒)' } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VTextField', 'props': { 'model': 'adminuser', 'label': '管理员用户', 'placeholder': '多个用户,分割' } } ] }, { 'component': 'VCol', 'props': { 'cols': 12, 'md': 4 }, 'content': [ { 'component': 'VSelect', 'props': { 'model': 'type', 'label': '推送类型', 'items': [ {'title': '管理员', 'value': 'admin'}, {'title': '下载用户', 'value': 'user'}, {'title': '管理员和下载用户', 'value': 'both'}, {'title': '所有用户', 'value': 'all'} ] } } ] } ] } ] } ], { "enabled": False, "seconds": 300, "adminuser": "", "type": "admin" } def get_page(self) -> List[dict]: pass def stop_service(self): pass