From 81bc5aefff7c830d6149a044fa16c2d868d653df Mon Sep 17 00:00:00 2001 From: tisonhuang Date: Mon, 2 Mar 2026 18:52:02 +0800 Subject: [PATCH] fix(export): improve batch text progress and precheck interruptibility --- electron/services/exportService.ts | 345 +++++++++++++++++++---------- 1 file changed, 230 insertions(+), 115 deletions(-) diff --git a/electron/services/exportService.ts b/electron/services/exportService.ts index ea4964b..cc9bf00 100644 --- a/electron/services/exportService.ts +++ b/electron/services/exportService.ts @@ -255,6 +255,27 @@ class ExportService { return { mode } } + private createCollectProgressReporter( + sessionName: string, + onProgress?: (progress: ExportProgress) => void, + progressCurrent = 5 + ): ((payload: { fetched: number }) => void) | undefined { + if (!onProgress) return undefined + let lastReportAt = 0 + return ({ fetched }) => { + const now = Date.now() + if (now - lastReportAt < 350) return + lastReportAt = now + onProgress({ + current: progressCurrent, + total: 100, + currentSession: sessionName, + phase: 'preparing', + phaseLabel: `收集消息 ${fetched.toLocaleString()} 条` + }) + } + } + private shouldDecodeMessageContentInFastMode(localType: number): boolean { // 这些类型在文本导出里只需要占位符,无需解码完整 XML / 压缩内容 if (localType === 3 || localType === 34 || localType === 42 || localType === 43 || localType === 47) { @@ -2853,6 +2874,7 @@ class ExportService { }) const collectParams = this.resolveCollectParams(options) + const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5) const collected = await this.collectMessages( sessionId, cleanedMyWxid, @@ -2860,7 +2882,8 @@ class ExportService { options.senderUsername, collectParams.mode, collectParams.targetMediaTypes, - control + control, + collectProgressReporter ) const allMessages = collected.rows @@ -3273,7 +3296,7 @@ class ExportService { }) const collectParams = this.resolveCollectParams(options) - let collectProgressLastReportAt = 0 + const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5) const collected = await this.collectMessages( sessionId, cleanedMyWxid, @@ -3282,18 +3305,7 @@ class ExportService { collectParams.mode, collectParams.targetMediaTypes, control, - ({ fetched }) => { - const now = Date.now() - if (now - collectProgressLastReportAt < 350) return - collectProgressLastReportAt = now - onProgress?.({ - current: 5, - total: 100, - currentSession: sessionInfo.displayName, - phase: 'preparing', - phaseLabel: `收集消息 ${fetched.toLocaleString()} 条` - }) - } + collectProgressReporter ) // 如果没有消息,不创建文件 @@ -3890,6 +3902,7 @@ class ExportService { }) const collectParams = this.resolveCollectParams(options) + const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5) const collected = await this.collectMessages( sessionId, cleanedMyWxid, @@ -3897,7 +3910,8 @@ class ExportService { options.senderUsername, collectParams.mode, collectParams.targetMediaTypes, - control + control, + collectProgressReporter ) // 如果没有消息,不创建文件 @@ -4401,6 +4415,7 @@ class ExportService { }) const collectParams = this.resolveCollectParams(options) + const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5) const collected = await this.collectMessages( sessionId, cleanedMyWxid, @@ -4408,7 +4423,8 @@ class ExportService { options.senderUsername, collectParams.mode, collectParams.targetMediaTypes, - control + control, + collectProgressReporter ) // 如果没有消息,不创建文件 @@ -4705,6 +4721,7 @@ class ExportService { }) const collectParams = this.resolveCollectParams(options) + const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5) const collected = await this.collectMessages( sessionId, cleanedMyWxid, @@ -4712,7 +4729,8 @@ class ExportService { options.senderUsername, collectParams.mode, collectParams.targetMediaTypes, - control + control, + collectProgressReporter ) if (collected.rows.length === 0) { return { success: false, error: '该会话在指定时间范围内没有消息' } @@ -5071,6 +5089,7 @@ class ExportService { } const collectParams = this.resolveCollectParams(options) + const collectProgressReporter = this.createCollectProgressReporter(sessionInfo.displayName, onProgress, 5) const collected = await this.collectMessages( sessionId, cleanedMyWxid, @@ -5078,7 +5097,8 @@ class ExportService { options.senderUsername, collectParams.mode, collectParams.targetMediaTypes, - control + control, + collectProgressReporter ) // 如果没有消息,不创建文件 @@ -5671,128 +5691,223 @@ class ExportService { ? (effectiveOptions.sessionLayout ?? 'per-session') : 'shared' let completedCount = 0 + const activeSessionRatios = new Map() + const computeAggregateCurrent = () => { + let activeRatioSum = 0 + for (const ratio of activeSessionRatios.values()) { + activeRatioSum += Math.max(0, Math.min(1, ratio)) + } + return Math.min(sessionIds.length, completedCount + activeRatioSum) + } const defaultConcurrency = exportMediaEnabled ? 2 : 4 const rawConcurrency = typeof effectiveOptions.exportConcurrency === 'number' ? Math.floor(effectiveOptions.exportConcurrency) : defaultConcurrency const clampedConcurrency = Math.max(1, Math.min(rawConcurrency, 6)) const sessionConcurrency = clampedConcurrency + const queue = [...sessionIds] + let pauseRequested = false + let stopRequested = false const emptySessionIds = new Set() const canFastSkipEmptySessions = this.isUnboundedDateRange(effectiveOptions.dateRange) && !String(effectiveOptions.senderUsername || '').trim() if (canFastSkipEmptySessions && sessionIds.length > 0) { - const countsResult = await wcdbService.getMessageCounts(sessionIds) - if (countsResult.success && countsResult.counts) { - for (const sessionId of sessionIds) { - const count = countsResult.counts[sessionId] - if (typeof count === 'number' && Number.isFinite(count) && count <= 0) { - emptySessionIds.add(sessionId) + const EMPTY_SESSION_PRECHECK_LIMIT = 1200 + if (sessionIds.length <= EMPTY_SESSION_PRECHECK_LIMIT) { + let checkedCount = 0 + onProgress?.({ + current: computeAggregateCurrent(), + total: sessionIds.length, + currentSession: '', + currentSessionId: '', + phase: 'preparing', + phaseProgress: 0, + phaseTotal: sessionIds.length, + phaseLabel: `预检查空会话 0/${sessionIds.length}` + }) + + const PRECHECK_BATCH_SIZE = 160 + for (let i = 0; i < sessionIds.length; i += PRECHECK_BATCH_SIZE) { + if (control?.shouldStop?.()) { + stopRequested = true + break } + if (control?.shouldPause?.()) { + pauseRequested = true + break + } + + const batchSessionIds = sessionIds.slice(i, i + PRECHECK_BATCH_SIZE) + const countsResult = await wcdbService.getMessageCounts(batchSessionIds) + if (countsResult.success && countsResult.counts) { + for (const batchSessionId of batchSessionIds) { + const count = countsResult.counts[batchSessionId] + if (typeof count === 'number' && Number.isFinite(count) && count <= 0) { + emptySessionIds.add(batchSessionId) + } + } + } + + checkedCount = Math.min(sessionIds.length, checkedCount + batchSessionIds.length) + onProgress?.({ + current: computeAggregateCurrent(), + total: sessionIds.length, + currentSession: '', + currentSessionId: '', + phase: 'preparing', + phaseProgress: checkedCount, + phaseTotal: sessionIds.length, + phaseLabel: `预检查空会话 ${checkedCount}/${sessionIds.length}` + }) } + } else { + onProgress?.({ + current: computeAggregateCurrent(), + total: sessionIds.length, + currentSession: '', + currentSessionId: '', + phase: 'preparing', + phaseLabel: `会话较多,已跳过空会话预检查(${sessionIds.length} 个)` + }) + } + } + + if (stopRequested) { + return { + success: true, + successCount, + failCount, + stopped: true, + pendingSessionIds: [...queue], + successSessionIds, + failedSessionIds + } + } + if (pauseRequested) { + return { + success: true, + successCount, + failCount, + paused: true, + pendingSessionIds: [...queue], + successSessionIds, + failedSessionIds } } - const queue = [...sessionIds] - let pauseRequested = false - let stopRequested = false const runOne = async (sessionId: string): Promise<'done' | 'stopped'> => { - this.throwIfStopRequested(control) - const sessionInfo = await this.getContactInfo(sessionId) + try { + this.throwIfStopRequested(control) + const sessionInfo = await this.getContactInfo(sessionId) - if (emptySessionIds.has(sessionId)) { - failCount++ - failedSessionIds.push(sessionId) + if (emptySessionIds.has(sessionId)) { + failCount++ + failedSessionIds.push(sessionId) + activeSessionRatios.delete(sessionId) + completedCount++ + onProgress?.({ + current: computeAggregateCurrent(), + total: sessionIds.length, + currentSession: sessionInfo.displayName, + currentSessionId: sessionId, + phase: 'exporting', + phaseLabel: '该会话没有消息,已跳过' + }) + return 'done' + } + + const sessionProgress = (progress: ExportProgress) => { + const phaseTotal = Number.isFinite(progress.total) && progress.total > 0 ? progress.total : 100 + const phaseCurrent = Number.isFinite(progress.current) ? progress.current : 0 + const ratio = progress.phase === 'complete' + ? 1 + : Math.max(0, Math.min(1, phaseCurrent / phaseTotal)) + activeSessionRatios.set(sessionId, ratio) + onProgress?.({ + ...progress, + current: computeAggregateCurrent(), + total: sessionIds.length, + currentSession: sessionInfo.displayName, + currentSessionId: sessionId + }) + } + + sessionProgress({ + current: 0, + total: 100, + currentSession: sessionInfo.displayName, + phase: 'preparing', + phaseLabel: '准备导出' + }) + + const sanitizeName = (value: string) => value.replace(/[<>:"\/\\|?*]/g, '_').replace(/\.+$/, '').trim() + const baseName = sanitizeName(sessionInfo.displayName || sessionId) || sanitizeName(sessionId) || 'session' + const suffix = sanitizeName(effectiveOptions.fileNameSuffix || '') + const safeName = suffix ? `${baseName}_${suffix}` : baseName + const fileNameWithPrefix = `${await this.getSessionFilePrefix(sessionId)}${safeName}` + const useSessionFolder = sessionLayout === 'per-session' + const sessionDir = useSessionFolder ? path.join(exportBaseDir, safeName) : exportBaseDir + + if (useSessionFolder && !fs.existsSync(sessionDir)) { + fs.mkdirSync(sessionDir, { recursive: true }) + } + + let ext = '.json' + if (effectiveOptions.format === 'chatlab-jsonl') ext = '.jsonl' + else if (effectiveOptions.format === 'excel') ext = '.xlsx' + else if (effectiveOptions.format === 'txt') ext = '.txt' + else if (effectiveOptions.format === 'weclone') ext = '.csv' + else if (effectiveOptions.format === 'html') ext = '.html' + const outputPath = path.join(sessionDir, `${fileNameWithPrefix}${ext}`) + + let result: { success: boolean; error?: string } + if (effectiveOptions.format === 'json' || effectiveOptions.format === 'arkme-json') { + result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress, control) + } else if (effectiveOptions.format === 'chatlab' || effectiveOptions.format === 'chatlab-jsonl') { + result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress, control) + } else if (effectiveOptions.format === 'excel') { + result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress, control) + } else if (effectiveOptions.format === 'txt') { + result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress, control) + } else if (effectiveOptions.format === 'weclone') { + result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress, control) + } else if (effectiveOptions.format === 'html') { + result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress, control) + } else { + result = { success: false, error: `不支持的格式: ${effectiveOptions.format}` } + } + + if (!result.success && this.isStopError(result.error)) { + activeSessionRatios.delete(sessionId) + return 'stopped' + } + + if (result.success) { + successCount++ + successSessionIds.push(sessionId) + } else { + failCount++ + failedSessionIds.push(sessionId) + console.error(`导出 ${sessionId} 失败:`, result.error) + } + + activeSessionRatios.delete(sessionId) completedCount++ onProgress?.({ - current: completedCount, + current: computeAggregateCurrent(), total: sessionIds.length, currentSession: sessionInfo.displayName, currentSessionId: sessionId, - phase: 'exporting', - phaseLabel: '该会话没有消息,已跳过' + phase: 'exporting' }) return 'done' + } catch (error) { + if (this.isStopError(error)) { + activeSessionRatios.delete(sessionId) + return 'stopped' + } + throw error } - - const sessionProgress = (progress: ExportProgress) => { - const phaseTotal = Number.isFinite(progress.total) && progress.total > 0 ? progress.total : 100 - const phaseCurrent = Number.isFinite(progress.current) ? progress.current : 0 - const ratio = Math.max(0, Math.min(1, phaseCurrent / phaseTotal)) - const aggregateCurrent = Math.min(sessionIds.length, completedCount + ratio) - onProgress?.({ - ...progress, - current: aggregateCurrent, - total: sessionIds.length, - currentSession: sessionInfo.displayName, - currentSessionId: sessionId - }) - } - - sessionProgress({ - current: completedCount, - total: sessionIds.length, - currentSession: sessionInfo.displayName, - phase: 'exporting' - }) - - const sanitizeName = (value: string) => value.replace(/[<>:"\/\\|?*]/g, '_').replace(/\.+$/, '').trim() - const baseName = sanitizeName(sessionInfo.displayName || sessionId) || sanitizeName(sessionId) || 'session' - const suffix = sanitizeName(effectiveOptions.fileNameSuffix || '') - const safeName = suffix ? `${baseName}_${suffix}` : baseName - const fileNameWithPrefix = `${await this.getSessionFilePrefix(sessionId)}${safeName}` - const useSessionFolder = sessionLayout === 'per-session' - const sessionDir = useSessionFolder ? path.join(exportBaseDir, safeName) : exportBaseDir - - if (useSessionFolder && !fs.existsSync(sessionDir)) { - fs.mkdirSync(sessionDir, { recursive: true }) - } - - let ext = '.json' - if (effectiveOptions.format === 'chatlab-jsonl') ext = '.jsonl' - else if (effectiveOptions.format === 'excel') ext = '.xlsx' - else if (effectiveOptions.format === 'txt') ext = '.txt' - else if (effectiveOptions.format === 'weclone') ext = '.csv' - else if (effectiveOptions.format === 'html') ext = '.html' - const outputPath = path.join(sessionDir, `${fileNameWithPrefix}${ext}`) - - let result: { success: boolean; error?: string } - if (effectiveOptions.format === 'json' || effectiveOptions.format === 'arkme-json') { - result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress, control) - } else if (effectiveOptions.format === 'chatlab' || effectiveOptions.format === 'chatlab-jsonl') { - result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress, control) - } else if (effectiveOptions.format === 'excel') { - result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress, control) - } else if (effectiveOptions.format === 'txt') { - result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress, control) - } else if (effectiveOptions.format === 'weclone') { - result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress, control) - } else if (effectiveOptions.format === 'html') { - result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress, control) - } else { - result = { success: false, error: `不支持的格式: ${effectiveOptions.format}` } - } - - if (!result.success && this.isStopError(result.error)) { - return 'stopped' - } - - if (result.success) { - successCount++ - successSessionIds.push(sessionId) - } else { - failCount++ - failedSessionIds.push(sessionId) - console.error(`导出 ${sessionId} 失败:`, result.error) - } - - completedCount++ - onProgress?.({ - current: completedCount, - total: sessionIds.length, - currentSession: sessionInfo.displayName, - phase: 'exporting' - }) - return 'done' } const workers = Array.from({ length: Math.min(sessionConcurrency, queue.length) }, async () => {