refactor: 添加订阅协程处理

This commit is contained in:
jxxghp
2025-08-02 09:14:38 +08:00
parent e5d2ade6e6
commit c2c8214075
8 changed files with 479 additions and 89 deletions

View File

@@ -17,7 +17,7 @@ from app.db.models.subscribe import Subscribe
from app.db.models.subscribehistory import SubscribeHistory
from app.db.models.user import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.db.user_oper import get_current_active_user_async
from app.helper.subscribe import SubscribeHelper
from app.scheduler import Scheduler
from app.schemas.types import MediaType, EventType, SystemConfigKey
@@ -53,10 +53,10 @@ async def list_subscribes(_: Annotated[str, Depends(verify_apitoken)]) -> Any:
@router.post("/", summary="新增订阅", response_model=schemas.Response)
def create_subscribe(
async def create_subscribe(
*,
subscribe_in: schemas.Subscribe,
current_user: User = Depends(get_current_active_user),
current_user: User = Depends(get_current_active_user_async),
) -> schemas.Response:
"""
新增订阅
@@ -78,10 +78,10 @@ def create_subscribe(
title = None
# 订阅用户
subscribe_in.username = current_user.name
sid, message = SubscribeChain().add(mtype=mtype,
title=title,
exist_ok=True,
**subscribe_in.dict())
sid, message = await SubscribeChain().async_add(mtype=mtype,
title=title,
exist_ok=True,
**subscribe_in.dict())
return schemas.Response(
success=bool(sid), message=message, data={"id": sid}
)
@@ -495,9 +495,9 @@ async def subscribe_share_delete(
@router.post("/fork", summary="复用订阅", response_model=schemas.Response)
def subscribe_fork(
async def subscribe_fork(
sub: schemas.SubscribeShare,
current_user: User = Depends(get_current_active_user)) -> Any:
current_user: User = Depends(get_current_active_user_async)) -> Any:
"""
复用订阅
"""
@@ -506,10 +506,10 @@ def subscribe_fork(
for key in list(sub_dict.keys()):
if not hasattr(schemas.Subscribe(), key):
sub_dict.pop(key)
result = create_subscribe(subscribe_in=schemas.Subscribe(**sub_dict),
current_user=current_user)
result = await create_subscribe(subscribe_in=schemas.Subscribe(**sub_dict),
current_user=current_user)
if result.success:
SubscribeHelper().sub_fork(share_id=sub.id)
await SubscribeHelper().async_sub_fork(share_id=sub.id)
return result

View File

@@ -6,8 +6,8 @@ from sqlalchemy.orm import Session
from app import schemas
from app.chain.media import MediaChain
from app.chain.tvdb import TvdbChain
from app.chain.subscribe import SubscribeChain
from app.chain.tvdb import TvdbChain
from app.core.metainfo import MetaInfo
from app.core.security import verify_apikey
from app.db import get_db, get_async_db
@@ -307,7 +307,8 @@ def arr_movie_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db: S
@arr_router.get("/movie/{mid}", summary="电影订阅详情", response_model=schemas.RadarrMovie)
async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)],
db: AsyncSession = Depends(get_async_db)) -> Any:
"""
查询Rardar电影订阅
"""
@@ -333,25 +334,25 @@ async def arr_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: Asy
@arr_router.post("/movie", summary="新增电影订阅")
def arr_add_movie(_: Annotated[str, Depends(verify_apikey)],
movie: RadarrMovie,
db: Session = Depends(get_db)
) -> Any:
async def arr_add_movie(_: Annotated[str, Depends(verify_apikey)],
movie: RadarrMovie,
db: AsyncSession = Depends(get_async_db)
) -> Any:
"""
新增Rardar电影订阅
"""
# 检查订阅是否已存在
subscribe = Subscribe.get_by_tmdbid(db, movie.tmdbId)
subscribe = await Subscribe.async_get_by_tmdbid(db, movie.tmdbId)
if subscribe:
return {
"id": subscribe.id
}
# 添加订阅
sid, message = SubscribeChain().add(title=movie.title,
year=movie.year,
mtype=MediaType.MOVIE,
tmdbid=movie.tmdbId,
username="Seerr")
sid, message = await SubscribeChain().async_add(title=movie.title,
year=movie.year,
mtype=MediaType.MOVIE,
tmdbid=movie.tmdbId,
username="Seerr")
if sid:
return {
"id": sid
@@ -364,7 +365,8 @@ def arr_add_movie(_: Annotated[str, Depends(verify_apikey)],
@arr_router.delete("/movie/{mid}", summary="删除电影订阅", response_model=schemas.Response)
async def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
async def arr_remove_movie(mid: int, _: Annotated[str, Depends(verify_apikey)],
db: AsyncSession = Depends(get_async_db)) -> Any:
"""
删除Rardar电影订阅
"""
@@ -606,7 +608,8 @@ def arr_series_lookup(term: str, _: Annotated[str, Depends(verify_apikey)], db:
@arr_router.get("/series/{tid}", summary="剧集详情")
async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)],
db: AsyncSession = Depends(get_async_db)) -> Any:
"""
查询Sonarr剧集
"""
@@ -640,17 +643,17 @@ async def arr_serie(tid: int, _: Annotated[str, Depends(verify_apikey)], db: Asy
@arr_router.post("/series", summary="新增剧集订阅")
def arr_add_series(tv: schemas.SonarrSeries,
_: Annotated[str, Depends(verify_apikey)],
db: Session = Depends(get_db)) -> Any:
async def arr_add_series(tv: schemas.SonarrSeries,
_: Annotated[str, Depends(verify_apikey)],
db: AsyncSession = Depends(get_async_db)) -> Any:
"""
新增Sonarr剧集订阅
"""
# 检查订阅是否存在
left_seasons = []
for season in tv.seasons:
subscribe = Subscribe.get_by_tmdbid(db, tmdbid=tv.tmdbId,
season=season.get("seasonNumber"))
subscribe = await Subscribe.async_get_by_tmdbid(db, tmdbid=tv.tmdbId,
season=season.get("seasonNumber"))
if subscribe:
continue
left_seasons.append(season)
@@ -665,12 +668,12 @@ def arr_add_series(tv: schemas.SonarrSeries,
for season in left_seasons:
if not season.get("monitored"):
continue
sid, message = SubscribeChain().add(title=tv.title,
year=tv.year,
season=season.get("seasonNumber"),
tmdbid=tv.tmdbId,
mtype=MediaType.TV,
username="Seerr")
sid, message = await SubscribeChain().async_add(title=tv.title,
year=tv.year,
season=season.get("seasonNumber"),
tmdbid=tv.tmdbId,
mtype=MediaType.TV,
username="Seerr")
if sid:
return {
@@ -684,15 +687,16 @@ def arr_add_series(tv: schemas.SonarrSeries,
@arr_router.put("/series", summary="更新剧集订阅")
def arr_update_series(tv: schemas.SonarrSeries) -> Any:
async def arr_update_series(tv: schemas.SonarrSeries, _: Annotated[str, Depends(verify_apikey)]) -> Any:
"""
更新Sonarr剧集订阅
"""
return arr_add_series(tv)
return await arr_add_series(tv)
@arr_router.delete("/series/{tid}", summary="删除剧集订阅")
async def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)], db: AsyncSession = Depends(get_async_db)) -> Any:
async def arr_remove_series(tid: int, _: Annotated[str, Depends(verify_apikey)],
db: AsyncSession = Depends(get_async_db)) -> Any:
"""
删除Sonarr剧集订阅
"""

View File

@@ -922,6 +922,85 @@ class ChainBase(metaclass=ABCMeta):
self.messagequeue.send_message("post_message", message=message,
immediately=True if message.userid else False)
async def async_post_message(self,
message: Optional[Notification] = None,
meta: Optional[MetaBase] = None,
mediainfo: Optional[MediaInfo] = None,
torrentinfo: Optional[TorrentInfo] = None,
transferinfo: Optional[TransferInfo] = None,
**kwargs) -> None:
"""
异步发送消息
:param message: Notification实例
:param meta: 元数据
:param mediainfo: 媒体信息
:param torrentinfo: 种子信息
:param transferinfo: 文件整理信息
:param kwargs: 其他参数(覆盖业务对象属性值)
:return: 成功或失败
"""
# 渲染消息
message = MessageTemplateHelper.render(message=message, meta=meta, mediainfo=mediainfo,
torrentinfo=torrentinfo, transferinfo=transferinfo, **kwargs)
# 保存消息
self.messagehelper.put(message, role="user", title=message.title)
await self.messageoper.async_add(**message.dict())
# 发送消息按设置隔离
if not message.userid and message.mtype:
# 消息隔离设置
notify_action = ServiceConfigHelper.get_notification_switch(message.mtype)
if notify_action:
# 'admin' 'user,admin' 'user' 'all'
actions = notify_action.split(",")
# 是否已发送管理员标志
admin_sended = False
send_orignal = False
useroper = UserOper()
for action in actions:
send_message = copy.deepcopy(message)
if action == "admin" and not admin_sended:
# 仅发送管理员
logger.info(f"{send_message.mtype} 的消息已设置发送给管理员")
# 读取管理员消息IDS
send_message.targets = useroper.get_settings(settings.SUPERUSER)
admin_sended = True
elif action == "user" and send_message.username:
# 发送对应用户
logger.info(f"{send_message.mtype} 的消息已设置发送给用户 {send_message.username}")
# 读取用户消息IDS
send_message.targets = useroper.get_settings(send_message.username)
if send_message.targets is None:
# 没有找到用户
if not admin_sended:
# 回滚发送管理员
logger.info(f"用户 {send_message.username} 不存在,消息将发送给管理员")
# 读取管理员消息IDS
send_message.targets = useroper.get_settings(settings.SUPERUSER)
admin_sended = True
else:
# 管理员发过了,此消息不发了
logger.info(f"用户 {send_message.username} 不存在,消息无法发送到对应用户")
continue
elif send_message.username == settings.SUPERUSER:
# 管理员同名已发送
admin_sended = True
else:
# 按原消息发送全体
if not admin_sended:
send_orignal = True
break
# 按设定发送
await self.eventmanager.async_send_event(etype=EventType.NoticeMessage,
data={**send_message.dict(), "type": send_message.mtype})
await self.messagequeue.async_send_message("post_message", message=send_message)
if not send_orignal:
return
# 发送消息事件
await self.eventmanager.async_send_event(etype=EventType.NoticeMessage, data={**message.dict(), "type": message.mtype})
# 按原消息发送
await self.messagequeue.async_send_message("post_message", message=message,
immediately=True if message.userid else False)
def post_medias_message(self, message: Notification, medias: List[MediaInfo]) -> None:
"""
发送媒体信息选择列表

View File

@@ -41,6 +41,82 @@ class SubscribeChain(ChainBase):
# 避免莫名原因导致长时间持有锁
_LOCK_TIMOUT = 3600 * 2
@staticmethod
def __get_event_meida(_mediaid: str, _meta: MetaBase) -> Optional[MediaInfo]:
"""
广播事件解析媒体信息
"""
event_data = MediaRecognizeConvertEventData(
mediaid=_mediaid,
convert_type=settings.RECOGNIZE_SOURCE
)
event = eventmanager.send_event(ChainEventType.MediaRecognizeConvert, event_data)
# 使用事件返回的上下文数据
if event and event.event_data:
event_data: MediaRecognizeConvertEventData = event.event_data
if event_data.media_dict:
mediachain = MediaChain()
new_id = event_data.media_dict.get("id")
if event_data.convert_type == "themoviedb":
return mediachain.recognize_media(meta=_meta, tmdbid=new_id)
elif event_data.convert_type == "douban":
return mediachain.recognize_media(meta=_meta, doubanid=new_id)
return None
@staticmethod
async def __async_get_event_meida(_mediaid: str, _meta: MetaBase) -> Optional[MediaInfo]:
"""
广播事件解析媒体信息
"""
event_data = MediaRecognizeConvertEventData(
mediaid=_mediaid,
convert_type=settings.RECOGNIZE_SOURCE
)
event = await eventmanager.async_send_event(ChainEventType.MediaRecognizeConvert, event_data)
# 使用事件返回的上下文数据
if event and event.event_data:
event_data: MediaRecognizeConvertEventData = event.event_data
if event_data.media_dict:
mediachain = MediaChain()
new_id = event_data.media_dict.get("id")
if event_data.convert_type == "themoviedb":
return await mediachain.async_recognize_media(meta=_meta, tmdbid=new_id)
elif event_data.convert_type == "douban":
return await mediachain.async_recognize_media(meta=_meta, doubanid=new_id)
return None
def __get_default_kwargs(self, mtype: MediaType, **kwargs) -> dict:
"""
获取订阅默认配置
:param mtype: 媒体类型
:param key: 配置键
:return: 配置值
"""
return {
'quality': self.__get_default_subscribe_config(mtype, "quality") if not kwargs.get(
"quality") else kwargs.get("quality"),
'resolution': self.__get_default_subscribe_config(mtype, "resolution") if not kwargs.get(
"resolution") else kwargs.get("resolution"),
'effect': self.__get_default_subscribe_config(mtype, "effect") if not kwargs.get(
"effect") else kwargs.get("effect"),
'include': self.__get_default_subscribe_config(mtype, "include") if not kwargs.get(
"include") else kwargs.get("include"),
'exclude': self.__get_default_subscribe_config(mtype, "exclude") if not kwargs.get(
"exclude") else kwargs.get("exclude"),
'best_version': self.__get_default_subscribe_config(mtype, "best_version") if not kwargs.get(
"best_version") else kwargs.get("best_version"),
'search_imdbid': self.__get_default_subscribe_config(mtype, "search_imdbid") if not kwargs.get(
"search_imdbid") else kwargs.get("search_imdbid"),
'sites': self.__get_default_subscribe_config(mtype, "sites") or None if not kwargs.get(
"sites") else kwargs.get("sites"),
'downloader': self.__get_default_subscribe_config(mtype, "downloader") if not kwargs.get(
"downloader") else kwargs.get("downloader"),
'save_path': self.__get_default_subscribe_config(mtype, "save_path") if not kwargs.get(
"save_path") else kwargs.get("save_path"),
'filter_groups': self.__get_default_subscribe_config(mtype, "filter_groups") if not kwargs.get(
"filter_groups") else kwargs.get("filter_groups")
}
def add(self, title: str, year: str,
mtype: MediaType = None,
tmdbid: Optional[int] = None,
@@ -60,27 +136,6 @@ class SubscribeChain(ChainBase):
识别媒体信息并添加订阅
"""
def __get_event_meida(_mediaid: str, _meta: MetaBase) -> Optional[MediaInfo]:
"""
广播事件解析媒体信息
"""
event_data = MediaRecognizeConvertEventData(
mediaid=_mediaid,
convert_type=settings.RECOGNIZE_SOURCE
)
event = eventmanager.send_event(ChainEventType.MediaRecognizeConvert, event_data)
# 使用事件返回的上下文数据
if event and event.event_data:
event_data: MediaRecognizeConvertEventData = event.event_data
if event_data.media_dict:
mediachain = MediaChain()
new_id = event_data.media_dict.get("id")
if event_data.convert_type == "themoviedb":
return mediachain.recognize_media(meta=_meta, tmdbid=new_id)
elif event_data.convert_type == "douban":
return mediachain.recognize_media(meta=_meta, doubanid=new_id)
return None
logger.info(f'开始添加订阅,标题:{title} ...')
mediainfo = None
@@ -103,7 +158,7 @@ class SubscribeChain(ChainBase):
mediainfo = MediaInfo(tmdb_info=tmdbinfo)
elif mediaid:
# 未知前缀,广播事件解析媒体信息
mediainfo = __get_event_meida(mediaid, metainfo)
mediainfo = self.__get_event_meida(mediaid, metainfo)
else:
# 使用TMDBID识别
mediainfo = self.recognize_media(meta=metainfo, mtype=mtype, tmdbid=tmdbid,
@@ -114,7 +169,7 @@ class SubscribeChain(ChainBase):
mediainfo = self.recognize_media(meta=metainfo, mtype=mtype, doubanid=doubanid, cache=False)
elif mediaid:
# 未知前缀,广播事件解析媒体信息
mediainfo = __get_event_meida(mediaid, metainfo)
mediainfo = self.__get_event_meida(mediaid, metainfo)
if mediainfo:
# 豆瓣标题处理
meta = MetaInfo(mediainfo.title)
@@ -176,30 +231,8 @@ class SubscribeChain(ChainBase):
mediainfo.bangumi_id = bangumiid
# 添加订阅
kwargs.update({
'quality': self.__get_default_subscribe_config(mediainfo.type, "quality") if not kwargs.get(
"quality") else kwargs.get("quality"),
'resolution': self.__get_default_subscribe_config(mediainfo.type, "resolution") if not kwargs.get(
"resolution") else kwargs.get("resolution"),
'effect': self.__get_default_subscribe_config(mediainfo.type, "effect") if not kwargs.get(
"effect") else kwargs.get("effect"),
'include': self.__get_default_subscribe_config(mediainfo.type, "include") if not kwargs.get(
"include") else kwargs.get("include"),
'exclude': self.__get_default_subscribe_config(mediainfo.type, "exclude") if not kwargs.get(
"exclude") else kwargs.get("exclude"),
'best_version': self.__get_default_subscribe_config(mediainfo.type, "best_version") if not kwargs.get(
"best_version") else kwargs.get("best_version"),
'search_imdbid': self.__get_default_subscribe_config(mediainfo.type, "search_imdbid") if not kwargs.get(
"search_imdbid") else kwargs.get("search_imdbid"),
'sites': self.__get_default_subscribe_config(mediainfo.type, "sites") or None if not kwargs.get(
"sites") else kwargs.get("sites"),
'downloader': self.__get_default_subscribe_config(mediainfo.type, "downloader") if not kwargs.get(
"downloader") else kwargs.get("downloader"),
'save_path': self.__get_default_subscribe_config(mediainfo.type, "save_path") if not kwargs.get(
"save_path") else kwargs.get("save_path"),
'filter_groups': self.__get_default_subscribe_config(mediainfo.type, "filter_groups") if not kwargs.get(
"filter_groups") else kwargs.get("filter_groups")
})
kwargs.update(self.__get_default_kwargs(mediainfo.type, **kwargs))
# 操作数据库
sid, err_msg = SubscribeOper().add(mediainfo=mediainfo, season=season, username=username, **kwargs)
if not sid:
@@ -261,6 +294,183 @@ class SubscribeChain(ChainBase):
# 返回结果
return sid, ""
async def async_add(self, title: str, year: str,
mtype: MediaType = None,
tmdbid: Optional[int] = None,
doubanid: Optional[str] = None,
bangumiid: Optional[int] = None,
mediaid: Optional[str] = None,
episode_group: Optional[str] = None,
season: Optional[int] = None,
channel: MessageChannel = None,
source: Optional[str] = None,
userid: Optional[str] = None,
username: Optional[str] = None,
message: Optional[bool] = True,
exist_ok: Optional[bool] = False,
**kwargs) -> Tuple[Optional[int], str]:
"""
异步识别媒体信息并添加订阅
"""
logger.info(f'开始添加订阅,标题:{title} ...')
mediainfo = None
metainfo = MetaInfo(title)
if year:
metainfo.year = year
if mtype:
metainfo.type = mtype
if season:
metainfo.type = MediaType.TV
metainfo.begin_season = season
# 识别媒体信息
if settings.RECOGNIZE_SOURCE == "themoviedb":
# TMDB识别模式
if not tmdbid:
if doubanid:
# 将豆瓣信息转换为TMDB信息
tmdbinfo = await MediaChain().async_get_tmdbinfo_by_doubanid(doubanid=doubanid, mtype=mtype)
if tmdbinfo:
mediainfo = MediaInfo(tmdb_info=tmdbinfo)
elif mediaid:
# 未知前缀,广播事件解析媒体信息
mediainfo = await self.__async_get_event_meida(mediaid, metainfo)
else:
# 使用TMDBID识别
mediainfo = await self.async_recognize_media(meta=metainfo, mtype=mtype, tmdbid=tmdbid,
episode_group=episode_group, cache=False)
else:
if doubanid:
# 豆瓣识别模式,不使用缓存
mediainfo = await self.async_recognize_media(meta=metainfo, mtype=mtype, doubanid=doubanid, cache=False)
elif mediaid:
# 未知前缀,广播事件解析媒体信息
mediainfo = await self.__async_get_event_meida(mediaid, metainfo)
if mediainfo:
# 豆瓣标题处理
meta = MetaInfo(mediainfo.title)
mediainfo.title = meta.name
if not season:
season = meta.begin_season
# 使用名称识别兜底
if not mediainfo:
mediainfo = await self.async_recognize_media(meta=metainfo, episode_group=episode_group)
# 识别失败
if not mediainfo:
logger.warn(f'未识别到媒体信息,标题:{title}tmdbid{tmdbid}doubanid{doubanid}')
return None, "未识别到媒体信息"
# 总集数
if mediainfo.type == MediaType.TV:
if not season:
season = 1
# 总集数
if not kwargs.get('total_episode'):
if not mediainfo.seasons or episode_group:
# 补充媒体信息
mediainfo = await self.async_recognize_media(mtype=mediainfo.type,
tmdbid=mediainfo.tmdb_id,
doubanid=mediainfo.douban_id,
bangumiid=mediainfo.bangumi_id,
episode_group=episode_group,
cache=False)
if not mediainfo:
logger.error(f"媒体信息识别失败!")
return None, "媒体信息识别失败"
if not mediainfo.seasons:
logger.error(f"媒体信息中没有季集信息,标题:{title}tmdbid{tmdbid}doubanid{doubanid}")
return None, "媒体信息中没有季集信息"
total_episode = len(mediainfo.seasons.get(season) or [])
if not total_episode:
logger.error(f'未获取到总集数,标题:{title}tmdbid{tmdbid}, doubanid{doubanid}')
return None, f"未获取到第 {season} 季的总集数"
kwargs.update({
'total_episode': total_episode
})
# 缺失集
if not kwargs.get('lack_episode'):
kwargs.update({
'lack_episode': kwargs.get('total_episode')
})
else:
# 避免season为0的问题
season = None
# 更新媒体图片
await self.async_obtain_images(mediainfo=mediainfo)
# 合并信息
if doubanid:
mediainfo.douban_id = doubanid
if bangumiid:
mediainfo.bangumi_id = bangumiid
# 列新默认参数
kwargs.update(self.__get_default_kwargs(mediainfo.type, **kwargs))
# 操作数据库
sid, err_msg = await SubscribeOper().async_add(mediainfo=mediainfo, season=season, username=username, **kwargs)
if not sid:
logger.error(f'{mediainfo.title_year} {err_msg}')
if not exist_ok and message:
# 失败发回原用户
await self.async_post_message(schemas.Notification(channel=channel,
source=source,
mtype=NotificationType.Subscribe,
title=f"{mediainfo.title_year} {metainfo.season} "
f"添加订阅失败!",
text=f"{err_msg}",
image=mediainfo.get_message_image(),
userid=userid))
return None, err_msg
elif message:
if mediainfo.type == MediaType.TV:
link = settings.MP_DOMAIN('#/subscribe/tv?tab=mysub')
else:
link = settings.MP_DOMAIN('#/subscribe/movie?tab=mysub')
# 订阅成功按规则发送消息
await self.async_post_message(
schemas.Notification(
channel=channel,
source=source,
mtype=NotificationType.Subscribe,
ctype=ContentType.SubscribeAdded,
image=mediainfo.get_message_image(),
link=link,
userid=userid,
username=username
),
meta=metainfo,
mediainfo=mediainfo,
username=username
)
# 发送事件
await eventmanager.async_send_event(EventType.SubscribeAdded, {
"subscribe_id": sid,
"username": username,
"mediainfo": mediainfo.to_dict(),
})
# 统计订阅
await SubscribeHelper().async_sub_reg({
"name": title,
"year": year,
"type": metainfo.type.value,
"tmdbid": mediainfo.tmdb_id,
"imdbid": mediainfo.imdb_id,
"tvdbid": mediainfo.tvdb_id,
"doubanid": mediainfo.douban_id,
"bangumiid": mediainfo.bangumi_id,
"season": metainfo.begin_season,
"poster": mediainfo.get_poster_image(),
"backdrop": mediainfo.get_backdrop_image(),
"vote": mediainfo.vote_average,
"description": mediainfo.overview
})
# 返回结果
return sid, ""
@staticmethod
def exists(mediainfo: MediaInfo, meta: MetaBase = None):
"""

View File

@@ -29,7 +29,7 @@ class MessageOper(DbOper):
note: Union[list, dict] = None,
**kwargs):
"""
新增媒体服务器数据
新增消息
:param channel: 消息渠道
:param source: 来源
:param mtype: 消息类型
@@ -57,11 +57,47 @@ class MessageOper(DbOper):
# 从kwargs中去掉Message中没有的字段
for k in list(kwargs.keys()):
if k not in Message.__table__.columns.keys(): # noqa
if k not in Message.__table__.columns.keys(): # noqa
kwargs.pop(k)
Message(**kwargs).create(self._db)
async def async_add(self,
channel: MessageChannel = None,
source: Optional[str] = None,
mtype: NotificationType = None,
title: Optional[str] = None,
text: Optional[str] = None,
image: Optional[str] = None,
link: Optional[str] = None,
userid: Optional[str] = None,
action: Optional[int] = 1,
note: Union[list, dict] = None,
**kwargs):
"""
异步新增消息
"""
kwargs.update({
"channel": channel.value if channel else '',
"source": source,
"mtype": mtype.value if mtype else '',
"title": title,
"text": text,
"image": image,
"link": link,
"userid": userid,
"action": action,
"reg_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"note": note or {}
})
# 从kwargs中去掉Message中没有的字段
for k in list(kwargs.keys()):
if k not in Message.__table__.columns.keys(): # noqa
kwargs.pop(k)
await Message(**kwargs).async_create(self._db)
def list_by_page(self, page: Optional[int] = 1, count: Optional[int] = 30) -> Optional[str]:
"""
获取媒体服务器数据ID

View File

@@ -48,6 +48,42 @@ class SubscribeOper(DbOper):
else:
return subscribe.id, "订阅已存在"
async def async_add(self, mediainfo: MediaInfo, **kwargs) -> Tuple[int, str]:
"""
异步新增订阅
"""
subscribe = await Subscribe.async_exists(self._db,
tmdbid=mediainfo.tmdb_id,
doubanid=mediainfo.douban_id,
season=kwargs.get('season'))
kwargs.update({
"name": mediainfo.title,
"year": mediainfo.year,
"type": mediainfo.type.value,
"tmdbid": mediainfo.tmdb_id,
"imdbid": mediainfo.imdb_id,
"tvdbid": mediainfo.tvdb_id,
"doubanid": mediainfo.douban_id,
"bangumiid": mediainfo.bangumi_id,
"episode_group": mediainfo.episode_group,
"poster": mediainfo.get_poster_image(),
"backdrop": mediainfo.get_backdrop_image(),
"vote": mediainfo.vote_average,
"description": mediainfo.overview,
"date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
})
if not subscribe:
subscribe = Subscribe(**kwargs)
await subscribe.async_create(self._db)
# 查询订阅
subscribe = await Subscribe.async_exists(self._db,
tmdbid=mediainfo.tmdb_id,
doubanid=mediainfo.douban_id,
season=kwargs.get('season'))
return subscribe.id, "新增订阅成功"
else:
return subscribe.id, "订阅已存在"
def exists(self, tmdbid: Optional[int] = None, doubanid: Optional[str] = None,
season: Optional[int] = None) -> bool:
"""

View File

@@ -657,6 +657,17 @@ class MessageQueueManager(metaclass=SingletonClass):
})
logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}")
async def async_send_message(self, *args, **kwargs) -> None:
"""
异步发送消息(直接加入队列)
"""
kwargs.pop("immediately", False)
self.queue.put({
"args": args,
"kwargs": kwargs
})
logger.info(f"消息已加入队列,当前队列长度:{self.queue.qsize()}")
def _send(self, *args, **kwargs) -> None:
"""
实际发送消息(可通过回调函数自定义)

View File

@@ -173,6 +173,20 @@ class SubscribeHelper(metaclass=WeakSingleton):
return True
return False
async def async_sub_reg(self, sub: dict) -> bool:
"""
异步新增订阅统计
"""
enabled, _ = self._check_subscribe_share_enabled()
if not enabled:
return False
res = await AsyncRequestUtils(proxies=settings.PROXY, timeout=5, headers={
"Content-Type": "application/json"
}).post_res(self._sub_reg, json=sub)
if res and res.status_code == 200:
return True
return False
def sub_done(self, sub: dict) -> bool:
"""
完成订阅统计