diff --git a/electron/services/chatService.ts b/electron/services/chatService.ts index 6c393a0..5c9a21d 100644 --- a/electron/services/chatService.ts +++ b/electron/services/chatService.ts @@ -4914,7 +4914,7 @@ class ChatService { /** * HTTP API 复用消息解析逻辑,确保和应用内展示一致。 */ - mapRowsToMessagesForApi(rows: Record[], sessionId: string): Message[] { + mapRowsToMessagesForApi(rows: Record[], sessionId: string = ''): Message[] { return this.mapRowsToMessages(rows, sessionId) } diff --git a/electron/services/httpService.ts b/electron/services/httpService.ts index 52eece8..4f6264c 100644 --- a/electron/services/httpService.ts +++ b/electron/services/httpService.ts @@ -126,6 +126,7 @@ class HttpService { private port: number = 5031 private host: string = '127.0.0.1' private running: boolean = false + private dbWarmed: boolean = false private connections: Set = new Set() private messagePushClients: Set = new Set() private messagePushReplayBuffer: MessagePushReplayEvent[] = [] @@ -184,6 +185,9 @@ class HttpService { this.server.listen(this.port, this.host, () => { this.running = true + // 主动预热数据库连接与消息库索引:HTTP 服务可能经 http:start 独立启动 + // (未走 GUI 的 connect/warmup),避免首批请求因原生消息库缓存为空而整页丢消息。 + void this.ensureDbReady().catch((e) => console.warn('[HttpService] warmup on start failed:', e)) this.startMessagePushHeartbeat() console.log(`[HttpService] HTTP API server started on http://${this.host}:${this.port}`) resolve({ success: true, port: this.port }) @@ -595,9 +599,38 @@ class HttpService { } } + /** + * 确保账号数据库已连接并完成一次消息库索引预热。 + * HTTP 读消息不像 GUI 那样自带 ensureConnected/warmup:当应用冷启动、或 HTTP 服务经 + * http:start 独立启动时,原生消息库缓存可能尚未建立,lite 游标会返回 -3 + * (WCDB_STATUS_NO_MESSAGE_DB) 而整页丢消息(见 upstream issue #926 / #1029)。 + * connect() 在已连接时会短路秒返;warmup 只在成功前重试,成功后本进程不再重复。 + */ + private async ensureDbReady(): Promise<{ success: boolean; error?: string }> { + const connected = await chatService.connect() + if (!connected.success) { + return { success: false, error: connected.error || '数据库未连接' } + } + if (!this.dbWarmed) { + try { + const warm = await chatService.warmupMessageDbSnapshot() + if (warm.success) this.dbWarmed = true + } catch (e) { + console.warn('[HttpService] warmupMessageDbSnapshot failed:', e) + } + } + return { success: true } + } + /** * 批量获取消息(循环游标直到满足 limit) - * 绕过 chatService 的单 batch 限制,直接操作 wcdbService 游标 + * 绕过 chatService 的单 batch 限制,直接操作 wcdbService 游标。 + * + * 健壮性(修复"前几次请求丢消息"): + * - 先经 ensureDbReady() 确保账号库已连接并预热,消除冷启动/空闲后缓存为空导致的 -3。 + * - lite 游标打开失败时回退到 GUI 同款 full openMessageCursor(自带 -3 forceReopen 自愈)。 + * - lite 打开成功但 offset=0 首页为空且无更多时,疑似 lite 冷缓存误判(heapSize=0), + * 再用 full 游标复核一次;full 也为空才认定确实没有数据。既保留 lite 性能又不丢数据。 */ private async fetchMessagesBatch( talker: string, @@ -608,25 +641,24 @@ class HttpService { ascending: boolean, useLiteMapping: boolean = true ): Promise<{ success: boolean; messages?: Message[]; hasMore?: boolean; error?: string }> { + const ready = await this.ensureDbReady() + if (!ready.success) { + return { success: false, error: ready.error || '数据库未连接' } + } + try { // 深分页时放大 batch,避免 offset 很大时出现大量小批次循环。 const batchSize = Math.min(2000, Math.max(500, limit)) const beginTimestamp = startTime > 10000000000 ? Math.floor(startTime / 1000) : startTime const endTimestamp = endTime > 10000000000 ? Math.floor(endTime / 1000) : endTime - const cursorResult = await wcdbService.openMessageCursorLite(talker, batchSize, ascending, beginTimestamp, endTimestamp) - if (!cursorResult.success || !cursorResult.cursor) { - return { success: false, error: cursorResult.error || '打开消息游标失败' } - } - - const cursor = cursorResult.cursor - try { + // 在单个游标上循环累积:处理 offset 跳过 + limit 累积 + const collectFromCursor = async (cursor: number): Promise<{ rows: Record[]; hasMore: boolean }> => { const collectedRows: Record[] = [] let hasMore = true let skipped = 0 let reachedLimit = false - // 循环获取消息,处理 offset 跳过 + limit 累积 while (collectedRows.length < limit && hasMore) { const batch = await wcdbService.fetchMessageBatch(cursor) if (!batch.success || !batch.rows || batch.rows.length === 0) { @@ -658,15 +690,50 @@ class HttpService { collectedRows.push(...rows) } - const finalHasMore = hasMore || reachedLimit - const messages = useLiteMapping - ? chatService.mapRowsToMessagesLiteForApi(collectedRows) - : chatService.mapRowsToMessagesForApi(collectedRows) - await this.backfillMissingSenderUsernames(talker, messages) - return { success: true, messages, hasMore: finalHasMore } - } finally { - await wcdbService.closeMessageCursor(cursor) + return { rows: collectedRows, hasMore: hasMore || reachedLimit } } + + // 打开游标 -> 累积 -> 关闭;打开失败返回 null(错误经 lastCursorError 透出) + let lastCursorError = '打开消息游标失败' + const runWithCursor = async ( + open: () => Promise<{ success: boolean; cursor?: number; error?: string }> + ): Promise<{ rows: Record[]; hasMore: boolean } | null> => { + const cursorResult = await open() + if (!cursorResult.success || !cursorResult.cursor) { + lastCursorError = cursorResult.error || lastCursorError + return null + } + const cursor = cursorResult.cursor + try { + return await collectFromCursor(cursor) + } finally { + await wcdbService.closeMessageCursor(cursor) + } + } + + const openLite = () => wcdbService.openMessageCursorLite(talker, batchSize, ascending, beginTimestamp, endTimestamp) + const openFull = () => wcdbService.openMessageCursor(talker, batchSize, ascending, beginTimestamp, endTimestamp) + + // 1) lite 游标快路径 + let collected = await runWithCursor(openLite) + + if (!collected) { + // 2) lite 打开失败 -> 回退 full 游标 + collected = await runWithCursor(openFull) + if (!collected) { + return { success: false, error: lastCursorError } + } + } else if (collected.rows.length === 0 && offset === 0 && !collected.hasMore) { + // 3) lite 首页空且无更多 -> 用 full 复核一次(防 lite 冷缓存误判) + const recheck = await runWithCursor(openFull) + if (recheck && recheck.rows.length > 0) { + collected = recheck + } + } + + const messages = await this.mapRowsToMessagesYielding(collected.rows, useLiteMapping, talker) + await this.backfillMissingSenderUsernames(talker, messages) + return { success: true, messages, hasMore: collected.hasMore } } catch (e) { console.error('[HttpService] fetchMessagesBatch error:', e) return { success: false, error: String(e) } @@ -682,6 +749,63 @@ class HttpService { return Math.min(Math.max(parsed, min), max) } + /** + * 让出主进程事件循环一拍。 + * 大批量消息的解码/映射(decodeMessageContent 的 hex/zlib 解压、toApiMessage 的 XML 解析) + * 是 CPU 密集且同步的——一次性处理会长时间独占主进程,阻塞所有依赖主进程 IPC 的 GUI 交互, + * 表现为「获取消息时本体卡住」。用 setImmediate 让出(drain 的是宏任务而非仅微任务), + * 使主进程能在分片之间处理 GUI 的 IPC 与 Worker 回调。 + */ + private yieldToEventLoop(): Promise { + return new Promise((resolve) => setImmediate(resolve)) + } + + /** + * 分片映射数据库行 -> Message[],按时间片(~24ms)让出事件循环,避免阻塞主进程。 + * 两个底层 mapper 都是逐行独立映射、无跨行状态,故分片输出与一次性调用完全一致。 + */ + private async mapRowsToMessagesYielding( + rows: Record[], + useLiteMapping: boolean, + sessionId: string + ): Promise { + const out: Message[] = [] + const STEP = 16 + let sliceStart = Date.now() + for (let i = 0; i < rows.length; i += STEP) { + const slice = rows.slice(i, i + STEP) + const mapped = useLiteMapping + ? chatService.mapRowsToMessagesLiteForApi(slice) + : chatService.mapRowsToMessagesForApi(slice, sessionId) + for (let k = 0; k < mapped.length; k++) out.push(mapped[k]) + if (i + STEP < rows.length && Date.now() - sliceStart >= 24) { + await this.yieldToEventLoop() + sliceStart = Date.now() + } + } + return out + } + + /** + * 分片把 Message[] 转为 API JSON 行,按时间片让出事件循环,避免阻塞主进程。 + */ + private async toApiMessagesYielding( + messages: Message[], + mediaMap: Map + ): Promise[]> { + const out: Record[] = [] + let sliceStart = Date.now() + for (let i = 0; i < messages.length; i++) { + const msg = messages[i] + out.push(this.toApiMessage(msg, mediaMap.get(msg.localId))) + if ((i & 15) === 15 && Date.now() - sliceStart >= 24) { + await this.yieldToEventLoop() + sliceStart = Date.now() + } + } + return out + } + private async backfillMissingSenderUsernames(talker: string, messages: Message[]): Promise { if (!talker.endsWith('@chatroom')) return @@ -721,7 +845,7 @@ class HttpService { try { const detail = await wcdbService.getMessageById(talker, localId) if (detail.success && detail.message) { - const hydrated = chatService.mapRowsToMessagesForApi([detail.message])[0] + const hydrated = chatService.mapRowsToMessagesForApi([detail.message], talker)[0] if (hydrated?.senderUsername) { msg.senderUsername = hydrated.senderUsername } @@ -835,6 +959,11 @@ class HttpService { let hasMore = false if (keyword) { + const ready = await this.ensureDbReady() + if (!ready.success) { + this.sendError(res, 500, ready.error || 'Failed to get messages') + return + } const searchLimit = Math.max(1, limit) + 1 const searchResult = await chatService.searchMessages( keyword, @@ -888,7 +1017,7 @@ class HttpService { return } - const apiMessages = messages.map((msg) => this.toApiMessage(msg, mediaMap.get(msg.localId))) + const apiMessages = await this.toApiMessagesYielding(messages, mediaMap) this.sendJson(res, { success: true, talker, diff --git a/electron/services/messagePushService.ts b/electron/services/messagePushService.ts index 8a8c888..029a72b 100644 --- a/electron/services/messagePushService.ts +++ b/electron/services/messagePushService.ts @@ -606,7 +606,7 @@ class MessagePushService { ].join(' ') const result = await wcdbService.execQuery('message', table.dbPath, sql) if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue - messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record[])) + messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record[], sessionId)) } return messages @@ -630,7 +630,7 @@ class MessagePushService { ].join(' ') const result = await wcdbService.execQuery('message', table.dbPath, sql) if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue - messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record[])) + messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record[], sessionId)) } return messages.sort((left, right) => this.compareMessagePosition(left, right)) @@ -666,7 +666,7 @@ class MessagePushService { ].filter(Boolean).join(' ') const result = await wcdbService.execQuery('message', table.dbPath, sql) if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue - const [message] = chatService.mapRowsToMessagesForApi(result.rows as Record[]) + const [message] = chatService.mapRowsToMessagesForApi(result.rows as Record[], sessionId) if (message && !this.isRevokeSystemMessage(message)) return message }