mirror of
https://github.com/d0zingcat/MoviePilot-Plugins.git
synced 2026-05-19 15:09:29 +00:00
代码结构优化
This commit is contained in:
@@ -3,12 +3,13 @@
|
||||
"name": "Bangumi收藏订阅",
|
||||
"description": "Bangumi用户收藏添加到订阅",
|
||||
"labels": "订阅",
|
||||
"version": "1.3.1",
|
||||
"version": "1.4",
|
||||
"icon": "https://raw.githubusercontent.com/wikrin/MoviePilot-Plugins/main/icons/bangumi_b.png",
|
||||
"author": "Attente",
|
||||
"level": 1,
|
||||
"v2": true,
|
||||
"history": {
|
||||
"v1.4": "结构优化",
|
||||
"v1.3.1": "修复因修改季号导致未下载剧集而完成订阅的问题",
|
||||
"v1.3": "添加订阅逻辑优化",
|
||||
"v1.2.2": "新增: 订阅添加失败总览 修复: 其他方式添加的订阅反复添加的问题",
|
||||
|
||||
@@ -16,7 +16,6 @@ from app.log import logger
|
||||
from app.plugins import _PluginBase
|
||||
from app.schemas.types import NotificationType
|
||||
from app.utils.http import RequestUtils
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from sqlalchemy import JSON
|
||||
@@ -31,7 +30,7 @@ class BangumiColl(_PluginBase):
|
||||
# 插件图标
|
||||
plugin_icon = "https://raw.githubusercontent.com/wikrin/MoviePilot-Plugins/main/icons/bangumi_b.png"
|
||||
# 插件版本
|
||||
plugin_version = "1.3.1"
|
||||
plugin_version = "1.4"
|
||||
# 插件作者
|
||||
plugin_author = "Attente"
|
||||
# 作者主页
|
||||
@@ -69,45 +68,43 @@ class BangumiColl(_PluginBase):
|
||||
|
||||
# 停止现有任务
|
||||
self.stop_service()
|
||||
self.load_config(config)
|
||||
|
||||
# 配置
|
||||
if self._onlyonce:
|
||||
self.schedule_once()
|
||||
|
||||
def load_config(self, config: dict):
|
||||
"""加载配置"""
|
||||
if config:
|
||||
self._enabled = config.get("enabled")
|
||||
self._cron = config.get("cron")
|
||||
self._notify = config.get("notify")
|
||||
self._onlyonce = config.get("onlyonce")
|
||||
self._include = config.get("include")
|
||||
self._exclude = config.get("exclude")
|
||||
self._uid = config.get("uid")
|
||||
self._collection_type = config.get("collection_type") or [3]
|
||||
self._save_path = config.get("save_path")
|
||||
self._sites = config.get("sites")
|
||||
self._enabled = config.get("enabled", self._enabled)
|
||||
self._cron = config.get("cron", self._cron)
|
||||
self._notify = config.get("notify", self._notify)
|
||||
self._onlyonce = config.get("onlyonce", self._onlyonce)
|
||||
self._include = config.get("include", self._include)
|
||||
self._exclude = config.get("exclude", self._exclude)
|
||||
self._uid = config.get("uid", self._uid)
|
||||
self._collection_type = config.get("collection_type", [3])
|
||||
self._save_path = config.get("save_path", self._save_path)
|
||||
self._sites = config.get("sites", self._sites)
|
||||
|
||||
if self._onlyonce:
|
||||
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
|
||||
logger.info(f"Bangumi收藏订阅启动,立即运行一次")
|
||||
self._scheduler.add_job(
|
||||
func=self.bangumi_coll,
|
||||
trigger='date',
|
||||
run_date=datetime.datetime.now(tz=pytz.timezone(settings.TZ))
|
||||
+ datetime.timedelta(seconds=3),
|
||||
)
|
||||
def schedule_once(self):
|
||||
"""调度一次性任务"""
|
||||
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
|
||||
logger.info("Bangumi收藏订阅,立即运行一次")
|
||||
self._scheduler.add_job(
|
||||
func=self.bangumi_coll,
|
||||
trigger='date',
|
||||
run_date=datetime.datetime.now(tz=pytz.timezone(settings.TZ))
|
||||
+ datetime.timedelta(seconds=3),
|
||||
)
|
||||
self._scheduler.start()
|
||||
|
||||
# 启动任务
|
||||
if self._scheduler.get_jobs():
|
||||
self._scheduler.print_jobs()
|
||||
self._scheduler.start()
|
||||
|
||||
if self._onlyonce:
|
||||
# 关闭一次性开关
|
||||
self._onlyonce = False
|
||||
# 保存设置
|
||||
self.__update_config()
|
||||
# 关闭一次性开关
|
||||
self._onlyonce = False
|
||||
self.__update_config()
|
||||
|
||||
def __update_config(self):
|
||||
"""
|
||||
更新设置
|
||||
"""
|
||||
"""更新设置"""
|
||||
self.update_config(
|
||||
{
|
||||
"enabled": self._enabled,
|
||||
@@ -123,12 +120,6 @@ class BangumiColl(_PluginBase):
|
||||
}
|
||||
)
|
||||
|
||||
def get_api(self):
|
||||
pass
|
||||
|
||||
def get_command(self):
|
||||
pass
|
||||
|
||||
def get_form(self):
|
||||
# 列出所有站点
|
||||
sites_options = [
|
||||
@@ -324,254 +315,129 @@ class BangumiColl(_PluginBase):
|
||||
"sites": [],
|
||||
}
|
||||
|
||||
def get_page(self):
|
||||
pass
|
||||
|
||||
# 注册定时任务
|
||||
def get_service(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
注册插件公共服务
|
||||
[{
|
||||
"id": "服务ID",
|
||||
"name": "服务名称",
|
||||
"trigger": "触发器:cron/interval/date/CronTrigger.from_crontab()",
|
||||
"func": self.xxx,
|
||||
"kwargs": {} # 定时器参数
|
||||
}]
|
||||
"""
|
||||
if self._enabled and self._cron:
|
||||
"""注册插件公共服务"""
|
||||
if self._enabled:
|
||||
trigger = CronTrigger.from_crontab(self._cron) if self._cron else "interval"
|
||||
kwargs = {"hours": 6} if not self._cron else {}
|
||||
return [
|
||||
{
|
||||
"id": "BangumiColl",
|
||||
"name": "Bangumi收藏订阅",
|
||||
"trigger": CronTrigger.from_crontab(self._cron),
|
||||
"trigger": trigger,
|
||||
"func": self.bangumi_coll,
|
||||
"kwargs": {},
|
||||
}
|
||||
]
|
||||
elif self._enabled:
|
||||
return [
|
||||
{
|
||||
"id": "BangumiColl",
|
||||
"name": "Bangumi收藏订阅",
|
||||
"trigger": "interval",
|
||||
"func": self.bangumi_coll,
|
||||
"kwargs": {"hours": 6},
|
||||
"kwargs": kwargs,
|
||||
}
|
||||
]
|
||||
return []
|
||||
|
||||
def stop_service(self):
|
||||
"""
|
||||
退出插件
|
||||
"""
|
||||
"""退出插件"""
|
||||
try:
|
||||
if self._scheduler:
|
||||
self._scheduler.remove_all_jobs()
|
||||
if self._scheduler.running:
|
||||
self._scheduler.shutdown()
|
||||
self._scheduler.shutdown()
|
||||
self._scheduler = None
|
||||
except Exception as e:
|
||||
logger.error("退出插件失败:%s" % str(e))
|
||||
logger.error(f"退出插件失败:{str(e)}")
|
||||
|
||||
def get_api(self):
|
||||
pass
|
||||
|
||||
def get_command(self):
|
||||
pass
|
||||
|
||||
def get_page(self):
|
||||
pass
|
||||
|
||||
def get_state(self):
|
||||
return self._enabled
|
||||
|
||||
def bangumi_coll(self):
|
||||
"""
|
||||
订阅Bangumi用户收藏
|
||||
"""
|
||||
"""订阅Bangumi用户收藏"""
|
||||
if not self._uid:
|
||||
logger.error("请设置UID")
|
||||
return
|
||||
|
||||
# 获取收藏列表
|
||||
res = self.get_bgm_res(addr="UserCollections", id=self._uid)
|
||||
res = res.json().get("data")
|
||||
if not res:
|
||||
logger.error(f"Bangumi用户:{self._uid} ,没有任何收藏")
|
||||
try:
|
||||
res = self.get_bgm_res(addr="UserCollections", id=self._uid)
|
||||
items = self.parse_collection_items(res)
|
||||
|
||||
# 解析出必要数据
|
||||
items: Dict[int, Dict[str, Any]] = {}
|
||||
logger.info(f"解析Bangumi条目信息...")
|
||||
for item in res:
|
||||
# 新增和移除条目
|
||||
self.manage_subscriptions(items)
|
||||
|
||||
logger.info("Bangumi收藏订阅执行完成")
|
||||
except Exception as e:
|
||||
logger.error(f"执行失败: {str(e)}")
|
||||
|
||||
def parse_collection_items(self, response) -> Dict[int, Dict[str, Any]]:
|
||||
"""解析获取的收藏条目"""
|
||||
data = response.json().get("data")
|
||||
if not data:
|
||||
logger.error(f"Bangumi用户:{self._uid} ,没有任何收藏")
|
||||
return {}
|
||||
|
||||
items = {}
|
||||
logger.info("解析Bangumi条目信息...")
|
||||
for item in data:
|
||||
if item.get("type") not in self._collection_type:
|
||||
logger.debug(
|
||||
f"条目: {item['subject'].get('name_cn')} 类型:{item.get('type')} 不符合"
|
||||
)
|
||||
continue
|
||||
|
||||
# 条目id
|
||||
subject_id = item.get("subject_id")
|
||||
# 主标题
|
||||
name = item['subject'].get('name')
|
||||
# 中文标题
|
||||
name_cn = item['subject'].get('name_cn')
|
||||
# 放送时间
|
||||
date = item['subject'].get('date')
|
||||
# 集数
|
||||
eps = item['subject'].get('eps')
|
||||
## 这里在后面添加排除规则
|
||||
items.update(
|
||||
{
|
||||
subject_id: {
|
||||
"name": name,
|
||||
"name_cn": name_cn,
|
||||
"date": date,
|
||||
"eps": eps,
|
||||
}
|
||||
}
|
||||
)
|
||||
## 获取已添加的订阅
|
||||
items[item.get("subject_id")] = {
|
||||
"name": item['subject'].get('name'),
|
||||
"name_cn": item['subject'].get('name_cn'),
|
||||
"date": item['subject'].get('date'),
|
||||
"eps": item['subject'].get('eps'),
|
||||
}
|
||||
return items
|
||||
|
||||
def manage_subscriptions(self, items: Dict[int, Dict[str, Any]]):
|
||||
"""管理订阅的新增和删除"""
|
||||
db_sub = {
|
||||
i.bangumiid: i.id
|
||||
for i in self.subscribechain.subscribeoper.list()
|
||||
if i.bangumiid
|
||||
}
|
||||
## 获取历史订阅
|
||||
db_hist = self.get_subscribe_history()
|
||||
# 新增条目
|
||||
new_sub = items.keys() - db_sub.keys() - db_hist
|
||||
logger.debug(f"待新增条目:{new_sub}")
|
||||
# 移除条目
|
||||
del_sub = db_sub.keys() - items.keys()
|
||||
|
||||
logger.debug(f"待新增条目:{new_sub}")
|
||||
logger.debug(f"待移除条目:{del_sub}")
|
||||
|
||||
logger.info(f"解析Bangumi条目信息完成,共{len(items)}条,新增{len(new_sub)}条")
|
||||
|
||||
# 执行移除操作
|
||||
if del_sub and self._notify:
|
||||
# 数据库id为键,bgm条目id为值
|
||||
del_items = {db_sub[i]: i for i in del_sub}
|
||||
logger.info(f"开始移除订阅...")
|
||||
logger.info("开始移除订阅...")
|
||||
self.delete_subscribe(del_items)
|
||||
|
||||
# 执行添加操作
|
||||
if new_sub:
|
||||
# bgm条目id为键,bgm条目信息为值
|
||||
new_sub = {i: items[i] for i in new_sub}
|
||||
logger.info(f"开始添加订阅...")
|
||||
msg = self.add_subscribe(new_sub)
|
||||
logger.info("开始添加订阅...")
|
||||
msg = self.add_subscribe({i: items[i] for i in new_sub})
|
||||
if msg:
|
||||
# 订阅失败的条目打印至日志
|
||||
logger.info("\n".ljust(49, ' ').join(list(msg.values())))
|
||||
|
||||
# 结束
|
||||
logger.info(f"Bangumi收藏订阅执行完成")
|
||||
|
||||
# 添加订阅
|
||||
def add_subscribe(self, items: Dict[int, Dict[str, Any]]) -> Dict:
|
||||
'''
|
||||
添加订阅
|
||||
:param items: bgm条目id为键,bgm条目信息为值
|
||||
'''
|
||||
|
||||
# 记录失败条目
|
||||
"""添加订阅"""
|
||||
fail_items = {}
|
||||
# 格式化站点
|
||||
sites = (
|
||||
self._sites
|
||||
if self.are_types_equal(attribute_name='sites')
|
||||
else json.dumps(self._sites)
|
||||
)
|
||||
|
||||
for subject_id, item in items.items():
|
||||
for self._subid, item in items.items():
|
||||
meta = MetaInfo(item.get("name_cn"))
|
||||
if not meta.name:
|
||||
fail_items.update(
|
||||
{subject_id: f"{item.get('name_cn')} 未识别到有效数据"}
|
||||
)
|
||||
fail_items[self._subid] = f"{item.get('name_cn')} 未识别到有效数据"
|
||||
logger.warn(f"{item.get('name_cn')} 未识别到有效数据")
|
||||
continue
|
||||
# 设置默认年份, 避免出现多个结果使用早期条目
|
||||
meta.year = item.get("date")[:4]
|
||||
mediainfo: MediaInfo = self.chain.recognize_media(meta=meta)
|
||||
# 识别失败则跳过
|
||||
|
||||
meta.year = item.get("date")[:4] if item.get("date") else None
|
||||
mediainfo = self.chain.recognize_media(meta=meta)
|
||||
if not mediainfo:
|
||||
fail_items.update(
|
||||
{subject_id: f"{item.get('name_cn')} 媒体信息识别失败"}
|
||||
)
|
||||
fail_items[self._subid] = f"{item.get('name_cn')} 媒体信息识别失败"
|
||||
continue
|
||||
# 对比Bangumi和tmdb的信息确定季度
|
||||
for info in mediainfo.season_info:
|
||||
# 对比日期, 误差默认7天
|
||||
if not self.are_dates(item.get("date"), info.get("air_date")):
|
||||
continue
|
||||
else:
|
||||
# 更新季度信息
|
||||
mediainfo.number_of_seasons = info.get("season_number")
|
||||
# 更新集数信息
|
||||
mediainfo.number_of_episodes = info.get("episode_count")
|
||||
|
||||
# 总集数
|
||||
total_episode = len(
|
||||
mediainfo.seasons.get(mediainfo.number_of_seasons) or []
|
||||
)
|
||||
# 额外参数
|
||||
kwargs = {
|
||||
"save_path": self._save_path,
|
||||
"sites": sites,
|
||||
"total_episode": total_episode,
|
||||
}
|
||||
self.update_media_info(item, mediainfo)
|
||||
|
||||
# 对比BGM 和 TMDB 季度和集数
|
||||
if (
|
||||
meta.begin_season
|
||||
and mediainfo.number_of_seasons != meta.begin_season
|
||||
or (item.get('eps') != 0 and total_episode != item.get('eps'))
|
||||
or (item.get('eps') == 0 and not total_episode >= 12)
|
||||
):
|
||||
'''
|
||||
1. 标题识别到季度且与tmdb不一致
|
||||
2. eps 字段为0且总集数小于12
|
||||
3. eps 字段不为0且总集数与Bangumi集数不一致
|
||||
'''
|
||||
logger.info(f"{mediainfo.title_year} 与Bangumi的集数不一致")
|
||||
|
||||
def get_eps(id: str, addr: str = "getEpisodes") -> tuple:
|
||||
"""
|
||||
获取Bangumi条目的集数信息
|
||||
:param id: bangumi条目id
|
||||
:param addr: API地址
|
||||
"""
|
||||
ep: int = 1
|
||||
sort: int = 1
|
||||
total: int = 24
|
||||
res = self.get_bgm_res(addr=addr, id=id)
|
||||
res = res.json()
|
||||
data = res.get("data")
|
||||
if data:
|
||||
# 当前季的集数
|
||||
ep = data[0].get("ep")
|
||||
# 系列的累计集数
|
||||
sort = data[0].get("sort")
|
||||
# 当前集的总集数
|
||||
total = res.get("total")
|
||||
begin_ep = sort - ep + 1
|
||||
total_ep = sort - ep + total
|
||||
return begin_ep, total_ep
|
||||
|
||||
begin_ep, total_ep = get_eps(id=subject_id)
|
||||
prev_eps = [i for i in range(1, begin_ep)]
|
||||
# 更新参数
|
||||
note = (
|
||||
prev_eps
|
||||
if self.are_types_equal(attribute_name='note')
|
||||
else json.dumps(prev_eps)
|
||||
)
|
||||
kwargs.update(
|
||||
{
|
||||
"total_episode": total_ep, # 总集数
|
||||
"start_episode": begin_ep, # 开始集数
|
||||
"lack_episode": total_ep - begin_ep + 1, # 缺失集数
|
||||
"note": note, # 忽略之前季度的集数
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
f"{mediainfo.title_year} 更新总集数为: {total_ep},开始集数为: {begin_ep}"
|
||||
)
|
||||
|
||||
# 检查是否已经订阅, 添加bangumiid
|
||||
sid = self.subscribeoper.list_by_tmdbid(
|
||||
mediainfo.tmdb_id, mediainfo.number_of_seasons
|
||||
)
|
||||
@@ -579,55 +445,114 @@ class BangumiColl(_PluginBase):
|
||||
logger.info(f"{mediainfo.title_year} {meta.season} 正在订阅中")
|
||||
if len(sid) == 1:
|
||||
self.subscribeoper.update(
|
||||
sid=sid[0].id, payload={"bangumiid": subject_id}
|
||||
sid=sid[0].id, payload={"bangumiid": self._subid}
|
||||
)
|
||||
logger.info(
|
||||
f"{mediainfo.title_year} {meta.season} Bangumi条目id更新成功"
|
||||
)
|
||||
continue
|
||||
|
||||
# 添加到订阅
|
||||
sid, msg = self.subscribechain.add(
|
||||
title=mediainfo.title,
|
||||
year=mediainfo.year,
|
||||
mtype=mediainfo.type,
|
||||
tmdbid=mediainfo.tmdb_id,
|
||||
bangumiid=subject_id,
|
||||
bangumiid=self._subid,
|
||||
season=mediainfo.number_of_seasons,
|
||||
exist_ok=True,
|
||||
username="Bangumi订阅",
|
||||
**kwargs,
|
||||
**self.prepare_kwargs(item, meta.begin_season, mediainfo),
|
||||
)
|
||||
if not sid:
|
||||
fail_items.update({subject_id: f"{item.get('name_cn')} {msg}"})
|
||||
continue
|
||||
fail_items[self._subid] = f"{item.get('name_cn')} {msg}"
|
||||
|
||||
return fail_items
|
||||
|
||||
def prepare_kwargs(self, item: dict, meta_season: int, mediainfo: MediaInfo):
|
||||
"""准备额外参数"""
|
||||
kwargs = {
|
||||
"save_path": self._save_path,
|
||||
"sites": (
|
||||
self._sites
|
||||
if self.are_types_equal(attribute_name='sites')
|
||||
else json.dumps(self._sites)
|
||||
),
|
||||
}
|
||||
|
||||
if self.check_series_info(meta_season, item.get("eps", 0), mediainfo):
|
||||
begin_ep, total_ep = self.get_eps()
|
||||
prev_eps: list = [i for i in range(1, begin_ep)]
|
||||
kwargs.update(
|
||||
{
|
||||
"total_episode": total_ep,
|
||||
"start_episode": begin_ep,
|
||||
"lack_episode": total_ep - begin_ep + 1,
|
||||
"note": (
|
||||
prev_eps
|
||||
if self.are_types_equal("note")
|
||||
else json.dumps(prev_eps)
|
||||
),
|
||||
}
|
||||
)
|
||||
logger.info(
|
||||
f"{mediainfo.title_year} 更新总集数为: {total_ep},开始集数为: {begin_ep}"
|
||||
)
|
||||
|
||||
return kwargs
|
||||
|
||||
@staticmethod
|
||||
def check_series_info(meta_season: int, bgm_eps: int, mediainfo: MediaInfo) -> bool:
|
||||
"""检查系列信息是否不一致"""
|
||||
total_episode = len(mediainfo.seasons.get(mediainfo.number_of_seasons) or [])
|
||||
return (
|
||||
meta_season
|
||||
and mediainfo.number_of_seasons != meta_season
|
||||
or (bgm_eps != 0 and total_episode != bgm_eps)
|
||||
or (bgm_eps == 0 and not total_episode >= 12)
|
||||
)
|
||||
|
||||
def update_media_info(self, item, mediainfo):
|
||||
"""更新媒体信息"""
|
||||
for info in mediainfo.season_info:
|
||||
if self.are_dates(item.get("date"), info.get("air_date")):
|
||||
mediainfo.number_of_seasons = info.get("season_number")
|
||||
mediainfo.number_of_episodes = info.get("episode_count")
|
||||
break
|
||||
|
||||
def get_eps(self) -> tuple:
|
||||
"""获取Bangumi条目的集数信息"""
|
||||
try:
|
||||
res = self.get_bgm_res(addr="getEpisodes", id=self._subid)
|
||||
data = res.json().get("data", [{}])[0]
|
||||
ep = data.get("ep", 1)
|
||||
sort = data.get("sort", 1)
|
||||
total = res.json().get("total", 24)
|
||||
begin_ep = sort - ep + 1
|
||||
total_ep = sort - ep + total
|
||||
return begin_ep, total_ep
|
||||
except Exception as e:
|
||||
logger.error(f"获取集数信息失败: {str(e)}")
|
||||
return 1, 24 # 默认值
|
||||
|
||||
# 移除订阅
|
||||
def delete_subscribe(self, del_items: Dict[int, int]):
|
||||
'''
|
||||
删除订阅
|
||||
:param del_items: 数据库id为键,bgm条目id为值
|
||||
'''
|
||||
args = [i for i in del_items.keys()]
|
||||
for arg in args:
|
||||
subscribe_id = int(arg)
|
||||
subscribe = self.subscribeoper.get(subscribe_id)
|
||||
if subscribe:
|
||||
self.subscribeoper.delete(subscribe_id)
|
||||
# 统计订阅
|
||||
self.subscribehelper.sub_done_async(
|
||||
{"tmdbid": subscribe.tmdbid, "doubanid": subscribe.doubanid}
|
||||
)
|
||||
# 发送通知
|
||||
self.post_message(
|
||||
mtype=NotificationType.Subscribe,
|
||||
title=f"{subscribe.name}({subscribe.year}) 第{subscribe.season}季 已取消订阅",
|
||||
text="原因: 未在Bangumi收藏中找到该条目\n"
|
||||
+ f"订阅用户: {subscribe.username}\n"
|
||||
+ f"创建时间: {subscribe.date}",
|
||||
image=subscribe.backdrop,
|
||||
)
|
||||
"""删除订阅"""
|
||||
for subscribe_id in del_items.keys():
|
||||
try:
|
||||
subscribe = self.subscribeoper.get(subscribe_id)
|
||||
if subscribe:
|
||||
self.subscribeoper.delete(subscribe_id)
|
||||
self.subscribehelper.sub_done_async(
|
||||
{"tmdbid": subscribe.tmdbid, "doubanid": subscribe.doubanid}
|
||||
)
|
||||
self.post_message(
|
||||
mtype=NotificationType.Subscribe,
|
||||
title=f"{subscribe.name}({subscribe.year}) 第{subscribe.season}季 已取消订阅",
|
||||
text=f"原因: 未在Bangumi收藏中找到该条目\n订阅用户: {subscribe.username}\n创建时间: {subscribe.date}",
|
||||
image=subscribe.backdrop,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"删除订阅失败 {subscribe_id}: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def get_bgm_res(addr: str, id: int | str):
|
||||
@@ -642,45 +567,30 @@ class BangumiColl(_PluginBase):
|
||||
|
||||
@staticmethod
|
||||
def are_dates(date_str1, date_str2, threshold_days: int = 7) -> bool:
|
||||
"""
|
||||
对比两个日期字符串是否接近
|
||||
:param date_str: 日期字符串,格式为'YYYY-MM-DD'
|
||||
:param threshold_days: 阈值天数,默认为7天
|
||||
:return: 如果两个日期之间的差异小于等于阈值天数,则返回True,否则返回False
|
||||
"""
|
||||
# 将日期字符串转换为datetime对象
|
||||
"""对比两个日期字符串是否接近"""
|
||||
date1 = datetime.datetime.strptime(date_str1, '%Y-%m-%d')
|
||||
date2 = datetime.datetime.strptime(date_str2, '%Y-%m-%d')
|
||||
|
||||
# 计算两个日期之间的差异
|
||||
delta = abs(date1 - date2)
|
||||
|
||||
# 将阈值转换为timedelta对象
|
||||
threshold = datetime.timedelta(days=threshold_days)
|
||||
|
||||
# 比较差异和阈值
|
||||
return delta <= threshold
|
||||
return abs((date1 - date2).days) <= threshold_days
|
||||
|
||||
@db_query
|
||||
def get_subscribe_history(self, db: Session = None) -> set:
|
||||
'''
|
||||
获取已完成的订阅
|
||||
'''
|
||||
result = (
|
||||
db.query(SubscribeHistory).filter(SubscribeHistory.bangumiid != None).all()
|
||||
)
|
||||
return set([i.bangumiid for i in result])
|
||||
"""获取已完成的订阅"""
|
||||
try:
|
||||
result = (
|
||||
db.query(SubscribeHistory)
|
||||
.filter(SubscribeHistory.bangumiid.isnot(None))
|
||||
.all()
|
||||
)
|
||||
return {i.bangumiid for i in result}
|
||||
except Exception as e:
|
||||
logger.error(f"获取订阅历史失败: {str(e)}")
|
||||
return set()
|
||||
|
||||
@staticmethod
|
||||
def are_types_equal(
|
||||
attribute_name: str, expected_type: Type[Any] = JSON(), class_=Subscribe
|
||||
) -> bool:
|
||||
"""
|
||||
比较类中属性的类型与expected_type是否一致
|
||||
:param class_: 类
|
||||
:param attribute_name: 属性名称
|
||||
:param expected_type: 期望的类型
|
||||
"""
|
||||
"""比较类中属性的类型与expected_type是否一致"""
|
||||
column = class_.__table__.columns.get(attribute_name)
|
||||
if column is None:
|
||||
raise AttributeError(
|
||||
|
||||
Reference in New Issue
Block a user