fix(export): improve batch text progress and precheck interruptibility

This commit is contained in:
tisonhuang
2026-03-02 18:52:02 +08:00
parent 698d2c96d7
commit 81bc5aefff

View File

@@ -255,6 +255,27 @@ class ExportService {
return { mode } return { mode }
} }
private createCollectProgressReporter(
sessionName: string,
onProgress?: (progress: ExportProgress) => void,
progressCurrent = 5
): ((payload: { fetched: number }) => void) | undefined {
if (!onProgress) return undefined
let lastReportAt = 0
return ({ fetched }) => {
const now = Date.now()
if (now - lastReportAt < 350) return
lastReportAt = now
onProgress({
current: progressCurrent,
total: 100,
currentSession: sessionName,
phase: 'preparing',
phaseLabel: `收集消息 ${fetched.toLocaleString()}`
})
}
}
private shouldDecodeMessageContentInFastMode(localType: number): boolean { private shouldDecodeMessageContentInFastMode(localType: number): boolean {
// 这些类型在文本导出里只需要占位符,无需解码完整 XML / 压缩内容 // 这些类型在文本导出里只需要占位符,无需解码完整 XML / 压缩内容
if (localType === 3 || localType === 34 || localType === 42 || localType === 43 || localType === 47) { if (localType === 3 || localType === 34 || localType === 42 || localType === 43 || localType === 47) {
@@ -2853,6 +2874,7 @@ class ExportService {
}) })
const collectParams = this.resolveCollectParams(options) const collectParams = this.resolveCollectParams(options)
const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5)
const collected = await this.collectMessages( const collected = await this.collectMessages(
sessionId, sessionId,
cleanedMyWxid, cleanedMyWxid,
@@ -2860,7 +2882,8 @@ class ExportService {
options.senderUsername, options.senderUsername,
collectParams.mode, collectParams.mode,
collectParams.targetMediaTypes, collectParams.targetMediaTypes,
control control,
collectProgressReporter
) )
const allMessages = collected.rows const allMessages = collected.rows
@@ -3273,7 +3296,7 @@ class ExportService {
}) })
const collectParams = this.resolveCollectParams(options) const collectParams = this.resolveCollectParams(options)
let collectProgressLastReportAt = 0 const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5)
const collected = await this.collectMessages( const collected = await this.collectMessages(
sessionId, sessionId,
cleanedMyWxid, cleanedMyWxid,
@@ -3282,18 +3305,7 @@ class ExportService {
collectParams.mode, collectParams.mode,
collectParams.targetMediaTypes, collectParams.targetMediaTypes,
control, control,
({ fetched }) => { collectProgressReporter
const now = Date.now()
if (now - collectProgressLastReportAt < 350) return
collectProgressLastReportAt = now
onProgress?.({
current: 5,
total: 100,
currentSession: sessionInfo.displayName,
phase: 'preparing',
phaseLabel: `收集消息 ${fetched.toLocaleString()}`
})
}
) )
// 如果没有消息,不创建文件 // 如果没有消息,不创建文件
@@ -3890,6 +3902,7 @@ class ExportService {
}) })
const collectParams = this.resolveCollectParams(options) const collectParams = this.resolveCollectParams(options)
const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5)
const collected = await this.collectMessages( const collected = await this.collectMessages(
sessionId, sessionId,
cleanedMyWxid, cleanedMyWxid,
@@ -3897,7 +3910,8 @@ class ExportService {
options.senderUsername, options.senderUsername,
collectParams.mode, collectParams.mode,
collectParams.targetMediaTypes, collectParams.targetMediaTypes,
control control,
collectProgressReporter
) )
// 如果没有消息,不创建文件 // 如果没有消息,不创建文件
@@ -4401,6 +4415,7 @@ class ExportService {
}) })
const collectParams = this.resolveCollectParams(options) const collectParams = this.resolveCollectParams(options)
const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5)
const collected = await this.collectMessages( const collected = await this.collectMessages(
sessionId, sessionId,
cleanedMyWxid, cleanedMyWxid,
@@ -4408,7 +4423,8 @@ class ExportService {
options.senderUsername, options.senderUsername,
collectParams.mode, collectParams.mode,
collectParams.targetMediaTypes, collectParams.targetMediaTypes,
control control,
collectProgressReporter
) )
// 如果没有消息,不创建文件 // 如果没有消息,不创建文件
@@ -4705,6 +4721,7 @@ class ExportService {
}) })
const collectParams = this.resolveCollectParams(options) const collectParams = this.resolveCollectParams(options)
const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5)
const collected = await this.collectMessages( const collected = await this.collectMessages(
sessionId, sessionId,
cleanedMyWxid, cleanedMyWxid,
@@ -4712,7 +4729,8 @@ class ExportService {
options.senderUsername, options.senderUsername,
collectParams.mode, collectParams.mode,
collectParams.targetMediaTypes, collectParams.targetMediaTypes,
control control,
collectProgressReporter
) )
if (collected.rows.length === 0) { if (collected.rows.length === 0) {
return { success: false, error: '该会话在指定时间范围内没有消息' } return { success: false, error: '该会话在指定时间范围内没有消息' }
@@ -5071,6 +5089,7 @@ class ExportService {
} }
const collectParams = this.resolveCollectParams(options) const collectParams = this.resolveCollectParams(options)
const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5)
const collected = await this.collectMessages( const collected = await this.collectMessages(
sessionId, sessionId,
cleanedMyWxid, cleanedMyWxid,
@@ -5078,7 +5097,8 @@ class ExportService {
options.senderUsername, options.senderUsername,
collectParams.mode, collectParams.mode,
collectParams.targetMediaTypes, collectParams.targetMediaTypes,
control control,
collectProgressReporter
) )
// 如果没有消息,不创建文件 // 如果没有消息,不创建文件
@@ -5671,128 +5691,223 @@ class ExportService {
? (effectiveOptions.sessionLayout ?? 'per-session') ? (effectiveOptions.sessionLayout ?? 'per-session')
: 'shared' : 'shared'
let completedCount = 0 let completedCount = 0
const activeSessionRatios = new Map<string, number>()
const computeAggregateCurrent = () => {
let activeRatioSum = 0
for (const ratio of activeSessionRatios.values()) {
activeRatioSum += Math.max(0, Math.min(1, ratio))
}
return Math.min(sessionIds.length, completedCount + activeRatioSum)
}
const defaultConcurrency = exportMediaEnabled ? 2 : 4 const defaultConcurrency = exportMediaEnabled ? 2 : 4
const rawConcurrency = typeof effectiveOptions.exportConcurrency === 'number' const rawConcurrency = typeof effectiveOptions.exportConcurrency === 'number'
? Math.floor(effectiveOptions.exportConcurrency) ? Math.floor(effectiveOptions.exportConcurrency)
: defaultConcurrency : defaultConcurrency
const clampedConcurrency = Math.max(1, Math.min(rawConcurrency, 6)) const clampedConcurrency = Math.max(1, Math.min(rawConcurrency, 6))
const sessionConcurrency = clampedConcurrency const sessionConcurrency = clampedConcurrency
const queue = [...sessionIds]
let pauseRequested = false
let stopRequested = false
const emptySessionIds = new Set<string>() const emptySessionIds = new Set<string>()
const canFastSkipEmptySessions = this.isUnboundedDateRange(effectiveOptions.dateRange) && const canFastSkipEmptySessions = this.isUnboundedDateRange(effectiveOptions.dateRange) &&
!String(effectiveOptions.senderUsername || '').trim() !String(effectiveOptions.senderUsername || '').trim()
if (canFastSkipEmptySessions && sessionIds.length > 0) { if (canFastSkipEmptySessions && sessionIds.length > 0) {
const countsResult = await wcdbService.getMessageCounts(sessionIds) const EMPTY_SESSION_PRECHECK_LIMIT = 1200
if (countsResult.success && countsResult.counts) { if (sessionIds.length <= EMPTY_SESSION_PRECHECK_LIMIT) {
for (const sessionId of sessionIds) { let checkedCount = 0
const count = countsResult.counts[sessionId] onProgress?.({
if (typeof count === 'number' && Number.isFinite(count) && count <= 0) { current: computeAggregateCurrent(),
emptySessionIds.add(sessionId) total: sessionIds.length,
currentSession: '',
currentSessionId: '',
phase: 'preparing',
phaseProgress: 0,
phaseTotal: sessionIds.length,
phaseLabel: `预检查空会话 0/${sessionIds.length}`
})
const PRECHECK_BATCH_SIZE = 160
for (let i = 0; i < sessionIds.length; i += PRECHECK_BATCH_SIZE) {
if (control?.shouldStop?.()) {
stopRequested = true
break
} }
if (control?.shouldPause?.()) {
pauseRequested = true
break
}
const batchSessionIds = sessionIds.slice(i, i + PRECHECK_BATCH_SIZE)
const countsResult = await wcdbService.getMessageCounts(batchSessionIds)
if (countsResult.success && countsResult.counts) {
for (const batchSessionId of batchSessionIds) {
const count = countsResult.counts[batchSessionId]
if (typeof count === 'number' && Number.isFinite(count) && count <= 0) {
emptySessionIds.add(batchSessionId)
}
}
}
checkedCount = Math.min(sessionIds.length, checkedCount + batchSessionIds.length)
onProgress?.({
current: computeAggregateCurrent(),
total: sessionIds.length,
currentSession: '',
currentSessionId: '',
phase: 'preparing',
phaseProgress: checkedCount,
phaseTotal: sessionIds.length,
phaseLabel: `预检查空会话 ${checkedCount}/${sessionIds.length}`
})
} }
} else {
onProgress?.({
current: computeAggregateCurrent(),
total: sessionIds.length,
currentSession: '',
currentSessionId: '',
phase: 'preparing',
phaseLabel: `会话较多,已跳过空会话预检查(${sessionIds.length} 个)`
})
}
}
if (stopRequested) {
return {
success: true,
successCount,
failCount,
stopped: true,
pendingSessionIds: [...queue],
successSessionIds,
failedSessionIds
}
}
if (pauseRequested) {
return {
success: true,
successCount,
failCount,
paused: true,
pendingSessionIds: [...queue],
successSessionIds,
failedSessionIds
} }
} }
const queue = [...sessionIds]
let pauseRequested = false
let stopRequested = false
const runOne = async (sessionId: string): Promise<'done' | 'stopped'> => { const runOne = async (sessionId: string): Promise<'done' | 'stopped'> => {
this.throwIfStopRequested(control) try {
const sessionInfo = await this.getContactInfo(sessionId) this.throwIfStopRequested(control)
const sessionInfo = await this.getContactInfo(sessionId)
if (emptySessionIds.has(sessionId)) { if (emptySessionIds.has(sessionId)) {
failCount++ failCount++
failedSessionIds.push(sessionId) failedSessionIds.push(sessionId)
activeSessionRatios.delete(sessionId)
completedCount++
onProgress?.({
current: computeAggregateCurrent(),
total: sessionIds.length,
currentSession: sessionInfo.displayName,
currentSessionId: sessionId,
phase: 'exporting',
phaseLabel: '该会话没有消息,已跳过'
})
return 'done'
}
const sessionProgress = (progress: ExportProgress) => {
const phaseTotal = Number.isFinite(progress.total) && progress.total > 0 ? progress.total : 100
const phaseCurrent = Number.isFinite(progress.current) ? progress.current : 0
const ratio = progress.phase === 'complete'
? 1
: Math.max(0, Math.min(1, phaseCurrent / phaseTotal))
activeSessionRatios.set(sessionId, ratio)
onProgress?.({
...progress,
current: computeAggregateCurrent(),
total: sessionIds.length,
currentSession: sessionInfo.displayName,
currentSessionId: sessionId
})
}
sessionProgress({
current: 0,
total: 100,
currentSession: sessionInfo.displayName,
phase: 'preparing',
phaseLabel: '准备导出'
})
const sanitizeName = (value: string) => value.replace(/[<>:"\/\\|?*]/g, '_').replace(/\.+$/, '').trim()
const baseName = sanitizeName(sessionInfo.displayName || sessionId) || sanitizeName(sessionId) || 'session'
const suffix = sanitizeName(effectiveOptions.fileNameSuffix || '')
const safeName = suffix ? `${baseName}_${suffix}` : baseName
const fileNameWithPrefix = `${await this.getSessionFilePrefix(sessionId)}${safeName}`
const useSessionFolder = sessionLayout === 'per-session'
const sessionDir = useSessionFolder ? path.join(exportBaseDir, safeName) : exportBaseDir
if (useSessionFolder && !fs.existsSync(sessionDir)) {
fs.mkdirSync(sessionDir, { recursive: true })
}
let ext = '.json'
if (effectiveOptions.format === 'chatlab-jsonl') ext = '.jsonl'
else if (effectiveOptions.format === 'excel') ext = '.xlsx'
else if (effectiveOptions.format === 'txt') ext = '.txt'
else if (effectiveOptions.format === 'weclone') ext = '.csv'
else if (effectiveOptions.format === 'html') ext = '.html'
const outputPath = path.join(sessionDir, `${fileNameWithPrefix}${ext}`)
let result: { success: boolean; error?: string }
if (effectiveOptions.format === 'json' || effectiveOptions.format === 'arkme-json') {
result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'chatlab' || effectiveOptions.format === 'chatlab-jsonl') {
result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'excel') {
result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'txt') {
result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'weclone') {
result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'html') {
result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else {
result = { success: false, error: `不支持的格式: ${effectiveOptions.format}` }
}
if (!result.success && this.isStopError(result.error)) {
activeSessionRatios.delete(sessionId)
return 'stopped'
}
if (result.success) {
successCount++
successSessionIds.push(sessionId)
} else {
failCount++
failedSessionIds.push(sessionId)
console.error(`导出 ${sessionId} 失败:`, result.error)
}
activeSessionRatios.delete(sessionId)
completedCount++ completedCount++
onProgress?.({ onProgress?.({
current: completedCount, current: computeAggregateCurrent(),
total: sessionIds.length, total: sessionIds.length,
currentSession: sessionInfo.displayName, currentSession: sessionInfo.displayName,
currentSessionId: sessionId, currentSessionId: sessionId,
phase: 'exporting', phase: 'exporting'
phaseLabel: '该会话没有消息,已跳过'
}) })
return 'done' return 'done'
} catch (error) {
if (this.isStopError(error)) {
activeSessionRatios.delete(sessionId)
return 'stopped'
}
throw error
} }
const sessionProgress = (progress: ExportProgress) => {
const phaseTotal = Number.isFinite(progress.total) && progress.total > 0 ? progress.total : 100
const phaseCurrent = Number.isFinite(progress.current) ? progress.current : 0
const ratio = Math.max(0, Math.min(1, phaseCurrent / phaseTotal))
const aggregateCurrent = Math.min(sessionIds.length, completedCount + ratio)
onProgress?.({
...progress,
current: aggregateCurrent,
total: sessionIds.length,
currentSession: sessionInfo.displayName,
currentSessionId: sessionId
})
}
sessionProgress({
current: completedCount,
total: sessionIds.length,
currentSession: sessionInfo.displayName,
phase: 'exporting'
})
const sanitizeName = (value: string) => value.replace(/[<>:"\/\\|?*]/g, '_').replace(/\.+$/, '').trim()
const baseName = sanitizeName(sessionInfo.displayName || sessionId) || sanitizeName(sessionId) || 'session'
const suffix = sanitizeName(effectiveOptions.fileNameSuffix || '')
const safeName = suffix ? `${baseName}_${suffix}` : baseName
const fileNameWithPrefix = `${await this.getSessionFilePrefix(sessionId)}${safeName}`
const useSessionFolder = sessionLayout === 'per-session'
const sessionDir = useSessionFolder ? path.join(exportBaseDir, safeName) : exportBaseDir
if (useSessionFolder && !fs.existsSync(sessionDir)) {
fs.mkdirSync(sessionDir, { recursive: true })
}
let ext = '.json'
if (effectiveOptions.format === 'chatlab-jsonl') ext = '.jsonl'
else if (effectiveOptions.format === 'excel') ext = '.xlsx'
else if (effectiveOptions.format === 'txt') ext = '.txt'
else if (effectiveOptions.format === 'weclone') ext = '.csv'
else if (effectiveOptions.format === 'html') ext = '.html'
const outputPath = path.join(sessionDir, `${fileNameWithPrefix}${ext}`)
let result: { success: boolean; error?: string }
if (effectiveOptions.format === 'json' || effectiveOptions.format === 'arkme-json') {
result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'chatlab' || effectiveOptions.format === 'chatlab-jsonl') {
result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'excel') {
result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'txt') {
result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'weclone') {
result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'html') {
result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else {
result = { success: false, error: `不支持的格式: ${effectiveOptions.format}` }
}
if (!result.success && this.isStopError(result.error)) {
return 'stopped'
}
if (result.success) {
successCount++
successSessionIds.push(sessionId)
} else {
failCount++
failedSessionIds.push(sessionId)
console.error(`导出 ${sessionId} 失败:`, result.error)
}
completedCount++
onProgress?.({
current: completedCount,
total: sessionIds.length,
currentSession: sessionInfo.displayName,
phase: 'exporting'
})
return 'done'
} }
const workers = Array.from({ length: Math.min(sessionConcurrency, queue.length) }, async () => { const workers = Array.from({ length: Math.min(sessionConcurrency, queue.length) }, async () => {