diff --git a/electron/main.ts b/electron/main.ts index c8c1cdb..4f9aed9 100644 --- a/electron/main.ts +++ b/electron/main.ts @@ -100,6 +100,7 @@ interface ExportTaskControlState { } const exportTaskControlMap = new Map() +const pendingExportTaskControlMap = new Map() const getTaskControlState = (taskId?: string): ExportTaskControlState | null => { const normalized = typeof taskId === 'string' ? taskId.trim() : '' @@ -110,7 +111,12 @@ const getTaskControlState = (taskId?: string): ExportTaskControlState | null => const createTaskControlState = (taskId?: string): string | null => { const normalized = typeof taskId === 'string' ? taskId.trim() : '' if (!normalized) return null - exportTaskControlMap.set(normalized, { pauseRequested: false, stopRequested: false }) + const pending = pendingExportTaskControlMap.get(normalized) + exportTaskControlMap.set(normalized, { + pauseRequested: Boolean(pending?.pauseRequested), + stopRequested: Boolean(pending?.stopRequested) + }) + pendingExportTaskControlMap.delete(normalized) return normalized } @@ -118,6 +124,16 @@ const clearTaskControlState = (taskId?: string): void => { const normalized = typeof taskId === 'string' ? taskId.trim() : '' if (!normalized) return exportTaskControlMap.delete(normalized) + pendingExportTaskControlMap.delete(normalized) +} + +const queueTaskControlRequest = (taskId: string, action: 'pause' | 'stop'): void => { + const normalized = taskId.trim() + if (!normalized) return + const existing = pendingExportTaskControlMap.get(normalized) || { pauseRequested: false, stopRequested: false } + if (action === 'pause') existing.pauseRequested = true + if (action === 'stop') existing.stopRequested = true + pendingExportTaskControlMap.set(normalized, existing) } function createWindow(options: { autoShow?: boolean } = {}) { @@ -1297,7 +1313,8 @@ function registerIpcHandlers() { ipcMain.handle('export:pauseTask', async (_, taskId: string) => { const state = getTaskControlState(taskId) if (!state) { - return { success: false, error: '任务未在执行中或已结束' } + queueTaskControlRequest(taskId, 'pause') + return { success: true, queued: true } } state.pauseRequested = true return { success: true } @@ -1306,7 +1323,8 @@ function registerIpcHandlers() { ipcMain.handle('export:stopTask', async (_, taskId: string) => { const state = getTaskControlState(taskId) if (!state) { - return { success: false, error: '任务未在执行中或已结束' } + queueTaskControlRequest(taskId, 'stop') + return { success: true, queued: true } } state.stopRequested = true return { success: true } @@ -1398,8 +1416,15 @@ function registerIpcHandlers() { return groupAnalyticsService.getGroupMembers(chatroomId) }) - ipcMain.handle('groupAnalytics:getGroupMembersPanelData', async (_, chatroomId: string, forceRefresh?: boolean) => { - return groupAnalyticsService.getGroupMembersPanelData(chatroomId, forceRefresh) + ipcMain.handle( + 'groupAnalytics:getGroupMembersPanelData', + async (_, chatroomId: string, options?: { forceRefresh?: boolean; includeMessageCounts?: boolean } | boolean) => { + const normalizedOptions = typeof options === 'boolean' + ? { forceRefresh: options } + : options + return groupAnalyticsService.getGroupMembersPanelData(chatroomId, normalizedOptions) + } + ) }) ipcMain.handle('groupAnalytics:getGroupMessageRanking', async (_, chatroomId: string, limit?: number, startTime?: number, endTime?: number) => { diff --git a/electron/preload.ts b/electron/preload.ts index 0b52a1b..cdcde72 100644 --- a/electron/preload.ts +++ b/electron/preload.ts @@ -237,8 +237,10 @@ contextBridge.exposeInMainWorld('electronAPI', { groupAnalytics: { getGroupChats: () => ipcRenderer.invoke('groupAnalytics:getGroupChats'), getGroupMembers: (chatroomId: string) => ipcRenderer.invoke('groupAnalytics:getGroupMembers', chatroomId), - getGroupMembersPanelData: (chatroomId: string, forceRefresh?: boolean) => - ipcRenderer.invoke('groupAnalytics:getGroupMembersPanelData', chatroomId, forceRefresh), + getGroupMembersPanelData: ( + chatroomId: string, + options?: { forceRefresh?: boolean; includeMessageCounts?: boolean } + ) => ipcRenderer.invoke('groupAnalytics:getGroupMembersPanelData', chatroomId, options), getGroupMessageRanking: (chatroomId: string, limit?: number, startTime?: number, endTime?: number) => ipcRenderer.invoke('groupAnalytics:getGroupMessageRanking', chatroomId, limit, startTime, endTime), getGroupActiveHours: (chatroomId: string, startTime?: number, endTime?: number) => ipcRenderer.invoke('groupAnalytics:getGroupActiveHours', chatroomId, startTime, endTime), getGroupMediaStats: (chatroomId: string, startTime?: number, endTime?: number) => ipcRenderer.invoke('groupAnalytics:getGroupMediaStats', chatroomId, startTime, endTime), diff --git a/electron/services/exportService.ts b/electron/services/exportService.ts index 1ebd075..ea4964b 100644 --- a/electron/services/exportService.ts +++ b/electron/services/exportService.ts @@ -119,6 +119,11 @@ export interface ExportProgress { phaseLabel?: string } +interface ExportTaskControl { + shouldPause?: () => boolean + shouldStop?: () => boolean +} + // 并发控制:限制同时执行的 Promise 数量 async function parallelLimit( items: T[], @@ -149,6 +154,7 @@ class ExportService { private contactCache: LRUCache private inlineEmojiCache: LRUCache private htmlStyleCache: string | null = null + private readonly STOP_ERROR_CODE = 'WEFLOW_EXPORT_STOP_REQUESTED' constructor() { this.configService = new ConfigService() @@ -157,6 +163,30 @@ class ExportService { this.inlineEmojiCache = new LRUCache(100) // 最多缓存100个表情 } + private createStopError(): Error { + const error = new Error('导出任务已停止') + ;(error as Error & { code?: string }).code = this.STOP_ERROR_CODE + return error + } + + private isStopError(error: unknown): boolean { + if (!error) return false + if (typeof error === 'string') { + return error.includes(this.STOP_ERROR_CODE) || error.includes('导出任务已停止') + } + if (error instanceof Error) { + const code = (error as Error & { code?: string }).code + return code === this.STOP_ERROR_CODE || error.message.includes(this.STOP_ERROR_CODE) || error.message.includes('导出任务已停止') + } + return false + } + + private throwIfStopRequested(control?: ExportTaskControl): void { + if (control?.shouldStop?.()) { + throw this.createStopError() + } + } + private getClampedConcurrency(value: number | undefined, fallback = 2, max = 6): number { if (typeof value !== 'number' || !Number.isFinite(value)) return fallback const raw = Math.floor(value) @@ -1994,7 +2024,9 @@ class ExportService { dateRange?: { start: number; end: number } | null, senderUsernameFilter?: string, collectMode: MessageCollectMode = 'full', - targetMediaTypes?: Set + targetMediaTypes?: Set, + control?: ExportTaskControl, + onCollectProgress?: (payload: { fetched: number }) => void ): Promise<{ rows: any[]; memberSet: Map; firstTime: number | null; lastTime: number | null }> { const rows: any[] = [] const memberSet = new Map() @@ -2010,6 +2042,7 @@ class ExportService { const endTime = dateRange?.end && dateRange.end > 0 ? dateRange.end : 0 const batchSize = (collectMode === 'text-fast' || collectMode === 'media-fast') ? 2000 : 500 + this.throwIfStopRequested(control) const cursor = collectMode === 'media-fast' ? await wcdbService.openMessageCursorLite( sessionId, @@ -2034,6 +2067,7 @@ class ExportService { let hasMore = true let batchCount = 0 while (hasMore) { + this.throwIfStopRequested(control) const batch = await wcdbService.fetchMessageBatch(cursor.cursor) batchCount++ @@ -2044,7 +2078,11 @@ class ExportService { if (!batch.rows) break + let rowIndex = 0 for (const row of batch.rows) { + if ((rowIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } const createTime = parseInt(row.create_time || '0', 10) if (dateRange) { if (createTime < dateRange.start || createTime > dateRange.end) continue @@ -2149,10 +2187,12 @@ class ExportService { if (firstTime === null || createTime < firstTime) firstTime = createTime if (lastTime === null || createTime > lastTime) lastTime = createTime } + onCollectProgress?.({ fetched: rows.length }) hasMore = batch.hasMore === true } } catch (err) { + if (this.isStopError(err)) throw err console.error(`[Export] 收集消息异常:`, err) } finally { try { @@ -2162,10 +2202,12 @@ class ExportService { } } + this.throwIfStopRequested(control) if (collectMode === 'media-fast' && mediaTypeFilter && rows.length > 0) { - await this.backfillMediaFieldsFromMessageDetail(sessionId, rows, mediaTypeFilter) + await this.backfillMediaFieldsFromMessageDetail(sessionId, rows, mediaTypeFilter, control) } + this.throwIfStopRequested(control) if (senderSet.size > 0) { const usernames = Array.from(senderSet) const [nameResult, avatarResult] = await Promise.all([ @@ -2196,7 +2238,8 @@ class ExportService { private async backfillMediaFieldsFromMessageDetail( sessionId: string, rows: any[], - targetMediaTypes: Set + targetMediaTypes: Set, + control?: ExportTaskControl ): Promise { const needsBackfill = rows.filter((msg) => { if (!targetMediaTypes.has(msg.localType)) return false @@ -2209,6 +2252,7 @@ class ExportService { const DETAIL_CONCURRENCY = 6 await parallelLimit(needsBackfill, DETAIL_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const localId = Number(msg.localId || 0) if (!Number.isFinite(localId) || localId <= 0) return @@ -2788,9 +2832,11 @@ class ExportService { sessionId: string, outputPath: string, options: ExportOptions, - onProgress?: (progress: ExportProgress) => void + onProgress?: (progress: ExportProgress) => void, + control?: ExportTaskControl ): Promise<{ success: boolean; error?: string }> { try { + this.throwIfStopRequested(control) const conn = await this.ensureConnected() if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error } @@ -2813,7 +2859,8 @@ class ExportService { options.dateRange, options.senderUsername, collectParams.mode, - collectParams.targetMediaTypes + collectParams.targetMediaTypes, + control ) const allMessages = collected.rows @@ -2831,6 +2878,7 @@ class ExportService { } if (isGroup) { + this.throwIfStopRequested(control) await this.mergeGroupMembers(sessionId, collected.memberSet, options.exportAvatars === true) } @@ -2889,6 +2937,7 @@ class ExportService { const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency) let mediaExported = 0 await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => { + this.throwIfStopRequested(control) const mediaKey = `${msg.localType}_${msg.localId}` if (!mediaCache.has(mediaKey)) { const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, { @@ -2933,6 +2982,7 @@ class ExportService { const VOICE_CONCURRENCY = 4 let voiceTranscribed = 0 await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername) voiceTranscriptMap.set(msg.localId, transcript) voiceTranscribed++ @@ -2957,7 +3007,11 @@ class ExportService { }) const chatLabMessages: ChatLabMessage[] = [] + let messageIndex = 0 for (const msg of allMessages) { + if ((messageIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } const memberInfo = collected.memberSet.get(msg.senderUsername)?.member || { platformId: msg.senderUsername, accountName: msg.senderUsername, @@ -3150,13 +3204,17 @@ class ExportService { meta: chatLabExport.meta })) for (const member of chatLabExport.members) { + this.throwIfStopRequested(control) lines.push(JSON.stringify({ _type: 'member', ...member })) } for (const message of chatLabExport.messages) { + this.throwIfStopRequested(control) lines.push(JSON.stringify({ _type: 'message', ...message })) } + this.throwIfStopRequested(control) fs.writeFileSync(outputPath, lines.join('\n'), 'utf-8') } else { + this.throwIfStopRequested(control) fs.writeFileSync(outputPath, JSON.stringify(chatLabExport, null, 2), 'utf-8') } @@ -3169,6 +3227,9 @@ class ExportService { return { success: true } } catch (e) { + if (this.isStopError(e)) { + return { success: false, error: '导出任务已停止' } + } return { success: false, error: String(e) } } } @@ -3180,9 +3241,11 @@ class ExportService { sessionId: string, outputPath: string, options: ExportOptions, - onProgress?: (progress: ExportProgress) => void + onProgress?: (progress: ExportProgress) => void, + control?: ExportTaskControl ): Promise<{ success: boolean; error?: string }> { try { + this.throwIfStopRequested(control) const conn = await this.ensureConnected() if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error } @@ -3210,13 +3273,27 @@ class ExportService { }) const collectParams = this.resolveCollectParams(options) + let collectProgressLastReportAt = 0 const collected = await this.collectMessages( sessionId, cleanedMyWxid, options.dateRange, options.senderUsername, collectParams.mode, - collectParams.targetMediaTypes + collectParams.targetMediaTypes, + control, + ({ fetched }) => { + const now = Date.now() + if (now - collectProgressLastReportAt < 350) return + collectProgressLastReportAt = now + onProgress?.({ + current: 5, + total: 100, + currentSession: sessionInfo.displayName, + phase: 'preparing', + phaseLabel: `收集消息 ${fetched.toLocaleString()} 条` + }) + } ) // 如果没有消息,不创建文件 @@ -3233,7 +3310,11 @@ class ExportService { } const senderUsernames = new Set() + let senderScanIndex = 0 for (const msg of collected.rows) { + if ((senderScanIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } if (msg.senderUsername) senderUsernames.add(msg.senderUsername) } senderUsernames.add(sessionId) @@ -3272,6 +3353,7 @@ class ExportService { const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency) let mediaExported = 0 await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => { + this.throwIfStopRequested(control) const mediaKey = `${msg.localType}_${msg.localId}` if (!mediaCache.has(mediaKey)) { const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, { @@ -3315,6 +3397,7 @@ class ExportService { const VOICE_CONCURRENCY = 4 let voiceTranscribed = 0 await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername) voiceTranscriptMap.set(msg.localId, transcript) voiceTranscribed++ @@ -3360,7 +3443,11 @@ class ExportService { const transferCandidates: Array<{ xml: string; messageRef: any }> = [] let needSort = false let lastCreateTime = Number.NEGATIVE_INFINITY + let messageIndex = 0 for (const msg of collected.rows) { + if ((messageIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } const senderInfo = senderInfoMap.get(msg.senderUsername) || { displayName: msg.senderUsername || '' } const sourceMatch = /[\s\S]*?<\/msgsource>/i.exec(msg.content || '') const source = sourceMatch ? sourceMatch[0] : '' @@ -3470,6 +3557,7 @@ class ExportService { const transferConcurrency = this.getClampedConcurrency(options.exportConcurrency, 4, 8) await parallelLimit(transferCandidates, transferConcurrency, async (item) => { + this.throwIfStopRequested(control) const transferDesc = await this.resolveTransferDesc( item.xml, cleanedMyWxid, @@ -3516,6 +3604,7 @@ class ExportService { const weflow = this.getWeflowHeader() if (options.format === 'arkme-json' && isGroup) { + this.throwIfStopRequested(control) await this.mergeGroupMembers(sessionId, collected.memberSet, options.exportAvatars === true) } @@ -3587,6 +3676,7 @@ class ExportService { } const compactMessages = allMessages.map((message) => { + this.throwIfStopRequested(control) const senderID = ensureSenderId(String(message.senderUsername || '')) const compactMessage: any = { localId: message.localId, @@ -3646,6 +3736,7 @@ class ExportService { groupMembers = [] for (const memberWxid of memberUsernames) { + this.throwIfStopRequested(control) const member = collected.memberSet.get(memberWxid)?.member const contactResult = await getContactCached(memberWxid) const contact = contactResult.success ? contactResult.contact : null @@ -3712,6 +3803,7 @@ class ExportService { arkmeExport.groupMembers = groupMembers } + this.throwIfStopRequested(control) fs.writeFileSync(outputPath, JSON.stringify(arkmeExport, null, 2), 'utf-8') } else { const detailedExport: any = { @@ -3734,6 +3826,7 @@ class ExportService { } } + this.throwIfStopRequested(control) fs.writeFileSync(outputPath, JSON.stringify(detailedExport, null, 2), 'utf-8') } @@ -3746,6 +3839,9 @@ class ExportService { return { success: true } } catch (e) { + if (this.isStopError(e)) { + return { success: false, error: '导出任务已停止' } + } return { success: false, error: String(e) } } } @@ -3757,9 +3853,11 @@ class ExportService { sessionId: string, outputPath: string, options: ExportOptions, - onProgress?: (progress: ExportProgress) => void + onProgress?: (progress: ExportProgress) => void, + control?: ExportTaskControl ): Promise<{ success: boolean; error?: string }> { try { + this.throwIfStopRequested(control) const conn = await this.ensureConnected() if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error } @@ -3798,7 +3896,8 @@ class ExportService { options.dateRange, options.senderUsername, collectParams.mode, - collectParams.targetMediaTypes + collectParams.targetMediaTypes, + control ) // 如果没有消息,不创建文件 @@ -3815,7 +3914,11 @@ class ExportService { } const senderUsernames = new Set() + let senderScanIndex = 0 for (const msg of collected.rows) { + if ((senderScanIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } if (msg.senderUsername) senderUsernames.add(msg.senderUsername) } senderUsernames.add(sessionId) @@ -3976,6 +4079,7 @@ class ExportService { const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency) let mediaExported = 0 await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => { + this.throwIfStopRequested(control) const mediaKey = `${msg.localType}_${msg.localId}` if (!mediaCache.has(mediaKey)) { const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, { @@ -4019,6 +4123,7 @@ class ExportService { const VOICE_CONCURRENCY = 4 let voiceTranscribed = 0 await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername) voiceTranscriptMap.set(msg.localId, transcript) voiceTranscribed++ @@ -4043,6 +4148,9 @@ class ExportService { // ========== 写入 Excel 行 ========== for (let i = 0; i < sortedMessages.length; i++) { + if ((i & 0x7f) === 0) { + this.throwIfStopRequested(control) + } const msg = sortedMessages[i] // 确定发送者信息 @@ -4194,6 +4302,7 @@ class ExportService { }) // 写入文件 + this.throwIfStopRequested(control) await workbook.xlsx.writeFile(outputPath) onProgress?.({ @@ -4205,6 +4314,9 @@ class ExportService { return { success: true } } catch (e) { + if (this.isStopError(e)) { + return { success: false, error: '导出任务已停止' } + } // 处理文件被占用的错误 if (e instanceof Error) { if (e.message.includes('EBUSY') || e.message.includes('resource busy') || e.message.includes('locked')) { @@ -4258,9 +4370,11 @@ class ExportService { sessionId: string, outputPath: string, options: ExportOptions, - onProgress?: (progress: ExportProgress) => void + onProgress?: (progress: ExportProgress) => void, + control?: ExportTaskControl ): Promise<{ success: boolean; error?: string }> { try { + this.throwIfStopRequested(control) const conn = await this.ensureConnected() if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error } @@ -4293,7 +4407,8 @@ class ExportService { options.dateRange, options.senderUsername, collectParams.mode, - collectParams.targetMediaTypes + collectParams.targetMediaTypes, + control ) // 如果没有消息,不创建文件 @@ -4310,7 +4425,11 @@ class ExportService { } const senderUsernames = new Set() + let senderScanIndex = 0 for (const msg of collected.rows) { + if ((senderScanIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } if (msg.senderUsername) senderUsernames.add(msg.senderUsername) } senderUsernames.add(sessionId) @@ -4357,6 +4476,7 @@ class ExportService { const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency) let mediaExported = 0 await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => { + this.throwIfStopRequested(control) const mediaKey = `${msg.localType}_${msg.localId}` if (!mediaCache.has(mediaKey)) { const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, { @@ -4399,6 +4519,7 @@ class ExportService { const VOICE_CONCURRENCY = 4 let voiceTranscribed = 0 await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername) voiceTranscriptMap.set(msg.localId, transcript) voiceTranscribed++ @@ -4424,6 +4545,9 @@ class ExportService { const lines: string[] = [] for (let i = 0; i < sortedMessages.length; i++) { + if ((i & 0x7f) === 0) { + this.throwIfStopRequested(control) + } const msg = sortedMessages[i] const mediaKey = `${msg.localType}_${msg.localId}` const mediaItem = mediaCache.get(mediaKey) @@ -4524,6 +4648,7 @@ class ExportService { phase: 'writing' }) + this.throwIfStopRequested(control) fs.writeFileSync(outputPath, lines.join('\n'), 'utf-8') onProgress?.({ @@ -4535,6 +4660,9 @@ class ExportService { return { success: true } } catch (e) { + if (this.isStopError(e)) { + return { success: false, error: '导出任务已停止' } + } return { success: false, error: String(e) } } } @@ -4546,9 +4674,11 @@ class ExportService { sessionId: string, outputPath: string, options: ExportOptions, - onProgress?: (progress: ExportProgress) => void + onProgress?: (progress: ExportProgress) => void, + control?: ExportTaskControl ): Promise<{ success: boolean; error?: string }> { try { + this.throwIfStopRequested(control) const conn = await this.ensureConnected() if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error } @@ -4581,14 +4711,19 @@ class ExportService { options.dateRange, options.senderUsername, collectParams.mode, - collectParams.targetMediaTypes + collectParams.targetMediaTypes, + control ) if (collected.rows.length === 0) { return { success: false, error: '该会话在指定时间范围内没有消息' } } const senderUsernames = new Set() + let senderScanIndex = 0 for (const msg of collected.rows) { + if ((senderScanIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } if (msg.senderUsername) senderUsernames.add(msg.senderUsername) } senderUsernames.add(sessionId) @@ -4642,6 +4777,7 @@ class ExportService { const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency) let mediaExported = 0 await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => { + this.throwIfStopRequested(control) const mediaKey = `${msg.localType}_${msg.localId}` if (!mediaCache.has(mediaKey)) { const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, { @@ -4684,6 +4820,7 @@ class ExportService { const VOICE_CONCURRENCY = 4 let voiceTranscribed = 0 await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername) voiceTranscriptMap.set(msg.localId, transcript) voiceTranscribed++ @@ -4710,6 +4847,9 @@ class ExportService { lines.push('id,MsgSvrID,type_name,is_sender,talker,msg,src,CreateTime') for (let i = 0; i < sortedMessages.length; i++) { + if ((i & 0x7f) === 0) { + this.throwIfStopRequested(control) + } const msg = sortedMessages[i] const mediaKey = `${msg.localType}_${msg.localId}` const mediaItem = mediaCache.get(mediaKey) || null @@ -4787,6 +4927,7 @@ class ExportService { phase: 'writing' }) + this.throwIfStopRequested(control) fs.writeFileSync(outputPath, `\uFEFF${lines.join('\r\n')}`, 'utf-8') onProgress?.({ @@ -4798,6 +4939,9 @@ class ExportService { return { success: true } } catch (e) { + if (this.isStopError(e)) { + return { success: false, error: '导出任务已停止' } + } return { success: false, error: String(e) } } } @@ -4893,9 +5037,11 @@ class ExportService { sessionId: string, outputPath: string, options: ExportOptions, - onProgress?: (progress: ExportProgress) => void + onProgress?: (progress: ExportProgress) => void, + control?: ExportTaskControl ): Promise<{ success: boolean; error?: string }> { try { + this.throwIfStopRequested(control) const conn = await this.ensureConnected() if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error } @@ -4931,7 +5077,8 @@ class ExportService { options.dateRange, options.senderUsername, collectParams.mode, - collectParams.targetMediaTypes + collectParams.targetMediaTypes, + control ) // 如果没有消息,不创建文件 @@ -4940,7 +5087,11 @@ class ExportService { } const senderUsernames = new Set() + let senderScanIndex = 0 for (const msg of collected.rows) { + if ((senderScanIndex++ & 0x7f) === 0) { + this.throwIfStopRequested(control) + } if (msg.senderUsername) senderUsernames.add(msg.senderUsername) } senderUsernames.add(sessionId) @@ -4958,6 +5109,7 @@ class ExportService { : new Map() if (isGroup) { + this.throwIfStopRequested(control) await this.mergeGroupMembers(sessionId, collected.memberSet, options.exportAvatars === true) } const sortedMessages = collected.rows.sort((a, b) => a.createTime - b.createTime) @@ -4989,6 +5141,7 @@ class ExportService { const MEDIA_CONCURRENCY = 6 let mediaExported = 0 await parallelLimit(mediaMessages, MEDIA_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const mediaKey = `${msg.localType}_${msg.localId}` if (!mediaCache.has(mediaKey)) { const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, { @@ -5036,6 +5189,7 @@ class ExportService { const VOICE_CONCURRENCY = 4 let voiceTranscribed = 0 await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => { + this.throwIfStopRequested(control) const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername) voiceTranscriptMap.set(msg.localId, transcript) voiceTranscribed++ @@ -5079,6 +5233,7 @@ class ExportService { const writePromise = (str: string) => { return new Promise((resolve, reject) => { + this.throwIfStopRequested(control) if (!stream.write(str)) { stream.once('drain', resolve) } else { @@ -5145,6 +5300,9 @@ class ExportService { let writeBuf: string[] = [] for (let i = 0; i < sortedMessages.length; i++) { + if ((i & 0x7f) === 0) { + this.throwIfStopRequested(control) + } const msg = sortedMessages[i] const mediaKey = `${msg.localType}_${msg.localId}` const mediaItem = mediaCache.get(mediaKey) || null @@ -5378,6 +5536,9 @@ class ExportService { }) } catch (e) { + if (this.isStopError(e)) { + return { success: false, error: '导出任务已停止' } + } return { success: false, error: String(e) } } } @@ -5467,10 +5628,7 @@ class ExportService { outputDir: string, options: ExportOptions, onProgress?: (progress: ExportProgress) => void, - control?: { - shouldPause?: () => boolean - shouldStop?: () => boolean - } + control?: ExportTaskControl ): Promise<{ success: boolean successCount: number @@ -5537,7 +5695,8 @@ class ExportService { let pauseRequested = false let stopRequested = false - const runOne = async (sessionId: string) => { + const runOne = async (sessionId: string): Promise<'done' | 'stopped'> => { + this.throwIfStopRequested(control) const sessionInfo = await this.getContactInfo(sessionId) if (emptySessionIds.has(sessionId)) { @@ -5552,13 +5711,17 @@ class ExportService { phase: 'exporting', phaseLabel: '该会话没有消息,已跳过' }) - return + return 'done' } const sessionProgress = (progress: ExportProgress) => { + const phaseTotal = Number.isFinite(progress.total) && progress.total > 0 ? progress.total : 100 + const phaseCurrent = Number.isFinite(progress.current) ? progress.current : 0 + const ratio = Math.max(0, Math.min(1, phaseCurrent / phaseTotal)) + const aggregateCurrent = Math.min(sessionIds.length, completedCount + ratio) onProgress?.({ ...progress, - current: completedCount, + current: aggregateCurrent, total: sessionIds.length, currentSession: sessionInfo.displayName, currentSessionId: sessionId @@ -5594,21 +5757,25 @@ class ExportService { let result: { success: boolean; error?: string } if (effectiveOptions.format === 'json' || effectiveOptions.format === 'arkme-json') { - result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress) + result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress, control) } else if (effectiveOptions.format === 'chatlab' || effectiveOptions.format === 'chatlab-jsonl') { - result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress) + result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress, control) } else if (effectiveOptions.format === 'excel') { - result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress) + result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress, control) } else if (effectiveOptions.format === 'txt') { - result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress) + result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress, control) } else if (effectiveOptions.format === 'weclone') { - result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress) + result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress, control) } else if (effectiveOptions.format === 'html') { - result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress) + result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress, control) } else { result = { success: false, error: `不支持的格式: ${effectiveOptions.format}` } } + if (!result.success && this.isStopError(result.error)) { + return 'stopped' + } + if (result.success) { successCount++ successSessionIds.push(sessionId) @@ -5625,6 +5792,7 @@ class ExportService { currentSession: sessionInfo.displayName, phase: 'exporting' }) + return 'done' } const workers = Array.from({ length: Math.min(sessionConcurrency, queue.length) }, async () => { @@ -5640,7 +5808,12 @@ class ExportService { const sessionId = queue.shift() if (!sessionId) break - await runOne(sessionId) + const runState = await runOne(sessionId) + if (runState === 'stopped') { + stopRequested = true + queue.unshift(sessionId) + break + } } }) await Promise.all(workers) diff --git a/electron/services/groupAnalyticsService.ts b/electron/services/groupAnalyticsService.ts index 463b80a..0688970 100644 --- a/electron/services/groupAnalyticsService.ts +++ b/electron/services/groupAnalyticsService.ts @@ -468,10 +468,11 @@ class GroupAnalyticsService { return fallback } - private buildGroupMembersPanelCacheKey(chatroomId: string): string { + private buildGroupMembersPanelCacheKey(chatroomId: string, includeMessageCounts: boolean): string { const dbPath = String(this.configService.get('dbPath') || '').trim() const wxid = this.cleanAccountDirName(String(this.configService.get('myWxid') || '').trim()) - return `${dbPath}::${wxid}::${chatroomId}` + const mode = includeMessageCounts ? 'full' : 'members' + return `${dbPath}::${wxid}::${chatroomId}::${mode}` } private pruneGroupMembersPanelCache(maxEntries: number = 80): void { @@ -495,7 +496,11 @@ class GroupAnalyticsService { if (batch.length === 0) continue const inList = batch.map((username) => `'${username.replace(/'/g, "''")}'`).join(',') - const sql = `SELECT * FROM contact WHERE username IN (${inList})` + const sql = ` + SELECT username, user_name, encrypt_username, encrypt_user_name, remark, nick_name, alias, local_type + FROM contact + WHERE username IN (${inList}) + ` const result = await wcdbService.execQuery('contact', null, sql) if (!result.success || !result.rows) continue @@ -790,7 +795,8 @@ class GroupAnalyticsService { } private async loadGroupMembersPanelDataFresh( - chatroomId: string + chatroomId: string, + includeMessageCounts: boolean ): Promise<{ success: boolean; data?: GroupMembersPanelEntry[]; error?: string }> { const membersResult = await wcdbService.getGroupMembers(chatroomId) if (!membersResult.success || !membersResult.members) { @@ -813,7 +819,9 @@ class GroupAnalyticsService { const displayNamesPromise = wcdbService.getDisplayNames(usernames) const contactLookupPromise = this.buildGroupMemberContactLookup(usernames) const ownerPromise = this.detectGroupOwnerUsername(chatroomId, members) - const messageCountLookupPromise = this.buildGroupMessageCountLookup(chatroomId) + const messageCountLookupPromise = includeMessageCounts + ? this.buildGroupMessageCountLookup(chatroomId) + : Promise.resolve(new Map()) const [displayNames, contactLookup, ownerUsername, messageCountLookup] = await Promise.all([ displayNamesPromise, @@ -879,13 +887,15 @@ class GroupAnalyticsService { async getGroupMembersPanelData( chatroomId: string, - forceRefresh: boolean = false + options?: { forceRefresh?: boolean; includeMessageCounts?: boolean } ): Promise<{ success: boolean; data?: GroupMembersPanelEntry[]; error?: string; fromCache?: boolean; updatedAt?: number }> { try { const normalizedChatroomId = String(chatroomId || '').trim() if (!normalizedChatroomId) return { success: false, error: '群聊ID不能为空' } - const cacheKey = this.buildGroupMembersPanelCacheKey(normalizedChatroomId) + const forceRefresh = Boolean(options?.forceRefresh) + const includeMessageCounts = options?.includeMessageCounts !== false + const cacheKey = this.buildGroupMembersPanelCacheKey(normalizedChatroomId, includeMessageCounts) const now = Date.now() const cached = this.groupMembersPanelCache.get(cacheKey) if (!forceRefresh && cached && now - cached.updatedAt < this.groupMembersPanelCacheTtlMs) { @@ -901,7 +911,7 @@ class GroupAnalyticsService { const conn = await this.ensureConnected() if (!conn.success) return { success: false, error: conn.error } - const fresh = await this.loadGroupMembersPanelDataFresh(normalizedChatroomId) + const fresh = await this.loadGroupMembersPanelDataFresh(normalizedChatroomId, includeMessageCounts) if (!fresh.success || !fresh.data) { return { success: false, error: fresh.error || '获取群成员面板数据失败' } } diff --git a/src/pages/ChatPage.tsx b/src/pages/ChatPage.tsx index b06d78d..e17bb34 100644 --- a/src/pages/ChatPage.tsx +++ b/src/pages/ChatPage.tsx @@ -281,6 +281,7 @@ interface SessionPreviewCachePayload { interface GroupMembersPanelCacheEntry { updatedAt: number members: GroupPanelMember[] + includeMessageCounts: boolean } // 全局头像加载队列管理器已移至 src/utils/AvatarLoadQueue.ts @@ -1033,6 +1034,88 @@ function ChatPage(_props: ChatPageProps) { } }, [applySessionDetailStats, currentSessionId, isLoadingRelationStats]) + const normalizeGroupPanelMembers = useCallback((payload: GroupPanelMember[]): GroupPanelMember[] => { + const membersPayload = Array.isArray(payload) ? payload : [] + return membersPayload + .map((member: GroupPanelMember): GroupPanelMember | null => { + const username = String(member.username || '').trim() + if (!username) return null + const preferredName = String( + member.groupNickname || + member.remark || + member.displayName || + member.nickname || + username + ) + + return { + username, + displayName: preferredName, + avatarUrl: member.avatarUrl, + nickname: member.nickname, + alias: member.alias, + remark: member.remark, + groupNickname: member.groupNickname, + isOwner: Boolean(member.isOwner), + isFriend: Boolean(member.isFriend), + messageCount: Number.isFinite(member.messageCount) ? Math.max(0, Math.floor(member.messageCount)) : 0 + } + }) + .filter((member: GroupPanelMember | null): member is GroupPanelMember => Boolean(member)) + .sort((a: GroupPanelMember, b: GroupPanelMember) => { + const ownerDiff = Number(Boolean(b.isOwner)) - Number(Boolean(a.isOwner)) + if (ownerDiff !== 0) return ownerDiff + + const friendDiff = Number(b.isFriend) - Number(a.isFriend) + if (friendDiff !== 0) return friendDiff + + if (a.messageCount !== b.messageCount) return b.messageCount - a.messageCount + return a.displayName.localeCompare(b.displayName, 'zh-Hans-CN') + }) + }, []) + + const updateGroupMembersPanelCache = useCallback(( + chatroomId: string, + members: GroupPanelMember[], + includeMessageCounts: boolean + ) => { + groupMembersPanelCacheRef.current.set(chatroomId, { + updatedAt: Date.now(), + members, + includeMessageCounts + }) + if (groupMembersPanelCacheRef.current.size > 80) { + const oldestEntry = Array.from(groupMembersPanelCacheRef.current.entries()) + .sort((a, b) => a[1].updatedAt - b[1].updatedAt)[0] + if (oldestEntry) { + groupMembersPanelCacheRef.current.delete(oldestEntry[0]) + } + } + }, []) + + const getGroupMembersPanelDataWithTimeout = useCallback(async ( + chatroomId: string, + options: { forceRefresh?: boolean; includeMessageCounts?: boolean }, + timeoutMs: number + ) => { + let timeoutTimer: number | null = null + try { + const timeoutPromise = new Promise<{ success: false; error: string }>((resolve) => { + timeoutTimer = window.setTimeout(() => { + resolve({ success: false, error: '加载群成员超时,请稍后重试' }) + }, timeoutMs) + }) + return await Promise.race([ + window.electronAPI.groupAnalytics.getGroupMembersPanelData(chatroomId, options), + timeoutPromise + ]) + } finally { + if (timeoutTimer) { + window.clearTimeout(timeoutTimer) + } + } + }, []) + const loadGroupMembersPanel = useCallback(async (chatroomId: string) => { if (!chatroomId || !isGroupChatSession(chatroomId)) return @@ -1041,14 +1124,52 @@ function ChatPage(_props: ChatPageProps) { const cached = groupMembersPanelCacheRef.current.get(chatroomId) const cacheFresh = Boolean(cached && now - cached.updatedAt < GROUP_MEMBERS_PANEL_CACHE_TTL_MS) const hasCachedMembers = Boolean(cached && cached.members.length > 0) + const hasFreshMessageCounts = Boolean(cacheFresh && cached?.includeMessageCounts) + let startedBackgroundRefresh = false + + const refreshMessageCountsInBackground = (forceRefresh: boolean) => { + startedBackgroundRefresh = true + setIsRefreshingGroupMembers(true) + void (async () => { + try { + const countsResult = await getGroupMembersPanelDataWithTimeout( + chatroomId, + { forceRefresh, includeMessageCounts: true }, + 25000 + ) + if (requestSeq !== groupMembersRequestSeqRef.current) return + if (!countsResult.success || !Array.isArray(countsResult.data)) { + setGroupMembersError('成员列表已加载,发言统计稍后再试') + return + } + + const membersWithCounts = normalizeGroupPanelMembers(countsResult.data as GroupPanelMember[]) + setGroupPanelMembers(membersWithCounts) + setGroupMembersError(null) + updateGroupMembersPanelCache(chatroomId, membersWithCounts, true) + hasInitializedGroupMembersRef.current = true + } catch { + if (requestSeq !== groupMembersRequestSeqRef.current) return + setGroupMembersError('成员列表已加载,发言统计稍后再试') + } finally { + if (requestSeq === groupMembersRequestSeqRef.current) { + setIsRefreshingGroupMembers(false) + } + } + })() + } if (cacheFresh && cached) { setGroupPanelMembers(cached.members) setGroupMembersError(null) setGroupMembersLoadingHint('') - setIsRefreshingGroupMembers(false) setIsLoadingGroupMembers(false) hasInitializedGroupMembersRef.current = true + if (!hasFreshMessageCounts) { + refreshMessageCountsInBackground(false) + } else { + setIsRefreshingGroupMembers(false) + } return } @@ -1070,7 +1191,11 @@ function ChatPage(_props: ChatPageProps) { } try { - const membersResult = await window.electronAPI.groupAnalytics.getGroupMembersPanelData(chatroomId) + const membersResult = await getGroupMembersPanelDataWithTimeout( + chatroomId, + { includeMessageCounts: false, forceRefresh: false }, + 12000 + ) if (requestSeq !== groupMembersRequestSeqRef.current) return if (!membersResult.success || !Array.isArray(membersResult.data)) { @@ -1081,58 +1206,12 @@ function ChatPage(_props: ChatPageProps) { return } - const membersPayload = membersResult.data as GroupPanelMember[] - const members: GroupPanelMember[] = membersPayload - .map((member: GroupPanelMember): GroupPanelMember | null => { - const username = String(member.username || '').trim() - if (!username) return null - const preferredName = String( - member.groupNickname || - member.remark || - member.displayName || - member.nickname || - username - ) - - return { - username, - displayName: preferredName, - avatarUrl: member.avatarUrl, - nickname: member.nickname, - alias: member.alias, - remark: member.remark, - groupNickname: member.groupNickname, - isOwner: Boolean(member.isOwner), - isFriend: Boolean(member.isFriend), - messageCount: Number.isFinite(member.messageCount) ? Math.max(0, Math.floor(member.messageCount)) : 0 - } - }) - .filter((member: GroupPanelMember | null): member is GroupPanelMember => Boolean(member)) - .sort((a: GroupPanelMember, b: GroupPanelMember) => { - const ownerDiff = Number(Boolean(b.isOwner)) - Number(Boolean(a.isOwner)) - if (ownerDiff !== 0) return ownerDiff - - const friendDiff = Number(b.isFriend) - Number(a.isFriend) - if (friendDiff !== 0) return friendDiff - - if (a.messageCount !== b.messageCount) return b.messageCount - a.messageCount - return a.displayName.localeCompare(b.displayName, 'zh-Hans-CN') - }) - + const members = normalizeGroupPanelMembers(membersResult.data as GroupPanelMember[]) setGroupPanelMembers(members) setGroupMembersError(null) - groupMembersPanelCacheRef.current.set(chatroomId, { - updatedAt: Date.now(), - members - }) - if (groupMembersPanelCacheRef.current.size > 80) { - const oldestEntry = Array.from(groupMembersPanelCacheRef.current.entries()) - .sort((a, b) => a[1].updatedAt - b[1].updatedAt)[0] - if (oldestEntry) { - groupMembersPanelCacheRef.current.delete(oldestEntry[0]) - } - } + updateGroupMembersPanelCache(chatroomId, members, false) hasInitializedGroupMembersRef.current = true + refreshMessageCountsInBackground(false) } catch (e) { if (requestSeq !== groupMembersRequestSeqRef.current) return if (!hasCachedMembers) { @@ -1142,11 +1221,18 @@ function ChatPage(_props: ChatPageProps) { } finally { if (requestSeq === groupMembersRequestSeqRef.current) { setIsLoadingGroupMembers(false) - setIsRefreshingGroupMembers(false) setGroupMembersLoadingHint('') + if (!startedBackgroundRefresh) { + setIsRefreshingGroupMembers(false) + } } } - }, [isGroupChatSession]) + }, [ + getGroupMembersPanelDataWithTimeout, + isGroupChatSession, + normalizeGroupPanelMembers, + updateGroupMembersPanelCache + ]) const toggleGroupMembersPanel = useCallback(() => { if (!currentSessionId || !isGroupChatSession(currentSessionId)) return @@ -3367,7 +3453,7 @@ function ChatPage(_props: ChatPageProps) { {isRefreshingGroupMembers && (
- 正在更新群成员数据... + 正在统计成员发言数...
)} {groupMembersError && groupPanelMembers.length > 0 && ( diff --git a/src/pages/ExportPage.tsx b/src/pages/ExportPage.tsx index 248a63d..2b5812e 100644 --- a/src/pages/ExportPage.tsx +++ b/src/pages/ExportPage.tsx @@ -3250,6 +3250,13 @@ function ExportPage() { const topSessions = isPerfExpanded ? getTaskPerformanceTopSessions(task.performance, nowTick, 5) : [] + const normalizedProgressTotal = task.progress.total > 0 ? task.progress.total : 0 + const normalizedProgressCurrent = normalizedProgressTotal > 0 + ? Math.max(0, Math.min(normalizedProgressTotal, task.progress.current)) + : 0 + const currentSessionRatio = task.progress.phaseTotal > 0 + ? Math.max(0, Math.min(1, task.progress.phaseProgress / task.progress.phaseTotal)) + : null return (
@@ -3263,13 +3270,16 @@ function ExportPage() {
0 ? (task.progress.current / task.progress.total) * 100 : 0}%` }} + style={{ width: `${normalizedProgressTotal > 0 ? (normalizedProgressCurrent / normalizedProgressTotal) * 100 : 0}%` }} />
- {task.progress.total > 0 - ? `${task.progress.current} / ${task.progress.total}` + {normalizedProgressTotal > 0 + ? `${Math.floor(normalizedProgressCurrent)} / ${normalizedProgressTotal}` : '处理中'} + {task.status === 'running' && currentSessionRatio !== null + ? `(当前会话 ${Math.round(currentSessionRatio * 100)}%)` + : ''} {task.progress.phaseLabel ? ` · ${task.progress.phaseLabel}` : ''}
diff --git a/src/types/electron.d.ts b/src/types/electron.d.ts index 128bf40..e49273e 100644 --- a/src/types/electron.d.ts +++ b/src/types/electron.d.ts @@ -351,7 +351,10 @@ export interface ElectronAPI { }> error?: string }> - getGroupMembersPanelData: (chatroomId: string, forceRefresh?: boolean) => Promise<{ + getGroupMembersPanelData: ( + chatroomId: string, + options?: { forceRefresh?: boolean; includeMessageCounts?: boolean } + ) => Promise<{ success: boolean data?: Array<{ username: string