perf(export): reuse pre-estimate cache during export run

This commit is contained in:
tisonhuang
2026-03-04 18:05:31 +08:00
parent 7b4aa23f35
commit 7a7e54ea5b

View File

@@ -125,6 +125,46 @@ interface ExportTaskControl {
shouldStop?: () => boolean
}
interface ExportStatsResult {
totalMessages: number
voiceMessages: number
cachedVoiceCount: number
needTranscribeCount: number
mediaMessages: number
estimatedSeconds: number
sessions: Array<{ sessionId: string; displayName: string; totalCount: number; voiceCount: number }>
}
interface ExportStatsSessionSnapshot {
totalCount: number
voiceCount: number
imageCount: number
videoCount: number
emojiCount: number
cachedVoiceCount: number
lastTimestamp?: number
}
interface ExportStatsCacheEntry {
createdAt: number
result: ExportStatsResult
sessions: Record<string, ExportStatsSessionSnapshot>
}
interface ExportAggregatedSessionMetric {
totalMessages?: number
voiceMessages?: number
imageMessages?: number
videoMessages?: number
emojiMessages?: number
lastTimestamp?: number
}
interface ExportAggregatedSessionStatsCacheEntry {
createdAt: number
data: Record<string, ExportAggregatedSessionMetric>
}
// 并发控制:限制同时执行的 Promise 数量
async function parallelLimit<T, R>(
items: T[],
@@ -155,6 +195,11 @@ class ExportService {
private contactCache: LRUCache<string, { displayName: string; avatarUrl?: string }>
private inlineEmojiCache: LRUCache<string, string>
private htmlStyleCache: string | null = null
private exportStatsCache = new Map<string, ExportStatsCacheEntry>()
private exportAggregatedSessionStatsCache = new Map<string, ExportAggregatedSessionStatsCacheEntry>()
private readonly exportStatsCacheTtlMs = 2 * 60 * 1000
private readonly exportAggregatedSessionStatsCacheTtlMs = 60 * 1000
private readonly exportStatsCacheMaxEntries = 16
private readonly STOP_ERROR_CODE = 'WEFLOW_EXPORT_STOP_REQUESTED'
constructor() {
@@ -170,6 +215,107 @@ class ExportService {
return error
}
private normalizeSessionIds(sessionIds: string[]): string[] {
return Array.from(
new Set((sessionIds || []).map((id) => String(id || '').trim()).filter(Boolean))
)
}
private getExportStatsDateRangeToken(dateRange?: { start: number; end: number } | null): string {
if (!dateRange) return 'all'
const start = Number.isFinite(dateRange.start) ? Math.max(0, Math.floor(dateRange.start)) : 0
const end = Number.isFinite(dateRange.end) ? Math.max(0, Math.floor(dateRange.end)) : 0
return `${start}-${end}`
}
private buildExportStatsCacheKey(
sessionIds: string[],
options: Pick<ExportOptions, 'dateRange' | 'senderUsername'>,
cleanedWxid?: string
): string {
const normalizedIds = this.normalizeSessionIds(sessionIds).sort()
const senderToken = String(options.senderUsername || '').trim()
const dateToken = this.getExportStatsDateRangeToken(options.dateRange)
const dbPath = String(this.configService.get('dbPath') || '').trim()
const wxidToken = String(cleanedWxid || this.cleanAccountDirName(String(this.configService.get('myWxid') || '')) || '').trim()
return `${dbPath}::${wxidToken}::${dateToken}::${senderToken}::${normalizedIds.join('\u001f')}`
}
private cloneExportStatsResult(result: ExportStatsResult): ExportStatsResult {
return {
...result,
sessions: result.sessions.map((item) => ({ ...item }))
}
}
private pruneExportStatsCaches(): void {
const now = Date.now()
for (const [key, entry] of this.exportStatsCache.entries()) {
if (now - entry.createdAt > this.exportStatsCacheTtlMs) {
this.exportStatsCache.delete(key)
}
}
for (const [key, entry] of this.exportAggregatedSessionStatsCache.entries()) {
if (now - entry.createdAt > this.exportAggregatedSessionStatsCacheTtlMs) {
this.exportAggregatedSessionStatsCache.delete(key)
}
}
}
private getExportStatsCacheEntry(key: string): ExportStatsCacheEntry | null {
this.pruneExportStatsCaches()
const entry = this.exportStatsCache.get(key)
if (!entry) return null
if (Date.now() - entry.createdAt > this.exportStatsCacheTtlMs) {
this.exportStatsCache.delete(key)
return null
}
return entry
}
private setExportStatsCacheEntry(key: string, entry: ExportStatsCacheEntry): void {
this.pruneExportStatsCaches()
this.exportStatsCache.set(key, entry)
if (this.exportStatsCache.size <= this.exportStatsCacheMaxEntries) return
const staleKeys = Array.from(this.exportStatsCache.entries())
.sort((a, b) => a[1].createdAt - b[1].createdAt)
.slice(0, Math.max(0, this.exportStatsCache.size - this.exportStatsCacheMaxEntries))
.map(([cacheKey]) => cacheKey)
for (const staleKey of staleKeys) {
this.exportStatsCache.delete(staleKey)
}
}
private getAggregatedSessionStatsCache(key: string): Record<string, ExportAggregatedSessionMetric> | null {
this.pruneExportStatsCaches()
const entry = this.exportAggregatedSessionStatsCache.get(key)
if (!entry) return null
if (Date.now() - entry.createdAt > this.exportAggregatedSessionStatsCacheTtlMs) {
this.exportAggregatedSessionStatsCache.delete(key)
return null
}
return entry.data
}
private setAggregatedSessionStatsCache(
key: string,
data: Record<string, ExportAggregatedSessionMetric>
): void {
this.pruneExportStatsCaches()
this.exportAggregatedSessionStatsCache.set(key, {
createdAt: Date.now(),
data
})
if (this.exportAggregatedSessionStatsCache.size <= this.exportStatsCacheMaxEntries) return
const staleKeys = Array.from(this.exportAggregatedSessionStatsCache.entries())
.sort((a, b) => a[1].createdAt - b[1].createdAt)
.slice(0, Math.max(0, this.exportAggregatedSessionStatsCache.size - this.exportStatsCacheMaxEntries))
.map(([cacheKey]) => cacheKey)
for (const staleKey of staleKeys) {
this.exportAggregatedSessionStatsCache.delete(staleKey)
}
}
private isStopError(error: unknown): boolean {
if (!error) return false
if (typeof error === 'string') {
@@ -5716,28 +5862,34 @@ class ExportService {
async getExportStats(
sessionIds: string[],
options: ExportOptions
): Promise<{
totalMessages: number
voiceMessages: number
cachedVoiceCount: number
needTranscribeCount: number
mediaMessages: number
estimatedSeconds: number
sessions: Array<{ sessionId: string; displayName: string; totalCount: number; voiceCount: number }>
}> {
): Promise<ExportStatsResult> {
const conn = await this.ensureConnected()
if (!conn.success || !conn.cleanedWxid) {
return { totalMessages: 0, voiceMessages: 0, cachedVoiceCount: 0, needTranscribeCount: 0, mediaMessages: 0, estimatedSeconds: 0, sessions: [] }
}
const normalizedSessionIds = Array.from(
new Set((sessionIds || []).map((id) => String(id || '').trim()).filter(Boolean))
)
const normalizedSessionIds = this.normalizeSessionIds(sessionIds)
if (normalizedSessionIds.length === 0) {
return { totalMessages: 0, voiceMessages: 0, cachedVoiceCount: 0, needTranscribeCount: 0, mediaMessages: 0, estimatedSeconds: 0, sessions: [] }
}
const cacheKey = this.buildExportStatsCacheKey(normalizedSessionIds, options, conn.cleanedWxid)
const cachedStats = this.getExportStatsCacheEntry(cacheKey)
if (cachedStats) {
const cachedResult = this.cloneExportStatsResult(cachedStats.result)
const orderedSessions: Array<{ sessionId: string; displayName: string; totalCount: number; voiceCount: number }> = []
const sessionMap = new Map(cachedResult.sessions.map((item) => [item.sessionId, item] as const))
for (const sessionId of normalizedSessionIds) {
const cachedSession = sessionMap.get(sessionId)
if (cachedSession) orderedSessions.push(cachedSession)
}
if (orderedSessions.length === cachedResult.sessions.length) {
cachedResult.sessions = orderedSessions
}
return cachedResult
}
const cleanedMyWxid = conn.cleanedWxid
const sessionsStats: Array<{ sessionId: string; displayName: string; totalCount: number; voiceCount: number }> = []
const sessionSnapshotMap: Record<string, ExportStatsSessionSnapshot> = {}
let totalMessages = 0
let voiceMessages = 0
let cachedVoiceCount = 0
@@ -5749,11 +5901,18 @@ class ExportService {
// 快速路径:直接复用 ChatService 聚合统计,避免逐会话 collectMessages 扫全量消息。
if (canUseAggregatedStats) {
try {
const statsResult = await chatService.getExportSessionStats(normalizedSessionIds, {
includeRelations: false,
allowStaleCache: true
})
if (statsResult.success && statsResult.data) {
let aggregatedData = this.getAggregatedSessionStatsCache(cacheKey)
if (!aggregatedData) {
const statsResult = await chatService.getExportSessionStats(normalizedSessionIds, {
includeRelations: false,
allowStaleCache: true
})
if (statsResult.success && statsResult.data) {
aggregatedData = statsResult.data as Record<string, ExportAggregatedSessionMetric>
this.setAggregatedSessionStatsCache(cacheKey, aggregatedData)
}
}
if (aggregatedData) {
const cachedVoiceCountMap = chatService.getCachedVoiceTranscriptCountMap(normalizedSessionIds)
const fastRows = await parallelLimit(
normalizedSessionIds,
@@ -5774,7 +5933,7 @@ class ExportService {
// 预估阶段显示名获取失败不阻塞统计
}
const metric = statsResult.data?.[sessionId]
const metric = aggregatedData?.[sessionId]
const totalCount = Number.isFinite(metric?.totalMessages)
? Math.max(0, Math.floor(metric!.totalMessages))
: 0
@@ -5790,12 +5949,25 @@ class ExportService {
const emojiCount = Number.isFinite(metric?.emojiMessages)
? Math.max(0, Math.floor(metric!.emojiMessages))
: 0
const lastTimestamp = Number.isFinite(metric?.lastTimestamp)
? Math.max(0, Math.floor(metric!.lastTimestamp))
: undefined
const cachedCountRaw = Number(cachedVoiceCountMap[sessionId] || 0)
const sessionCachedVoiceCount = Math.min(
voiceCount,
Number.isFinite(cachedCountRaw) ? Math.max(0, Math.floor(cachedCountRaw)) : 0
)
sessionSnapshotMap[sessionId] = {
totalCount,
voiceCount,
imageCount,
videoCount,
emojiCount,
cachedVoiceCount: sessionCachedVoiceCount,
lastTimestamp
}
return {
sessionId,
displayName,
@@ -5822,7 +5994,7 @@ class ExportService {
const needTranscribeCount = Math.max(0, voiceMessages - cachedVoiceCount)
const estimatedSeconds = needTranscribeCount * 2
return {
const result: ExportStatsResult = {
totalMessages,
voiceMessages,
cachedVoiceCount,
@@ -5831,6 +6003,12 @@ class ExportService {
estimatedSeconds,
sessions: sessionsStats
}
this.setExportStatsCacheEntry(cacheKey, {
createdAt: Date.now(),
result: this.cloneExportStatsResult(result),
sessions: { ...sessionSnapshotMap }
})
return result
}
} catch (error) {
// 聚合统计失败时自动回退到慢路径,保证功能正确。
@@ -5848,36 +6026,56 @@ class ExportService {
'text-fast'
)
const msgs = collected.rows
const voiceMsgs = msgs.filter(m => m.localType === 34)
const mediaMsgs = msgs.filter(m => {
const t = m.localType
return (t === 3) || (t === 47) || (t === 43) || (t === 34)
})
let voiceCount = 0
let imageCount = 0
let videoCount = 0
let emojiCount = 0
let latestTimestamp = 0
let cached = 0
for (const msg of voiceMsgs) {
if (chatService.hasTranscriptCache(sessionId, String(msg.localId), msg.createTime)) {
cached++
for (const msg of msgs) {
if (msg.createTime > latestTimestamp) {
latestTimestamp = msg.createTime
}
const localType = msg.localType
if (localType === 34) {
voiceCount++
if (chatService.hasTranscriptCache(sessionId, String(msg.localId), msg.createTime)) {
cached++
}
continue
}
if (localType === 3) imageCount++
if (localType === 43) videoCount++
if (localType === 47) emojiCount++
}
const mediaCount = voiceCount + imageCount + videoCount + emojiCount
totalMessages += msgs.length
voiceMessages += voiceMsgs.length
voiceMessages += voiceCount
cachedVoiceCount += cached
mediaMessages += mediaMsgs.length
mediaMessages += mediaCount
sessionSnapshotMap[sessionId] = {
totalCount: msgs.length,
voiceCount,
imageCount,
videoCount,
emojiCount,
cachedVoiceCount: cached,
lastTimestamp: latestTimestamp > 0 ? latestTimestamp : undefined
}
sessionsStats.push({
sessionId,
displayName: sessionInfo.displayName,
totalCount: msgs.length,
voiceCount: voiceMsgs.length
voiceCount
})
}
const needTranscribeCount = voiceMessages - cachedVoiceCount
const needTranscribeCount = Math.max(0, voiceMessages - cachedVoiceCount)
// 预估:每条语音转文字约 2 秒
const estimatedSeconds = needTranscribeCount * 2
return {
const result: ExportStatsResult = {
totalMessages,
voiceMessages,
cachedVoiceCount,
@@ -5886,6 +6084,12 @@ class ExportService {
estimatedSeconds,
sessions: sessionsStats
}
this.setExportStatsCacheEntry(cacheKey, {
createdAt: Date.now(),
result: this.cloneExportStatsResult(result),
sessions: { ...sessionSnapshotMap }
})
return result
}
/**
@@ -5961,6 +6165,21 @@ class ExportService {
const emptySessionIds = new Set<string>()
const sessionMessageCountHints = new Map<string, number>()
const sessionLatestTimestampHints = new Map<string, number>()
const exportStatsCacheKey = this.buildExportStatsCacheKey(sessionIds, effectiveOptions, conn.cleanedWxid)
const cachedStatsEntry = this.getExportStatsCacheEntry(exportStatsCacheKey)
if (cachedStatsEntry?.sessions) {
for (const sessionId of sessionIds) {
const snapshot = cachedStatsEntry.sessions[sessionId]
if (!snapshot) continue
sessionMessageCountHints.set(sessionId, Math.max(0, Math.floor(snapshot.totalCount || 0)))
if (Number.isFinite(snapshot.lastTimestamp) && Number(snapshot.lastTimestamp) > 0) {
sessionLatestTimestampHints.set(sessionId, Math.floor(Number(snapshot.lastTimestamp)))
}
if (snapshot.totalCount <= 0) {
emptySessionIds.add(sessionId)
}
}
}
const canUseSessionSnapshotHints = isTextContentBatchExport &&
this.isUnboundedDateRange(effectiveOptions.dateRange) &&
!String(effectiveOptions.senderUsername || '').trim()
@@ -5968,9 +6187,12 @@ class ExportService {
this.isUnboundedDateRange(effectiveOptions.dateRange) &&
!String(effectiveOptions.senderUsername || '').trim()
const canTrySkipUnchangedTextSessions = canUseSessionSnapshotHints
if (canFastSkipEmptySessions && sessionIds.length > 0) {
const precheckSessionIds = canFastSkipEmptySessions
? sessionIds.filter((sessionId) => !sessionMessageCountHints.has(sessionId))
: []
if (canFastSkipEmptySessions && precheckSessionIds.length > 0) {
const EMPTY_SESSION_PRECHECK_LIMIT = 1200
if (sessionIds.length <= EMPTY_SESSION_PRECHECK_LIMIT) {
if (precheckSessionIds.length <= EMPTY_SESSION_PRECHECK_LIMIT) {
let checkedCount = 0
onProgress?.({
current: computeAggregateCurrent(),
@@ -5979,12 +6201,12 @@ class ExportService {
currentSessionId: '',
phase: 'preparing',
phaseProgress: 0,
phaseTotal: sessionIds.length,
phaseLabel: `预检查空会话 0/${sessionIds.length}`
phaseTotal: precheckSessionIds.length,
phaseLabel: `预检查空会话 0/${precheckSessionIds.length}`
})
const PRECHECK_BATCH_SIZE = 160
for (let i = 0; i < sessionIds.length; i += PRECHECK_BATCH_SIZE) {
for (let i = 0; i < precheckSessionIds.length; i += PRECHECK_BATCH_SIZE) {
if (control?.shouldStop?.()) {
stopRequested = true
break
@@ -5994,7 +6216,7 @@ class ExportService {
break
}
const batchSessionIds = sessionIds.slice(i, i + PRECHECK_BATCH_SIZE)
const batchSessionIds = precheckSessionIds.slice(i, i + PRECHECK_BATCH_SIZE)
const countsResult = await wcdbService.getMessageCounts(batchSessionIds)
if (countsResult.success && countsResult.counts) {
for (const batchSessionId of batchSessionIds) {
@@ -6008,7 +6230,7 @@ class ExportService {
}
}
checkedCount = Math.min(sessionIds.length, checkedCount + batchSessionIds.length)
checkedCount = Math.min(precheckSessionIds.length, checkedCount + batchSessionIds.length)
onProgress?.({
current: computeAggregateCurrent(),
total: sessionIds.length,
@@ -6016,8 +6238,8 @@ class ExportService {
currentSessionId: '',
phase: 'preparing',
phaseProgress: checkedCount,
phaseTotal: sessionIds.length,
phaseLabel: `预检查空会话 ${checkedCount}/${sessionIds.length}`
phaseTotal: precheckSessionIds.length,
phaseLabel: `预检查空会话 ${checkedCount}/${precheckSessionIds.length}`
})
}
} else {
@@ -6027,26 +6249,39 @@ class ExportService {
currentSession: '',
currentSessionId: '',
phase: 'preparing',
phaseLabel: `会话较多,已跳过空会话预检查(${sessionIds.length} 个)`
phaseLabel: `会话较多,已跳过空会话预检查(${precheckSessionIds.length} 个)`
})
}
}
if (canUseSessionSnapshotHints && sessionIds.length > 0) {
const sessionSet = new Set(sessionIds)
const sessionsResult = await chatService.getSessions()
if (sessionsResult.success && Array.isArray(sessionsResult.sessions)) {
for (const item of sessionsResult.sessions) {
const username = String(item?.username || '').trim()
if (!username) continue
if (!sessionSet.has(username)) continue
const messageCountHint = Number(item?.messageCountHint)
if (Number.isFinite(messageCountHint) && messageCountHint >= 0) {
sessionMessageCountHints.set(username, Math.floor(messageCountHint))
}
const lastTimestamp = Number(item?.lastTimestamp)
if (Number.isFinite(lastTimestamp) && lastTimestamp > 0) {
sessionLatestTimestampHints.set(username, Math.floor(lastTimestamp))
const missingHintSessionIds = sessionIds.filter((sessionId) => (
!sessionMessageCountHints.has(sessionId) || !sessionLatestTimestampHints.has(sessionId)
))
if (missingHintSessionIds.length > 0) {
const sessionSet = new Set(missingHintSessionIds)
const sessionsResult = await chatService.getSessions()
if (sessionsResult.success && Array.isArray(sessionsResult.sessions)) {
for (const item of sessionsResult.sessions) {
const username = String(item?.username || '').trim()
if (!username) continue
if (!sessionSet.has(username)) continue
const messageCountHint = Number(item?.messageCountHint)
if (
!sessionMessageCountHints.has(username) &&
Number.isFinite(messageCountHint) &&
messageCountHint >= 0
) {
sessionMessageCountHints.set(username, Math.floor(messageCountHint))
}
const lastTimestamp = Number(item?.lastTimestamp)
if (
!sessionLatestTimestampHints.has(username) &&
Number.isFinite(lastTimestamp) &&
lastTimestamp > 0
) {
sessionLatestTimestampHints.set(username, Math.floor(lastTimestamp))
}
}
}
}