feat: add configurable data cleanup settings

Add a global cleanup switch, per-table retention periods, and scheduler config reload support so data cleanup can be managed and applied without restarting.
This commit is contained in:
jxxghp
2026-05-09 21:22:02 +08:00
parent bc6c61bc45
commit 4027ae2641
3 changed files with 244 additions and 79 deletions

View File

@@ -139,6 +139,18 @@ class ConfigModel(BaseModel):
# PostgreSQL 连接池溢出数量
DB_POSTGRESQL_MAX_OVERFLOW: int = 50
# ==================== 数据清理配置 ====================
# 是否启用数据表定时清理
DATA_CLEANUP_ENABLE: bool = False
# 消息表保留天数0为不清理
DATA_CLEANUP_MESSAGE_DAYS: int = 90
# 下载历史表保留天数0为不清理
DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS: int = 180
# 站点用户数据表保留天数0为不清理
DATA_CLEANUP_SITE_USERDATA_DAYS: int = 180
# 整理历史表保留天数0为不清理
DATA_CLEANUP_TRANSFER_HISTORY_DAYS: int = 365 * 3
# ==================== 缓存配置 ====================
# 缓存类型,支持 cachetools 和 redis默认使用 cachetools
CACHE_BACKEND_TYPE: str = "cachetools"

View File

