perf(export): speed up batch text export pipeline

This commit is contained in:
tisonhuang
2026-03-02 17:05:00 +08:00
parent 04d1b0c694
commit 5cb364f754
4 changed files with 82 additions and 53 deletions

View File

@@ -105,7 +105,7 @@ export class ConfigService {
whisperDownloadSource: 'tsinghua',
autoTranscribeVoice: false,
transcribeLanguages: ['zh'],
exportDefaultConcurrency: 2,
exportDefaultConcurrency: 4,
analyticsExcludedUsernames: [],
authEnabled: false,
authPassword: '',
@@ -671,4 +671,4 @@ export class ConfigService {
this.unlockedKeys.clear()
this.unlockPassword = null
}
}
}

View File

@@ -253,6 +253,22 @@ class ExportService {
})
}
private async preloadContactInfos(
usernames: Iterable<string>,
limit = 8
): Promise<Map<string, { displayName: string; avatarUrl?: string }>> {
const infoMap = new Map<string, { displayName: string; avatarUrl?: string }>()
const unique = Array.from(new Set(Array.from(usernames).filter(Boolean)))
if (unique.length === 0) return infoMap
await parallelLimit(unique, limit, async (username) => {
const info = await this.getContactInfo(username)
infoMap.set(username, info)
})
return infoMap
}
/**
* 通过 contact.chat_room.ext_buffer 解析群昵称(纯 SQL
*/
@@ -1901,8 +1917,6 @@ class ExportService {
const beginTime = dateRange?.start || 0
const endTime = dateRange?.end && dateRange.end > 0 ? dateRange.end : 0
console.log(`[Export] 收集消息: sessionId=${sessionId}, 时间范围: ${beginTime} ~ ${endTime || '无限制'}`)
const cursor = await wcdbService.openMessageCursor(
sessionId,
500,
@@ -1927,12 +1941,7 @@ class ExportService {
break
}
if (!batch.rows) {
console.warn(`[Export] 批次 ${batchCount} 无数据`)
break
}
console.log(`[Export] 批次 ${batchCount}: 收到 ${batch.rows.length} 条消息`)
if (!batch.rows) break
for (const row of batch.rows) {
const createTime = parseInt(row.create_time || '0', 10)
@@ -2025,13 +2034,11 @@ class ExportService {
hasMore = batch.hasMore === true
}
console.log(`[Export] 收集完成: 共 ${rows.length} 条消息, ${batchCount} 个批次`)
} catch (err) {
console.error(`[Export] 收集消息异常:`, err)
} finally {
try {
await wcdbService.closeMessageCursor(cursor.cursor)
console.log(`[Export] 游标已关闭`)
} catch (err) {
console.error(`[Export] 关闭游标失败:`, err)
}
@@ -3039,6 +3046,10 @@ class ExportService {
}
senderUsernames.add(sessionId)
await this.preloadContacts(senderUsernames, contactCache)
const senderInfoMap = await this.preloadContactInfos([
...Array.from(senderUsernames.values()),
cleanedMyWxid
])
const { exportMediaEnabled, mediaRootDir, mediaRelativePrefix } = this.getMediaLayout(outputPath, options)
@@ -3154,8 +3165,11 @@ class ExportService {
remark: string
groupNickname: string
}>()
const transferCandidates: Array<{ xml: string; messageRef: any }> = []
let needSort = false
let lastCreateTime = Number.NEGATIVE_INFINITY
for (const msg of collected.rows) {
const senderInfo = await this.getContactInfo(msg.senderUsername)
const senderInfo = senderInfoMap.get(msg.senderUsername) || { displayName: msg.senderUsername || '' }
const sourceMatch = /<msgsource>[\s\S]*?<\/msgsource>/i.exec(msg.content || '')
const source = sourceMatch ? sourceMatch[0] : ''
@@ -3179,28 +3193,11 @@ class ExportService {
)
}
// 转账消息:追加 "谁转账给谁" 信息
if (content && this.isTransferExportContent(content) && msg.content) {
const transferDesc = await this.resolveTransferDesc(
msg.content,
cleanedMyWxid,
groupNicknamesMap,
async (username) => {
const c = await getContactCached(username)
if (c.success && c.contact) {
return c.contact.remark || c.contact.nickName || c.contact.alias || username
}
return username
}
)
if (transferDesc) {
content = this.appendTransferDesc(content, transferDesc)
}
}
// 获取发送者信息用于名称显示
const senderWxid = msg.senderUsername
const contact = await getContactCached(senderWxid)
const contact = senderWxid
? (contactCache.get(senderWxid) ?? { success: false as const })
: { success: false as const }
const senderNickname = contact.success && contact.contact?.nickName
? contact.contact.nickName
: (senderInfo.displayName || senderWxid)
@@ -3223,20 +3220,6 @@ class ExportService {
remark: senderRemark,
groupNickname: senderGroupNickname
})
} else {
if (!existingSenderProfile.displayName && senderDisplayName) {
existingSenderProfile.displayName = senderDisplayName
}
if (!existingSenderProfile.nickname && senderNickname) {
existingSenderProfile.nickname = senderNickname
}
if (!existingSenderProfile.remark && senderRemark) {
existingSenderProfile.remark = senderRemark
}
if (!existingSenderProfile.groupNickname && senderGroupNickname) {
existingSenderProfile.groupNickname = senderGroupNickname
}
senderProfileMap.set(senderWxid, existingSenderProfile)
}
const msgObj: any = {
@@ -3253,6 +3236,10 @@ class ExportService {
senderAvatarKey: msg.senderUsername
}
if (content && this.isTransferExportContent(content) && msg.content) {
transferCandidates.push({ xml: msg.content, messageRef: msgObj })
}
// 位置消息:附加结构化位置字段
if (msg.localType === 48) {
if (msg.locationLat != null) msgObj.locationLat = msg.locationLat
@@ -3262,9 +3249,50 @@ class ExportService {
}
allMessages.push(msgObj)
if (msg.createTime < lastCreateTime) needSort = true
lastCreateTime = msg.createTime
}
allMessages.sort((a, b) => a.createTime - b.createTime)
if (transferCandidates.length > 0) {
const transferNameCache = new Map<string, string>()
const transferNamePromiseCache = new Map<string, Promise<string>>()
const resolveDisplayNameByUsername = async (username: string): Promise<string> => {
if (!username) return username
const cachedName = transferNameCache.get(username)
if (cachedName) return cachedName
const pending = transferNamePromiseCache.get(username)
if (pending) return pending
const task = (async () => {
const contactResult = contactCache.get(username) ?? await getContactCached(username)
if (contactResult.success && contactResult.contact) {
return contactResult.contact.remark || contactResult.contact.nickName || contactResult.contact.alias || username
}
return username
})()
transferNamePromiseCache.set(username, task)
const resolved = await task
transferNamePromiseCache.delete(username)
transferNameCache.set(username, resolved)
return resolved
}
const transferConcurrency = this.getClampedConcurrency(options.exportConcurrency, 4, 8)
await parallelLimit(transferCandidates, transferConcurrency, async (item) => {
const transferDesc = await this.resolveTransferDesc(
item.xml,
cleanedMyWxid,
groupNicknamesMap,
resolveDisplayNameByUsername
)
if (transferDesc && typeof item.messageRef.content === 'string') {
item.messageRef.content = this.appendTransferDesc(item.messageRef.content, transferDesc)
}
})
}
if (needSort) {
allMessages.sort((a, b) => a.createTime - b.createTime)
}
onProgress?.({
current: 70,
@@ -3274,7 +3302,7 @@ class ExportService {
})
// 获取会话的昵称和备注信息
const sessionContact = await getContactCached(sessionId)
const sessionContact = contactCache.get(sessionId) ?? await getContactCached(sessionId)
const sessionNickname = sessionContact.success && sessionContact.contact?.nickName
? sessionContact.contact.nickName
: sessionInfo.displayName
@@ -5251,9 +5279,10 @@ class ExportService {
? (options.sessionLayout ?? 'per-session')
: 'shared'
let completedCount = 0
const defaultConcurrency = exportMediaEnabled ? 2 : 4
const rawConcurrency = typeof options.exportConcurrency === 'number'
? Math.floor(options.exportConcurrency)
: 2
: defaultConcurrency
const clampedConcurrency = Math.max(1, Math.min(rawConcurrency, 6))
const sessionConcurrency = (exportMediaEnabled && sessionLayout === 'shared')
? 1