Files
archived-MoviePilot-Plugins/plugins/subscribegroup/__init__.py
2024-04-23 21:58:20 +08:00

668 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import re
import time
from app.db.downloadhistory_oper import DownloadHistoryOper
from app.db.subscribe_oper import SubscribeOper
from app.db.site_oper import SiteOper
from app.plugins import _PluginBase
from typing import Any, List, Dict, Tuple
from app.log import logger
from app.core.event import eventmanager, Event
from app.schemas.types import EventType
class SubscribeGroup(_PluginBase):
# 插件名称
plugin_name = "订阅规则自动填充"
# 插件描述
plugin_desc = "电视剧下载后自动添加官组等信息到订阅;添加订阅后根据二级分类名称自定义订阅规则。"
# 插件图标
plugin_icon = "teamwork.png"
# 插件版本
plugin_version = "1.5"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "subscribegroup_"
# 加载顺序
plugin_order = 26
# 可使用的用户级别
auth_level = 2
# 私有属性
_enabled: bool = False
_clear = False
_clear_handle = False
_update_details = []
_update_confs = []
_subscribe_confs = {}
_subscribeoper = None
_downloadhistoryoper = None
_siteoper = None
def init_plugin(self, config: dict = None):
self._downloadhistoryoper = DownloadHistoryOper()
self._subscribeoper = SubscribeOper()
self._siteoper = SiteOper()
if config:
self._enabled = config.get("enabled")
self._clear = config.get("clear")
self._clear_handle = config.get("clear_handle")
self._update_details = config.get("update_details") or []
self._update_confs = config.get("update_confs") or []
if len(self._update_confs) > 0:
active_sites = self._siteoper.list_active()
for confs in self._update_confs:
category = None
resolution = None
quality = None
effect = None
include = None
exclude = None
sites = []
for conf in str(confs).split("#"):
if "=" in conf:
k, v = conf.split("=")
if k == "category":
category = v
if k == "resolution":
resolution = v
if k == "quality":
quality = v
if k == "effect":
effect = v
if k == "include":
include = v
if k == "exclude":
exclude = v
if k == "sites":
for site_name in str(v).split(","):
for active_site in active_sites:
if str(site_name) == str(active_site.name):
sites.append(active_site.id)
break
if category:
self._subscribe_confs[category] = {
'resolution': resolution,
'quality': quality,
'effect': effect,
'include': include,
'exclude': exclude,
'sites': sites
}
# 清理已处理历史
if self._clear_handle:
self.del_data(key="history_handle")
self._clear_handle = False
self.__update_config()
logger.info("已处理历史清理完成")
# 清理历史记录
if self._clear:
self.del_data(key="history")
self._clear = False
self.__update_config()
logger.info("历史记录清理完成")
def __update_config(self):
self.update_config({
"enabled": self._enabled,
"clear": self._clear,
"clear_handle": self._clear_handle,
"update_details": self._update_details,
"update_confs": self._update_confs,
})
@eventmanager.register(EventType.SubscribeAdded)
def subscribe_notice(self, event: Event = None):
"""
添加订阅根据二级分类填充订阅
"""
if not self._enabled:
logger.error("插件未开启")
return
if len(self._subscribe_confs.keys()) == 0:
logger.error("插件未开启二级分类自定义填充")
return
if event:
event_data = event.event_data
if not event_data or not event_data.get("subscribe_id") or not event_data.get("mediainfo"):
return
sid = event_data.get("subscribe_id")
category = event_data.get("mediainfo").get("category")
if not category:
logger.error(f"订阅ID:{sid} 未获取到二级分类")
return
if category not in self._subscribe_confs.keys():
logger.error(f"订阅ID:{sid} 二级分类:{category} 未配置自定义规则")
return
# 查询订阅
subscribe = self._subscribeoper.get(sid)
# 二级分类自定义配置
category_conf = self._subscribe_confs.get(category)
# 更新订阅自定义配置
self._subscribeoper.update(sid, {
'include': category_conf.get('include') if category_conf.get('include') else subscribe.include,
'exclude': category_conf.get('exclude') if category_conf.get('exclude') else subscribe.exclude,
'sites': json.dumps(category_conf.get('sites')) if category_conf.get('sites') else subscribe.sites,
'resolution': self.__parse_pix(category_conf.get('resolution')) if category_conf.get(
'resolution') else subscribe.resolution,
'quality': self.__parse_type(category_conf.get('quality')) if category_conf.get(
'quality') else subscribe.quality,
'effect': self.__parse_effect(category_conf.get('effect')) if category_conf.get(
'effect') else subscribe.effect,
})
logger.info(f"订阅记录:{subscribe.name} 填充成功\n"
f"包含关键词 {category_conf.get('include') if category_conf.get('include') else subscribe.include} \n"
f"排除关键词 {category_conf.get('exclude') if category_conf.get('exclude') else subscribe.exclude} \n"
f"站点 {json.dumps(category_conf.get('sites')) if category_conf.get('sites') else subscribe.sites} \n"
f"分辨率 {self.__parse_pix(category_conf.get('resolution')) if category_conf.get('resolution') else subscribe.resolution} \n"
f"质量 {self.__parse_type(category_conf.get('quality')) if category_conf.get('quality') else subscribe.quality} \n"
f"特效 {self.__parse_effect(category_conf.get('effect')) if category_conf.get('effect') else subscribe.effect}")
# 读取历史记录
history = self.get_data('history') or []
history.append({
'name': subscribe.name,
'type': f'{category}订阅自定义配置',
'content': json.dumps(category_conf),
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
})
# 保存历史
self.save_data(key="history", value=history)
@eventmanager.register(EventType.DownloadAdded)
def download_notice(self, event: Event = None):
"""
添加下载填充订阅制作组等信息
"""
if not self._enabled:
logger.error("插件未开启")
return
if len(self._update_details) == 0:
logger.error("插件未开启更新填充内容")
return
history_handle: List[str] = self.get_data('history_handle') or []
if event:
event_data = event.event_data
if not event_data or not event_data.get("hash") or not event_data.get("context"):
return
download_hash = event_data.get("hash")
# 根据hash查询下载记录
download_history = self._downloadhistoryoper.get_by_hash(download_hash)
if not download_history:
logger.warning(f"种子hash:{download_hash} 对应下载记录不存在")
return
if f"{download_history.type}:{download_history.tmdbid}" in history_handle:
logger.warning(f"下载历史:{download_history.title} 已处理过,不再重复处理")
return
# 保存已处理历史
history_handle.append(f"{download_history.type}:{download_history.tmdbid}")
self.save_data('history_handle', history_handle)
if download_history.type != '电视剧':
logger.warning(f"下载历史:{download_history.title} 不是电视剧,不进行官组填充")
return
# 根据下载历史查询订阅记录
subscribes = self._subscribeoper.list_by_tmdbid(tmdbid=download_history.tmdbid,
season=int(download_history.seasons.replace('S', ''))
if download_history.seasons and
download_history.seasons.count('-') == 0 else None)
if not subscribes or len(subscribes) == 0:
logger.warning(f"下载历史:{download_history.title} tmdbid:{download_history.tmdbid} 对应订阅记录不存在")
return
for subscribe in subscribes:
if subscribe.type != '电视剧':
logger.warning(f"订阅记录:{subscribe.name} 不是电视剧,不进行官组填充")
return
# 开始填充官组和站点
context = event_data.get("context")
_torrent = context.torrent_info
_meta = context.meta_info
# 分辨率
resource_pix = None
if "resource_pix" in self._update_details and not subscribe.resolution:
resource_pix = _meta.resource_pix if _meta else None
if resource_pix:
resource_pix = self.__parse_pix(resource_pix)
# 资源类型
resource_type = None
if "resource_type" in self._update_details and not subscribe.quality:
resource_type = _meta.resource_type if _meta else None
if resource_type:
resource_type = self.__parse_type(resource_type)
# 特效
resource_effect = None
if "resource_effect" in self._update_details and not subscribe.effect:
resource_effect = _meta.resource_effect if _meta else None
if resource_effect:
resource_effect = self.__parse_effect(resource_effect)
resource_team = None
sites = None
if ("group" in self._update_details and not subscribe.include
and (not subscribe.sites or len(json.loads(subscribe.sites)) == 0)):
# 官组
resource_team = _meta.resource_team if _meta else None
# 站点
sites = json.dumps([_torrent.site]) if _torrent and _torrent.site else None
# 更新订阅记录
if resource_pix or resource_type or resource_effect or resource_team or sites:
self._subscribeoper.update(subscribe.id, {
'include': resource_team,
'sites': sites,
'quality': resource_type,
'resolution': resource_pix,
'effect': resource_effect,
})
logger.info(f"订阅记录:{subscribe.name} 填充成功\n"
f"官组 {resource_team} \n"
f"站点 {sites} \n"
f"分辨率 {resource_pix} \n"
f"质量 {resource_type} \n"
f"特效 {resource_effect}")
# 读取历史记录
history = self.get_data('history') or []
history.append({
'name': subscribe.name,
'type': '种子下载自定义配置',
'content': f'官组 {resource_team} 站点 {sites} 分辨率 {resource_pix} 质量 {resource_type} 特效 {resource_effect}',
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
})
# 保存历史
self.save_data(key="history", value=history)
else:
logger.warning(f"订阅记录:{subscribe.name} 已配置相关参数,无需自动填充")
def __parse_pix(self, resource_pix):
# 识别1080或者4k或720
if re.match(r"1080[pi]|x1080", resource_pix):
resource_pix = "1080[pi]|x1080"
if re.match(r"4K|2160p|x2160", resource_pix):
resource_pix = "4K|2160p|x2160"
if re.match(r"720[pi]|x720", resource_pix):
resource_pix = "720[pi]|x720"
return resource_pix
def __parse_type(self, resource_type):
if re.match(r"Blu-?Ray.+VC-?1|Blu-?Ray.+AVC|UHD.+blu-?ray.+HEVC|MiniBD", resource_type):
resource_type = "Blu-?Ray.+VC-?1|Blu-?Ray.+AVC|UHD.+blu-?ray.+HEVC|MiniBD"
if re.match(r"Remux", resource_type):
resource_type = "Remux"
if re.match(r"Blu-?Ray", resource_type):
resource_type = "Blu-?Ray"
if re.match(r"UHD|UltraHD", resource_type):
resource_type = "UHD|UltraHD"
if re.match(r"WEB-?DL|WEB-?RIP", resource_type):
resource_type = "WEB-?DL|WEB-?RIP"
if re.match(r"HDTV", resource_type):
resource_type = "HDTV"
if re.match(r"[Hx].?265|HEVC", resource_type):
resource_type = "[Hx].?265|HEVC"
if re.match(r"[Hx].?264|AVC", resource_type):
resource_type = "[Hx].?264|AVC"
return resource_type
def __parse_effect(self, resource_effect):
if re.match(r"Dolby[\\s.]+Vision|DOVI|[\\s.]+DV[\\s.]+", resource_effect):
resource_effect = "Dolby[\\s.]+Vision|DOVI|[\\s.]+DV[\\s.]+"
if re.match(r"Dolby[\\s.]*\\+?Atmos|Atmos", resource_effect):
resource_effect = "Dolby[\\s.]*\\+?Atmos|Atmos"
if re.match(r"[\\s.]+HDR[\\s.]+|HDR10|HDR10\\+", resource_effect):
resource_effect = "[\\s.]+HDR[\\s.]+|HDR10|HDR10\\+"
if re.match(r"[\\s.]+SDR[\\s.]+", resource_effect):
resource_effect = "[\\s.]+SDR[\\s.]+"
return resource_effect
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面需要返回两块数据1、页面配置2、数据结构
"""
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'clear',
'label': '清理历史记录',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 3
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'clear_handle',
'label': '清理已处理记录',
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VSelect',
'props': {
'multiple': True,
'chips': True,
'model': 'update_details',
'label': '填充内容',
'items': [
{
"title": "资源质量",
"vale": "resource_type"
},
{
"title": "分辨率",
"vale": "resource_pix"
},
{
"title": "特效",
"vale": "resource_effect"
},
{
"title": "制作组",
"vale": "group"
}
]
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12
},
'content': [
{
'component': 'VTextarea',
'props': {
'model': 'update_confs',
'label': '二级分类自定义填充',
'rows': 3,
'placeholder': 'category:#resolution:#quality:#effect:#include:#exclude:#sites:'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '电视剧订阅未配置包含关键词和订阅站点等配置时,订阅或搜索下载后,'
'将下载种子的制作组和站点等信息填充到订阅信息中,以保证后续订阅资源的统一性。'
'(适用于订阅新出的电视剧。)'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '二级分类自定义填充用于根据二级分类自定义订阅规则,具体属性明细请查看电视剧订阅设置页面。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': 'category:二级分类名称,resolution:分辨率,quality:质量,effect:特效,include:包含关键词,'
'exclude:排除关键词,sites:站点名称(多个站点用逗号拼接)。'
'category必填多组属性用#分割。例如category:动漫#resolution:1080p'
'添加的动漫订阅指定分辨率为1080p'
}
}
]
}
]
},
]
}
], {
"enabled": False,
"clear": False,
"clear_handle": False,
"update_details": [],
"update_confs": [],
}
def get_page(self) -> List[dict]:
historys = self.get_data('history')
if not historys:
return [
{
'component': 'div',
'text': '暂无数据',
'props': {
'class': 'text-center',
}
}
]
if not isinstance(historys, list):
historys = [historys]
# 按照时间倒序
historys = sorted(historys, key=lambda x: x.get("time") or 0, reverse=True)
contens = [
{
'component': 'tr',
'props': {
'class': 'text-sm'
},
'content': [
{
'component': 'td',
'text': history.get("time")
},
{
'component': 'td',
'text': history.get("name")
},
{
'component': 'td',
'text': history.get("type")
},
{
'component': 'td',
'text': history.get("content")
}
]
} for history in historys
]
# 拼装页面
return [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VTable',
'props': {
'hover': True
},
'content': [
{
'component': 'thead',
'content': [
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '执行时间'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '订阅名称'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '更新类型'
},
{
'component': 'th',
'props': {
'class': 'text-start ps-4'
},
'text': '更新内容'
},
]
},
{
'component': 'tbody',
'content': contens
}
]
}
]
}
]
}
]
def stop_service(self):
"""
退出插件
"""
pass