diff --git a/app/core/config.py b/app/core/config.py index 56d72cff..5cd319fa 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -139,6 +139,18 @@ class ConfigModel(BaseModel): # PostgreSQL 连接池溢出数量 DB_POSTGRESQL_MAX_OVERFLOW: int = 50 + # ==================== 数据清理配置 ==================== + # 是否启用数据表定时清理 + DATA_CLEANUP_ENABLE: bool = False + # 消息表保留天数,0为不清理 + DATA_CLEANUP_MESSAGE_DAYS: int = 90 + # 下载历史表保留天数,0为不清理 + DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS: int = 180 + # 站点用户数据表保留天数,0为不清理 + DATA_CLEANUP_SITE_USERDATA_DAYS: int = 180 + # 整理历史表保留天数,0为不清理 + DATA_CLEANUP_TRANSFER_HISTORY_DAYS: int = 365 * 3 + # ==================== 缓存配置 ==================== # 缓存类型,支持 cachetools 和 redis,默认使用 cachetools CACHE_BACKEND_TYPE: str = "cachetools" diff --git a/app/scheduler.py b/app/scheduler.py index 56b9cac1..cc25a8b7 100644 --- a/app/scheduler.py +++ b/app/scheduler.py @@ -53,95 +53,48 @@ class SchedulerChain(ChainBase): """ # 每批处理的记录数,避免一次性删除过多数据导致性能问题 DEFAULT_BATCH_SIZE = 500 - # 消息保留期,单位:天 - MESSAGE_RETENTION_DAYS = 90 - # 下载历史保留期,单位:天 - DOWNLOAD_HISTORY_RETENTION_DAYS = 180 - # 站点用户数据保留期,单位:天 - SITE_USERDATA_RETENTION_DAYS = 180 - # 下载转移历史保留期,单位:天 - TRANSFER_HISTORY_RETENTION_DAYS = 365 * 3 def cleanup(self, batch_size: Optional[int] = None) -> Dict[str, Any]: """ - 按预设保留期执行分批清理。 + 按配置保留期执行分批清理。 """ started_at = datetime.now() batch_size = batch_size or self.DEFAULT_BATCH_SIZE if batch_size <= 0: batch_size = self.DEFAULT_BATCH_SIZE - message_cutoff = ( - started_at - timedelta(days=self.MESSAGE_RETENTION_DAYS) - ).strftime("%Y-%m-%d %H:%M:%S") - download_history_cutoff = ( - started_at - timedelta(days=self.DOWNLOAD_HISTORY_RETENTION_DAYS) - ).strftime("%Y-%m-%d %H:%M:%S") - site_userdata_cutoff = ( - started_at - timedelta(days=self.SITE_USERDATA_RETENTION_DAYS) - ).strftime("%Y-%m-%d") - transfer_history_cutoff = ( - started_at - timedelta(days=self.TRANSFER_HISTORY_RETENTION_DAYS) - ).strftime("%Y-%m-%d %H:%M:%S") - report: Dict[str, Any] = { "started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"), "batch_size": batch_size, + "enabled": bool(settings.DATA_CLEANUP_ENABLE), "tables": {}, "total_deleted": 0, } + + if not settings.DATA_CLEANUP_ENABLE: + report["skipped_reason"] = "disabled" + logger.info("数据表清理总开关未开启,跳过执行") + return report + errors = [] - plans = [ - { - "name": "message", - "cutoff": message_cutoff, - "handler": lambda db: Message.delete_before( - db=db, - before_time=message_cutoff, - limit=batch_size, - ), - }, - { - "name": "downloadhistory", - "cutoff": download_history_cutoff, - "handler": lambda db: DownloadHistory.delete_before( - db=db, - before_time=download_history_cutoff, - limit=batch_size, - ), - }, - { - "name": "downloadfiles", - "cutoff": "follow-parent-history", - "handler": lambda db: DownloadFiles.delete_orphans( - db=db, - limit=batch_size, - ), - }, - { - "name": "siteuserdata", - "cutoff": site_userdata_cutoff, - "handler": lambda db: SiteUserData.delete_before( - db=db, - before_day=site_userdata_cutoff, - limit=batch_size, - ), - }, - { - "name": "transferhistory", - "cutoff": transfer_history_cutoff, - "handler": lambda db: TransferHistory.delete_before( - db=db, - before_time=transfer_history_cutoff, - limit=batch_size, - ), - }, - ] + plans = self._build_cleanup_plans(started_at=started_at, batch_size=batch_size) with SessionFactory() as db: for plan in plans: name = plan["name"] + retention_days = plan["retention_days"] + if retention_days <= 0: + report["tables"][name] = { + "deleted": 0, + "batches": 0, + "cutoff": None, + "retention_days": retention_days, + "skipped": True, + "reason": "retention_days<=0", + } + continue + try: table_report = self._cleanup_in_batches( db=db, @@ -149,6 +102,7 @@ class SchedulerChain(ChainBase): delete_batch=plan["handler"], ) table_report["cutoff"] = plan["cutoff"] + table_report["retention_days"] = retention_days report["tables"][name] = table_report report["total_deleted"] += table_report["deleted"] except Exception as err: @@ -158,6 +112,7 @@ class SchedulerChain(ChainBase): "deleted": 0, "batches": 0, "cutoff": plan["cutoff"], + "retention_days": retention_days, "error": str(err), } @@ -171,6 +126,95 @@ class SchedulerChain(ChainBase): logger.info(f"数据表清理完成:{json.dumps(report, ensure_ascii=False)}") return report + @staticmethod + def _normalize_retention_days(retention_days: Any) -> int: + try: + normalized_days = int(retention_days or 0) + except (TypeError, ValueError): + return 0 + return max(normalized_days, 0) + + def _build_cleanup_plans( + self, + started_at: datetime, + batch_size: int, + ) -> List[Dict[str, Any]]: + message_days = self._normalize_retention_days(settings.DATA_CLEANUP_MESSAGE_DAYS) + download_history_days = self._normalize_retention_days( + settings.DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS + ) + site_userdata_days = self._normalize_retention_days( + settings.DATA_CLEANUP_SITE_USERDATA_DAYS + ) + transfer_history_days = self._normalize_retention_days( + settings.DATA_CLEANUP_TRANSFER_HISTORY_DAYS + ) + + message_cutoff = ( + started_at - timedelta(days=message_days) + ).strftime("%Y-%m-%d %H:%M:%S") + download_history_cutoff = ( + started_at - timedelta(days=download_history_days) + ).strftime("%Y-%m-%d %H:%M:%S") + site_userdata_cutoff = ( + started_at - timedelta(days=site_userdata_days) + ).strftime("%Y-%m-%d") + transfer_history_cutoff = ( + started_at - timedelta(days=transfer_history_days) + ).strftime("%Y-%m-%d %H:%M:%S") + + return [ + { + "name": "message", + "retention_days": message_days, + "cutoff": message_cutoff, + "handler": lambda db: Message.delete_before( + db=db, + before_time=message_cutoff, + limit=batch_size, + ), + }, + { + "name": "downloadhistory", + "retention_days": download_history_days, + "cutoff": download_history_cutoff, + "handler": lambda db: DownloadHistory.delete_before( + db=db, + before_time=download_history_cutoff, + limit=batch_size, + ), + }, + { + "name": "downloadfiles", + "retention_days": download_history_days, + "cutoff": "follow-parent-history", + "handler": lambda db: DownloadFiles.delete_orphans( + db=db, + limit=batch_size, + ), + }, + { + "name": "siteuserdata", + "retention_days": site_userdata_days, + "cutoff": site_userdata_cutoff, + "handler": lambda db: SiteUserData.delete_before( + db=db, + before_day=site_userdata_cutoff, + limit=batch_size, + ), + }, + { + "name": "transferhistory", + "retention_days": transfer_history_days, + "cutoff": transfer_history_cutoff, + "handler": lambda db: TransferHistory.delete_before( + db=db, + before_time=transfer_history_cutoff, + limit=batch_size, + ), + }, + ] + @staticmethod def _cleanup_in_batches( db: Session, @@ -215,6 +259,11 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): "SITEDATA_REFRESH_INTERVAL", "AI_AGENT_ENABLE", "AI_AGENT_JOB_INTERVAL", + "DATA_CLEANUP_ENABLE", + "DATA_CLEANUP_MESSAGE_DAYS", + "DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS", + "DATA_CLEANUP_SITE_USERDATA_DAYS", + "DATA_CLEANUP_TRANSFER_HISTORY_DAYS", } def __init__(self): @@ -507,15 +556,16 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass): ) # 数据表清理服务,每天凌晨执行一次 - self._scheduler.add_job( - self.start, - "cron", - id="data_cleanup", - name="数据表清理", - hour=3, - minute=30, - kwargs={"job_id": "data_cleanup"}, - ) + if settings.DATA_CLEANUP_ENABLE: + self._scheduler.add_job( + self.start, + "cron", + id="data_cleanup", + name="数据表清理", + hour=3, + minute=30, + kwargs={"job_id": "data_cleanup"}, + ) # 定时检查用户认证,每隔10分钟 self._scheduler.add_job( diff --git a/tests/test_data_cleanup_chain.py b/tests/test_data_cleanup_chain.py index b64810bd..3e5e609f 100644 --- a/tests/test_data_cleanup_chain.py +++ b/tests/test_data_cleanup_chain.py @@ -12,6 +12,7 @@ from app.db.models.downloadhistory import DownloadHistory, DownloadFiles from app.db.models.message import Message from app.db.models.siteuserdata import SiteUserData from app.db.models.transferhistory import TransferHistory +from app.core.config import settings from app.scheduler import SchedulerChain @@ -31,6 +32,18 @@ class DataCleanupChainTest(unittest.TestCase): self.engine.dispose() self.temp_dir.cleanup() + @staticmethod + def _cleanup_settings(**overrides): + defaults = { + "DATA_CLEANUP_ENABLE": True, + "DATA_CLEANUP_MESSAGE_DAYS": 90, + "DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS": 180, + "DATA_CLEANUP_SITE_USERDATA_DAYS": 180, + "DATA_CLEANUP_TRANSFER_HISTORY_DAYS": 365 * 3, + } + defaults.update(overrides) + return patch.multiple(settings, **defaults) + def test_cleanup_removes_expired_rows_in_batches(self): """ 指定表应按保留期分批删除,并保留仍在有效期内的数据。 @@ -134,7 +147,7 @@ class DataCleanupChainTest(unittest.TestCase): ) db.commit() - with patch("app.chain.data_cleanup.SessionFactory", self.SessionFactory): + with self._cleanup_settings(), patch("app.scheduler.SessionFactory", self.SessionFactory): report = SchedulerChain().cleanup(batch_size=1) self.assertEqual(report["tables"]["message"]["deleted"], 3) @@ -171,10 +184,100 @@ class DataCleanupChainTest(unittest.TestCase): ) db.commit() - with patch("app.chain.data_cleanup.SessionFactory", self.SessionFactory): + with self._cleanup_settings(), patch("app.scheduler.SessionFactory", self.SessionFactory): report = SchedulerChain().cleanup(batch_size=10) self.assertEqual(report["tables"]["transferhistory"]["deleted"], 0) with self.SessionFactory() as db: self.assertEqual(db.query(TransferHistory).count(), 1) + + def test_cleanup_skips_when_disabled(self): + """ + 总开关关闭时应跳过清理。 + """ + now = datetime.now() + old_message_time = (now - timedelta(days=120)).strftime("%Y-%m-%d %H:%M:%S") + + with self.SessionFactory() as db: + db.add(Message(reg_time=old_message_time, title="old")) + db.commit() + + with self._cleanup_settings(DATA_CLEANUP_ENABLE=False), patch( + "app.scheduler.SessionFactory", self.SessionFactory + ): + report = SchedulerChain().cleanup(batch_size=10) + + self.assertFalse(report["enabled"]) + self.assertEqual(report["skipped_reason"], "disabled") + self.assertEqual(report["total_deleted"], 0) + + with self.SessionFactory() as db: + self.assertEqual(db.query(Message).count(), 1) + + def test_cleanup_respects_per_table_retention_days(self): + """ + 各表保留期应使用当前配置值。 + """ + now = datetime.now() + old_message_time = (now - timedelta(days=10)).strftime("%Y-%m-%d %H:%M:%S") + keep_message_time = (now - timedelta(days=2)).strftime("%Y-%m-%d %H:%M:%S") + + with self.SessionFactory() as db: + db.add_all( + [ + Message(reg_time=old_message_time, title="old"), + Message(reg_time=keep_message_time, title="keep"), + ] + ) + db.commit() + + with self._cleanup_settings(DATA_CLEANUP_MESSAGE_DAYS=7), patch( + "app.scheduler.SessionFactory", self.SessionFactory + ): + report = SchedulerChain().cleanup(batch_size=10) + + self.assertEqual(report["tables"]["message"]["retention_days"], 7) + self.assertEqual(report["tables"]["message"]["deleted"], 1) + + with self.SessionFactory() as db: + self.assertEqual(db.query(Message).count(), 1) + + def test_cleanup_skips_table_when_retention_days_is_zero(self): + """ + 单表保留期为 0 时应跳过该表及其附属孤儿记录清理。 + """ + now = datetime.now() + old_download_time = (now - timedelta(days=240)).strftime("%Y-%m-%d %H:%M:%S") + + with self.SessionFactory() as db: + db.add( + DownloadHistory( + path="/downloads/old", + type="电影", + title="old", + download_hash="hash-old", + date=old_download_time, + ) + ) + db.add( + DownloadFiles( + download_hash="hash-orphan", + fullpath="/downloads/orphan/file.mkv", + savepath="/downloads/orphan", + filepath="file.mkv", + ) + ) + db.commit() + + with self._cleanup_settings(DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS=0), patch( + "app.scheduler.SessionFactory", self.SessionFactory + ): + report = SchedulerChain().cleanup(batch_size=10) + + self.assertTrue(report["tables"]["downloadhistory"]["skipped"]) + self.assertTrue(report["tables"]["downloadfiles"]["skipped"]) + + with self.SessionFactory() as db: + self.assertEqual(db.query(DownloadHistory).count(), 1) + self.assertEqual(db.query(DownloadFiles).count(), 1)