diff --git a/app/chain/data_cleanup.py b/app/chain/data_cleanup.py deleted file mode 100644 index 69f15dca..00000000 --- a/app/chain/data_cleanup.py +++ /dev/null @@ -1,160 +0,0 @@ -import json -from datetime import datetime, timedelta -from typing import Callable, Optional, Dict, Any - -from sqlalchemy.orm import Session - -from app.db import SessionFactory -from app.db.models.downloadhistory import DownloadHistory, DownloadFiles -from app.db.models.message import Message -from app.db.models.siteuserdata import SiteUserData -from app.db.models.transferhistory import TransferHistory -from app.log import logger - - -class DataCleanupChain: - """ - 系统数据清理链。 - """ - - DEFAULT_BATCH_SIZE = 500 - MESSAGE_RETENTION_DAYS = 90 - DOWNLOAD_HISTORY_RETENTION_DAYS = 180 - SITE_USERDATA_RETENTION_DAYS = 180 - TRANSFER_HISTORY_RETENTION_DAYS = 365 * 3 - - def cleanup(self, batch_size: Optional[int] = None) -> Dict[str, Any]: - """ - 按预设保留期执行分批清理。 - """ - started_at = datetime.now() - batch_size = batch_size or self.DEFAULT_BATCH_SIZE - if batch_size <= 0: - batch_size = self.DEFAULT_BATCH_SIZE - - message_cutoff = ( - started_at - timedelta(days=self.MESSAGE_RETENTION_DAYS) - ).strftime("%Y-%m-%d %H:%M:%S") - download_history_cutoff = ( - started_at - timedelta(days=self.DOWNLOAD_HISTORY_RETENTION_DAYS) - ).strftime("%Y-%m-%d %H:%M:%S") - site_userdata_cutoff = ( - started_at - timedelta(days=self.SITE_USERDATA_RETENTION_DAYS) - ).strftime("%Y-%m-%d") - transfer_history_cutoff = ( - started_at - timedelta(days=self.TRANSFER_HISTORY_RETENTION_DAYS) - ).strftime("%Y-%m-%d %H:%M:%S") - - report: Dict[str, Any] = { - "started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"), - "batch_size": batch_size, - "tables": {}, - "total_deleted": 0, - } - errors = [] - - plans = [ - { - "name": "message", - "cutoff": message_cutoff, - "handler": lambda db: Message.delete_before( - db=db, - before_time=message_cutoff, - limit=batch_size, - ), - }, - { - "name": "downloadhistory", - "cutoff": download_history_cutoff, - "handler": lambda db: DownloadHistory.delete_before( - db=db, - before_time=download_history_cutoff, - limit=batch_size, - ), - }, - { - "name": "downloadfiles", - "cutoff": "follow-parent-history", - "handler": lambda db: DownloadFiles.delete_orphans( - db=db, - limit=batch_size, - ), - }, - { - "name": "siteuserdata", - "cutoff": site_userdata_cutoff, - "handler": lambda db: SiteUserData.delete_before( - db=db, - before_day=site_userdata_cutoff, - limit=batch_size, - ), - }, - { - "name": "transferhistory", - "cutoff": transfer_history_cutoff, - "handler": lambda db: TransferHistory.delete_before( - db=db, - before_time=transfer_history_cutoff, - limit=batch_size, - ), - }, - ] - - with SessionFactory() as db: - for plan in plans: - name = plan["name"] - try: - table_report = self._cleanup_in_batches( - db=db, - table_name=name, - delete_batch=plan["handler"], - ) - table_report["cutoff"] = plan["cutoff"] - report["tables"][name] = table_report - report["total_deleted"] += table_report["deleted"] - except Exception as err: - errors.append(f"{name}: {str(err)}") - logger.error(f"数据表 {name} 清理失败:{str(err)}") - report["tables"][name] = { - "deleted": 0, - "batches": 0, - "cutoff": plan["cutoff"], - "error": str(err), - } - - if errors: - report["errors"] = errors - logger.error( - f"数据表清理部分失败:{json.dumps(report, ensure_ascii=False)}" - ) - raise RuntimeError(";".join(errors)) - - logger.info(f"数据表清理完成:{json.dumps(report, ensure_ascii=False)}") - return report - - @staticmethod - def _cleanup_in_batches( - db: Session, - table_name: str, - delete_batch: Callable[[Session], int], - ) -> Dict[str, int]: - """ - 循环执行单表分批删除,直到没有可删除数据。 - """ - total_deleted = 0 - batches = 0 - - while True: - deleted = delete_batch(db) or 0 - if deleted <= 0: - break - batches += 1 - total_deleted += deleted - logger.info( - f"数据表 {table_name} 清理第 {batches} 批完成,删除 {deleted} 条记录" - ) - - return { - "deleted": total_deleted, - "batches": batches, - } diff --git a/app/scheduler.py b/app/scheduler.py index 74a60472..56b9cac1 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -1,21 +1,23 @@ import asyncio import gc import inspect +import json import multiprocessing import threading import traceback from datetime import datetime, timedelta -from typing import List, Optional +from typing import Callable, Optional, Dict, Any +from typing import List import pytz from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.base import JobLookupError from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger +from sqlalchemy.orm import Session from app import schemas from app.chain import ChainBase -from app.chain.data_cleanup import DataCleanupChain from app.chain.mediaserver import MediaServerChain from app.chain.recommend import RecommendChain from app.chain.site import SiteChain @@ -25,10 +27,15 @@ from app.chain.workflow import WorkflowChain from app.core.config import settings, global_vars from app.core.event import eventmanager from app.core.plugin import PluginManager +from app.db import SessionFactory +from app.db.models.downloadhistory import DownloadHistory, DownloadFiles +from app.db.models.message import Message +from app.db.models.siteuserdata import SiteUserData +from app.db.models.transferhistory import TransferHistory from app.db.systemconfig_oper import SystemConfigOper +from app.helper.image import WallpaperHelper from app.helper.message import MessageHelper from app.helper.sites import SitesHelper # noqa -from app.helper.image import WallpaperHelper from app.log import logger from app.schemas import Notification, NotificationType, Workflow from app.schemas.types import EventType, SystemConfigKey @@ -41,7 +48,155 @@ lock = threading.Lock() class SchedulerChain(ChainBase): - pass + """ + 定时任务链,负责执行各类定时任务,包括数据清理等 + """ + # 每批处理的记录数,避免一次性删除过多数据导致性能问题 + DEFAULT_BATCH_SIZE = 500 + # 消息保留期,单位:天 + MESSAGE_RETENTION_DAYS = 90 + # 下载历史保留期,单位:天 + DOWNLOAD_HISTORY_RETENTION_DAYS = 180 + # 站点用户数据保留期,单位:天 + SITE_USERDATA_RETENTION_DAYS = 180 + # 下载转移历史保留期,单位:天 + TRANSFER_HISTORY_RETENTION_DAYS = 365 * 3 + + def cleanup(self, batch_size: Optional[int] = None) -> Dict[str, Any]: + """ + 按预设保留期执行分批清理。 + """ + started_at = datetime.now() + batch_size = batch_size or self.DEFAULT_BATCH_SIZE + if batch_size <= 0: + batch_size = self.DEFAULT_BATCH_SIZE + + message_cutoff = ( + started_at - timedelta(days=self.MESSAGE_RETENTION_DAYS) + ).strftime("%Y-%m-%d %H:%M:%S") + download_history_cutoff = ( + started_at - timedelta(days=self.DOWNLOAD_HISTORY_RETENTION_DAYS) + ).strftime("%Y-%m-%d %H:%M:%S") + site_userdata_cutoff = ( + started_at - timedelta(days=self.SITE_USERDATA_RETENTION_DAYS) + ).strftime("%Y-%m-%d") + transfer_history_cutoff = ( + started_at - timedelta(days=self.TRANSFER_HISTORY_RETENTION_DAYS) + ).strftime("%Y-%m-%d %H:%M:%S") + + report: Dict[str, Any] = { + "started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"), + "batch_size": batch_size, + "tables": {}, + "total_deleted": 0, + } + errors = [] + + plans = [ + { + "name": "message", + "cutoff": message_cutoff, + "handler": lambda db: Message.delete_before( + db=db, + before_time=message_cutoff, + limit=batch_size, + ), + }, + { + "name": "downloadhistory", + "cutoff": download_history_cutoff, + "handler": lambda db: DownloadHistory.delete_before( + db=db, + before_time=download_history_cutoff, + limit=batch_size, + ), + }, + { + "name": "downloadfiles", + "cutoff": "follow-parent-history", + "handler": lambda db: DownloadFiles.delete_orphans( + db=db, + limit=batch_size, + ), + }, + { + "name": "siteuserdata", + "cutoff": site_userdata_cutoff, + "handler": lambda db: SiteUserData.delete_before( + db=db, + before_day=site_userdata_cutoff, + limit=batch_size, + ), + }, + { + "name": "transferhistory", + "cutoff": transfer_history_cutoff, + "handler": lambda db: TransferHistory.delete_before( + db=db, + before_time=transfer_history_cutoff, + limit=batch_size, + ), + }, + ] + + with SessionFactory() as db: + for plan in plans: + name = plan["name"] + try: + table_report = self._cleanup_in_batches( + db=db, + table_name=name, + delete_batch=plan["handler"], + ) + table_report["cutoff"] = plan["cutoff"] + report["tables"][name] = table_report + report["total_deleted"] += table_report["deleted"] + except Exception as err: + errors.append(f"{name}: {str(err)}") + logger.error(f"数据表 {name} 清理失败:{str(err)}") + report["tables"][name] = { + "deleted": 0, + "batches": 0, + "cutoff": plan["cutoff"], + "error": str(err), + } + + if errors: + report["errors"] = errors + logger.error( + f"数据表清理部分失败:{json.dumps(report, ensure_ascii=False)}" + ) + raise RuntimeError(";".join(errors)) + + logger.info(f"数据表清理完成:{json.dumps(report, ensure_ascii=False)}") + return report + + @staticmethod + def _cleanup_in_batches( + db: Session, + table_name: str, + delete_batch: Callable[[Session], int], + ) -> Dict[str, int]: + """ + 循环执行单表分批删除,直到没有可删除数据。 + """ + total_deleted = 0 + batches = 0 + + while True: + deleted = delete_batch(db) or 0 + if deleted <= 0: + break + batches += 1 + total_deleted += deleted + logger.info( + f"数据表 {table_name} 清理第 {batches} 批完成,删除 {deleted} 条记录" + ) + + return { + "deleted": total_deleted, + "batches": batches, + } class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): @@ -148,7 +303,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): }, "data_cleanup": { "name": "数据表清理", - "func": DataCleanupChain().cleanup, + "func": SchedulerChain().cleanup, "running": False, }, "user_auth": { @@ -207,8 +362,8 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): # CookieCloud定时同步 if ( - settings.COOKIECLOUD_INTERVAL - and str(settings.COOKIECLOUD_INTERVAL).isdigit() + settings.COOKIECLOUD_INTERVAL + and str(settings.COOKIECLOUD_INTERVAL).isdigit() ): self._scheduler.add_job( self.start, @@ -217,14 +372,14 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): name="同步CookieCloud站点", minutes=int(settings.COOKIECLOUD_INTERVAL), next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(minutes=5), + + timedelta(minutes=5), kwargs={"job_id": "cookiecloud"}, ) # 媒体服务器同步 if ( - settings.MEDIASERVER_SYNC_INTERVAL - and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit() + settings.MEDIASERVER_SYNC_INTERVAL + and str(settings.MEDIASERVER_SYNC_INTERVAL).isdigit() ): self._scheduler.add_job( self.start, @@ -233,7 +388,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): name="同步媒体服务器", hours=int(settings.MEDIASERVER_SYNC_INTERVAL), next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(minutes=10), + + timedelta(minutes=10), kwargs={"job_id": "mediaserver_sync"}, ) @@ -284,8 +439,8 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): else: # RSS订阅模式 if ( - not settings.SUBSCRIBE_RSS_INTERVAL - or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit() + not settings.SUBSCRIBE_RSS_INTERVAL + or not str(settings.SUBSCRIBE_RSS_INTERVAL).isdigit() ): settings.SUBSCRIBE_RSS_INTERVAL = 30 elif int(settings.SUBSCRIBE_RSS_INTERVAL) < 5: @@ -327,7 +482,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): name="壁纸缓存", minutes=30, next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(seconds=1), + + timedelta(seconds=1), kwargs={"job_id": "random_wallpager"}, ) @@ -391,7 +546,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): name="推荐缓存", hours=24, next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(seconds=5), + + timedelta(seconds=5), kwargs={"job_id": "recommend_refresh"}, ) @@ -413,7 +568,7 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): name="订阅日历缓存", hours=6, next_run_time=datetime.now(pytz.timezone(settings.TZ)) - + timedelta(minutes=2), + + timedelta(minutes=2), kwargs={"job_id": "subscribe_calendar_cache"}, ) diff --git a/tests/test_data_cleanup_chain.py b/tests/test_data_cleanup_chain.py index c2b4ecea..b64810bd 100644 --- a/tests/test_data_cleanup_chain.py +++ b/tests/test_data_cleanup_chain.py @@ -7,12 +7,12 @@ from unittest.mock import patch from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from app.chain.data_cleanup import DataCleanupChain from app.db import Base from app.db.models.downloadhistory import DownloadHistory, DownloadFiles from app.db.models.message import Message from app.db.models.siteuserdata import SiteUserData from app.db.models.transferhistory import TransferHistory +from app.scheduler import SchedulerChain class DataCleanupChainTest(unittest.TestCase): @@ -135,7 +135,7 @@ class DataCleanupChainTest(unittest.TestCase): db.commit() with patch("app.chain.data_cleanup.SessionFactory", self.SessionFactory): - report = DataCleanupChain().cleanup(batch_size=1) + report = SchedulerChain().cleanup(batch_size=1) self.assertEqual(report["tables"]["message"]["deleted"], 3) self.assertEqual(report["tables"]["message"]["batches"], 3) @@ -172,7 +172,7 @@ class DataCleanupChainTest(unittest.TestCase): db.commit() with patch("app.chain.data_cleanup.SessionFactory", self.SessionFactory): - report = DataCleanupChain().cleanup(batch_size=10) + report = SchedulerChain().cleanup(batch_size=10) self.assertEqual(report["tables"]["transferhistory"]["deleted"], 0)