Compare commits

..

15 Commits

Author SHA1 Message Date
jxxghp
8896867bb3 更新 fetch_medias.py 2025-03-02 14:23:37 +08:00
jxxghp
ba7c9eec7b fix 2025-03-02 13:16:46 +08:00
jxxghp
9b95fde8d1 v2.3.3
- 增加了多个索引和认证站点支持
- HDDolby切换为使用API(需要调整站点设置,否则无法正常刷新站点数据)
- 调整了IYUU认证使用的域名地址
- 继续完善任务工作流
2025-03-02 12:48:32 +08:00
jxxghp
2851f16395 feat:actions增加缓存机制 2025-03-02 12:27:36 +08:00
jxxghp
0d63dfb931 fix actions 2025-03-02 11:15:52 +08:00
jxxghp
37558e3135 更新 hddolby.py 2025-03-02 10:24:17 +08:00
jxxghp
96021e42a2 fix 2025-03-02 10:08:03 +08:00
jxxghp
c32b845515 feat:actions增加识别选项 2025-03-02 09:45:24 +08:00
jxxghp
147d980c54 fix hddolby 2025-03-02 08:51:09 +08:00
jxxghp
f91c43dde9 fix hddolby 2025-03-02 08:08:46 +08:00
jxxghp
4cf5cb06a0 fix hddolby 2025-03-02 08:06:25 +08:00
jxxghp
8e4b4c3144 add hddolby userdata api 2025-03-01 21:28:15 +08:00
jxxghp
c302013696 add hddolby api 2025-03-01 21:24:01 +08:00
jxxghp
37cb94c59d add hddolby api 2025-03-01 21:08:37 +08:00
jxxghp
01f7c6bc2b fix 2025-03-01 18:55:16 +08:00
27 changed files with 676 additions and 147 deletions

View File

@@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
from typing import List, Any, Union
from app.chain import ChainBase
from app.db.systemconfig_oper import SystemConfigOper
from app.schemas import ActionContext, ActionParams
@@ -13,27 +15,35 @@ class BaseAction(ABC):
工作流动作基类
"""
# 动作ID
_action_id = None
# 完成标志
_done_flag = False
# 执行信息
_message = ""
# 缓存键值
_cache_key = "WorkflowCache-%s"
def __init__(self, action_id: str):
self._action_id = action_id
self.systemconfigoper = SystemConfigOper()
@classmethod
@property
@abstractmethod
def name(cls) -> str:
def name(cls) -> str: # noqa
pass
@classmethod
@property
@abstractmethod
def description(cls) -> str:
def description(cls) -> str: # noqa
pass
@classmethod
@property
@abstractmethod
def data(cls) -> dict:
def data(cls) -> dict: # noqa
pass
@property
@@ -65,6 +75,29 @@ class BaseAction(ABC):
self._message = message
self._done_flag = True
def check_cache(self, workflow_id: int, key: str) -> bool:
"""
检查是否处理过
"""
workflow_key = self._cache_key % workflow_id
workflow_cache = self.systemconfigoper.get(workflow_key) or {}
action_cache = workflow_cache.get(self._action_id) or []
return key in action_cache
def save_cache(self, workflow_id: int, data: Union[list, str]):
"""
保存缓存
"""
workflow_key = self._cache_key % workflow_id
workflow_cache = self.systemconfigoper.get(workflow_key) or {}
action_cache = workflow_cache.get(self._action_id) or []
if isinstance(data, list):
action_cache.extend(data)
else:
action_cache.append(data)
workflow_cache[self._action_id] = action_cache
self.systemconfigoper.set(workflow_key, workflow_cache)
@abstractmethod
def execute(self, workflow_id: int, params: ActionParams, context: ActionContext) -> ActionContext:
"""

View File