@@ -53,95 +53,48 @@ class SchedulerChain(ChainBase):
"""
# 每批处理的记录数,避免一次性删除过多数据导致性能问题
DEFAULT_BATCH_SIZE = 500
# 消息保留期,单位:天
MESSAGE_RETENTION_DAYS = 90
# 下载历史保留期,单位:天
DOWNLOAD_HISTORY_RETENTION_DAYS = 180
# 站点用户数据保留期,单位:天
SITE_USERDATA_RETENTION_DAYS = 180
# 下载转移历史保留期,单位:天
TRANSFER_HISTORY_RETENTION_DAYS = 365 * 3
def cleanup(self, batch_size: Optional[int] = None) -> Dict[str, Any]:
"""
预设保留期执行分批清理。
配置保留期执行分批清理。
"""
started_at = datetime.now()
batch_size = batch_size or self.DEFAULT_BATCH_SIZE
if batch_size <= 0:
batch_size = self.DEFAULT_BATCH_SIZE
message_cutoff = (
started_at - timedelta(days=self.MESSAGE_RETENTION_DAYS)
).strftime("%Y-%m-%d %H:%M:%S")
download_history_cutoff = (
started_at - timedelta(days=self.DOWNLOAD_HISTORY_RETENTION_DAYS)
).strftime("%Y-%m-%d %H:%M:%S")
site_userdata_cutoff = (
started_at - timedelta(days=self.SITE_USERDATA_RETENTION_DAYS)
).strftime("%Y-%m-%d")
transfer_history_cutoff = (
started_at - timedelta(days=self.TRANSFER_HISTORY_RETENTION_DAYS)
).strftime("%Y-%m-%d %H:%M:%S")
report: Dict[str, Any] = {
"started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"),
"batch_size": batch_size,
"enabled": bool(settings.DATA_CLEANUP_ENABLE),
"tables": {},
"total_deleted": 0,
}
if not settings.DATA_CLEANUP_ENABLE:
report["skipped_reason"] = "disabled"
logger.info("数据表清理总开关未开启,跳过执行")
return report
errors = []
plans = [
{
"name": "message",
"cutoff": message_cutoff,
"handler": lambda db: Message.delete_before(
db=db,
before_time=message_cutoff,
limit=batch_size,
),
},
{
"name": "downloadhistory",
"cutoff": download_history_cutoff,
"handler": lambda db: DownloadHistory.delete_before(
db=db,
before_time=download_history_cutoff,
limit=batch_size,
),
},
{
"name": "downloadfiles",
"cutoff": "follow-parent-history",
"handler": lambda db: DownloadFiles.delete_orphans(
db=db,
limit=batch_size,
),
},
{
"name": "siteuserdata",
"cutoff": site_userdata_cutoff,
"handler": lambda db: SiteUserData.delete_before(
db=db,
before_day=site_userdata_cutoff,
limit=batch_size,
),
},
{
"name": "transferhistory",
"cutoff": transfer_history_cutoff,
"handler": lambda db: TransferHistory.delete_before(
db=db,
before_time=transfer_history_cutoff,
limit=batch_size,
),
},
]
plans = self._build_cleanup_plans(started_at=started_at, batch_size=batch_size)
with SessionFactory() as db:
for plan in plans:
name = plan["name"]
retention_days = plan["retention_days"]
if retention_days <= 0:
report["tables"][name] = {
"deleted": 0,
"batches": 0,
"cutoff": None,
"retention_days": retention_days,
"skipped": True,
"reason": "retention_days<=0",
}
continue
try:
table_report = self._cleanup_in_batches(
db=db,
@@ -149,6 +102,7 @@ class SchedulerChain(ChainBase):
delete_batch=plan["handler"],
)
table_report["cutoff"] = plan["cutoff"]
table_report["retention_days"] = retention_days
report["tables"][name] = table_report
report["total_deleted"] += table_report["deleted"]
except Exception as err:
@@ -158,6 +112,7 @@ class SchedulerChain(ChainBase):
"deleted": 0,
"batches": 0,
"cutoff": plan["cutoff"],
"retention_days": retention_days,
"error": str(err),
}
@@ -171,6 +126,95 @@ class SchedulerChain(ChainBase):
logger.info(f"数据表清理完成:{json.dumps(report, ensure_ascii=False)}")
return report
@staticmethod
def _normalize_retention_days(retention_days: Any) -> int:
try:
normalized_days = int(retention_days or 0)
except (TypeError, ValueError):
return 0
return max(normalized_days, 0)
def _build_cleanup_plans(
self,
started_at: datetime,
batch_size: int,
) -> List[Dict[str, Any]]:
message_days = self._normalize_retention_days(settings.DATA_CLEANUP_MESSAGE_DAYS)
download_history_days = self._normalize_retention_days(
settings.DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS
)
site_userdata_days = self._normalize_retention_days(
settings.DATA_CLEANUP_SITE_USERDATA_DAYS
)
transfer_history_days = self._normalize_retention_days(
settings.DATA_CLEANUP_TRANSFER_HISTORY_DAYS
)
message_cutoff = (
started_at - timedelta(days=message_days)
).strftime("%Y-%m-%d %H:%M:%S")
download_history_cutoff = (
started_at - timedelta(days=download_history_days)
).strftime("%Y-%m-%d %H:%M:%S")
site_userdata_cutoff = (
started_at - timedelta(days=site_userdata_days)
).strftime("%Y-%m-%d")
transfer_history_cutoff = (
started_at - timedelta(days=transfer_history_days)
).strftime("%Y-%m-%d %H:%M:%S")
return [
{
"name": "message",
"retention_days": message_days,
"cutoff": message_cutoff,
"handler": lambda db: Message.delete_before(
db=db,
before_time=message_cutoff,
limit=batch_size,
),
},
{
"name": "downloadhistory",
"retention_days": download_history_days,
"cutoff": download_history_cutoff,
"handler": lambda db: DownloadHistory.delete_before(
db=db,
before_time=download_history_cutoff,
limit=batch_size,
),
},
{
"name": "downloadfiles",
"retention_days": download_history_days,
"cutoff": "follow-parent-history",
"handler": lambda db: DownloadFiles.delete_orphans(
db=db,
limit=batch_size,
),
},
{
"name": "siteuserdata",
"retention_days": site_userdata_days,
"cutoff": site_userdata_cutoff,
"handler": lambda db: SiteUserData.delete_before(
db=db,
before_day=site_userdata_cutoff,
limit=batch_size,
),
},
{
"name": "transferhistory",
"retention_days": transfer_history_days,
"cutoff": transfer_history_cutoff,
"handler": lambda db: TransferHistory.delete_before(
db=db,
before_time=transfer_history_cutoff,
limit=batch_size,
),
},
]
@staticmethod
def _cleanup_in_batches(
db: Session,
@@ -215,6 +259,11 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
"SITEDATA_REFRESH_INTERVAL",
"AI_AGENT_ENABLE",
"AI_AGENT_JOB_INTERVAL",
"DATA_CLEANUP_ENABLE",
"DATA_CLEANUP_MESSAGE_DAYS",
"DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS",
"DATA_CLEANUP_SITE_USERDATA_DAYS",
"DATA_CLEANUP_TRANSFER_HISTORY_DAYS",
}
def __init__(self):
@@ -507,15 +556,16 @@ class Scheduler(ConfigReloadMixin, metaclass=SingletonClass):
)
# 数据表清理服务,每天凌晨执行一次
self._scheduler.add_job(
self.start,
"cron",
id="data_cleanup",
name="数据表清理",
hour=3,
minute=30,
kwargs={"job_id": "data_cleanup"},
)
if settings.DATA_CLEANUP_ENABLE:
self._scheduler.add_job(
self.start,
"cron",
id="data_cleanup",
name="数据表清理",
hour=3,
minute=30,
kwargs={"job_id": "data_cleanup"},
)
# 定时检查用户认证每隔10分钟
self._scheduler.add_job(

View File

@@ -12,6 +12,7 @@ from app.db.models.downloadhistory import DownloadHistory, DownloadFiles
from app.db.models.message import Message
from app.db.models.siteuserdata import SiteUserData
from app.db.models.transferhistory import TransferHistory
from app.core.config import settings
from app.scheduler import SchedulerChain
@@ -31,6 +32,18 @@ class DataCleanupChainTest(unittest.TestCase):
self.engine.dispose()
self.temp_dir.cleanup()
@staticmethod
def _cleanup_settings(**overrides):
defaults = {
"DATA_CLEANUP_ENABLE": True,
"DATA_CLEANUP_MESSAGE_DAYS": 90,
"DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS": 180,
"DATA_CLEANUP_SITE_USERDATA_DAYS": 180,
"DATA_CLEANUP_TRANSFER_HISTORY_DAYS": 365 * 3,
}
defaults.update(overrides)
return patch.multiple(settings, **defaults)
def test_cleanup_removes_expired_rows_in_batches(self):
"""
指定表应按保留期分批删除,并保留仍在有效期内的数据。
@@ -134,7 +147,7 @@ class DataCleanupChainTest(unittest.TestCase):
)
db.commit()
with patch("app.chain.data_cleanup.SessionFactory", self.SessionFactory):
with self._cleanup_settings(), patch("app.scheduler.SessionFactory", self.SessionFactory):
report = SchedulerChain().cleanup(batch_size=1)
self.assertEqual(report["tables"]["message"]["deleted"], 3)
@@ -171,10 +184,100 @@ class DataCleanupChainTest(unittest.TestCase):
)
db.commit()
with patch("app.chain.data_cleanup.SessionFactory", self.SessionFactory):
with self._cleanup_settings(), patch("app.scheduler.SessionFactory", self.SessionFactory):
report = SchedulerChain().cleanup(batch_size=10)
self.assertEqual(report["tables"]["transferhistory"]["deleted"], 0)
with self.SessionFactory() as db:
self.assertEqual(db.query(TransferHistory).count(), 1)
def test_cleanup_skips_when_disabled(self):
"""
总开关关闭时应跳过清理。
"""
now = datetime.now()
old_message_time = (now - timedelta(days=120)).strftime("%Y-%m-%d %H:%M:%S")
with self.SessionFactory() as db:
db.add(Message(reg_time=old_message_time, title="old"))
db.commit()
with self._cleanup_settings(DATA_CLEANUP_ENABLE=False), patch(
"app.scheduler.SessionFactory", self.SessionFactory
):
report = SchedulerChain().cleanup(batch_size=10)
self.assertFalse(report["enabled"])
self.assertEqual(report["skipped_reason"], "disabled")
self.assertEqual(report["total_deleted"], 0)
with self.SessionFactory() as db:
self.assertEqual(db.query(Message).count(), 1)
def test_cleanup_respects_per_table_retention_days(self):
"""
各表保留期应使用当前配置值。
"""
now = datetime.now()
old_message_time = (now - timedelta(days=10)).strftime("%Y-%m-%d %H:%M:%S")
keep_message_time = (now - timedelta(days=2)).strftime("%Y-%m-%d %H:%M:%S")
with self.SessionFactory() as db:
db.add_all(
[
Message(reg_time=old_message_time, title="old"),
Message(reg_time=keep_message_time, title="keep"),
]
)
db.commit()
with self._cleanup_settings(DATA_CLEANUP_MESSAGE_DAYS=7), patch(
"app.scheduler.SessionFactory", self.SessionFactory
):
report = SchedulerChain().cleanup(batch_size=10)
self.assertEqual(report["tables"]["message"]["retention_days"], 7)
self.assertEqual(report["tables"]["message"]["deleted"], 1)
with self.SessionFactory() as db:
self.assertEqual(db.query(Message).count(), 1)
def test_cleanup_skips_table_when_retention_days_is_zero(self):
"""
单表保留期为 0 时应跳过该表及其附属孤儿记录清理。
"""
now = datetime.now()
old_download_time = (now - timedelta(days=240)).strftime("%Y-%m-%d %H:%M:%S")
with self.SessionFactory() as db:
db.add(
DownloadHistory(
path="/downloads/old",
type="电影",
title="old",
download_hash="hash-old",
date=old_download_time,
)
)
db.add(
DownloadFiles(
download_hash="hash-orphan",
fullpath="/downloads/orphan/file.mkv",
savepath="/downloads/orphan",
filepath="file.mkv",
)
)
db.commit()
with self._cleanup_settings(DATA_CLEANUP_DOWNLOAD_HISTORY_DAYS=0), patch(
"app.scheduler.SessionFactory", self.SessionFactory
):
report = SchedulerChain().cleanup(batch_size=10)
self.assertTrue(report["tables"]["downloadhistory"]["skipped"])
self.assertTrue(report["tables"]["downloadfiles"]["skipped"])
with self.SessionFactory() as db:
self.assertEqual(db.query(DownloadHistory).count(), 1)
self.assertEqual(db.query(DownloadFiles).count(), 1)