feat(report): stream available years loading

This commit is contained in:
tisonhuang
2026-03-04 16:33:20 +08:00
parent d476fbbdae
commit 28d68d8a8e
5 changed files with 242 additions and 40 deletions

View File

@@ -198,23 +198,36 @@ class AnnualReportService {
return seconds > 0 ? seconds : 0
}
private addYearsFromRange(years: Set<number>, firstTs: number, lastTs: number): void {
private addYearsFromRange(years: Set<number>, firstTs: number, lastTs: number): boolean {
let changed = false
const currentYear = new Date().getFullYear()
const minTs = firstTs > 0 ? firstTs : lastTs
const maxTs = lastTs > 0 ? lastTs : firstTs
if (minTs <= 0 || maxTs <= 0) return
if (minTs <= 0 || maxTs <= 0) return changed
const minYear = new Date(minTs * 1000).getFullYear()
const maxYear = new Date(maxTs * 1000).getFullYear()
for (let y = minYear; y <= maxYear; y++) {
if (y >= 2010 && y <= currentYear) years.add(y)
if (y >= 2010 && y <= currentYear && !years.has(y)) {
years.add(y)
changed = true
}
}
return changed
}
private normalizeAvailableYears(years: Iterable<number>): number[] {
return Array.from(new Set(Array.from(years)))
.filter((y) => Number.isFinite(y))
.map((y) => Math.floor(y))
.sort((a, b) => b - a)
}
private async forEachWithConcurrency<T>(
items: T[],
concurrency: number,
handler: (item: T, index: number) => Promise<void>
handler: (item: T, index: number) => Promise<void>,
shouldStop?: () => boolean
): Promise<void> {
if (!items.length) return
const workerCount = Math.max(1, Math.min(concurrency, items.length))
@@ -224,6 +237,7 @@ class AnnualReportService {
for (let i = 0; i < workerCount; i++) {
workers.push((async () => {
while (true) {
if (shouldStop?.()) break
const current = nextIndex
nextIndex += 1
if (current >= items.length) break
@@ -297,37 +311,72 @@ class AnnualReportService {
return queryByColumn(detectedColumn)
}
private async getAvailableYearsByTableScan(sessionIds: string[]): Promise<number[]> {
private async getAvailableYearsByTableScan(
sessionIds: string[],
options?: { onProgress?: (years: number[]) => void; shouldCancel?: () => boolean }
): Promise<number[]> {
const years = new Set<number>()
let lastEmittedSize = 0
const emitIfChanged = (force = false) => {
if (!options?.onProgress) return
const next = this.normalizeAvailableYears(years)
if (!force && next.length === lastEmittedSize) return
options.onProgress(next)
lastEmittedSize = next.length
}
const shouldCancel = () => options?.shouldCancel?.() === true
await this.forEachWithConcurrency(sessionIds, this.availableYearsScanConcurrency, async (sessionId) => {
if (shouldCancel()) return
const tableStats = await wcdbService.getMessageTableStats(sessionId)
if (!tableStats.success || !Array.isArray(tableStats.tables) || tableStats.tables.length === 0) {
return
}
for (const table of tableStats.tables as Record<string, any>[]) {
if (shouldCancel()) return
const tableName = String(table.table_name || table.name || '').trim()
const dbPath = String(table.db_path || table.dbPath || '').trim()
if (!tableName || !dbPath) continue
const range = await this.getTableTimeRange(dbPath, tableName)
if (!range) continue
this.addYearsFromRange(years, range.first, range.last)
const changed = this.addYearsFromRange(years, range.first, range.last)
if (changed) emitIfChanged()
}
})
}, shouldCancel)
return Array.from(years).sort((a, b) => b - a)
emitIfChanged(true)
return this.normalizeAvailableYears(years)
}
private async getAvailableYearsByEdgeScan(sessionIds: string[]): Promise<number[]> {
private async getAvailableYearsByEdgeScan(
sessionIds: string[],
options?: { onProgress?: (years: number[]) => void; shouldCancel?: () => boolean }
): Promise<number[]> {
const years = new Set<number>()
let lastEmittedSize = 0
const shouldCancel = () => options?.shouldCancel?.() === true
const emitIfChanged = (force = false) => {
if (!options?.onProgress) return
const next = this.normalizeAvailableYears(years)
if (!force && next.length === lastEmittedSize) return
options.onProgress(next)
lastEmittedSize = next.length
}
for (const sessionId of sessionIds) {
if (shouldCancel()) break
const first = await this.getEdgeMessageTime(sessionId, true)
const last = await this.getEdgeMessageTime(sessionId, false)
this.addYearsFromRange(years, first || 0, last || 0)
const changed = this.addYearsFromRange(years, first || 0, last || 0)
if (changed) emitIfChanged()
}
return Array.from(years).sort((a, b) => b - a)
emitIfChanged(true)
return this.normalizeAvailableYears(years)
}
private buildAvailableYearsCacheKey(dbPath: string, cleanedWxid: string): string {
@@ -345,10 +394,7 @@ class AnnualReportService {
}
private setCachedAvailableYears(cacheKey: string, years: number[]): void {
const normalized = Array.from(new Set(years))
.filter((y) => Number.isFinite(y))
.map((y) => Math.floor(y))
.sort((a, b) => b - a)
const normalized = this.normalizeAvailableYears(years)
this.availableYearsCache.set(cacheKey, {
years: normalized,
@@ -546,13 +592,22 @@ class AnnualReportService {
return { sessionId: bestSessionId, days: bestDays, start: bestStart, end: bestEnd }
}
async getAvailableYears(params: { dbPath: string; decryptKey: string; wxid: string }): Promise<{ success: boolean; data?: number[]; error?: string }> {
async getAvailableYears(params: {
dbPath: string
decryptKey: string
wxid: string
onProgress?: (years: number[]) => void
shouldCancel?: () => boolean
}): Promise<{ success: boolean; data?: number[]; error?: string }> {
try {
const isCancelled = () => params.shouldCancel?.() === true
const conn = await this.ensureConnectedWithConfig(params.dbPath, params.decryptKey, params.wxid)
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
if (isCancelled()) return { success: false, error: '已取消加载年份数据' }
const cacheKey = this.buildAvailableYearsCacheKey(params.dbPath, conn.cleanedWxid)
const cached = this.getCachedAvailableYears(cacheKey)
if (cached) {
params.onProgress?.(cached)
return { success: true, data: cached }
}
@@ -560,14 +615,24 @@ class AnnualReportService {
if (sessionIds.length === 0) {
return { success: false, error: '未找到消息会话' }
}
if (isCancelled()) return { success: false, error: '已取消加载年份数据' }
let years = await this.getAvailableYearsByTableScan(sessionIds)
let years = await this.getAvailableYearsByTableScan(sessionIds, {
onProgress: params.onProgress,
shouldCancel: params.shouldCancel
})
if (isCancelled()) return { success: false, error: '已取消加载年份数据' }
if (years.length === 0) {
// 扫表失败时,再降级到游标首尾扫描,保证兼容性。
years = await this.getAvailableYearsByEdgeScan(sessionIds)
years = await this.getAvailableYearsByEdgeScan(sessionIds, {
onProgress: params.onProgress,
shouldCancel: params.shouldCancel
})
}
if (isCancelled()) return { success: false, error: '已取消加载年份数据' }
this.setCachedAvailableYears(cacheKey, years)
params.onProgress?.(years)
return { success: true, data: years }
} catch (e) {
return { success: false, error: String(e) }