@@ -17,6 +17,7 @@ class AddDownloadParams(ActionParams):
"""
downloader: Optional[str] = Field(None, description="下载器")
save_path: Optional[str] = Field(None, description="保存路径")
labels: Optional[str] = Field(None, description="标签(,分隔)")
only_lack: Optional[bool] = Field(False, description="仅下载缺失的资源")
@@ -29,24 +30,24 @@ class AddDownloadAction(BaseAction):
_added_downloads = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.downloadchain = DownloadChain()
self.mediachain = MediaChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "添加下载"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "根据资源列表添加下载任务"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return AddDownloadParams().dict()
@property
@@ -58,23 +59,29 @@ class AddDownloadAction(BaseAction):
将上下文中的torrents添加到下载任务中
"""
params = AddDownloadParams(**params)
_started = False
for t in context.torrents:
if global_vars.is_workflow_stopped(workflow_id):
break
# 检查缓存
cache_key = f"{t.torrent_info.site}-{t.torrent_info.title}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{t.torrent_info.title} 已添加过下载,跳过")
continue
if not t.meta_info:
t.meta_info = MetaInfo(title=t.title, subtitle=t.description)
t.meta_info = MetaInfo(title=t.torrent_info.title, subtitle=t.torrent_info.description)
if not t.media_info:
t.media_info = self.mediachain.recognize_media(meta=t.meta_info)
if not t.media_info:
self._has_error = True
logger.warning(f"{t.title} 未识别到媒体信息,无法下载")
logger.warning(f"{t.torrent_info.title} 未识别到媒体信息,无法下载")
continue
if params.only_lack:
exists_info = self.downloadchain.media_exists(t.media_info)
if exists_info:
if t.media_info.type == MediaType.MOVIE:
# 电影
logger.warning(f"{t.title} 媒体库中已存在,跳过")
logger.warning(f"{t.torrent_info.title} 媒体库中已存在,跳过")
continue
else:
# 电视剧
@@ -90,19 +97,23 @@ class AddDownloadAction(BaseAction):
logger.warning(f"{t.meta_info.title}{t.meta_info.begin_season} 季第 {t.meta_info.episode_list} 集已存在,跳过")
continue
_started = True
did = self.downloadchain.download_single(context=t,
downloader=params.downloader,
save_path=params.save_path)
save_path=params.save_path,
label=params.labels)
if did:
self._added_downloads.append(did)
else:
self._has_error = True
# 保存缓存
self.save_cache(workflow_id, cache_key)
if self._added_downloads:
logger.info(f"已添加 {len(self._added_downloads)} 个下载任务")
context.downloads.extend(
[DownloadTask(download_id=did, downloader=params.downloader) for did in self._added_downloads]
)
elif _started:
self._has_error = True
self.job_done(f"已添加 {len(self._added_downloads)} 个下载任务")
return context

View File

@@ -22,24 +22,24 @@ class AddSubscribeAction(BaseAction):
_added_subscribes = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.subscribechain = SubscribeChain()
self.subscribeoper = SubscribeOper()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "添加订阅"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "根据媒体列表添加订阅"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return AddSubscribeParams().dict()
@property
@@ -50,15 +50,22 @@ class AddSubscribeAction(BaseAction):
"""
将medias中的信息添加订阅如果订阅不存在的话
"""
_started = False
for media in context.medias:
if global_vars.is_workflow_stopped(workflow_id):
break
# 检查缓存
cache_key = f"{media.type}-{media.title}-{media.year}-{media.season}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{media.title} {media.year} 已添加过订阅,跳过")
continue
mediainfo = MediaInfo()
mediainfo.from_dict(media.dict())
if self.subscribechain.exists(mediainfo):
logger.info(f"{media.title} 已存在订阅")
continue
# 添加订阅
_started = True
sid, message = self.subscribechain.add(mtype=mediainfo.type,
title=mediainfo.title,
year=mediainfo.year,
@@ -69,13 +76,15 @@ class AddSubscribeAction(BaseAction):
username=settings.SUPERUSER)
if sid:
self._added_subscribes.append(sid)
else:
self._has_error = True
# 保存缓存
self.save_cache(workflow_id, cache_key)
if self._added_subscribes:
logger.info(f"已添加 {len(self._added_subscribes)} 个订阅")
for sid in self._added_subscribes:
context.subscribes.append(self.subscribeoper.get(sid))
elif _started:
self._has_error = True
self.job_done(f"已添加 {len(self._added_subscribes)} 个订阅")
return context

View File

@@ -18,23 +18,23 @@ class FetchDownloadsAction(BaseAction):
_downloads = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "获取下载任务"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "获取下载队列中的任务状态"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchDownloadsParams().dict()
@property

View File

