fix(export): improve progress visibility and hard-stop control

This commit is contained in:
tisonhuang
2026-03-02 18:37:52 +08:00
parent ac481c6b18
commit ce683a539d
7 changed files with 411 additions and 102 deletions

View File

@@ -119,6 +119,11 @@ export interface ExportProgress {
phaseLabel?: string
}
interface ExportTaskControl {
shouldPause?: () => boolean
shouldStop?: () => boolean
}
// 并发控制:限制同时执行的 Promise 数量
async function parallelLimit<T, R>(
items: T[],
@@ -149,6 +154,7 @@ class ExportService {
private contactCache: LRUCache<string, { displayName: string; avatarUrl?: string }>
private inlineEmojiCache: LRUCache<string, string>
private htmlStyleCache: string | null = null
private readonly STOP_ERROR_CODE = 'WEFLOW_EXPORT_STOP_REQUESTED'
constructor() {
this.configService = new ConfigService()
@@ -157,6 +163,30 @@ class ExportService {
this.inlineEmojiCache = new LRUCache(100) // 最多缓存100个表情
}
private createStopError(): Error {
const error = new Error('导出任务已停止')
;(error as Error & { code?: string }).code = this.STOP_ERROR_CODE
return error
}
private isStopError(error: unknown): boolean {
if (!error) return false
if (typeof error === 'string') {
return error.includes(this.STOP_ERROR_CODE) || error.includes('导出任务已停止')
}
if (error instanceof Error) {
const code = (error as Error & { code?: string }).code
return code === this.STOP_ERROR_CODE || error.message.includes(this.STOP_ERROR_CODE) || error.message.includes('导出任务已停止')
}
return false
}
private throwIfStopRequested(control?: ExportTaskControl): void {
if (control?.shouldStop?.()) {
throw this.createStopError()
}
}
private getClampedConcurrency(value: number | undefined, fallback = 2, max = 6): number {
if (typeof value !== 'number' || !Number.isFinite(value)) return fallback
const raw = Math.floor(value)
@@ -1994,7 +2024,9 @@ class ExportService {
dateRange?: { start: number; end: number } | null,
senderUsernameFilter?: string,
collectMode: MessageCollectMode = 'full',
targetMediaTypes?: Set<number>
targetMediaTypes?: Set<number>,
control?: ExportTaskControl,
onCollectProgress?: (payload: { fetched: number }) => void
): Promise<{ rows: any[]; memberSet: Map<string, { member: ChatLabMember; avatarUrl?: string }>; firstTime: number | null; lastTime: number | null }> {
const rows: any[] = []
const memberSet = new Map<string, { member: ChatLabMember; avatarUrl?: string }>()
@@ -2010,6 +2042,7 @@ class ExportService {
const endTime = dateRange?.end && dateRange.end > 0 ? dateRange.end : 0
const batchSize = (collectMode === 'text-fast' || collectMode === 'media-fast') ? 2000 : 500
this.throwIfStopRequested(control)
const cursor = collectMode === 'media-fast'
? await wcdbService.openMessageCursorLite(
sessionId,
@@ -2034,6 +2067,7 @@ class ExportService {
let hasMore = true
let batchCount = 0
while (hasMore) {
this.throwIfStopRequested(control)
const batch = await wcdbService.fetchMessageBatch(cursor.cursor)
batchCount++
@@ -2044,7 +2078,11 @@ class ExportService {
if (!batch.rows) break
let rowIndex = 0
for (const row of batch.rows) {
if ((rowIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
const createTime = parseInt(row.create_time || '0', 10)
if (dateRange) {
if (createTime < dateRange.start || createTime > dateRange.end) continue
@@ -2149,10 +2187,12 @@ class ExportService {
if (firstTime === null || createTime < firstTime) firstTime = createTime
if (lastTime === null || createTime > lastTime) lastTime = createTime
}
onCollectProgress?.({ fetched: rows.length })
hasMore = batch.hasMore === true
}
} catch (err) {
if (this.isStopError(err)) throw err
console.error(`[Export] 收集消息异常:`, err)
} finally {
try {
@@ -2162,10 +2202,12 @@ class ExportService {
}
}
this.throwIfStopRequested(control)
if (collectMode === 'media-fast' && mediaTypeFilter && rows.length > 0) {
await this.backfillMediaFieldsFromMessageDetail(sessionId, rows, mediaTypeFilter)
await this.backfillMediaFieldsFromMessageDetail(sessionId, rows, mediaTypeFilter, control)
}
this.throwIfStopRequested(control)
if (senderSet.size > 0) {
const usernames = Array.from(senderSet)
const [nameResult, avatarResult] = await Promise.all([
@@ -2196,7 +2238,8 @@ class ExportService {
private async backfillMediaFieldsFromMessageDetail(
sessionId: string,
rows: any[],
targetMediaTypes: Set<number>
targetMediaTypes: Set<number>,
control?: ExportTaskControl
): Promise<void> {
const needsBackfill = rows.filter((msg) => {
if (!targetMediaTypes.has(msg.localType)) return false
@@ -2209,6 +2252,7 @@ class ExportService {
const DETAIL_CONCURRENCY = 6
await parallelLimit(needsBackfill, DETAIL_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const localId = Number(msg.localId || 0)
if (!Number.isFinite(localId) || localId <= 0) return
@@ -2788,9 +2832,11 @@ class ExportService {
sessionId: string,
outputPath: string,
options: ExportOptions,
onProgress?: (progress: ExportProgress) => void
onProgress?: (progress: ExportProgress) => void,
control?: ExportTaskControl
): Promise<{ success: boolean; error?: string }> {
try {
this.throwIfStopRequested(control)
const conn = await this.ensureConnected()
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
@@ -2813,7 +2859,8 @@ class ExportService {
options.dateRange,
options.senderUsername,
collectParams.mode,
collectParams.targetMediaTypes
collectParams.targetMediaTypes,
control
)
const allMessages = collected.rows
@@ -2831,6 +2878,7 @@ class ExportService {
}
if (isGroup) {
this.throwIfStopRequested(control)
await this.mergeGroupMembers(sessionId, collected.memberSet, options.exportAvatars === true)
}
@@ -2889,6 +2937,7 @@ class ExportService {
const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency)
let mediaExported = 0
await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => {
this.throwIfStopRequested(control)
const mediaKey = `${msg.localType}_${msg.localId}`
if (!mediaCache.has(mediaKey)) {
const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, {
@@ -2933,6 +2982,7 @@ class ExportService {
const VOICE_CONCURRENCY = 4
let voiceTranscribed = 0
await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername)
voiceTranscriptMap.set(msg.localId, transcript)
voiceTranscribed++
@@ -2957,7 +3007,11 @@ class ExportService {
})
const chatLabMessages: ChatLabMessage[] = []
let messageIndex = 0
for (const msg of allMessages) {
if ((messageIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
const memberInfo = collected.memberSet.get(msg.senderUsername)?.member || {
platformId: msg.senderUsername,
accountName: msg.senderUsername,
@@ -3150,13 +3204,17 @@ class ExportService {
meta: chatLabExport.meta
}))
for (const member of chatLabExport.members) {
this.throwIfStopRequested(control)
lines.push(JSON.stringify({ _type: 'member', ...member }))
}
for (const message of chatLabExport.messages) {
this.throwIfStopRequested(control)
lines.push(JSON.stringify({ _type: 'message', ...message }))
}
this.throwIfStopRequested(control)
fs.writeFileSync(outputPath, lines.join('\n'), 'utf-8')
} else {
this.throwIfStopRequested(control)
fs.writeFileSync(outputPath, JSON.stringify(chatLabExport, null, 2), 'utf-8')
}
@@ -3169,6 +3227,9 @@ class ExportService {
return { success: true }
} catch (e) {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
return { success: false, error: String(e) }
}
}
@@ -3180,9 +3241,11 @@ class ExportService {
sessionId: string,
outputPath: string,
options: ExportOptions,
onProgress?: (progress: ExportProgress) => void
onProgress?: (progress: ExportProgress) => void,
control?: ExportTaskControl
): Promise<{ success: boolean; error?: string }> {
try {
this.throwIfStopRequested(control)
const conn = await this.ensureConnected()
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
@@ -3210,13 +3273,27 @@ class ExportService {
})
const collectParams = this.resolveCollectParams(options)
let collectProgressLastReportAt = 0
const collected = await this.collectMessages(
sessionId,
cleanedMyWxid,
options.dateRange,
options.senderUsername,
collectParams.mode,
collectParams.targetMediaTypes
collectParams.targetMediaTypes,
control,
({ fetched }) => {
const now = Date.now()
if (now - collectProgressLastReportAt < 350) return
collectProgressLastReportAt = now
onProgress?.({
current: 5,
total: 100,
currentSession: sessionInfo.displayName,
phase: 'preparing',
phaseLabel: `收集消息 ${fetched.toLocaleString()}`
})
}
)
// 如果没有消息,不创建文件
@@ -3233,7 +3310,11 @@ class ExportService {
}
const senderUsernames = new Set<string>()
let senderScanIndex = 0
for (const msg of collected.rows) {
if ((senderScanIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
if (msg.senderUsername) senderUsernames.add(msg.senderUsername)
}
senderUsernames.add(sessionId)
@@ -3272,6 +3353,7 @@ class ExportService {
const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency)
let mediaExported = 0
await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => {
this.throwIfStopRequested(control)
const mediaKey = `${msg.localType}_${msg.localId}`
if (!mediaCache.has(mediaKey)) {
const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, {
@@ -3315,6 +3397,7 @@ class ExportService {
const VOICE_CONCURRENCY = 4
let voiceTranscribed = 0
await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername)
voiceTranscriptMap.set(msg.localId, transcript)
voiceTranscribed++
@@ -3360,7 +3443,11 @@ class ExportService {
const transferCandidates: Array<{ xml: string; messageRef: any }> = []
let needSort = false
let lastCreateTime = Number.NEGATIVE_INFINITY
let messageIndex = 0
for (const msg of collected.rows) {
if ((messageIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
const senderInfo = senderInfoMap.get(msg.senderUsername) || { displayName: msg.senderUsername || '' }
const sourceMatch = /<msgsource>[\s\S]*?<\/msgsource>/i.exec(msg.content || '')
const source = sourceMatch ? sourceMatch[0] : ''
@@ -3470,6 +3557,7 @@ class ExportService {
const transferConcurrency = this.getClampedConcurrency(options.exportConcurrency, 4, 8)
await parallelLimit(transferCandidates, transferConcurrency, async (item) => {
this.throwIfStopRequested(control)
const transferDesc = await this.resolveTransferDesc(
item.xml,
cleanedMyWxid,
@@ -3516,6 +3604,7 @@ class ExportService {
const weflow = this.getWeflowHeader()
if (options.format === 'arkme-json' && isGroup) {
this.throwIfStopRequested(control)
await this.mergeGroupMembers(sessionId, collected.memberSet, options.exportAvatars === true)
}
@@ -3587,6 +3676,7 @@ class ExportService {
}
const compactMessages = allMessages.map((message) => {
this.throwIfStopRequested(control)
const senderID = ensureSenderId(String(message.senderUsername || ''))
const compactMessage: any = {
localId: message.localId,
@@ -3646,6 +3736,7 @@ class ExportService {
groupMembers = []
for (const memberWxid of memberUsernames) {
this.throwIfStopRequested(control)
const member = collected.memberSet.get(memberWxid)?.member
const contactResult = await getContactCached(memberWxid)
const contact = contactResult.success ? contactResult.contact : null
@@ -3712,6 +3803,7 @@ class ExportService {
arkmeExport.groupMembers = groupMembers
}
this.throwIfStopRequested(control)
fs.writeFileSync(outputPath, JSON.stringify(arkmeExport, null, 2), 'utf-8')
} else {
const detailedExport: any = {
@@ -3734,6 +3826,7 @@ class ExportService {
}
}
this.throwIfStopRequested(control)
fs.writeFileSync(outputPath, JSON.stringify(detailedExport, null, 2), 'utf-8')
}
@@ -3746,6 +3839,9 @@ class ExportService {
return { success: true }
} catch (e) {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
return { success: false, error: String(e) }
}
}
@@ -3757,9 +3853,11 @@ class ExportService {
sessionId: string,
outputPath: string,
options: ExportOptions,
onProgress?: (progress: ExportProgress) => void
onProgress?: (progress: ExportProgress) => void,
control?: ExportTaskControl
): Promise<{ success: boolean; error?: string }> {
try {
this.throwIfStopRequested(control)
const conn = await this.ensureConnected()
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
@@ -3798,7 +3896,8 @@ class ExportService {
options.dateRange,
options.senderUsername,
collectParams.mode,
collectParams.targetMediaTypes
collectParams.targetMediaTypes,
control
)
// 如果没有消息,不创建文件
@@ -3815,7 +3914,11 @@ class ExportService {
}
const senderUsernames = new Set<string>()
let senderScanIndex = 0
for (const msg of collected.rows) {
if ((senderScanIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
if (msg.senderUsername) senderUsernames.add(msg.senderUsername)
}
senderUsernames.add(sessionId)
@@ -3976,6 +4079,7 @@ class ExportService {
const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency)
let mediaExported = 0
await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => {
this.throwIfStopRequested(control)
const mediaKey = `${msg.localType}_${msg.localId}`
if (!mediaCache.has(mediaKey)) {
const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, {
@@ -4019,6 +4123,7 @@ class ExportService {
const VOICE_CONCURRENCY = 4
let voiceTranscribed = 0
await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername)
voiceTranscriptMap.set(msg.localId, transcript)
voiceTranscribed++
@@ -4043,6 +4148,9 @@ class ExportService {
// ========== 写入 Excel 行 ==========
for (let i = 0; i < sortedMessages.length; i++) {
if ((i & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
const msg = sortedMessages[i]
// 确定发送者信息
@@ -4194,6 +4302,7 @@ class ExportService {
})
// 写入文件
this.throwIfStopRequested(control)
await workbook.xlsx.writeFile(outputPath)
onProgress?.({
@@ -4205,6 +4314,9 @@ class ExportService {
return { success: true }
} catch (e) {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
// 处理文件被占用的错误
if (e instanceof Error) {
if (e.message.includes('EBUSY') || e.message.includes('resource busy') || e.message.includes('locked')) {
@@ -4258,9 +4370,11 @@ class ExportService {
sessionId: string,
outputPath: string,
options: ExportOptions,
onProgress?: (progress: ExportProgress) => void
onProgress?: (progress: ExportProgress) => void,
control?: ExportTaskControl
): Promise<{ success: boolean; error?: string }> {
try {
this.throwIfStopRequested(control)
const conn = await this.ensureConnected()
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
@@ -4293,7 +4407,8 @@ class ExportService {
options.dateRange,
options.senderUsername,
collectParams.mode,
collectParams.targetMediaTypes
collectParams.targetMediaTypes,
control
)
// 如果没有消息,不创建文件
@@ -4310,7 +4425,11 @@ class ExportService {
}
const senderUsernames = new Set<string>()
let senderScanIndex = 0
for (const msg of collected.rows) {
if ((senderScanIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
if (msg.senderUsername) senderUsernames.add(msg.senderUsername)
}
senderUsernames.add(sessionId)
@@ -4357,6 +4476,7 @@ class ExportService {
const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency)
let mediaExported = 0
await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => {
this.throwIfStopRequested(control)
const mediaKey = `${msg.localType}_${msg.localId}`
if (!mediaCache.has(mediaKey)) {
const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, {
@@ -4399,6 +4519,7 @@ class ExportService {
const VOICE_CONCURRENCY = 4
let voiceTranscribed = 0
await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername)
voiceTranscriptMap.set(msg.localId, transcript)
voiceTranscribed++
@@ -4424,6 +4545,9 @@ class ExportService {
const lines: string[] = []
for (let i = 0; i < sortedMessages.length; i++) {
if ((i & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
const msg = sortedMessages[i]
const mediaKey = `${msg.localType}_${msg.localId}`
const mediaItem = mediaCache.get(mediaKey)
@@ -4524,6 +4648,7 @@ class ExportService {
phase: 'writing'
})
this.throwIfStopRequested(control)
fs.writeFileSync(outputPath, lines.join('\n'), 'utf-8')
onProgress?.({
@@ -4535,6 +4660,9 @@ class ExportService {
return { success: true }
} catch (e) {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
return { success: false, error: String(e) }
}
}
@@ -4546,9 +4674,11 @@ class ExportService {
sessionId: string,
outputPath: string,
options: ExportOptions,
onProgress?: (progress: ExportProgress) => void
onProgress?: (progress: ExportProgress) => void,
control?: ExportTaskControl
): Promise<{ success: boolean; error?: string }> {
try {
this.throwIfStopRequested(control)
const conn = await this.ensureConnected()
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
@@ -4581,14 +4711,19 @@ class ExportService {
options.dateRange,
options.senderUsername,
collectParams.mode,
collectParams.targetMediaTypes
collectParams.targetMediaTypes,
control
)
if (collected.rows.length === 0) {
return { success: false, error: '该会话在指定时间范围内没有消息' }
}
const senderUsernames = new Set<string>()
let senderScanIndex = 0
for (const msg of collected.rows) {
if ((senderScanIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
if (msg.senderUsername) senderUsernames.add(msg.senderUsername)
}
senderUsernames.add(sessionId)
@@ -4642,6 +4777,7 @@ class ExportService {
const mediaConcurrency = this.getClampedConcurrency(options.exportConcurrency)
let mediaExported = 0
await parallelLimit(mediaMessages, mediaConcurrency, async (msg) => {
this.throwIfStopRequested(control)
const mediaKey = `${msg.localType}_${msg.localId}`
if (!mediaCache.has(mediaKey)) {
const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, {
@@ -4684,6 +4820,7 @@ class ExportService {
const VOICE_CONCURRENCY = 4
let voiceTranscribed = 0
await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername)
voiceTranscriptMap.set(msg.localId, transcript)
voiceTranscribed++
@@ -4710,6 +4847,9 @@ class ExportService {
lines.push('id,MsgSvrID,type_name,is_sender,talker,msg,src,CreateTime')
for (let i = 0; i < sortedMessages.length; i++) {
if ((i & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
const msg = sortedMessages[i]
const mediaKey = `${msg.localType}_${msg.localId}`
const mediaItem = mediaCache.get(mediaKey) || null
@@ -4787,6 +4927,7 @@ class ExportService {
phase: 'writing'
})
this.throwIfStopRequested(control)
fs.writeFileSync(outputPath, `\uFEFF${lines.join('\r\n')}`, 'utf-8')
onProgress?.({
@@ -4798,6 +4939,9 @@ class ExportService {
return { success: true }
} catch (e) {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
return { success: false, error: String(e) }
}
}
@@ -4893,9 +5037,11 @@ class ExportService {
sessionId: string,
outputPath: string,
options: ExportOptions,
onProgress?: (progress: ExportProgress) => void
onProgress?: (progress: ExportProgress) => void,
control?: ExportTaskControl
): Promise<{ success: boolean; error?: string }> {
try {
this.throwIfStopRequested(control)
const conn = await this.ensureConnected()
if (!conn.success || !conn.cleanedWxid) return { success: false, error: conn.error }
@@ -4931,7 +5077,8 @@ class ExportService {
options.dateRange,
options.senderUsername,
collectParams.mode,
collectParams.targetMediaTypes
collectParams.targetMediaTypes,
control
)
// 如果没有消息,不创建文件
@@ -4940,7 +5087,11 @@ class ExportService {
}
const senderUsernames = new Set<string>()
let senderScanIndex = 0
for (const msg of collected.rows) {
if ((senderScanIndex++ & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
if (msg.senderUsername) senderUsernames.add(msg.senderUsername)
}
senderUsernames.add(sessionId)
@@ -4958,6 +5109,7 @@ class ExportService {
: new Map<string, string>()
if (isGroup) {
this.throwIfStopRequested(control)
await this.mergeGroupMembers(sessionId, collected.memberSet, options.exportAvatars === true)
}
const sortedMessages = collected.rows.sort((a, b) => a.createTime - b.createTime)
@@ -4989,6 +5141,7 @@ class ExportService {
const MEDIA_CONCURRENCY = 6
let mediaExported = 0
await parallelLimit(mediaMessages, MEDIA_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const mediaKey = `${msg.localType}_${msg.localId}`
if (!mediaCache.has(mediaKey)) {
const mediaItem = await this.exportMediaForMessage(msg, sessionId, mediaRootDir, mediaRelativePrefix, {
@@ -5036,6 +5189,7 @@ class ExportService {
const VOICE_CONCURRENCY = 4
let voiceTranscribed = 0
await parallelLimit(voiceMessages, VOICE_CONCURRENCY, async (msg) => {
this.throwIfStopRequested(control)
const transcript = await this.transcribeVoice(sessionId, String(msg.localId), msg.createTime, msg.senderUsername)
voiceTranscriptMap.set(msg.localId, transcript)
voiceTranscribed++
@@ -5079,6 +5233,7 @@ class ExportService {
const writePromise = (str: string) => {
return new Promise<void>((resolve, reject) => {
this.throwIfStopRequested(control)
if (!stream.write(str)) {
stream.once('drain', resolve)
} else {
@@ -5145,6 +5300,9 @@ class ExportService {
let writeBuf: string[] = []
for (let i = 0; i < sortedMessages.length; i++) {
if ((i & 0x7f) === 0) {
this.throwIfStopRequested(control)
}
const msg = sortedMessages[i]
const mediaKey = `${msg.localType}_${msg.localId}`
const mediaItem = mediaCache.get(mediaKey) || null
@@ -5378,6 +5536,9 @@ class ExportService {
})
} catch (e) {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
return { success: false, error: String(e) }
}
}
@@ -5467,10 +5628,7 @@ class ExportService {
outputDir: string,
options: ExportOptions,
onProgress?: (progress: ExportProgress) => void,
control?: {
shouldPause?: () => boolean
shouldStop?: () => boolean
}
control?: ExportTaskControl
): Promise<{
success: boolean
successCount: number
@@ -5537,7 +5695,8 @@ class ExportService {
let pauseRequested = false
let stopRequested = false
const runOne = async (sessionId: string) => {
const runOne = async (sessionId: string): Promise<'done' | 'stopped'> => {
this.throwIfStopRequested(control)
const sessionInfo = await this.getContactInfo(sessionId)
if (emptySessionIds.has(sessionId)) {
@@ -5552,13 +5711,17 @@ class ExportService {
phase: 'exporting',
phaseLabel: '该会话没有消息,已跳过'
})
return
return 'done'
}
const sessionProgress = (progress: ExportProgress) => {
const phaseTotal = Number.isFinite(progress.total) && progress.total > 0 ? progress.total : 100
const phaseCurrent = Number.isFinite(progress.current) ? progress.current : 0
const ratio = Math.max(0, Math.min(1, phaseCurrent / phaseTotal))
const aggregateCurrent = Math.min(sessionIds.length, completedCount + ratio)
onProgress?.({
...progress,
current: completedCount,
current: aggregateCurrent,
total: sessionIds.length,
currentSession: sessionInfo.displayName,
currentSessionId: sessionId
@@ -5594,21 +5757,25 @@ class ExportService {
let result: { success: boolean; error?: string }
if (effectiveOptions.format === 'json' || effectiveOptions.format === 'arkme-json') {
result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress)
result = await this.exportSessionToDetailedJson(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'chatlab' || effectiveOptions.format === 'chatlab-jsonl') {
result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress)
result = await this.exportSessionToChatLab(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'excel') {
result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress)
result = await this.exportSessionToExcel(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'txt') {
result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress)
result = await this.exportSessionToTxt(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'weclone') {
result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress)
result = await this.exportSessionToWeCloneCsv(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else if (effectiveOptions.format === 'html') {
result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress)
result = await this.exportSessionToHtml(sessionId, outputPath, effectiveOptions, sessionProgress, control)
} else {
result = { success: false, error: `不支持的格式: ${effectiveOptions.format}` }
}
if (!result.success && this.isStopError(result.error)) {
return 'stopped'
}
if (result.success) {
successCount++
successSessionIds.push(sessionId)
@@ -5625,6 +5792,7 @@ class ExportService {
currentSession: sessionInfo.displayName,
phase: 'exporting'
})
return 'done'
}
const workers = Array.from({ length: Math.min(sessionConcurrency, queue.length) }, async () => {
@@ -5640,7 +5808,12 @@ class ExportService {
const sessionId = queue.shift()
if (!sessionId) break
await runOne(sessionId)
const runState = await runOne(sessionId)
if (runState === 'stopped') {
stopRequested = true
queue.unshift(sessionId)
break
}
}
})
await Promise.all(workers)

View File

@@ -468,10 +468,11 @@ class GroupAnalyticsService {
return fallback
}
private buildGroupMembersPanelCacheKey(chatroomId: string): string {
private buildGroupMembersPanelCacheKey(chatroomId: string, includeMessageCounts: boolean): string {
const dbPath = String(this.configService.get('dbPath') || '').trim()
const wxid = this.cleanAccountDirName(String(this.configService.get('myWxid') || '').trim())
return `${dbPath}::${wxid}::${chatroomId}`
const mode = includeMessageCounts ? 'full' : 'members'
return `${dbPath}::${wxid}::${chatroomId}::${mode}`
}
private pruneGroupMembersPanelCache(maxEntries: number = 80): void {
@@ -495,7 +496,11 @@ class GroupAnalyticsService {
if (batch.length === 0) continue
const inList = batch.map((username) => `'${username.replace(/'/g, "''")}'`).join(',')
const sql = `SELECT * FROM contact WHERE username IN (${inList})`
const sql = `
SELECT username, user_name, encrypt_username, encrypt_user_name, remark, nick_name, alias, local_type
FROM contact
WHERE username IN (${inList})
`
const result = await wcdbService.execQuery('contact', null, sql)
if (!result.success || !result.rows) continue
@@ -790,7 +795,8 @@ class GroupAnalyticsService {
}
private async loadGroupMembersPanelDataFresh(
chatroomId: string
chatroomId: string,
includeMessageCounts: boolean
): Promise<{ success: boolean; data?: GroupMembersPanelEntry[]; error?: string }> {
const membersResult = await wcdbService.getGroupMembers(chatroomId)
if (!membersResult.success || !membersResult.members) {
@@ -813,7 +819,9 @@ class GroupAnalyticsService {
const displayNamesPromise = wcdbService.getDisplayNames(usernames)
const contactLookupPromise = this.buildGroupMemberContactLookup(usernames)
const ownerPromise = this.detectGroupOwnerUsername(chatroomId, members)
const messageCountLookupPromise = this.buildGroupMessageCountLookup(chatroomId)
const messageCountLookupPromise = includeMessageCounts
? this.buildGroupMessageCountLookup(chatroomId)
: Promise.resolve(new Map<string, number>())
const [displayNames, contactLookup, ownerUsername, messageCountLookup] = await Promise.all([
displayNamesPromise,
@@ -879,13 +887,15 @@ class GroupAnalyticsService {
async getGroupMembersPanelData(
chatroomId: string,
forceRefresh: boolean = false
options?: { forceRefresh?: boolean; includeMessageCounts?: boolean }
): Promise<{ success: boolean; data?: GroupMembersPanelEntry[]; error?: string; fromCache?: boolean; updatedAt?: number }> {
try {
const normalizedChatroomId = String(chatroomId || '').trim()
if (!normalizedChatroomId) return { success: false, error: '群聊ID不能为空' }
const cacheKey = this.buildGroupMembersPanelCacheKey(normalizedChatroomId)
const forceRefresh = Boolean(options?.forceRefresh)
const includeMessageCounts = options?.includeMessageCounts !== false
const cacheKey = this.buildGroupMembersPanelCacheKey(normalizedChatroomId, includeMessageCounts)
const now = Date.now()
const cached = this.groupMembersPanelCache.get(cacheKey)
if (!forceRefresh && cached && now - cached.updatedAt < this.groupMembersPanelCacheTtlMs) {
@@ -901,7 +911,7 @@ class GroupAnalyticsService {
const conn = await this.ensureConnected()
if (!conn.success) return { success: false, error: conn.error }
const fresh = await this.loadGroupMembersPanelDataFresh(normalizedChatroomId)
const fresh = await this.loadGroupMembersPanelDataFresh(normalizedChatroomId, includeMessageCounts)
if (!fresh.success || !fresh.data) {
return { success: false, error: fresh.error || '获取群成员面板数据失败' }
}