Merge pull request #524 from wikrin/main

This commit is contained in:
jxxghp
2024-10-30 06:14:08 +08:00
committed by GitHub
4 changed files with 766 additions and 0 deletions

BIN
icons/bangumi_b.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.0 KiB

View File

@@ -889,5 +889,21 @@
"v1.1": "修复CookieCloud覆盖到浏览器",
"v1.0": "同步MoviePilot站点Cookie到CookieCloud"
}
},
"BangumiColl": {
"name": "Bangumi收藏订阅",
"description": "Bangumi用户收藏添加到订阅",
"labels": "订阅",
"version": "1.5.1",
"icon": "bangumi_b.png",
"author": "Attente",
"level": 1,
"v2": true,
"history": {
"v1.5.1": "修复季度信息未传递的问题. 新增站点列表同步删除",
"v1.5": "修复总集数会同步TMDB变动的问题,增加开关选项",
"v1.4": "结构优化",
"v1.3.1": "修复因修改季号导致未下载剧集而完成订阅的问题"
}
}
}

View File

@@ -0,0 +1,432 @@
# 基础库
import datetime
import json
from typing import Any, Dict, List, Optional, Type
# 第三方库
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import pytz
from sqlalchemy import JSON
from sqlalchemy.orm import Session
# 项目库
from app.chain.subscribe import SubscribeChain, Subscribe
from app.core.config import settings
from app.core.context import MediaInfo
from app.core.event import eventmanager, Event
from app.core.meta import MetaBase
from app.core.metainfo import MetaInfo
from app.db.models.subscribehistory import SubscribeHistory
from app.db.site_oper import SiteOper
from app.db.subscribe_oper import SubscribeOper
from app.db import db_query
from app.helper.subscribe import SubscribeHelper
from app.log import logger
from app.plugins import _PluginBase
from app.schemas.types import EventType, NotificationType
from app.utils.http import RequestUtils
class BangumiColl(_PluginBase):
# 插件名称
plugin_name = "Bangumi收藏订阅"
# 插件描述
plugin_desc = "将Bangumi用户收藏添加到订阅"
# 插件图标
plugin_icon = "bangumi_b.png"
# 插件版本
plugin_version = "1.5.1"
# 插件作者
plugin_author = "Attente"
# 作者主页
author_url = "https://github.com/wikrin"
# 插件配置项ID前缀
plugin_config_prefix = "bangumicoll_"
# 加载顺序
plugin_order = 23
# 可使用的用户级别
auth_level = 1
# 私有属性
_scheduler: Optional[BackgroundScheduler] = None
siteoper: SiteOper = None
subscribehelper: SubscribeHelper = None
subscribeoper: SubscribeOper = None
# 配置属性
_enabled: bool = False
_total_change: bool = False
_cron: str = ""
_notify: bool = False
_onlyonce: bool = False
_include: str = ""
_exclude: str = ""
_uid: str = ""
_collection_type = []
_save_path: str = ""
_sites: list = []
def init_plugin(self, config: dict = None):
self.subscribechain = SubscribeChain()
self.siteoper = SiteOper()
self.subscribehelper = SubscribeHelper()
self.subscribeoper = SubscribeOper()
# 停止现有任务
self.stop_service()
self.load_config(config)
if self._onlyonce:
self.schedule_once()
def load_config(self, config: dict):
"""加载配置"""
if config:
# 遍历配置中的键并设置相应的属性
for key in (
"enabled",
"total_change",
"cron",
"notify",
"onlyonce",
"uid",
"collection_type",
"save_path",
"sites",
):
setattr(self, f"_{key}", config.get(key, getattr(self, f"_{key}")))
# 获得所有站点
site_ids = {site.id for site in self.siteoper.list_order_by_pri()}
# 过滤已删除的站点
self._sites = [site_id for site_id in self._sites if site_id in site_ids]
# 更新配置
self.__update_config()
def schedule_once(self):
"""调度一次性任务"""
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
logger.info("Bangumi收藏订阅立即运行一次")
self._scheduler.add_job(
func=self.bangumi_coll,
trigger='date',
run_date=datetime.datetime.now(tz=pytz.timezone(settings.TZ))
+ datetime.timedelta(seconds=3),
)
self._scheduler.start()
# 关闭一次性开关
self._onlyonce = False
self.__update_config()
def __update_config(self):
"""更新设置"""
self.update_config(
{
"enabled": self._enabled,
"notify": self._notify,
"total_change": self._total_change,
"onlyonce": self._onlyonce,
"cron": self._cron,
"uid": self._uid,
"collection_type": self._collection_type,
"include": self._include,
"exclude": self._exclude,
"save_path": self._save_path,
"sites": self._sites,
}
)
def get_form(self):
from .page_components import form
# 列出所有站点
sites_options = [
{"title": site.name, "value": site.id}
for site in self.siteoper.list_order_by_pri()
]
return form(sites_options)
def get_service(self) -> List[Dict[str, Any]]:
"""注册插件公共服务"""
if self._enabled:
trigger = CronTrigger.from_crontab(self._cron) if self._cron else "interval"
kwargs = {"hours": 6} if not self._cron else {}
return [
{
"id": "BangumiColl",
"name": "Bangumi收藏订阅",
"trigger": trigger,
"func": self.bangumi_coll,
"kwargs": kwargs,
}
]
return []
def stop_service(self):
"""退出插件"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error(f"退出插件失败:{str(e)}")
@eventmanager.register(EventType.SiteDeleted)
def site_deleted(self, event: Event):
"""
删除对应站点
"""
site_id = event.event_data.get("site_id")
if site_id in self._sites:
self._sites.remove(site_id)
self.__update_config()
def get_api(self):
pass
def get_command(self):
pass
def get_page(self):
pass
def get_state(self):
return self._enabled
def bangumi_coll(self):
"""订阅Bangumi用户收藏"""
if not self._uid:
logger.error("请设置UID")
return
try:
res = self.get_bgm_res(addr="UserCollections", id=self._uid)
items = self.parse_collection_items(res)
# 新增和移除条目
self.manage_subscriptions(items)
logger.info("Bangumi收藏订阅执行完成")
except Exception as e:
logger.error(f"执行失败: {str(e)}")
def parse_collection_items(self, response) -> Dict[int, Dict[str, Any]]:
"""解析获取的收藏条目"""
data = response.json().get("data", [])
if not data:
logger.error(f"Bangumi用户{self._uid} ,没有任何收藏")
return {}
logger.info("解析Bangumi条目信息...")
return {
item.get("subject_id"): {
"name": item['subject'].get('name'),
"name_cn": item['subject'].get('name_cn'),
"date": item['subject'].get('date'),
"eps": item['subject'].get('eps'),
}
for item in data
if item.get("type") in self._collection_type
}
def manage_subscriptions(self, items: Dict[int, Dict[str, Any]]):
"""管理订阅的新增和删除"""
db_sub = {
i.bangumiid: i.id
for i in self.subscribechain.subscribeoper.list()
if i.bangumiid
}
db_hist = self.get_subscribe_history()
new_sub = items.keys() - db_sub.keys() - db_hist
del_sub = db_sub.keys() - items.keys()
logger.debug(f"待新增条目:{new_sub}")
logger.debug(f"待移除条目:{del_sub}")
if del_sub and self._notify:
del_items = {db_sub[i]: i for i in del_sub}
logger.info("开始移除订阅...")
self.delete_subscribe(del_items)
if new_sub:
logger.info("开始添加订阅...")
msg = self.add_subscribe({i: items[i] for i in new_sub})
if msg:
logger.info("\n".ljust(49, ' ').join(list(msg.values())))
# 添加订阅
def add_subscribe(self, items: Dict[int, Dict[str, Any]]) -> Dict:
"""添加订阅"""
fail_items = {}
for self._subid, item in items.items():
meta = MetaInfo(item.get("name_cn"))
if not meta.name:
fail_items[self._subid] = f"{item.get('name_cn')} 未识别到有效数据"
logger.warn(f"{item.get('name_cn')} 未识别到有效数据")
continue
meta.year = item.get("date")[:4] if item.get("date") else None
mediainfo = self.chain.recognize_media(meta=meta)
meta.total_episode = item.get("eps", 0)
if not mediainfo:
fail_items[self._subid] = f"{item.get('name_cn')} 媒体信息识别失败"
continue
self.update_media_info(item, mediainfo)
sid = self.subscribeoper.list_by_tmdbid(
mediainfo.tmdb_id, mediainfo.number_of_seasons
)
if sid:
logger.info(f"{mediainfo.title_year} 正在订阅中")
if len(sid) == 1:
self.subscribeoper.update(
sid=sid[0].id, payload={"bangumiid": self._subid}
)
logger.info(f"{mediainfo.title_year} Bangumi条目id更新成功")
continue
sid, msg = self.subscribechain.add(
title=mediainfo.title,
year=mediainfo.year,
season=mediainfo.number_of_seasons,
bangumiid=self._subid,
exist_ok=True,
username="Bangumi订阅",
**self.prepare_kwargs(meta, mediainfo),
)
if not sid:
fail_items[self._subid] = f"{item.get('name_cn')} {msg}"
return fail_items
def prepare_kwargs(self, meta: MetaBase, mediainfo: MediaInfo) -> Dict:
"""准备额外参数"""
kwargs = {
"save_path": self._save_path,
"sites": (
self._sites
if self.are_types_equal(attribute_name='sites')
else json.dumps(self._sites)
),
}
total_episode = len(mediainfo.seasons.get(mediainfo.number_of_seasons) or [])
if (
meta.begin_season
and mediainfo.number_of_seasons != meta.begin_season
or total_episode != meta.total_episode
):
meta = self.get_eps(meta)
total_ep: int = meta.end_episode if meta.end_episode else total_episode
lock_eps: int = total_ep - meta.begin_episode + 1
prev_eps: list = [i for i in range(1, meta.begin_episode)]
kwargs.update(
{
"total_episode": total_ep,
"start_episode": meta.begin_episode,
"lack_episode": lock_eps,
"manual_total_episode": (
1 if meta.total_episode and self._total_change else 0
), # 手动修改过总集数
"note": (
prev_eps
if self.are_types_equal("note")
else json.dumps(prev_eps)
),
}
)
logger.info(
f"{mediainfo.title_year} 更新总集数为: {total_ep},开始集数为: {meta.begin_episode}"
)
return kwargs
def update_media_info(self, item: dict, mediainfo: MediaInfo):
"""更新媒体信息"""
for info in mediainfo.season_info:
if self.are_dates(item.get("date"), info.get("air_date")):
mediainfo.number_of_seasons = info.get("season_number")
mediainfo.number_of_episodes = info.get("episode_count")
break
def get_eps(self, meta: MetaBase) -> MetaBase:
"""获取Bangumi条目的集数信息"""
try:
res = self.get_bgm_res(addr="getEpisodes", id=self._subid)
data = res.json().get("data", [{}])[0]
prev = data.get("sort", 1) - data.get("ep", 1)
total = res.json().get("total", None)
meta.begin_episode = prev + 1
meta.end_episode = prev + total if total else None
except Exception as e:
logger.error(f"获取集数信息失败: {str(e)}")
finally:
return meta
# 移除订阅
def delete_subscribe(self, del_items: Dict[int, int]):
"""删除订阅"""
for subscribe_id in del_items.keys():
try:
subscribe = self.subscribeoper.get(subscribe_id)
if subscribe:
self.subscribeoper.delete(subscribe_id)
self.subscribehelper.sub_done_async(
{"tmdbid": subscribe.tmdbid, "doubanid": subscribe.doubanid}
)
self.post_message(
mtype=NotificationType.Subscribe,
title=f"{subscribe.name}({subscribe.year}) 第{subscribe.season}季 已取消订阅",
text=f"原因: 未在Bangumi收藏中找到该条目\n订阅用户: {subscribe.username}\n创建时间: {subscribe.date}",
image=subscribe.backdrop,
)
except Exception as e:
logger.error(f"删除订阅失败 {subscribe_id}: {str(e)}")
@staticmethod
def get_bgm_res(addr: str, id: int | str):
url = {
"UserCollections": f"https://api.bgm.tv/v0/users/{str(id)}/collections?subject_type=2",
"getEpisodes": f"https://api.bgm.tv/v0/episodes?subject_id={str(id)}&type=0&limit=1",
}
headers = {
"User-Agent": "wikrin/MoviePilot-Plugins (https://github.com/wikrin/MoviePilot-Plugins)"
}
return RequestUtils(headers=headers).get_res(url=url[addr])
@staticmethod
def are_dates(date_str1, date_str2, threshold_days: int = 7) -> bool:
"""对比两个日期字符串是否接近"""
date1 = datetime.datetime.strptime(date_str1, '%Y-%m-%d')
date2 = datetime.datetime.strptime(date_str2, '%Y-%m-%d')
return abs((date1 - date2).days) <= threshold_days
@db_query
def get_subscribe_history(self, db: Session = None) -> set:
"""获取已完成的订阅"""
try:
result = (
db.query(SubscribeHistory)
.filter(SubscribeHistory.bangumiid.isnot(None))
.all()
)
return {i.bangumiid for i in result}
except Exception as e:
logger.error(f"获取订阅历史失败: {str(e)}")
return set()
@staticmethod
def are_types_equal(
attribute_name: str, expected_type: Type[Any] = JSON(), class_=Subscribe
) -> bool:
"""比较类中属性的类型与expected_type是否一致"""
column = class_.__table__.columns.get(attribute_name)
if column is None:
raise AttributeError(
f"Class: {class_.__name__} 没有属性: '{attribute_name}'"
)
return isinstance(column.type, type(expected_type))

View File

@@ -0,0 +1,318 @@
from bs4 import BeautifulSoup
def form(sites_options) -> list:
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {'cols': 12, 'md': 3},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
},
}
],
},
{
'component': 'VCol',
'props': {'cols': 12, 'md': 3},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'notify',
'label': '自动取消订阅并通知',
},
}
],
},
{
'component': 'VCol',
'props': {'cols': 12, 'md': 3},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'total_change',
'label': '不跟随TMDB变动',
},
}
],
},
{
'component': 'VCol',
'props': {'cols': 12, 'md': 3},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '立即运行一次',
},
}
],
},
],
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {'cols': 8, 'md': 4},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'cron',
'label': '执行周期',
'placeholder': '5位cron表达式留空自动',
},
}
],
},
{
'component': 'VCol',
'props': {'cols': 8, 'md': 4},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'uid',
'label': 'UID/用户名',
'placeholder': '设置了用户名填写用户名否则填写UID',
},
},
],
},
{
'component': 'VCol',
'props': {'cols': 8, 'md': 4},
'content': [
{
'component': 'VSelect',
'props': {
'model': 'collection_type',
'label': '收藏类型',
'chips': True,
'multiple': True,
'items': [
{'title': '在看', 'value': 3},
{'title': '想看', 'value': 1},
],
},
}
],
},
],
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {'cols': 12, 'md': 6},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'include',
'label': '包含',
'placeholder': '暂未实现',
},
}
],
},
{
'component': 'VCol',
'props': {'cols': 12, 'md': 6},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'exclude',
'label': '排除',
'placeholder': '暂未实现',
},
}
],
},
],
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {'cols': 12, 'md': 6},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'save_path',
'label': '保存目录',
'placeholder': '留空自动',
},
}
],
},
{
'component': 'VCol',
'props': {'cols': 12, 'md': 6},
'content': [
{
'component': 'VSelect',
'props': {
'model': 'sites',
'label': '选择站点',
'chips': True,
'multiple': True,
'items': sites_options,
},
}
],
},
],
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
},
'content': parse_html(
'<p>注意: 该插件仅会将<strong>公开</strong>的收藏添加到<strong>订阅</strong>。</p>'
),
}
],
}
],
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
},
'content': parse_html(
'<p>注意: 开启<strong>自动取消订阅并通知</strong>后,已添加的订阅在下一次执行时若不在已选择的<strong>收藏类型</strong>中,将会被取消订阅。</p>'
),
}
],
}
],
},
],
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
},
'content': parse_html(
'<p>注意: 开启<strong>不跟随TMDB变动</strong>后,从<a href="https://bangumi.github.io/api/#/%E7%AB%A0%E8%8A%82/getEpisodes" target="_blank"><u>Bangumi API</u></a>获取的总集数将不再跟随TMDB的集数变动。</p>'
),
},
],
},
],
},
], {
"enabled": False,
"total_change": False,
"notify": False,
"onlyonce": False,
"cron": "",
"uid": "",
"collection_type": [3],
"include": "",
"exclude": "",
"save_path": "",
"sites": [],
}
def parse_html(html_string: str) -> list:
soup = BeautifulSoup(html_string, 'html.parser')
result: list = []
# 定义需要直接转为文本的标签
inline_text_tags = {'strong', 'u', 'em', 'b', 'i'}
def process_element(element: BeautifulSoup):
# 处理纯文本节点
if element.name is None:
text = element.strip()
return text if text else ""
# 处理HTML标签
component = element.name
props = {attr: element[attr] for attr in element.attrs}
content = []
# 递归处理子元素
for child in element.children:
child_content = process_element(child)
if isinstance(child_content, str):
content.append({'component': 'span', 'text': child_content})
elif child_content: # 只有在child_content不为空时添加
content.append(child_content)
# 构建标签对象
tag_data = {
'component': component,
'props': props,
'content': content if component not in inline_text_tags else [],
}
if content and component in inline_text_tags:
tag_data['text'] = ' '.join(
item['text'] for item in content if 'text' in item
)
return tag_data
# 遍历所有子元素
for element in soup.children:
element_content = process_element(element)
if element_content: # 只增加非空内容
result.append(element_content)
return result