@@ -28,11 +28,11 @@ class FetchMediasAction(BaseAction):
"""
_inner_sources = []
_medias = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.__inner_sources = [
{
@@ -100,22 +100,22 @@ class FetchMediasAction(BaseAction):
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "获取媒体数据"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "获取榜单等媒体数据列表"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchMediasParams().dict()
@property
def success(self) -> bool:
return True if self._medias else False
return not self._has_error
def __get_source(self, source: str):
"""
@@ -131,37 +131,41 @@ class FetchMediasAction(BaseAction):
获取媒体数据填充到medias
"""
params = FetchMediasParams(**params)
if params.source_type == "ranking":
for name in params.sources:
if global_vars.is_workflow_stopped(workflow_id):
break
source = self.__get_source(name)
if not source:
continue
logger.info(f"获取媒体数据 {source} ...")
results = []
if source.get("func"):
results = source['func']()
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{name} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
else:
logger.error(f"{name} 获取数据失败")
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}{params.api_path}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{params.api_path} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
try:
if params.source_type == "ranking":
for name in params.sources:
if global_vars.is_workflow_stopped(workflow_id):
break
source = self.__get_source(name)
if not source:
continue
logger.info(f"获取媒体数据 {source} ...")
results = []
if source.get("func"):
results = source['func']()
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}/api/v1/{source['api_path']}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{name} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
else:
logger.error(f"{name} 获取数据失败")
else:
# 调用内部API获取数据
api_url = f"http://127.0.0.1:{settings.PORT}{params.api_path}?token={settings.API_TOKEN}"
res = RequestUtils(timeout=15).post_res(api_url)
if res:
results = res.json()
if results:
logger.info(f"{params.api_path} 获取到 {len(results)} 条数据")
self._medias.extend([MediaInfo(**r) for r in results])
except Exception as e:
logger.error(f"获取媒体数据失败: {e}")
self._has_error = True
if self._medias:
context.medias.extend(self._medias)

View File

@@ -21,6 +21,7 @@ class FetchRssParams(ActionParams):
content_type: Optional[str] = Field(None, description="Content-Type")
referer: Optional[str] = Field(None, description="Referer")
ua: Optional[str] = Field(None, description="User-Agent")
match_media: Optional[str] = Field(None, description="匹配媒体信息")
class FetchRssAction(BaseAction):
@@ -31,24 +32,24 @@ class FetchRssAction(BaseAction):
_rss_torrents = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.rsshelper = RssHelper()
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "获取RSS资源"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "订阅RSS地址获取资源"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchRssParams().dict()
@property
@@ -98,10 +99,12 @@ class FetchRssAction(BaseAction):
pubdate=item["pubdate"].strftime("%Y-%m-%d %H:%M:%S") if item.get("pubdate") else None,
)
meta = MetaInfo(title=torrentinfo.title, subtitle=torrentinfo.description)
mediainfo = self.chain.recognize_media(meta)
if not mediainfo:
logger.warning(f"{torrentinfo.title} 未识别到媒体信息")
continue
mediainfo = None
if params.match_media:
mediainfo = self.chain.recognize_media(meta)
if not mediainfo:
logger.warning(f"{torrentinfo.title} 未识别到媒体信息")
continue
self._rss_torrents.append(Context(meta_info=meta, media_info=mediainfo, torrent_info=torrentinfo))
if self._rss_torrents:

View File

@@ -21,6 +21,7 @@ class FetchTorrentsParams(ActionParams):
type: Optional[str] = Field(None, description="资源类型 (电影/电视剧)")
season: Optional[int] = Field(None, description="季度")
sites: Optional[List[int]] = Field([], description="站点列表")
match_media: Optional[bool] = Field(False, description="匹配媒体信息")
class FetchTorrentsAction(BaseAction):
@@ -30,23 +31,23 @@ class FetchTorrentsAction(BaseAction):
_torrents = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.searchchain = SearchChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "搜索站点资源"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "搜索站点种子资源列表"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FetchTorrentsParams().dict()
@property
@@ -71,10 +72,11 @@ class FetchTorrentsAction(BaseAction):
if params.season and torrent.meta_info.begin_season != params.season:
continue
# 识别媒体信息
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
if params.match_media:
torrent.media_info = self.searchchain.recognize_media(torrent.meta_info)
if not torrent.media_info:
logger.warning(f"{torrent.torrent_info.title} 未识别到媒体信息")
continue
self._torrents.append(torrent)
else:
# 搜索媒体列表
@@ -88,8 +90,8 @@ class FetchTorrentsAction(BaseAction):
for torrent in torrents:
self._torrents.append(torrent)
# 随机休眠 10-60秒
sleep_time = random.randint(10, 60)
# 随机休眠 5-30秒
sleep_time = random.randint(5, 30)
logger.info(f"随机休眠 {sleep_time} 秒 ...")
time.sleep(sleep_time)

View File

@@ -4,6 +4,7 @@ from pydantic import Field
from app.actions import BaseAction
from app.core.config import global_vars
from app.log import logger
from app.schemas import ActionParams, ActionContext
@@ -12,7 +13,6 @@ class FilterMediasParams(ActionParams):
过滤媒体数据参数
"""
type: Optional[str] = Field(None, description="媒体类型 (电影/电视剧)")
category: Optional[str] = Field(None, description="媒体类别 (二级分类)")
vote: Optional[int] = Field(0, description="评分")
year: Optional[str] = Field(None, description="年份")
@@ -26,17 +26,17 @@ class FilterMediasAction(BaseAction):
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "过滤媒体数据"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "对媒体数据列表进行过滤"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FilterMediasParams().dict()
@property
@@ -53,16 +53,15 @@ class FilterMediasAction(BaseAction):
break
if params.type and media.type != params.type:
continue
if params.category and media.category != params.category:
continue
if params.vote and media.vote_average < params.vote:
continue
if params.year and media.year != params.year:
continue
self._medias.append(media)
if self._medias:
context.medias = self._medias
logger.info(f"过滤后剩余 {len(self._medias)} 条媒体数据")
context.medias = self._medias
self.job_done(f"过滤后剩余 {len(self._medias)} 条媒体数据")
return context

View File

