mirror of
https://github.com/hicccc77/WeFlow.git
synced 2026-03-24 23:06:51 +00:00
fix(report): speed up available years loading
This commit is contained in:
@@ -86,6 +86,11 @@ export interface AnnualReportData {
|
||||
}
|
||||
|
||||
class AnnualReportService {
|
||||
private readonly availableYearsCacheTtlMs = 10 * 60 * 1000
|
||||
private readonly availableYearsScanConcurrency = 4
|
||||
private readonly availableYearsColumnCache = new Map<string, string>()
|
||||
private readonly availableYearsCache = new Map<string, { years: number[]; updatedAt: number }>()
|
||||
|
||||
constructor() {
|
||||
}
|
||||
|
||||
@@ -181,6 +186,188 @@ class AnnualReportService {
|
||||
}
|
||||
}
|
||||
|
||||
private quoteSqlIdentifier(identifier: string): string {
|
||||
return `"${String(identifier || '').replace(/"/g, '""')}"`
|
||||
}
|
||||
|
||||
private toUnixTimestamp(value: any): number {
|
||||
const n = Number(value)
|
||||
if (!Number.isFinite(n) || n <= 0) return 0
|
||||
// 兼容毫秒级时间戳
|
||||
const seconds = n > 1e12 ? Math.floor(n / 1000) : Math.floor(n)
|
||||
return seconds > 0 ? seconds : 0
|
||||
}
|
||||
|
||||
private addYearsFromRange(years: Set<number>, firstTs: number, lastTs: number): void {
|
||||
const currentYear = new Date().getFullYear()
|
||||
const minTs = firstTs > 0 ? firstTs : lastTs
|
||||
const maxTs = lastTs > 0 ? lastTs : firstTs
|
||||
if (minTs <= 0 || maxTs <= 0) return
|
||||
|
||||
const minYear = new Date(minTs * 1000).getFullYear()
|
||||
const maxYear = new Date(maxTs * 1000).getFullYear()
|
||||
for (let y = minYear; y <= maxYear; y++) {
|
||||
if (y >= 2010 && y <= currentYear) years.add(y)
|
||||
}
|
||||
}
|
||||
|
||||
private async forEachWithConcurrency<T>(
|
||||
items: T[],
|
||||
concurrency: number,
|
||||
handler: (item: T, index: number) => Promise<void>
|
||||
): Promise<void> {
|
||||
if (!items.length) return
|
||||
const workerCount = Math.max(1, Math.min(concurrency, items.length))
|
||||
let nextIndex = 0
|
||||
const workers: Promise<void>[] = []
|
||||
|
||||
for (let i = 0; i < workerCount; i++) {
|
||||
workers.push((async () => {
|
||||
while (true) {
|
||||
const current = nextIndex
|
||||
nextIndex += 1
|
||||
if (current >= items.length) break
|
||||
await handler(items[current], current)
|
||||
}
|
||||
})())
|
||||
}
|
||||
|
||||
await Promise.all(workers)
|
||||
}
|
||||
|
||||
private async detectTimeColumn(dbPath: string, tableName: string): Promise<string | null> {
|
||||
const cacheKey = `${dbPath}\u0001${tableName}`
|
||||
if (this.availableYearsColumnCache.has(cacheKey)) {
|
||||
const cached = this.availableYearsColumnCache.get(cacheKey) || ''
|
||||
return cached || null
|
||||
}
|
||||
|
||||
const result = await wcdbService.execQuery('message', dbPath, `PRAGMA table_info(${this.quoteSqlIdentifier(tableName)})`)
|
||||
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) {
|
||||
this.availableYearsColumnCache.set(cacheKey, '')
|
||||
return null
|
||||
}
|
||||
|
||||
const candidates = ['create_time', 'createtime', 'msg_create_time', 'msg_time', 'msgtime', 'time']
|
||||
const columns = new Set<string>()
|
||||
for (const row of result.rows as Record<string, any>[]) {
|
||||
const name = String(row.name || row.column_name || row.columnName || '').trim().toLowerCase()
|
||||
if (name) columns.add(name)
|
||||
}
|
||||
|
||||
for (const candidate of candidates) {
|
||||
if (columns.has(candidate)) {
|
||||
this.availableYearsColumnCache.set(cacheKey, candidate)
|
||||
return candidate
|
||||
}
|
||||
}
|
||||
|
||||
this.availableYearsColumnCache.set(cacheKey, '')
|
||||
return null
|
||||
}
|
||||
|
||||
private async getTableTimeRange(dbPath: string, tableName: string): Promise<{ first: number; last: number } | null> {
|
||||
const cacheKey = `${dbPath}\u0001${tableName}`
|
||||
const cachedColumn = this.availableYearsColumnCache.get(cacheKey)
|
||||
const initialColumn = cachedColumn && cachedColumn.length > 0 ? cachedColumn : 'create_time'
|
||||
const tried = new Set<string>()
|
||||
|
||||
const queryByColumn = async (column: string): Promise<{ first: number; last: number } | null> => {
|
||||
const sql = `SELECT MIN(${this.quoteSqlIdentifier(column)}) AS first_ts, MAX(${this.quoteSqlIdentifier(column)}) AS last_ts FROM ${this.quoteSqlIdentifier(tableName)}`
|
||||
const result = await wcdbService.execQuery('message', dbPath, sql)
|
||||
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) return null
|
||||
const row = result.rows[0] as Record<string, any>
|
||||
const first = this.toUnixTimestamp(row.first_ts ?? row.firstTs ?? row.min_ts ?? row.minTs)
|
||||
const last = this.toUnixTimestamp(row.last_ts ?? row.lastTs ?? row.max_ts ?? row.maxTs)
|
||||
return { first, last }
|
||||
}
|
||||
|
||||
tried.add(initialColumn)
|
||||
const quick = await queryByColumn(initialColumn)
|
||||
if (quick) {
|
||||
if (!cachedColumn) this.availableYearsColumnCache.set(cacheKey, initialColumn)
|
||||
return quick
|
||||
}
|
||||
|
||||
const detectedColumn = await this.detectTimeColumn(dbPath, tableName)
|
||||
if (!detectedColumn || tried.has(detectedColumn)) {
|
||||
return null
|
||||
}
|
||||
|
||||
return queryByColumn(detectedColumn)
|
||||
}
|
||||
|
||||
private async getAvailableYearsByTableScan(sessionIds: string[]): Promise<number[]> {
|
||||
const years = new Set<number>()
|
||||
|
||||
await this.forEachWithConcurrency(sessionIds, this.availableYearsScanConcurrency, async (sessionId) => {
|
||||
const tableStats = await wcdbService.getMessageTableStats(sessionId)
|
||||
if (!tableStats.success || !Array.isArray(tableStats.tables) || tableStats.tables.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
for (const table of tableStats.tables as Record<string, any>[]) {
|
||||
const tableName = String(table.table_name || table.name || '').trim()
|
||||
const dbPath = String(table.db_path || table.dbPath || '').trim()
|
||||
if (!tableName || !dbPath) continue
|
||||
|
||||
const range = await this.getTableTimeRange(dbPath, tableName)
|
||||
if (!range) continue
|
||||
this.addYearsFromRange(years, range.first, range.last)
|
||||
}
|
||||
})
|
||||
|
||||
return Array.from(years).sort((a, b) => b - a)
|
||||
}
|
||||
|
||||
private async getAvailableYearsByEdgeScan(sessionIds: string[]): Promise<number[]> {
|
||||
const years = new Set<number>()
|
||||
for (const sessionId of sessionIds) {
|
||||
const first = await this.getEdgeMessageTime(sessionId, true)
|
||||
const last = await this.getEdgeMessageTime(sessionId, false)
|
||||
this.addYearsFromRange(years, first || 0, last || 0)
|
||||
}
|
||||
return Array.from(years).sort((a, b) => b - a)
|
||||
}
|
||||
|
||||
private buildAvailableYearsCacheKey(dbPath: string, cleanedWxid: string): string {
|
||||
return `${dbPath}\u0001${cleanedWxid}`
|
||||
}
|
||||
|
||||
private getCachedAvailableYears(cacheKey: string): number[] | null {
|
||||
const cached = this.availableYearsCache.get(cacheKey)
|
||||
if (!cached) return null
|
||||
if (Date.now() - cached.updatedAt > this.availableYearsCacheTtlMs) {
|
||||
this.availableYearsCache.delete(cacheKey)
|
||||
return null
|
||||
}
|
||||
return [...cached.years]
|
||||
}
|
||||
|
||||
private setCachedAvailableYears(cacheKey: string, years: number[]): void {
|
||||
const normalized = Array.from(new Set(years))
|
||||
.filter((y) => Number.isFinite(y))
|
||||
.map((y) => Math.floor(y))
|
||||
.sort((a, b) => b - a)
|
||||
|
||||
this.availableYearsCache.set(cacheKey, {
|
||||
years: normalized,
|
||||
updatedAt: Date.now()
|
||||
})
|
||||
|
||||
if (this.availableYearsCache.size > 8) {
|
||||
let oldestKey = ''
|
||||
let oldestTime = Number.POSITIVE_INFINITY
|
||||
for (const [key, val] of this.availableYearsCache) {
|
||||
if (val.updatedAt < oldestTime) {
|
||||
oldestTime = val.updatedAt
|
||||
oldestKey = key
|
||||
}
|
||||
}
|
||||
if (oldestKey) this.availableYearsCache.delete(oldestKey)
|
||||
}
|
||||
}
|
||||
|
||||
private decodeMessageContent(messageContent: any, compressContent: any): string {
|
||||
let content = this.decodeMaybeCompressed(compressContent)
|
||||
if (!content || content.length === 0) {
|
||||
@@ -363,32 +550,25 @@ class AnnualReportService {
|
||||
try {
|
||||
const conn = await this.ensureConnectedWithConfig(params.dbPath, params.decryptKey, params.wxid)
|
||||
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
|
||||
const cacheKey = this.buildAvailableYearsCacheKey(params.dbPath, conn.cleanedWxid)
|
||||
const cached = this.getCachedAvailableYears(cacheKey)
|
||||
if (cached) {
|
||||
return { success: true, data: cached }
|
||||
}
|
||||
|
||||
const sessionIds = await this.getPrivateSessions(conn.cleanedWxid)
|
||||
if (sessionIds.length === 0) {
|
||||
return { success: false, error: '未找到消息会话' }
|
||||
}
|
||||
|
||||
const fastYears = await wcdbService.getAvailableYears(sessionIds)
|
||||
if (fastYears.success && fastYears.data) {
|
||||
return { success: true, data: fastYears.data }
|
||||
let years = await this.getAvailableYearsByTableScan(sessionIds)
|
||||
if (years.length === 0) {
|
||||
// 扫表失败时,再降级到游标首尾扫描,保证兼容性。
|
||||
years = await this.getAvailableYearsByEdgeScan(sessionIds)
|
||||
}
|
||||
|
||||
const years = new Set<number>()
|
||||
for (const sessionId of sessionIds) {
|
||||
const first = await this.getEdgeMessageTime(sessionId, true)
|
||||
const last = await this.getEdgeMessageTime(sessionId, false)
|
||||
if (!first && !last) continue
|
||||
|
||||
const minYear = new Date((first || last || 0) * 1000).getFullYear()
|
||||
const maxYear = new Date((last || first || 0) * 1000).getFullYear()
|
||||
for (let y = minYear; y <= maxYear; y++) {
|
||||
if (y >= 2010 && y <= new Date().getFullYear()) years.add(y)
|
||||
}
|
||||
}
|
||||
|
||||
const sortedYears = Array.from(years).sort((a, b) => b - a)
|
||||
return { success: true, data: sortedYears }
|
||||
this.setCachedAvailableYears(cacheKey, years)
|
||||
return { success: true, data: years }
|
||||
} catch (e) {
|
||||
return { success: false, error: String(e) }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user