feat(export): 添加导出暂停取消控制

This commit is contained in:
姜北尘
2026-04-25 23:23:42 +08:00
parent 32aab8d490
commit 1b75986987
8 changed files with 800 additions and 90 deletions

View File

@@ -200,6 +200,8 @@ interface MediaSourceResolution {
interface ExportTaskControl {
shouldPause?: () => boolean
shouldStop?: () => boolean
recordCreatedFile?: (filePath: string) => void
recordCreatedDir?: (dirPath: string) => void
}
interface ExportStatsResult {
@@ -279,6 +281,7 @@ class ExportService {
private readonly exportAggregatedSessionStatsCacheTtlMs = 60 * 1000
private readonly exportStatsCacheMaxEntries = 16
private readonly STOP_ERROR_CODE = 'WEFLOW_EXPORT_STOP_REQUESTED'
private readonly PAUSE_ERROR_CODE = 'WEFLOW_EXPORT_PAUSE_REQUESTED'
private mediaFileCachePopulatePending = new Map<string, Promise<string | null>>()
private mediaFileCacheReadyDirs = new Set<string>()
private mediaExportTelemetry: MediaExportTelemetry | null = null
@@ -311,6 +314,12 @@ class ExportService {
return error
}
private createPauseError(): Error {
const error = new Error('导出任务已暂停')
;(error as Error & { code?: string }).code = this.PAUSE_ERROR_CODE
return error
}
setRuntimeConfig(config: { dbPath?: string; decryptKey?: string; myWxid?: string } | null): void {
this.runtimeConfig = config
}
@@ -453,10 +462,42 @@ class ExportService {
return false
}
private isPauseError(error: unknown): boolean {
if (!error) return false
if (typeof error === 'string') {
return error.includes(this.PAUSE_ERROR_CODE) || error.includes('导出任务已暂停')
}
if (error instanceof Error) {
const code = (error as Error & { code?: string }).code
return code === this.PAUSE_ERROR_CODE || error.message.includes(this.PAUSE_ERROR_CODE) || error.message.includes('导出任务已暂停')
}
return false
}
private throwIfStopRequested(control?: ExportTaskControl): void {
if (control?.shouldStop?.()) {
throw this.createStopError()
}
if (control?.shouldPause?.()) {
throw this.createPauseError()
}
}
private async ensureExportDir(dirPath: string, control?: ExportTaskControl, dirCache?: Set<string>): Promise<void> {
if (dirCache?.has(dirPath)) return
const existed = await this.pathExists(dirPath)
await fs.promises.mkdir(dirPath, { recursive: true })
dirCache?.add(dirPath)
if (!existed) {
control?.recordCreatedDir?.(dirPath)
}
}
private async recordCreatedFileBeforeWrite(filePath: string, control?: ExportTaskControl): Promise<void> {
if (!control?.recordCreatedFile) return
if (!await this.pathExists(filePath)) {
control.recordCreatedFile(filePath)
}
}
private getClampedConcurrency(value: number | undefined, fallback = 2, max = 6): number {
@@ -850,8 +891,10 @@ class ExportService {
private async copyMediaWithCacheAndDedup(
kind: 'image' | 'video' | 'emoji',
sourcePath: string,
destPath: string
destPath: string,
control?: ExportTaskControl
): Promise<{ success: boolean; code?: string }> {
const existedBeforeCopy = await this.pathExists(destPath)
const resolved = await this.resolvePreferredMediaSource(kind, sourcePath)
if (resolved.cacheHit) {
this.noteMediaTelemetry({ cacheHitFiles: 1 })
@@ -870,6 +913,9 @@ class ExportService {
dedupReuseFiles: 1,
bytesWritten: resolved.fileStat?.size || 0
})
if (!existedBeforeCopy) {
control?.recordCreatedFile?.(destPath)
}
return { success: true }
}
}
@@ -886,6 +932,9 @@ class ExportService {
doneFiles: 1,
bytesWritten: resolved.fileStat?.size || 0
})
if (!existedBeforeCopy) {
control?.recordCreatedFile?.(destPath)
}
return { success: true }
}
@@ -3962,6 +4011,7 @@ class ExportService {
includeVideoPoster?: boolean
includeVoiceWithTranscript?: boolean
dirCache?: Set<string>
control?: ExportTaskControl
}
): Promise<MediaExportItem | null> {
const localType = msg.localType
@@ -3973,7 +4023,8 @@ class ExportService {
sessionId,
mediaRootDir,
mediaRelativePrefix,
options.dirCache
options.dirCache,
options.control
)
if (result) {
}
@@ -3983,7 +4034,7 @@ class ExportService {
// 语音消息
if (localType === 34) {
if (options.exportVoices) {
return this.exportVoice(msg, sessionId, mediaRootDir, mediaRelativePrefix, options.dirCache)
return this.exportVoice(msg, sessionId, mediaRootDir, mediaRelativePrefix, options.dirCache, options.control)
}
if (options.exportVoiceAsText) {
return null
@@ -3992,7 +4043,7 @@ class ExportService {
// 动画表情
if (localType === 47 && options.exportEmojis) {
const result = await this.exportEmoji(msg, sessionId, mediaRootDir, mediaRelativePrefix, options.dirCache)
const result = await this.exportEmoji(msg, sessionId, mediaRootDir, mediaRelativePrefix, options.dirCache, options.control)
if (result) {
}
return result
@@ -4005,7 +4056,8 @@ class ExportService {
mediaRootDir,
mediaRelativePrefix,
options.dirCache,
options.includeVideoPoster === true
options.includeVideoPoster === true,
options.control
)
}
@@ -4015,7 +4067,8 @@ class ExportService {
mediaRootDir,
mediaRelativePrefix,
options.maxFileSizeMb,
options.dirCache
options.dirCache,
options.control
)
}
@@ -4030,14 +4083,12 @@ class ExportService {
sessionId: string,
mediaRootDir: string,
mediaRelativePrefix: string,
dirCache?: Set<string>
dirCache?: Set<string>,
control?: ExportTaskControl
): Promise<MediaExportItem | null> {
try {
const imagesDir = path.join(mediaRootDir, mediaRelativePrefix, 'images')
if (!dirCache?.has(imagesDir)) {
await fs.promises.mkdir(imagesDir, { recursive: true })
dirCache?.add(imagesDir)
}
await this.ensureExportDir(imagesDir, control, dirCache)
const tryResolveImagePath = async (imageMd5?: string, imageDatName?: string): Promise<string | null> => {
if (!imageMd5 && !imageDatName) return null
@@ -4123,6 +4174,7 @@ class ExportService {
const destPath = path.join(imagesDir, fileName)
const buffer = Buffer.from(base64Data, 'base64')
await this.recordCreatedFileBeforeWrite(destPath, control)
await fs.promises.writeFile(destPath, buffer)
this.noteMediaTelemetry({
doneFiles: 1,
@@ -4142,7 +4194,7 @@ class ExportService {
const ext = path.extname(sourcePath) || '.jpg'
const fileName = `${messageId}_${imageKey}${ext}`
const destPath = path.join(imagesDir, fileName)
const copied = await this.copyMediaWithCacheAndDedup('image', sourcePath, destPath)
const copied = await this.copyMediaWithCacheAndDedup('image', sourcePath, destPath, control)
if (!copied.success) {
if (copied.code === 'ENOENT') {
console.log(`[Export] 源图片文件不存在 (localId=${msg.localId}): ${sourcePath} → 将显示 [图片] 占位符`)
@@ -4261,14 +4313,12 @@ class ExportService {
sessionId: string,
mediaRootDir: string,
mediaRelativePrefix: string,
dirCache?: Set<string>
dirCache?: Set<string>,
control?: ExportTaskControl
): Promise<MediaExportItem | null> {
try {
const voicesDir = path.join(mediaRootDir, mediaRelativePrefix, 'voices')
if (!dirCache?.has(voicesDir)) {
await fs.promises.mkdir(voicesDir, { recursive: true })
dirCache?.add(voicesDir)
}
await this.ensureExportDir(voicesDir, control, dirCache)
const msgId = String(msg.localId)
const safeSession = this.cleanAccountDirName(sessionId)
@@ -4300,6 +4350,7 @@ class ExportService {
// voiceResult.data 是 base64 编码的 wav 数据
const wavBuffer = Buffer.from(voiceResult.data, 'base64')
await this.recordCreatedFileBeforeWrite(destPath, control)
await fs.promises.writeFile(destPath, wavBuffer)
this.noteMediaTelemetry({
doneFiles: 1,
@@ -4338,14 +4389,12 @@ class ExportService {
sessionId: string,
mediaRootDir: string,
mediaRelativePrefix: string,
dirCache?: Set<string>
dirCache?: Set<string>,
control?: ExportTaskControl
): Promise<MediaExportItem | null> {
try {
const emojisDir = path.join(mediaRootDir, mediaRelativePrefix, 'emojis')
if (!dirCache?.has(emojisDir)) {
await fs.promises.mkdir(emojisDir, { recursive: true })
dirCache?.add(emojisDir)
}
await this.ensureExportDir(emojisDir, control, dirCache)
// 使用 chatService 下载表情包 (利用其重试和 fallback 逻辑)
const localPath = await chatService.downloadEmojiFile(msg)
@@ -4359,7 +4408,7 @@ class ExportService {
const key = msg.emojiMd5 || String(msg.localId)
const fileName = `${key}${ext}`
const destPath = path.join(emojisDir, fileName)
const copied = await this.copyMediaWithCacheAndDedup('emoji', localPath, destPath)
const copied = await this.copyMediaWithCacheAndDedup('emoji', localPath, destPath, control)
if (!copied.success) return null
return {
@@ -4381,7 +4430,8 @@ class ExportService {
mediaRootDir: string,
mediaRelativePrefix: string,
dirCache?: Set<string>,
includePoster = false
includePoster = false,
control?: ExportTaskControl
): Promise<MediaExportItem | null> {
try {
let videoMd5 = String(msg.videoMd5 || '').trim().toLowerCase()
@@ -4404,16 +4454,13 @@ class ExportService {
if (!videoInfo) return null
const videosDir = path.join(mediaRootDir, mediaRelativePrefix, 'videos')
if (!dirCache?.has(videosDir)) {
await fs.promises.mkdir(videosDir, { recursive: true })
dirCache?.add(videosDir)
}
await this.ensureExportDir(videosDir, control, dirCache)
const sourcePath = videoInfo.videoUrl
const fileName = path.basename(sourcePath)
const destPath = path.join(videosDir, fileName)
const copied = await this.copyMediaWithCacheAndDedup('video', sourcePath, destPath)
const copied = await this.copyMediaWithCacheAndDedup('video', sourcePath, destPath, control)
if (!copied.success) return null
return {
@@ -4864,7 +4911,8 @@ class ExportService {
mediaRootDir: string,
mediaRelativePrefix: string,
maxFileSizeMb?: number,
dirCache?: Set<string>
dirCache?: Set<string>,
control?: ExportTaskControl
): Promise<MediaExportItem | null> {
try {
const fileNameRaw = String(msg?.fileName || '').trim()
@@ -4872,10 +4920,7 @@ class ExportService {
const fileExtDir = this.resolveFileAttachmentExtensionDir(msg, fileNameRaw)
const fileDir = path.join(mediaRootDir, mediaRelativePrefix, 'file', fileExtDir)
if (!dirCache?.has(fileDir)) {
await fs.promises.mkdir(fileDir, { recursive: true })
dirCache?.add(fileDir)
}
await this.ensureExportDir(fileDir, control, dirCache)
const candidates = await this.resolveFileAttachmentCandidates(msg)
if (candidates.length === 0) {
@@ -4919,6 +4964,7 @@ class ExportService {
const messageId = String(msg?.localId || Date.now())
const destFileName = `${messageId}_${safeBaseName}`
const destPath = path.join(fileDir, destFileName)
const existedBeforeCopy = await this.pathExists(destPath)
const copied = await this.copyFileOptimized(selected.sourcePath, destPath)
if (!copied.success) {
this.recordFileAttachmentMiss(msg, '附件复制失败', {
@@ -4929,6 +4975,9 @@ class ExportService {
return null
}
if (!existedBeforeCopy) {
control?.recordCreatedFile?.(destPath)
}
this.noteMediaTelemetry({ doneFiles: 1, bytesWritten: stat.size })
return {
relativePath: path.posix.join(mediaRelativePrefix, 'file', fileExtDir, destFileName),
@@ -5884,16 +5933,15 @@ class ExportService {
*/
private async exportAvatarsToFiles(
members: Array<{ username: string; avatarUrl?: string }>,
outputDir: string
outputDir: string,
control?: ExportTaskControl
): Promise<Map<string, string>> {
const result = new Map<string, string>()
if (members.length === 0) return result
// 创建 avatars 子目录
const avatarsDir = path.join(outputDir, 'avatars')
if (!fs.existsSync(avatarsDir)) {
fs.mkdirSync(avatarsDir, { recursive: true })
}
await this.ensureExportDir(avatarsDir, control)
const AVATAR_CONCURRENCY = 8
await parallelLimit(members, AVATAR_CONCURRENCY, async (member) => {
@@ -5934,6 +5982,7 @@ class ExportService {
try {
await fs.promises.access(avatarPath)
} catch {
await this.recordCreatedFileBeforeWrite(avatarPath, control)
await fs.promises.writeFile(avatarPath, data)
}
@@ -6202,7 +6251,8 @@ class ExportService {
maxFileSizeMb: options.maxFileSizeMb,
exportVoiceAsText: options.exportVoiceAsText,
includeVideoPoster: options.format === 'html',
dirCache: mediaDirCache
dirCache: mediaDirCache,
control
})
mediaCache.set(mediaKey, mediaItem)
}
@@ -6551,9 +6601,11 @@ class ExportService {
lines.push(JSON.stringify({ _type: 'message', ...message }))
}
this.throwIfStopRequested(control)
await this.recordCreatedFileBeforeWrite(outputPath, control)
await fs.promises.writeFile(outputPath, lines.join('\n'), 'utf-8')
} else {
this.throwIfStopRequested(control)
await this.recordCreatedFileBeforeWrite(outputPath, control)
await fs.promises.writeFile(outputPath, JSON.stringify(chatLabExport, null, 2), 'utf-8')
}
@@ -6573,6 +6625,9 @@ class ExportService {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
if (this.isPauseError(e)) {
return { success: false, error: '导出任务已暂停' }
}
return { success: false, error: String(e) }
}
}
@@ -6706,7 +6761,8 @@ class ExportService {
maxFileSizeMb: options.maxFileSizeMb,
exportVoiceAsText: options.exportVoiceAsText,
includeVideoPoster: options.format === 'html',
dirCache: mediaDirCache
dirCache: mediaDirCache,
control
})
mediaCache.set(mediaKey, mediaItem)
}
@@ -7256,6 +7312,7 @@ class ExportService {
}
this.throwIfStopRequested(control)
await this.recordCreatedFileBeforeWrite(outputPath, control)
await fs.promises.writeFile(outputPath, JSON.stringify(arkmeExport, null, 2), 'utf-8')
} else {
const detailedExport: any = {
@@ -7279,6 +7336,7 @@ class ExportService {
}
this.throwIfStopRequested(control)
await this.recordCreatedFileBeforeWrite(outputPath, control)
await fs.promises.writeFile(outputPath, JSON.stringify(detailedExport, null, 2), 'utf-8')
}
@@ -7298,6 +7356,9 @@ class ExportService {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
if (this.isPauseError(e)) {
return { success: false, error: '导出任务已暂停' }
}
return { success: false, error: String(e) }
}
}
@@ -7571,7 +7632,8 @@ class ExportService {
maxFileSizeMb: options.maxFileSizeMb,
exportVoiceAsText: options.exportVoiceAsText,
includeVideoPoster: options.format === 'html',
dirCache: mediaDirCache
dirCache: mediaDirCache,
control
})
mediaCache.set(mediaKey, mediaItem)
}
@@ -7835,6 +7897,7 @@ class ExportService {
// 写入文件
this.throwIfStopRequested(control)
await this.recordCreatedFileBeforeWrite(outputPath, control)
await workbook.xlsx.writeFile(outputPath)
onProgress?.({
@@ -7853,6 +7916,9 @@ class ExportService {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
if (this.isPauseError(e)) {
return { success: false, error: '导出任务已暂停' }
}
// 处理文件被占用的错误
if (e instanceof Error) {
if (e.message.includes('EBUSY') || e.message.includes('resource busy') || e.message.includes('locked')) {
@@ -8134,6 +8200,9 @@ class ExportService {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
if (this.isPauseError(e)) {
return { success: false, error: '导出任务已暂停' }
}
if (e instanceof Error) {
if (e.message.includes('EBUSY') || e.message.includes('resource busy') || e.message.includes('locked')) {
return { success: false, error: '文件已经打开,请关闭后再导出' }
@@ -8315,7 +8384,8 @@ class ExportService {
maxFileSizeMb: options.maxFileSizeMb,
exportVoiceAsText: options.exportVoiceAsText,
includeVideoPoster: options.format === 'html',
dirCache: mediaDirCache
dirCache: mediaDirCache,
control
})
mediaCache.set(mediaKey, mediaItem)
}
@@ -8382,6 +8452,7 @@ class ExportService {
exportedMessages: 0
})
await this.recordCreatedFileBeforeWrite(outputPath, control)
const stream = fs.createWriteStream(outputPath, { encoding: 'utf-8' })
const writeChunk = async (chunk: string): Promise<void> => {
await new Promise<void>((resolve, _reject) => {
@@ -8567,6 +8638,9 @@ class ExportService {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
if (this.isPauseError(e)) {
return { success: false, error: '导出任务已暂停' }
}
return { success: false, error: String(e) }
}
}
@@ -8710,7 +8784,8 @@ class ExportService {
maxFileSizeMb: options.maxFileSizeMb,
exportVoiceAsText: options.exportVoiceAsText,
includeVideoPoster: options.format === 'html',
dirCache: mediaDirCache
dirCache: mediaDirCache,
control
})
mediaCache.set(mediaKey, mediaItem)
}
@@ -8777,6 +8852,7 @@ class ExportService {
exportedMessages: 0
})
await this.recordCreatedFileBeforeWrite(outputPath, control)
const stream = fs.createWriteStream(outputPath, { encoding: 'utf-8' })
const writeChunk = async (chunk: string): Promise<void> => {
await new Promise<void>((resolve, _reject) => {
@@ -8929,6 +9005,9 @@ class ExportService {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
if (this.isPauseError(e)) {
return { success: false, error: '导出任务已暂停' }
}
return { success: false, error: String(e) }
}
}
@@ -9153,7 +9232,8 @@ class ExportService {
includeVideoPoster: options.format === 'html',
includeVoiceWithTranscript: true,
exportVideos: options.exportVideos,
dirCache: mediaDirCache
dirCache: mediaDirCache,
control
})
mediaCache.set(mediaKey, mediaItem)
}
@@ -9224,7 +9304,8 @@ class ExportService {
{ username: sessionId, avatarUrl: sessionInfo.avatarUrl },
{ username: cleanedMyWxid, avatarUrl: myInfo.avatarUrl }
],
path.dirname(outputPath)
path.dirname(outputPath),
control
)
: new Map<string, string>()
@@ -9241,6 +9322,7 @@ class ExportService {
// ================= BEGIN STREAM WRITING =================
const exportMeta = this.getExportMeta(sessionId, sessionInfo, isGroup)
const htmlStyles = this.loadExportHtmlStyles()
await this.recordCreatedFileBeforeWrite(outputPath, control)
const stream = fs.createWriteStream(outputPath, { encoding: 'utf-8' })
const writePromise = (str: string) => {
@@ -9605,6 +9687,9 @@ class ExportService {
if (this.isStopError(e)) {
return { success: false, error: '导出任务已停止' }
}
if (this.isPauseError(e)) {
return { success: false, error: '导出任务已暂停' }
}
return { success: false, error: String(e) }
}
}
@@ -9908,7 +9993,7 @@ class ExportService {
const reservedOutputPaths = new Set<string>()
const ensureTaskDir = async (dirPath: string) => {
if (createdTaskDirs.has(dirPath)) return
await fs.promises.mkdir(dirPath, { recursive: true })
await this.ensureExportDir(dirPath, control)
createdTaskDirs.add(dirPath)
}
await ensureTaskDir(exportBaseDir)
@@ -10085,7 +10170,7 @@ class ExportService {
}
}
const runOne = async (sessionId: string): Promise<'done' | 'stopped'> => {
const runOne = async (sessionId: string): Promise<'done' | 'stopped' | 'paused'> => {
try {
this.throwIfStopRequested(control)
const sessionInfo = await this.getContactInfo(sessionId)
@@ -10234,6 +10319,10 @@ class ExportService {
activeSessionRatios.delete(sessionId)
return 'stopped'
}
if (!result.success && this.isPauseError(result.error)) {
activeSessionRatios.delete(sessionId)
return 'paused'
}
if (result.success) {
successCount++
@@ -10269,6 +10358,10 @@ class ExportService {
activeSessionRatios.delete(sessionId)
return 'stopped'
}
if (this.isPauseError(error)) {
activeSessionRatios.delete(sessionId)
return 'paused'
}
throw error
}
}
@@ -10294,6 +10387,11 @@ class ExportService {
queue.unshift(sessionId)
break
}
if (runState === 'paused') {
pauseRequested = true
queue.unshift(sessionId)
break
}
}
} else {
const workers = Array.from({ length: Math.min(sessionConcurrency, queue.length) }, async () => {
@@ -10315,6 +10413,11 @@ class ExportService {
queue.unshift(sessionId)
break
}
if (runState === 'paused') {
pauseRequested = true
queue.unshift(sessionId)
break
}
}
})
await Promise.all(workers)
@@ -10333,7 +10436,7 @@ class ExportService {
sessionOutputPaths
}
}
if (pauseRequested && pendingSessionIds.length > 0) {
if (pauseRequested) {
return {
success: true,
successCount,

View File

@@ -0,0 +1,210 @@
import * as path from 'path'
import { rm, rmdir } from 'fs/promises'
export type ExportTaskControlState = 'running' | 'pause_requested' | 'cancel_requested'
export interface ExportTaskControlHooks {
shouldPause: () => boolean
shouldStop: () => boolean
recordCreatedFile: (filePath: string) => void
recordCreatedDir: (dirPath: string) => void
}
interface ExportTaskManifest {
outputDir: string
files: Set<string>
dirs: Set<string>
}
interface ExportTaskControlRecord {
state: ExportTaskControlState
manifest: ExportTaskManifest
createdAt: number
updatedAt: number
}
export interface ExportTaskCleanupResult {
success: boolean
filesDeleted: number
dirsDeleted: number
error?: string
}
class ExportTaskControlService {
private tasks = new Map<string, ExportTaskControlRecord>()
createControl(taskId: string, outputDir: string): ExportTaskControlHooks {
this.registerTask(taskId, outputDir)
return {
shouldPause: () => this.getState(taskId) === 'pause_requested',
shouldStop: () => this.getState(taskId) === 'cancel_requested',
recordCreatedFile: (filePath: string) => this.recordCreatedFile(taskId, filePath),
recordCreatedDir: (dirPath: string) => this.recordCreatedDir(taskId, dirPath)
}
}
registerTask(taskId: string, outputDir: string): void {
const normalizedTaskId = this.normalizeTaskId(taskId)
if (!normalizedTaskId) return
const normalizedOutputDir = path.resolve(String(outputDir || '').trim() || '.')
const existing = this.tasks.get(normalizedTaskId)
if (existing) {
existing.state = 'running'
existing.updatedAt = Date.now()
if (!existing.manifest.outputDir) {
existing.manifest.outputDir = normalizedOutputDir
}
return
}
this.tasks.set(normalizedTaskId, {
state: 'running',
manifest: {
outputDir: normalizedOutputDir,
files: new Set<string>(),
dirs: new Set<string>()
},
createdAt: Date.now(),
updatedAt: Date.now()
})
}
pauseTask(taskId: string): boolean {
return this.setState(taskId, 'pause_requested')
}
resumeTask(taskId: string): boolean {
return this.setState(taskId, 'running')
}
cancelTask(taskId: string): boolean {
return this.setState(taskId, 'cancel_requested')
}
getState(taskId: string): ExportTaskControlState | null {
const normalizedTaskId = this.normalizeTaskId(taskId)
if (!normalizedTaskId) return null
return this.tasks.get(normalizedTaskId)?.state || null
}
releaseTask(taskId: string): void {
const normalizedTaskId = this.normalizeTaskId(taskId)
if (!normalizedTaskId) return
this.tasks.delete(normalizedTaskId)
}
recordCreatedFile(taskId: string, filePath: string): void {
const task = this.getTaskForManifestWrite(taskId, filePath)
if (!task) return
task.manifest.files.add(path.resolve(filePath))
task.updatedAt = Date.now()
}
recordCreatedDir(taskId: string, dirPath: string): void {
const task = this.getTaskForManifestWrite(taskId, dirPath)
if (!task) return
task.manifest.dirs.add(path.resolve(dirPath))
task.updatedAt = Date.now()
}
async cleanupTask(taskId: string): Promise<ExportTaskCleanupResult> {
const normalizedTaskId = this.normalizeTaskId(taskId)
const task = normalizedTaskId ? this.tasks.get(normalizedTaskId) : undefined
if (!task) {
return { success: true, filesDeleted: 0, dirsDeleted: 0 }
}
const outputDir = task.manifest.outputDir
let filesDeleted = 0
let dirsDeleted = 0
const errors: string[] = []
const files = Array.from(task.manifest.files)
.filter(filePath => this.isInsideOutputDir(filePath, outputDir))
.sort((a, b) => b.length - a.length)
for (const filePath of files) {
try {
await rm(filePath, { force: true, recursive: false })
filesDeleted++
} catch (error) {
const code = (error as NodeJS.ErrnoException | undefined)?.code
if (code !== 'ENOENT') {
errors.push(`${filePath}: ${error instanceof Error ? error.message : String(error)}`)
}
}
}
const dirs = Array.from(task.manifest.dirs)
.filter(dirPath => this.isInsideOutputDir(dirPath, outputDir) || this.isSamePath(dirPath, outputDir))
.sort((a, b) => b.length - a.length)
for (const dirPath of dirs) {
try {
await rmdir(dirPath)
dirsDeleted++
} catch (error) {
const code = (error as NodeJS.ErrnoException | undefined)?.code
if (code !== 'ENOENT' && code !== 'ENOTEMPTY' && code !== 'EEXIST') {
errors.push(`${dirPath}: ${error instanceof Error ? error.message : String(error)}`)
}
}
}
if (errors.length === 0) {
this.releaseTask(normalizedTaskId)
return { success: true, filesDeleted, dirsDeleted }
}
return {
success: false,
filesDeleted,
dirsDeleted,
error: errors.slice(0, 3).join('; ')
}
}
private setState(taskId: string, state: ExportTaskControlState): boolean {
const normalizedTaskId = this.normalizeTaskId(taskId)
if (!normalizedTaskId) return false
const task = this.tasks.get(normalizedTaskId)
if (!task) return false
task.state = state
task.updatedAt = Date.now()
return true
}
private getTaskForManifestWrite(taskId: string, targetPath: string): ExportTaskControlRecord | null {
const normalizedTaskId = this.normalizeTaskId(taskId)
if (!normalizedTaskId) return null
const task = this.tasks.get(normalizedTaskId)
if (!task) return null
if (!this.isInsideOutputDir(targetPath, task.manifest.outputDir) && !this.isSamePath(targetPath, task.manifest.outputDir)) {
return null
}
return task
}
private isInsideOutputDir(targetPath: string, outputDir: string): boolean {
const resolvedTarget = path.resolve(targetPath)
const resolvedOutputDir = path.resolve(outputDir)
const relativePath = path.relative(resolvedOutputDir, resolvedTarget)
return Boolean(relativePath) && !relativePath.startsWith('..') && !path.isAbsolute(relativePath)
}
private isSamePath(left: string, right: string): boolean {
const resolvedLeft = path.resolve(left)
const resolvedRight = path.resolve(right)
if (process.platform === 'win32') {
return resolvedLeft.toLowerCase() === resolvedRight.toLowerCase()
}
return resolvedLeft === resolvedRight
}
private normalizeTaskId(taskId: string): string {
return String(taskId || '').trim()
}
}
export const exportTaskControlService = new ExportTaskControlService()

View File

@@ -1340,6 +1340,8 @@ class SnsService {
}, progressCallback?: (progress: { current: number; total: number; status: string }) => void, control?: {
shouldPause?: () => boolean
shouldStop?: () => boolean
recordCreatedFile?: (filePath: string) => void
recordCreatedDir?: (dirPath: string) => void
}): Promise<{ success: boolean; filePath?: string; postCount?: number; mediaCount?: number; paused?: boolean; stopped?: boolean; error?: string }> {
const { outputDir, format, usernames, keyword, startTime, endTime } = options
const hasExplicitMediaSelection =
@@ -1361,6 +1363,18 @@ class SnsService {
if (control?.shouldPause?.()) return 'paused'
return null
}
const ensureExportDir = (dirPath: string) => {
const existed = existsSync(dirPath)
if (!existed) {
mkdirSync(dirPath, { recursive: true })
control?.recordCreatedDir?.(dirPath)
}
}
const recordCreatedFileBeforeWrite = (filePath: string) => {
if (!existsSync(filePath)) {
control?.recordCreatedFile?.(filePath)
}
}
const buildInterruptedResult = (state: 'paused' | 'stopped', postCount: number, mediaCount: number) => (
state === 'stopped'
? { success: true, stopped: true, filePath: '', postCount, mediaCount }
@@ -1369,9 +1383,7 @@ class SnsService {
try {
// 确保输出目录存在
if (!existsSync(outputDir)) {
mkdirSync(outputDir, { recursive: true })
}
ensureExportDir(outputDir)
// 1. 分页加载全部帖子
const allPosts: SnsPost[] = []
@@ -1414,9 +1426,7 @@ class SnsService {
const mediaDir = join(outputDir, 'media')
if (shouldExportMedia) {
if (!existsSync(mediaDir)) {
mkdirSync(mediaDir, { recursive: true })
}
ensureExportDir(mediaDir)
// 收集所有媒体下载任务
const mediaTasks: Array<{
@@ -1485,6 +1495,7 @@ class SnsService {
} else {
const result = await this.fetchAndDecryptImage(task.url, task.key)
if (result.success && result.data) {
recordCreatedFileBeforeWrite(filePath)
await writeFile(filePath, result.data)
if (task.kind === 'livephoto') {
if (media.livePhoto) (media.livePhoto as any).localPath = `media/${fileName}`
@@ -1494,6 +1505,7 @@ class SnsService {
mediaCount++
} else if (result.success && result.cachePath) {
const cachedData = await readFile(result.cachePath)
recordCreatedFileBeforeWrite(filePath)
await writeFile(filePath, cachedData)
if (task.kind === 'livephoto') {
if (media.livePhoto) (media.livePhoto as any).localPath = `media/${fileName}`
@@ -1531,7 +1543,7 @@ class SnsService {
// 2.5 下载头像
const avatarMap = new Map<string, string>()
if (format === 'html') {
if (!existsSync(mediaDir)) mkdirSync(mediaDir, { recursive: true })
ensureExportDir(mediaDir)
const uniqueUsers = [...new Map(allPosts.filter(p => p.avatarUrl).map(p => [p.username, p])).values()]
let avatarDone = 0
const avatarQueue = [...uniqueUsers]
@@ -1548,6 +1560,7 @@ class SnsService {
} else {
const result = await this.fetchAndDecryptImage(post.avatarUrl!)
if (result.success && result.data) {
recordCreatedFileBeforeWrite(filePath)
await writeFile(filePath, result.data)
avatarMap.set(post.username, `media/${fileName}`)
}
@@ -1602,6 +1615,7 @@ class SnsService {
linkUrl: (p as any).linkUrl
}))
}
recordCreatedFileBeforeWrite(outputFilePath)
await writeFile(outputFilePath, JSON.stringify(exportData, null, 2), 'utf-8')
} else if (format === 'arkmejson') {
outputFilePath = join(outputDir, `朋友圈导出_${timestamp}.json`)
@@ -1689,11 +1703,13 @@ class SnsService {
},
posts
}
recordCreatedFileBeforeWrite(outputFilePath)
await writeFile(outputFilePath, JSON.stringify(exportData, null, 2), 'utf-8')
} else {
// HTML 格式
outputFilePath = join(outputDir, `朋友圈导出_${timestamp}.html`)
const html = this.generateHtml(allPosts, { usernames, keyword }, avatarMap)
recordCreatedFileBeforeWrite(outputFilePath)
await writeFile(outputFilePath, html, 'utf-8')
}