@@ -5,6 +5,7 @@ from pydantic import Field
from app.actions import BaseAction, ActionChain
from app.core.config import global_vars
from app.helper.torrent import TorrentHelper
from app.log import logger
from app.schemas import ActionParams, ActionContext
@@ -28,24 +29,24 @@ class FilterTorrentsAction(BaseAction):
_torrents = []
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.torrenthelper = TorrentHelper()
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "过滤资源"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "对资源列表数据进行过滤"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return FilterTorrentsParams().dict()
@property
@@ -78,6 +79,8 @@ class FilterTorrentsAction(BaseAction):
):
self._torrents.append(torrent)
logger.info(f"过滤后剩余 {len(self._torrents)} 个资源")
context.torrents = self._torrents
self.job_done(f"过滤后剩余 {len(self._torrents)} 个资源")

View File

@@ -27,23 +27,23 @@ class ScanFileAction(BaseAction):
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "扫描目录"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "扫描目录文件到队列"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return ScanFileParams().dict()
@property
@@ -68,7 +68,14 @@ class ScanFileAction(BaseAction):
break
if not file.extension or f".{file.extension.lower()}" not in settings.RMT_MEDIAEXT:
continue
# 检查缓存
cache_key = f"{file.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{file.path} 已处理过,跳过")
continue
self._fileitems.append(fileitem)
# 保存缓存
self.save_cache(workflow_id, cache_key)
if self._fileitems:
context.fileitems.extend(self._fileitems)

View File

@@ -24,24 +24,24 @@ class ScrapeFileAction(BaseAction):
_scraped_files = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.storagechain = StorageChain()
self.mediachain = MediaChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "刮削文件"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "刮削媒体信息和图片"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return ScrapeFileParams().dict()
@property
@@ -52,6 +52,8 @@ class ScrapeFileAction(BaseAction):
"""
刮削fileitems中的所有文件
"""
# 失败次数
_failed_count = 0
for fileitem in context.fileitems:
if global_vars.is_workflow_stopped(workflow_id):
break
@@ -59,14 +61,24 @@ class ScrapeFileAction(BaseAction):
continue
if not self.storagechain.exists(fileitem):
continue
# 检查缓存
cache_key = f"{fileitem.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{fileitem.path} 已刮削过,跳过")
continue
meta = MetaInfoPath(Path(fileitem.path))
mediainfo = self.mediachain.recognize_media(meta)
if not mediainfo:
self._has_error = True
_failed_count += 1
logger.info(f"{fileitem.path} 未识别到媒体信息,无法刮削")
continue
self.mediachain.scrape_metadata(fileitem=fileitem, meta=meta, mediainfo=mediainfo)
self._scraped_files.append(fileitem)
# 保存缓存
self.save_cache(workflow_id, cache_key)
self.job_done(f"成功刮削了 {len(self._scraped_files)} 个文件")
if not self._scraped_files and _failed_count:
self._has_error = True
self.job_done(f"成功刮削 {len(self._scraped_files)} 个文件,失败 {_failed_count}")
return context

View File

@@ -18,17 +18,17 @@ class SendEventAction(BaseAction):
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "发送事件"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "发送任务执行事件"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return SendEventParams().dict()
@property

View File

