perf(export): speed up session message count aggregation

This commit is contained in:
tisonhuang
2026-03-04 14:39:14 +08:00
parent 756a83191d
commit 4b8c8155fa

View File

@@ -235,8 +235,15 @@ class ChatService {
private readonly sessionTablesCacheTtl = 300000 // 5分钟 private readonly sessionTablesCacheTtl = 300000 // 5分钟
private sessionMessageCountCache = new Map<string, { count: number; updatedAt: number }>() private sessionMessageCountCache = new Map<string, { count: number; updatedAt: number }>()
private sessionMessageCountHintCache = new Map<string, number>() private sessionMessageCountHintCache = new Map<string, number>()
private sessionMessageCountBatchCache: {
dbSignature: string
sessionIdsKey: string
counts: Record<string, number>
updatedAt: number
} | null = null
private sessionMessageCountCacheScope = '' private sessionMessageCountCacheScope = ''
private readonly sessionMessageCountCacheTtlMs = 10 * 60 * 1000 private readonly sessionMessageCountCacheTtlMs = 10 * 60 * 1000
private readonly sessionMessageCountBatchCacheTtlMs = 5 * 60 * 1000
private sessionDetailFastCache = new Map<string, { detail: SessionDetailFast; updatedAt: number }>() private sessionDetailFastCache = new Map<string, { detail: SessionDetailFast; updatedAt: number }>()
private sessionDetailExtraCache = new Map<string, { detail: SessionDetailExtra; updatedAt: number }>() private sessionDetailExtraCache = new Map<string, { detail: SessionDetailExtra; updatedAt: number }>()
private readonly sessionDetailFastCacheTtlMs = 60 * 1000 private readonly sessionDetailFastCacheTtlMs = 60 * 1000
@@ -896,6 +903,209 @@ class ChatService {
return this.getContactTypeCounts() return this.getContactTypeCounts()
} }
private async listMessageDbPathsForCount(): Promise<{ success: boolean; dbPaths?: string[]; error?: string }> {
try {
const result = await wcdbService.listMessageDbs()
if (!result.success) {
return { success: false, error: result.error || '获取消息数据库列表失败' }
}
const normalized = Array.from(new Set(
(result.data || [])
.map(pathItem => String(pathItem || '').trim())
.filter(Boolean)
))
return { success: true, dbPaths: normalized }
} catch (e) {
return { success: false, error: String(e) }
}
}
private buildMessageDbSignature(dbPaths: string[]): string {
if (!Array.isArray(dbPaths) || dbPaths.length === 0) return 'empty'
const parts: string[] = []
const sortedPaths = [...dbPaths].sort()
for (const dbPath of sortedPaths) {
try {
const stat = statSync(dbPath)
parts.push(`${dbPath}:${stat.size}:${Math.floor(stat.mtimeMs)}`)
} catch {
parts.push(`${dbPath}:missing`)
}
}
return parts.join('|')
}
private buildSessionHashLookup(sessionIds: string[]): {
full32: Map<string, string>
short16: Map<string, string | null>
} {
const full32 = new Map<string, string>()
const short16 = new Map<string, string | null>()
for (const sessionId of sessionIds) {
const hash = crypto.createHash('md5').update(sessionId).digest('hex').toLowerCase()
full32.set(hash, sessionId)
const shortHash = hash.slice(0, 16)
const existing = short16.get(shortHash)
if (existing === undefined) {
short16.set(shortHash, sessionId)
} else if (existing !== sessionId) {
short16.set(shortHash, null)
}
}
return { full32, short16 }
}
private matchSessionIdByTableName(
tableName: string,
hashLookup: {
full32: Map<string, string>
short16: Map<string, string | null>
}
): string | null {
const normalized = String(tableName || '').trim().toLowerCase()
if (!normalized.startsWith('msg_')) return null
const suffix = normalized.slice(4)
const directFull = hashLookup.full32.get(suffix)
if (directFull) return directFull
if (suffix.length >= 16) {
const shortCandidate = hashLookup.short16.get(suffix.slice(0, 16))
if (typeof shortCandidate === 'string') return shortCandidate
}
const hashMatch = normalized.match(/[a-f0-9]{32}|[a-f0-9]{16}/i)
if (!hashMatch || !hashMatch[0]) return null
const matchedHash = hashMatch[0].toLowerCase()
if (matchedHash.length >= 32) {
const full = hashLookup.full32.get(matchedHash)
if (full) return full
}
const short = hashLookup.short16.get(matchedHash.slice(0, 16))
return typeof short === 'string' ? short : null
}
private quoteSqlIdentifier(identifier: string): string {
return `"${String(identifier || '').replace(/"/g, '""')}"`
}
private async countSessionMessageCountsByTableScan(
sessionIds: string[],
traceId?: string
): Promise<{
success: boolean
counts?: Record<string, number>
error?: string
dbSignature?: string
}> {
const normalizedSessionIds = Array.from(new Set(
(sessionIds || [])
.map(id => String(id || '').trim())
.filter(Boolean)
))
if (normalizedSessionIds.length === 0) {
return { success: true, counts: {}, dbSignature: 'empty' }
}
const dbPathsResult = await this.listMessageDbPathsForCount()
if (!dbPathsResult.success) {
return { success: false, error: dbPathsResult.error || '获取消息数据库列表失败' }
}
const dbPaths = dbPathsResult.dbPaths || []
const dbSignature = this.buildMessageDbSignature(dbPaths)
if (dbPaths.length === 0) {
const emptyCounts = normalizedSessionIds.reduce<Record<string, number>>((acc, sessionId) => {
acc[sessionId] = 0
return acc
}, {})
return { success: true, counts: emptyCounts, dbSignature }
}
const hashLookup = this.buildSessionHashLookup(normalizedSessionIds)
const counts = normalizedSessionIds.reduce<Record<string, number>>((acc, sessionId) => {
acc[sessionId] = 0
return acc
}, {})
const unionChunkSize = 48
const queryCountKeys = ['count', 'COUNT(*)', 'cnt', 'CNT', 'table_count', 'tableCount']
for (const dbPath of dbPaths) {
const tablesResult = await wcdbService.execQuery(
'message',
dbPath,
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'"
)
if (!tablesResult.success || !tablesResult.rows || tablesResult.rows.length === 0) {
continue
}
const tableToSessionId = new Map<string, string>()
for (const row of tablesResult.rows as Record<string, any>[]) {
const tableName = String(this.getRowField(row, ['name', 'table_name', 'tableName']) || '').trim()
if (!tableName) continue
const sessionId = this.matchSessionIdByTableName(tableName, hashLookup)
if (!sessionId) continue
tableToSessionId.set(tableName, sessionId)
}
if (tableToSessionId.size === 0) {
continue
}
const matchedTables = Array.from(tableToSessionId.keys())
for (let i = 0; i < matchedTables.length; i += unionChunkSize) {
const chunk = matchedTables.slice(i, i + unionChunkSize)
if (chunk.length === 0) continue
const unionSql = chunk.map((tableName) => {
const tableAlias = tableName.replace(/'/g, "''")
return `SELECT '${tableAlias}' AS table_name, COUNT(*) AS count FROM ${this.quoteSqlIdentifier(tableName)}`
}).join(' UNION ALL ')
const unionResult = await wcdbService.execQuery('message', dbPath, unionSql)
if (unionResult.success && unionResult.rows) {
for (const row of unionResult.rows as Record<string, any>[]) {
const tableName = String(this.getRowField(row, ['table_name', 'tableName', 'name']) || '').trim()
const sessionId = tableToSessionId.get(tableName)
if (!sessionId) continue
const countValue = Math.max(0, Math.floor(this.getRowInt(row, queryCountKeys, 0)))
counts[sessionId] = (counts[sessionId] || 0) + countValue
}
continue
}
// 回退到逐表查询,避免单个 UNION 查询失败导致整批丢失。
for (const tableName of chunk) {
const sessionId = tableToSessionId.get(tableName)
if (!sessionId) continue
const countSql = `SELECT COUNT(*) AS count FROM ${this.quoteSqlIdentifier(tableName)}`
const singleResult = await wcdbService.execQuery('message', dbPath, countSql)
if (!singleResult.success || !singleResult.rows || singleResult.rows.length === 0) {
continue
}
const countValue = Math.max(0, Math.floor(this.getRowInt(singleResult.rows[0], queryCountKeys, 0)))
counts[sessionId] = (counts[sessionId] || 0) + countValue
}
}
}
this.logExportDiag({
traceId,
level: 'debug',
source: 'backend',
stepId: 'backend-get-session-message-counts-table-scan',
stepName: '会话消息总数表扫描',
status: 'done',
message: '按 Msg 表聚合统计完成',
data: {
dbCount: dbPaths.length,
requestedSessions: normalizedSessionIds.length
}
})
return { success: true, counts, dbSignature }
}
/** /**
* 批量获取会话消息总数(轻量接口,用于列表优先排序) * 批量获取会话消息总数(轻量接口,用于列表优先排序)
*/ */
@@ -949,6 +1159,7 @@ class ChatService {
const counts: Record<string, number> = {} const counts: Record<string, number> = {}
const now = Date.now() const now = Date.now()
const pendingSessionIds: string[] = [] const pendingSessionIds: string[] = []
const sessionIdsKey = [...normalizedSessionIds].sort().join('\u0001')
for (const sessionId of normalizedSessionIds) { for (const sessionId of normalizedSessionIds) {
if (!bypassSessionCache) { if (!bypassSessionCache) {
@@ -974,40 +1185,106 @@ class ChatService {
pendingSessionIds.push(sessionId) pendingSessionIds.push(sessionId)
} }
const batchSize = 320 if (pendingSessionIds.length > 0) {
for (let i = 0; i < pendingSessionIds.length; i += batchSize) { let tableScanSucceeded = false
const batch = pendingSessionIds.slice(i, i + batchSize) const cachedBatch = this.sessionMessageCountBatchCache
this.logExportDiag({ const cachedBatchFresh = cachedBatch &&
traceId, now - cachedBatch.updatedAt <= this.sessionMessageCountBatchCacheTtlMs
level: 'debug',
source: 'backend', if (cachedBatchFresh && cachedBatch.sessionIdsKey === sessionIdsKey) {
stepId: 'backend-get-session-message-counts-batch', const dbPathsResult = await this.listMessageDbPathsForCount()
stepName: '会话消息总数批次查询', if (dbPathsResult.success) {
status: 'running', const currentDbSignature = this.buildMessageDbSignature(dbPathsResult.dbPaths || [])
message: `开始查询批次 ${Math.floor(i / batchSize) + 1}/${Math.ceil(pendingSessionIds.length / batchSize) || 1}`, if (currentDbSignature === cachedBatch.dbSignature) {
data: { for (const sessionId of pendingSessionIds) {
batchSize: batch.length const nextCountRaw = cachedBatch.counts[sessionId]
const nextCount = Number.isFinite(nextCountRaw) ? Math.max(0, Math.floor(nextCountRaw)) : 0
counts[sessionId] = nextCount
this.sessionMessageCountCache.set(sessionId, {
count: nextCount,
updatedAt: now
})
}
tableScanSucceeded = true
}
} }
})
let batchCounts: Record<string, number> = {}
try {
const result = await wcdbService.getMessageCounts(batch)
if (result.success && result.counts) {
batchCounts = result.counts
}
} catch {
// noop
} }
const nowTs = Date.now() if (!tableScanSucceeded) {
for (const sessionId of batch) { const tableScanResult = await this.countSessionMessageCountsByTableScan(pendingSessionIds, traceId)
const nextCountRaw = batchCounts[sessionId] if (tableScanResult.success && tableScanResult.counts) {
const nextCount = Number.isFinite(nextCountRaw) ? Math.max(0, Math.floor(nextCountRaw)) : 0 const nowTs = Date.now()
counts[sessionId] = nextCount for (const sessionId of pendingSessionIds) {
this.sessionMessageCountCache.set(sessionId, { const nextCountRaw = tableScanResult.counts[sessionId]
count: nextCount, const nextCount = Number.isFinite(nextCountRaw) ? Math.max(0, Math.floor(nextCountRaw)) : 0
updatedAt: nowTs counts[sessionId] = nextCount
}) this.sessionMessageCountCache.set(sessionId, {
count: nextCount,
updatedAt: nowTs
})
}
if (tableScanResult.dbSignature) {
this.sessionMessageCountBatchCache = {
dbSignature: tableScanResult.dbSignature,
sessionIdsKey,
counts: { ...counts },
updatedAt: nowTs
}
}
tableScanSucceeded = true
} else {
this.logExportDiag({
traceId,
level: 'warn',
source: 'backend',
stepId: 'backend-get-session-message-counts-table-scan',
stepName: '会话消息总数表扫描',
status: 'failed',
message: '按 Msg 表聚合统计失败,回退逐会话统计',
data: {
error: tableScanResult.error || '未知错误'
}
})
}
}
if (!tableScanSucceeded) {
const batchSize = 320
for (let i = 0; i < pendingSessionIds.length; i += batchSize) {
const batch = pendingSessionIds.slice(i, i + batchSize)
this.logExportDiag({
traceId,
level: 'debug',
source: 'backend',
stepId: 'backend-get-session-message-counts-batch',
stepName: '会话消息总数批次查询',
status: 'running',
message: `开始查询批次 ${Math.floor(i / batchSize) + 1}/${Math.ceil(pendingSessionIds.length / batchSize) || 1}`,
data: {
batchSize: batch.length
}
})
let batchCounts: Record<string, number> = {}
try {
const result = await wcdbService.getMessageCounts(batch)
if (result.success && result.counts) {
batchCounts = result.counts
}
} catch {
// noop
}
const nowTs = Date.now()
for (const sessionId of batch) {
const nextCountRaw = batchCounts[sessionId]
const nextCount = Number.isFinite(nextCountRaw) ? Math.max(0, Math.floor(nextCountRaw)) : 0
counts[sessionId] = nextCount
this.sessionMessageCountCache.set(sessionId, {
count: nextCount,
updatedAt: nowTs
})
}
}
} }
} }
@@ -1752,6 +2029,7 @@ class ChatService {
this.sessionMessageCountCacheScope = scope this.sessionMessageCountCacheScope = scope
this.sessionMessageCountCache.clear() this.sessionMessageCountCache.clear()
this.sessionMessageCountHintCache.clear() this.sessionMessageCountHintCache.clear()
this.sessionMessageCountBatchCache = null
this.sessionDetailFastCache.clear() this.sessionDetailFastCache.clear()
this.sessionDetailExtraCache.clear() this.sessionDetailExtraCache.clear()
this.sessionStatusCache.clear() this.sessionStatusCache.clear()