fix: 修复更新弹窗无响应、内存泄漏、SQL注入、文件句柄泄漏及并发安全问题;优化导出功能

This commit is contained in:
你的名字
2026-02-23 09:55:33 +08:00
parent b7852a8c07
commit ab2c086e93
9 changed files with 289 additions and 45 deletions

View File

@@ -79,14 +79,14 @@ class AnalyticsService {
const chunkSize = 200
for (let i = 0; i < usernames.length; i += chunkSize) {
const chunk = usernames.slice(i, i + chunkSize)
const inList = chunk.map((u) => `'${this.escapeSqlValue(u)}'`).join(',')
if (!inList) continue
// 使用参数化查询防止SQL注入
const placeholders = chunk.map(() => '?').join(',')
const sql = `
SELECT username, alias
FROM contact
WHERE username IN (${inList})
WHERE username IN (${placeholders})
`
const result = await wcdbService.execQuery('contact', null, sql)
const result = await wcdbService.execQuery('contact', null, sql, chunk)
if (!result.success || !result.rows) continue
for (const row of result.rows as Record<string, any>[]) {
const username = row.username || ''

View File

@@ -13,6 +13,7 @@ import { wcdbService } from './wcdbService'
import { MessageCacheService } from './messageCacheService'
import { ContactCacheService, ContactCacheEntry } from './contactCacheService'
import { voiceTranscribeService } from './voiceTranscribeService'
import { LRUCache } from '../utils/LRUCache.js'
type HardlinkState = {
db: Database.Database
@@ -114,6 +115,7 @@ class ChatService {
private configService: ConfigService
private connected = false
private messageCursors: Map<string, { cursor: number; fetched: number; batchSize: number; startTime?: number; endTime?: number; ascending?: boolean; bufferedMessages?: any[] }> = new Map()
private messageCursorMutex: boolean = false
private readonly messageBatchDefault = 50
private avatarCache: Map<string, ContactCacheEntry>
private readonly avatarCacheTtlMs = 10 * 60 * 1000
@@ -121,8 +123,8 @@ class ChatService {
private hardlinkCache = new Map<string, HardlinkState>()
private readonly contactCacheService: ContactCacheService
private readonly messageCacheService: MessageCacheService
private voiceWavCache = new Map<string, Buffer>()
private voiceTranscriptCache = new Map<string, string>()
private voiceWavCache: LRUCache<string, Buffer>
private voiceTranscriptCache: LRUCache<string, string>
private voiceTranscriptPending = new Map<string, Promise<{ success: boolean; transcript?: string; error?: string }>>()
private transcriptCacheLoaded = false
private transcriptCacheDirty = false
@@ -149,6 +151,9 @@ class ChatService {
const persisted = this.contactCacheService.getAllEntries()
this.avatarCache = new Map(Object.entries(persisted))
this.messageCacheService = new MessageCacheService(this.configService.getCacheBasePath())
// 初始化LRU缓存限制大小防止内存泄漏
this.voiceWavCache = new LRUCache(this.voiceWavCacheMaxEntries)
this.voiceTranscriptCache = new LRUCache(1000) // 最多缓存1000条转写记录
}
/**
@@ -728,8 +733,15 @@ class ChatService {
}
const batchSize = Math.max(1, limit || this.messageBatchDefault)
// 使用互斥锁保护游标状态访问
while (this.messageCursorMutex) {
await new Promise(resolve => setTimeout(resolve, 1))
}
this.messageCursorMutex = true
let state = this.messageCursors.get(sessionId)
// 只在以下情况重新创建游标:
// 1. 没有游标状态
// 2. offset 为 0 (重新加载会话)
@@ -765,7 +777,8 @@ class ChatService {
state = { cursor: cursorResult.cursor, fetched: 0, batchSize, startTime, endTime, ascending }
this.messageCursors.set(sessionId, state)
this.messageCursorMutex = false
// 如果需要跳过消息(offset > 0),逐批获取但不返回
// 注意:仅在 offset === 0 时重建游标最安全;
// 当 startTime/endTime 变化导致重建时offset 应由前端重置为 0
@@ -866,9 +879,12 @@ class ChatService {
}
state.fetched += rows.length
this.messageCursorMutex = false
this.messageCacheService.set(sessionId, normalized)
return { success: true, messages: normalized, hasMore }
} catch (e) {
this.messageCursorMutex = false
console.error('ChatService: 获取消息失败:', e)
return { success: false, error: String(e) }
}
@@ -3698,10 +3714,7 @@ class ChatService {
private cacheVoiceWav(cacheKey: string, wavData: Buffer): void {
this.voiceWavCache.set(cacheKey, wavData)
if (this.voiceWavCache.size > this.voiceWavCacheMaxEntries) {
const oldestKey = this.voiceWavCache.keys().next().value
if (oldestKey) this.voiceWavCache.delete(oldestKey)
}
// LRU缓存会自动处理大小限制无需手动清理
}
/** 获取持久化转写缓存文件路径 */

View File

@@ -12,6 +12,7 @@ import { chatService } from './chatService'
import { videoService } from './videoService'
import { voiceTranscribeService } from './voiceTranscribeService'
import { EXPORT_HTML_STYLES } from './exportHtmlStyles'
import { LRUCache } from '../utils/LRUCache.js'
// ChatLab 格式类型定义
interface ChatLabHeader {
@@ -140,12 +141,15 @@ async function parallelLimit<T, R>(
class ExportService {
private configService: ConfigService
private contactCache: Map<string, { displayName: string; avatarUrl?: string }> = new Map()
private inlineEmojiCache: Map<string, string> = new Map()
private contactCache: LRUCache<string, { displayName: string; avatarUrl?: string }>
private inlineEmojiCache: LRUCache<string, string>
private htmlStyleCache: string | null = null
constructor() {
this.configService = new ConfigService()
// 限制缓存大小,防止内存泄漏
this.contactCache = new LRUCache(500) // 最多缓存500个联系人
this.inlineEmojiCache = new LRUCache(100) // 最多缓存100个表情
}
private getClampedConcurrency(value: number | undefined, fallback = 2, max = 6): number {
@@ -219,9 +223,9 @@ class ExportService {
*/
async getGroupNicknamesForRoom(chatroomId: string, candidates: string[] = []): Promise<Map<string, string>> {
try {
const escapedChatroomId = chatroomId.replace(/'/g, "''")
const sql = `SELECT ext_buffer FROM chat_room WHERE username='${escapedChatroomId}' LIMIT 1`
const result = await wcdbService.execQuery('contact', null, sql)
// 使用参数化查询防止SQL注入
const sql = 'SELECT ext_buffer FROM chat_room WHERE username = ? LIMIT 1'
const result = await wcdbService.execQuery('contact', null, sql, [chatroomId])
if (!result.success || !result.rows || result.rows.length === 0) {
return new Map<string, string>()
}
@@ -1467,6 +1471,7 @@ class ExportService {
})
if (!result.success || !result.localPath) {
console.log(`[Export] 图片解密失败 (localId=${msg.localId}): imageMd5=${imageMd5}, imageDatName=${imageDatName}, error=${result.error || '未知'}`)
// 尝试获取缩略图
const thumbResult = await imageDecryptService.resolveCachedImage({
sessionId,
@@ -1474,8 +1479,10 @@ class ExportService {
imageDatName
})
if (!thumbResult.success || !thumbResult.localPath) {
console.log(`[Export] 缩略图也获取失败 (localId=${msg.localId}): error=${thumbResult.error || '未知'} → 将显示 [图片] 占位符`)
return null
}
console.log(`[Export] 使用缩略图替代 (localId=${msg.localId}): ${thumbResult.localPath}`)
result.localPath = thumbResult.localPath
}
@@ -1503,7 +1510,10 @@ class ExportService {
}
// 复制文件
if (!fs.existsSync(sourcePath)) return null
if (!fs.existsSync(sourcePath)) {
console.log(`[Export] 源图片文件不存在 (localId=${msg.localId}): ${sourcePath} → 将显示 [图片] 占位符`)
return null
}
const ext = path.extname(sourcePath) || '.jpg'
const fileName = `${messageId}_${imageKey}${ext}`
const destPath = path.join(imagesDir, fileName)
@@ -1517,6 +1527,7 @@ class ExportService {
kind: 'image'
}
} catch (e) {
console.error(`[Export] 导出图片异常 (localId=${msg.localId}):`, e, `→ 将显示 [图片] 占位符`)
return null
}
}
@@ -1785,7 +1796,14 @@ class ExportService {
fileStream.close()
resolve(true)
})
fileStream.on('error', () => {
fileStream.on('error', (err) => {
// 确保在错误情况下销毁流,释放文件句柄
fileStream.destroy()
resolve(false)
})
response.on('error', (err) => {
// 确保在响应错误时也关闭文件句柄
fileStream.destroy()
resolve(false)
})
})
@@ -1812,22 +1830,43 @@ class ExportService {
let firstTime: number | null = null
let lastTime: number | null = null
// 修复时间范围0 表示不限制,而不是时间戳 0
const beginTime = dateRange?.start || 0
const endTime = dateRange?.end && dateRange.end > 0 ? dateRange.end : 0
console.log(`[Export] 收集消息: sessionId=${sessionId}, 时间范围: ${beginTime} ~ ${endTime || '无限制'}`)
const cursor = await wcdbService.openMessageCursor(
sessionId,
500,
true,
dateRange?.start || 0,
dateRange?.end || 0
beginTime,
endTime
)
if (!cursor.success || !cursor.cursor) {
console.error(`[Export] 打开游标失败: ${cursor.error || '未知错误'}`)
return { rows, memberSet, firstTime, lastTime }
}
try {
let hasMore = true
let batchCount = 0
while (hasMore) {
const batch = await wcdbService.fetchMessageBatch(cursor.cursor)
if (!batch.success || !batch.rows) break
batchCount++
if (!batch.success) {
console.error(`[Export] 获取批次 ${batchCount} 失败: ${batch.error}`)
break
}
if (!batch.rows) {
console.warn(`[Export] 批次 ${batchCount} 无数据`)
break
}
console.log(`[Export] 批次 ${batchCount}: 收到 ${batch.rows.length} 条消息`)
for (const row of batch.rows) {
const createTime = parseInt(row.create_time || '0', 10)
if (dateRange) {
@@ -1918,8 +1957,17 @@ class ExportService {
}
hasMore = batch.hasMore === true
}
console.log(`[Export] 收集完成: 共 ${rows.length} 条消息, ${batchCount} 个批次`)
} catch (err) {
console.error(`[Export] 收集消息异常:`, err)
} finally {
await wcdbService.closeMessageCursor(cursor.cursor)
try {
await wcdbService.closeMessageCursor(cursor.cursor)
console.log(`[Export] 游标已关闭`)
} catch (err) {
console.error(`[Export] 关闭游标失败:`, err)
}
}
if (senderSet.size > 0) {
@@ -4562,6 +4610,12 @@ class ExportService {
</html>`);
return new Promise((resolve, reject) => {
stream.on('error', (err) => {
// 确保在流错误时销毁流,释放文件句柄
stream.destroy()
reject(err)
})
stream.end(() => {
onProgress?.({
current: 100,

View File

@@ -100,6 +100,7 @@ class HttpService {
private port: number = 5031
private running: boolean = false
private connections: Set<import('net').Socket> = new Set()
private connectionMutex: boolean = false
constructor() {
this.configService = ConfigService.getInstance()
@@ -120,9 +121,20 @@ class HttpService {
// 跟踪所有连接,以便关闭时能强制断开
this.server.on('connection', (socket) => {
this.connections.add(socket)
// 使用互斥锁防止并发修改
if (!this.connectionMutex) {
this.connectionMutex = true
this.connections.add(socket)
this.connectionMutex = false
}
socket.on('close', () => {
this.connections.delete(socket)
// 使用互斥锁防止并发修改
if (!this.connectionMutex) {
this.connectionMutex = true
this.connections.delete(socket)
this.connectionMutex = false
}
})
})
@@ -150,11 +162,20 @@ class HttpService {
async stop(): Promise<void> {
return new Promise((resolve) => {
if (this.server) {
// 强制关闭所有活动连接
for (const socket of this.connections) {
socket.destroy()
}
// 使用互斥锁保护连接集合操作
this.connectionMutex = true
const socketsToClose = Array.from(this.connections)
this.connections.clear()
this.connectionMutex = false
// 强制关闭所有活动连接
for (const socket of socketsToClose) {
try {
socket.destroy()
} catch (err) {
console.error('[HttpService] Error destroying socket:', err)
}
}
this.server.close(() => {
this.running = false

View File

@@ -458,8 +458,18 @@ export class VoiceTranscribeService {
writer.on('error', (err) => {
clearInterval(speedInterval)
// 确保在错误情况下也关闭文件句柄
writer.destroy()
reject(err)
})
response.on('error', (err) => {
clearInterval(speedInterval)
// 确保在响应错误时也关闭文件句柄
writer.destroy()
reject(err)
})
response.pipe(writer)
})
request.on('error', reject)

View File

@@ -361,10 +361,10 @@ export class WcdbService {
}
/**
* 执行 SQL 查询
* 执行 SQL 查询(支持参数化查询)
*/
async execQuery(kind: string, path: string | null, sql: string): Promise<{ success: boolean; rows?: any[]; error?: string }> {
return this.callWorker('execQuery', { kind, path, sql })
async execQuery(kind: string, path: string | null, sql: string, params: any[] = []): Promise<{ success: boolean; rows?: any[]; error?: string }> {
return this.callWorker('execQuery', { kind, path, sql, params })
}
/**