@@ -19,23 +19,23 @@ class SendMessageAction(BaseAction):
发送消息
"""
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.chain = ActionChain()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "发送消息"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "发送任务执行消息"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return SendMessageParams().dict()
@property

View File

@@ -29,25 +29,25 @@ class TransferFileAction(BaseAction):
_fileitems = []
_has_error = False
def __init__(self):
super().__init__()
def __init__(self, action_id: str):
super().__init__(action_id)
self.transferchain = TransferChain()
self.storagechain = StorageChain()
self.transferhis = TransferHistoryOper()
@classmethod
@property
def name(cls) -> str:
def name(cls) -> str: # noqa
return "整理文件"
@classmethod
@property
def description(cls) -> str:
def description(cls) -> str: # noqa
return "整理队列中的文件"
@classmethod
@property
def data(cls) -> dict:
def data(cls) -> dict: # noqa
return TransferFileParams().dict()
@property
@@ -68,6 +68,8 @@ class TransferFileAction(BaseAction):
return True
params = TransferFileParams(**params)
# 失败次数
_failed_count = 0
if params.source == "downloads":
# 从下载任务中整理文件
for download in context.downloads:
@@ -76,6 +78,11 @@ class TransferFileAction(BaseAction):
if not download.completed:
logger.info(f"下载任务 {download.download_id} 未完成")
continue
# 检查缓存
cache_key = f"{download.download_id}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{download.path} 已整理过,跳过")
continue
fileitem = self.storagechain.get_file_item(storage="local", path=Path(download.path))
if not fileitem:
logger.info(f"文件 {download.path} 不存在")
@@ -87,16 +94,22 @@ class TransferFileAction(BaseAction):
logger.info(f"开始整理文件 {download.path} ...")
state, errmsg = self.transferchain.do_transfer(fileitem, background=False)
if not state:
self._has_error = True
_failed_count += 1
logger.error(f"整理文件 {download.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {download.path} 完成")
self._fileitems.append(fileitem)
self.save_cache(workflow_id, cache_key)
else:
# 从 fileitems 中整理文件
for fileitem in copy.deepcopy(context.fileitems):
if not check_continue():
break
# 检查缓存
cache_key = f"{fileitem.path}"
if self.check_cache(workflow_id, cache_key):
logger.info(f"{fileitem.path} 已整理过,跳过")
continue
transferd = self.transferhis.get_by_src(fileitem.path, storage=fileitem.storage)
if transferd:
# 已经整理过的文件不再整理
@@ -105,16 +118,20 @@ class TransferFileAction(BaseAction):
state, errmsg = self.transferchain.do_transfer(fileitem, background=False,
continue_callback=check_continue)
if not state:
self._has_error = True
_failed_count += 1
logger.error(f"整理文件 {fileitem.path} 失败: {errmsg}")
continue
logger.info(f"整理文件 {fileitem.path} 完成")
# 从 fileitems 中移除已整理的文件
context.fileitems.remove(fileitem)
self._fileitems.append(fileitem)
# 记录已整理的文件
self.save_cache(workflow_id, cache_key)
if self._fileitems:
context.fileitems.extend(self._fileitems)
elif _failed_count:
self._has_error = True
self.job_done()
self.job_done(f"整理成功 {len(self._fileitems)} 个文件,失败 {_failed_count}")
return context

View File

@@ -9,6 +9,7 @@ from app.core.config import global_vars
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db.models.workflow import Workflow
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
@@ -84,8 +85,12 @@ def delete_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 删除工作流
Workflow.delete(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True, message="删除成功")
@@ -112,8 +117,9 @@ def start_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 添加定时任务
Scheduler().update_workflow_job(workflow)
global_vars.workflow_resume(workflow_id)
# 更新状态
workflow.update_state(db, workflow_id, "W")
return schemas.Response(success=True)
@@ -128,7 +134,29 @@ def pause_workflow(workflow_id: int,
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 删除定时任务
Scheduler().remove_workflow_job(workflow)
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 更新状态
workflow.update_state(db, workflow_id, "P")
return schemas.Response(success=True)
@router.post("/{workflow_id}/reset", summary="重置工作流", response_model=schemas.Response)
def reset_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
重置工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 重置工作流
workflow.reset(db, workflow_id)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True)

View File

@@ -347,7 +347,7 @@ class ChainBase(metaclass=ABCMeta):
torrent_list=torrent_list, mediainfo=mediainfo)
def download(self, content: Union[Path, str], download_dir: Path, cookie: str,
episodes: Set[int] = None, category: str = None,
episodes: Set[int] = None, category: str = None, label: str = None,
downloader: str = None
) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]:
"""
@@ -357,11 +357,12 @@ class ChainBase(metaclass=ABCMeta):
:param cookie: cookie
:param episodes: 需要下载的集数
:param category: 种子分类
:param label: 标签
:param downloader: 下载器
:return: 下载器名称、种子Hash、种子文件布局、错误原因
"""
return self.run_module("download", content=content, download_dir=download_dir,
cookie=cookie, episodes=episodes, category=category,
cookie=cookie, episodes=episodes, category=category, label=label,
downloader=downloader)
def download_added(self, context: Context, download_dir: Path, torrent_path: Path = None) -> None:

View File

@@ -209,7 +209,8 @@ class DownloadChain(ChainBase):
save_path: str = None,
userid: Union[str, int] = None,
username: str = None,
media_category: str = None) -> Optional[str]:
media_category: str = None,
label: str = None) -> Optional[str]:
"""
下载及发送通知
:param context: 资源上下文
@@ -222,6 +223,7 @@ class DownloadChain(ChainBase):
:param userid: 用户ID
:param username: 调用下载的用户名/插件名
:param media_category: 自定义媒体类别
:param label: 自定义标签
"""
# 发送资源下载事件,允许外部拦截下载
event_data = ResourceDownloadEventData(
@@ -310,6 +312,7 @@ class DownloadChain(ChainBase):
episodes=episodes,
download_dir=download_dir,
category=_media.category,
label=label,
downloader=downloader or _site_downloader)
if result:
_downloader, _hash, _layout, error_msg = result

View File

@@ -66,6 +66,7 @@ class WorkflowExecutor:
# 初始上下文
if workflow.current_action and workflow.context:
logger.info(f"工作流已执行动作:{workflow.current_action}")
# Base64解码
decoded_data = base64.b64decode(workflow.context["content"])
# 反序列化数据
@@ -73,7 +74,9 @@ class WorkflowExecutor:
else:
self.context = ActionContext()
# 初始化队列入度为0的节点
# 恢复工作流
global_vars.workflow_resume(self.workflow.id)
# 初始化队列添加入度为0的节点
for action_id in self.actions:
if self.indegree[action_id] == 0:
self.queue.append(action_id)
@@ -91,7 +94,7 @@ class WorkflowExecutor:
if not self.success:
break
if not self.queue:
sleep(1)
sleep(0.1)
continue
# 取出队首节点
node_id = self.queue.popleft()

View File

@@ -64,7 +64,7 @@ class WorkFlowManager(metaclass=Singleton):
context = ActionContext()
if action.type in self._actions:
# 实例化
action_obj = self._actions[action.type]()
action_obj = self._actions[action.type](action.id)
# 执行
logger.info(f"执行动作: {action.id} - {action.name}")
try:

View File

@@ -88,6 +88,7 @@ class Workflow(Base):
"state": 'W',
"result": None,
"current_action": None,
"run_count": 0,
})
return True
@@ -95,7 +96,7 @@ class Workflow(Base):
@db_update
def update_current_action(db, wid: int, action_id: str, context: dict):
db.query(Workflow).filter(Workflow.id == wid).update({
"current_action": f"{Workflow.current_action},{action_id}" if Workflow.current_action else action_id,
"current_action": Workflow.current_action + f",{action_id}" if Workflow.current_action else action_id,
"context": context
})
return True

View File

@@ -10,6 +10,7 @@ from app.log import logger
from app.modules import _ModuleBase
from app.modules.indexer.parser import SiteParserBase
from app.modules.indexer.spider.haidan import HaiDanSpider
from app.modules.indexer.spider.hddolby import HddolbySpider
from app.modules.indexer.spider.mtorrent import MTorrentSpider
from app.modules.indexer.spider.tnode import TNodeSpider
from app.modules.indexer.spider.torrentleech import TorrentLeech
@@ -153,6 +154,12 @@ class IndexerModule(_ModuleBase):
keyword=search_word,
mtype=mtype
)
elif site.get('parser') == "HDDolby":
error_flag, result = HddolbySpider(site).search(
keyword=search_word,
mtype=mtype,
page=page
)
else:
error_flag, result = self.__spider_search(
search_word=search_word,

View File

@@ -32,6 +32,7 @@ class SiteSchema(Enum):
TNode = "TNode"
MTorrent = "MTorrent"
Yema = "Yema"
HDDolby = "HDDolby"
class SiteParserBase(metaclass=ABCMeta):
@@ -155,11 +156,17 @@ class SiteParserBase(metaclass=ABCMeta):
解析站点信息
:return:
"""
# 获取站点首页html
self._index_html = self._get_page_content(url=self._site_url)
# 检查是否已经登录
if not self._parse_logged_in(self._index_html):
return
# Cookie模式时获取站点首页html
if self.request_mode == "apikey":
if not self.apikey and not self.token:
logger.warn(f"{self._site_name} 未设置cookie 或 apikey/token跳过后续操作")
return
self._index_html = {}
else:
# 检查是否已经登录
self._index_html = self._get_page_content(url=self._site_url)
if not self._parse_logged_in(self._index_html):
return
# 解析站点页面
self._parse_site_page(self._index_html)
# 解析用户基础信息
@@ -293,9 +300,13 @@ class SiteParserBase(metaclass=ABCMeta):
req_headers = None
proxies = settings.PROXY if self._proxy else None
if self._ua or headers or self._addition_headers:
req_headers = {
"User-Agent": f"{self._ua}"
}
if self.request_mode == "apikey":
req_headers = {}
else:
req_headers = {
"User-Agent": f"{self._ua}"
}
if headers:
req_headers.update(headers)

View File

@@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
import json
from typing import Optional, Tuple
from app.modules.indexer.parser import SiteParserBase, SiteSchema
from app.utils.string import StringUtils
class HDDolbySiteUserInfo(SiteParserBase):
schema = SiteSchema.HDDolby
request_mode = "apikey"
# 用户级别字典
HDDolby_sysRoleList = {
"0": "Peasant",
"1": "User",
"2": "Power User",
"3": "Elite User",
"4": "Crazy User",
"5": "Insane User",
"6": "Veteran User",
"7": "Extreme User",
"8": "Ultimate User",
"9": "Nexus Master",
"10": "VIP",
"11": "Retiree",
"12": "Helper",
"13": "Seeder",
"14": "Transferrer",
"15": "Uploader",
"16": "Torrent Manager",
"17": "Forum Moderator",
"18": "Coder",
"19": "Moderator",
"20": "Administrator",
"21": "Sysop",
"22": "Staff Leader",
}
def _parse_site_page(self, html_text: str):
"""
获取站点页面地址
"""
# 更换api地址
self._base_url = f"https://api.{StringUtils.get_url_domain(self._base_url)}"
self._user_traffic_page = None
self._user_detail_page = None
self._user_basic_page = "api/v1/user/data"
self._user_basic_params = {}
self._user_basic_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*"
}
self._sys_mail_unread_page = None
self._user_mail_unread_page = None
self._mail_unread_params = {}
self._torrent_seeding_page = "api/v1/user/peers"
self._torrent_seeding_params = {}
self._torrent_seeding_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*"
}
self._addition_headers = {
"x-api-key": self.apikey,
}
def _parse_logged_in(self, html_text):
"""
判断是否登录成功, 通过判断是否存在用户信息
暂时跳过检测,待后续优化
:param html_text:
:return:
"""
return True
def _parse_user_base_info(self, html_text: str):
"""
解析用户基本信息这里把_parse_user_traffic_info和_parse_user_detail_info合并到这里
"""
if not html_text:
return None
detail = json.loads(html_text)
if not detail or detail.get("status") != 0:
return
user_infos = detail.get("data")
"""
{
"id": "1",
"added": "2019-03-03 15:30:36",
"last_access": "2025-02-18 19:48:04",
"class": "22",
"uploaded": "852071699418375",
"downloaded": "1885536536176",
"seedbonus": "99774808.0",
"sebonus": "3739023.7",
"unread_messages": "0",
}
"""
if not user_infos:
return
user_info = user_infos[0]
self.userid = user_info.get("id")
self.username = user_info.get("username")
self.user_level = self.HDDolby_sysRoleList.get(user_info.get("class") or "1")
self.join_at = user_info.get("added")
self.upload = int(user_info.get("uploaded") or '0')
self.download = int(user_info.get("downloaded") or '0')
self.ratio = round(self.upload / self.download, 2) if self.download else 0
self.bonus = float(user_info.get("seedbonus") or "0")
self.message_unread = int(user_info.get("unread_messages") or '0')
def _parse_user_traffic_info(self, html_text: str):
"""
解析用户流量信息
"""
pass
def _parse_user_detail_info(self, html_text: str):
"""
解析用户详细信息
"""
pass
def _parse_user_torrent_seeding_info(self, html_text: str, multi_page: bool = False) -> Optional[str]:
"""
解析用户做种信息
"""
if not html_text:
return None
seeding_info = json.loads(html_text)
if not seeding_info or seeding_info.get("status") != 0:
return None
torrents = seeding_info.get("data", [])
page_seeding_size = 0
page_seeding_info = []
for info in torrents:
size = info.get("size")
seeder = info.get("seeders") or 1
page_seeding_size += size
page_seeding_info.append([seeder, size])
self.seeding += len(torrents)
self.seeding_size += page_seeding_size
self.seeding_info.extend(page_seeding_info)
return None
def _parse_message_unread_links(self, html_text: str, msg_links: list) -> Optional[str]:
"""
解析未读消息链接,这里直接读出详情
"""
pass
def _parse_message_content(self, html_text) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""
解析消息内容
"""
pass

View File

@@ -0,0 +1,211 @@
from typing import Tuple, List
from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.schemas import MediaType
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
class HddolbySpider:
"""
HDDolby API
"""
_indexerid = None
_domain = None
_domain_host = None
_name = ""
_proxy = None
_cookie = None
_ua = None
_apikey = None
_size = 40
_pageurl = None
_timeout = 15
_searchurl = None
# 分类
_movie_category = [401, 405]
_tv_category = [402, 403, 404, 405]
# 标签
_labels = {
"gf": "官方",
"gy": "国语",
"yy": "粤语",
"ja": "日语",
"ko": "韩语",
"zz": "中文字幕",
"jz": "禁转",
"xz": "限转",
"diy": "DIY",
"sf": "首发",
"yq": "应求",
"m0": "零魔",
"yc": "原创",
"gz": "官字",
"db": "Dolby Vision",
"hdr10": "HDR10",
"hdrm": "HDR10+",
"tx": "特效",
"lz": "连载",
"wj": "完结",
"hdrv": "HDR Vivid",
"hlg": "HLG",
"hq": "高码率",
"hfr": "高帧率",
}
def __init__(self, indexer: dict):
self.systemconfig = SystemConfigOper()
if indexer:
self._indexerid = indexer.get('id')
self._domain = indexer.get('domain')
self._domain_host = StringUtils.get_url_domain(self._domain)
self._name = indexer.get('name')
if indexer.get('proxy'):
self._proxy = settings.PROXY
self._cookie = indexer.get('cookie')
self._ua = indexer.get('ua')
self._apikey = indexer.get('apikey')
self._timeout = indexer.get('timeout') or 15
self._searchurl = f"https://api.{self._domain_host}/api/v1/torrent/search"
self._pageurl = f"{self._domain}details.php?id=%s&hit=1"
def search(self, keyword: str, mtype: MediaType = None, page: int = 0) -> Tuple[bool, List[dict]]:
"""
搜索
"""
if mtype == MediaType.TV:
categories = self._tv_category
elif mtype == MediaType.MOVIE:
categories = self._movie_category
else:
categories = list(set(self._movie_category + self._tv_category))
# 输入参数
params = {
"keyword": keyword,
"page_number": page,
"page_size": 100,
"categories": categories,
"visible": 1,
}
res = RequestUtils(
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/plain, */*",
"x-api-key": self._apikey
},
cookies=self._cookie,
proxies=self._proxy,
referer=f"{self._domain}",
timeout=self._timeout
).post_res(url=self._searchurl, json=params)
torrents = []
if res and res.status_code == 200:
results = res.json().get('data', []) or []
for result in results:
"""
{
"id": 120202,
"promotion_time_type": 0,
"promotion_until": "0000-00-00 00:00:00",
"category": 402,
"medium": 6,
"codec": 1,
"standard": 2,
"team": 10,
"audiocodec": 14,
"leechers": 0,
"seeders": 1,
"name": "[DBY] Lost S06 2010 Complete 1080p Netflix WEB-DL AVC DDP5.1-DBTV",
"small_descr": "lost ",
"times_completed": 0,
"size": 33665425886,
"added": "2025-02-18 19:47:56",
"url": 0,
"hr": 0,
"tmdb_type": "tv",
"tmdb_id": 4607,
"imdb_id": null,
"tags": "gf"
}
"""
# 类别
category_value = result.get('category')
if category_value in self._tv_category:
category = MediaType.TV.value
elif category_value in self._movie_category:
category = MediaType.MOVIE.value
else:
category = MediaType.UNKNOWN.value
# 标签
torrentLabelIds = result.get('tags', "").split(";") or []
torrentLabels = []
for labelId in torrentLabelIds:
if self._labels.get(labelId) is not None:
torrentLabels.append(self._labels.get(labelId))
# 种子信息
torrent = {
'title': result.get('name'),
'description': result.get('small_descr'),
'enclosure': self.__get_download_url(result.get('id'), result.get('downhash')),
'pubdate': result.get('added'),
'size': result.get('size'),
'seeders': result.get('seeders'),
'peers': result.get('leechers'),
'grabs': result.get('times_completed'),
'downloadvolumefactor': self.__get_downloadvolumefactor(result.get('promotion_time_type')),
'uploadvolumefactor': self.__get_uploadvolumefactor(result.get('promotion_time_type')),
'freedate': result.get('promotion_until'),
'page_url': self._pageurl % (self._domain, result.get('id')),
'labels': torrentLabels,
'category': category
}
torrents.append(torrent)
elif res is not None:
logger.warn(f"{self._name} 搜索失败,错误码:{res.status_code}")
return True, []
else:
logger.warn(f"{self._name} 搜索失败,无法连接 {self._domain}")
return True, []
return False, torrents
@staticmethod
def __get_downloadvolumefactor(discount: int) -> float:
"""
获取下载系数
"""
discount_dict = {
2: 0,
5: 0.5,
6: 1,
7: 0.3
}
if discount:
return discount_dict.get(discount, 1)
return 1
@staticmethod
def __get_uploadvolumefactor(discount: int) -> float:
"""
获取上传系数
"""
discount_dict = {
3: 2,
4: 2,
6: 2
}
if discount:
return discount_dict.get(discount, 1)
return 1
def __get_download_url(self, torrent_id: int, downhash: str) -> str:
"""
获取下载链接返回base64编码的json字符串及URL
"""
return f"{self._domain}download.php?id={torrent_id}&downhash={downhash}"

View File

@@ -78,7 +78,7 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
server.reconnect()
def download(self, content: Union[Path, str], download_dir: Path, cookie: str,
episodes: Set[int] = None, category: str = None,
episodes: Set[int] = None, category: str = None, label: str = None,
downloader: str = None) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]:
"""
根据种子文件,选择并添加下载任务
@@ -87,6 +87,7 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
:param cookie: cookie
:param episodes: 需要下载的集数
:param category: 分类
:param label: 标签
:param downloader: 下载器
:return: 下载器名称、种子Hash、种子文件布局、错误原因
"""
@@ -118,7 +119,9 @@ class QbittorrentModule(_ModuleBase, _DownloaderBase[Qbittorrent]):
# 生成随机Tag
tag = StringUtils.generate_random_str(10)
if settings.TORRENT_TAG:
if label:
tags = label.split(',') + [tag]
elif settings.TORRENT_TAG:
tags = [tag, settings.TORRENT_TAG]
else:
tags = [tag]

View File

@@ -79,7 +79,7 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
server.reconnect()
def download(self, content: Union[Path, str], download_dir: Path, cookie: str,
episodes: Set[int] = None, category: str = None,
episodes: Set[int] = None, category: str = None, label: str = None,
downloader: str = None) -> Optional[Tuple[Optional[str], Optional[str], Optional[str], str]]:
"""
根据种子文件,选择并添加下载任务
@@ -88,6 +88,7 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
:param cookie: cookie
:param episodes: 需要下载的集数
:param category: 分类TR中未使用
:param label: 标签
:param downloader: 下载器
:return: 下载器名称、种子Hash、种子文件布局、错误原因
"""
@@ -118,8 +119,11 @@ class TransmissionModule(_ModuleBase, _DownloaderBase[Transmission]):
# 如果要选择文件则先暂停
is_paused = True if episodes else False
# 标签
if settings.TORRENT_TAG:
if label:
labels = label.split(',')
elif settings.TORRENT_TAG:
labels = [settings.TORRENT_TAG]
else:
labels = None

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.3.2'
FRONTEND_VERSION = 'v2.3.2'
APP_VERSION = 'v2.3.3'
FRONTEND_VERSION = 'v2.3.3'