mirror of
https://github.com/hicccc77/WeFlow.git
synced 2026-06-01 23:26:55 +00:00
优化api性能
This commit is contained in:
@@ -4914,7 +4914,7 @@ class ChatService {
|
||||
/**
|
||||
* HTTP API 复用消息解析逻辑,确保和应用内展示一致。
|
||||
*/
|
||||
mapRowsToMessagesForApi(rows: Record<string, any>[], sessionId: string): Message[] {
|
||||
mapRowsToMessagesForApi(rows: Record<string, any>[], sessionId: string = ''): Message[] {
|
||||
return this.mapRowsToMessages(rows, sessionId)
|
||||
}
|
||||
|
||||
|
||||
@@ -126,6 +126,7 @@ class HttpService {
|
||||
private port: number = 5031
|
||||
private host: string = '127.0.0.1'
|
||||
private running: boolean = false
|
||||
private dbWarmed: boolean = false
|
||||
private connections: Set<import('net').Socket> = new Set()
|
||||
private messagePushClients: Set<http.ServerResponse> = new Set()
|
||||
private messagePushReplayBuffer: MessagePushReplayEvent[] = []
|
||||
@@ -184,6 +185,9 @@ class HttpService {
|
||||
|
||||
this.server.listen(this.port, this.host, () => {
|
||||
this.running = true
|
||||
// 主动预热数据库连接与消息库索引:HTTP 服务可能经 http:start 独立启动
|
||||
// (未走 GUI 的 connect/warmup),避免首批请求因原生消息库缓存为空而整页丢消息。
|
||||
void this.ensureDbReady().catch((e) => console.warn('[HttpService] warmup on start failed:', e))
|
||||
this.startMessagePushHeartbeat()
|
||||
console.log(`[HttpService] HTTP API server started on http://${this.host}:${this.port}`)
|
||||
resolve({ success: true, port: this.port })
|
||||
@@ -595,9 +599,38 @@ class HttpService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 确保账号数据库已连接并完成一次消息库索引预热。
|
||||
* HTTP 读消息不像 GUI 那样自带 ensureConnected/warmup:当应用冷启动、或 HTTP 服务经
|
||||
* http:start 独立启动时,原生消息库缓存可能尚未建立,lite 游标会返回 -3
|
||||
* (WCDB_STATUS_NO_MESSAGE_DB) 而整页丢消息(见 upstream issue #926 / #1029)。
|
||||
* connect() 在已连接时会短路秒返;warmup 只在成功前重试,成功后本进程不再重复。
|
||||
*/
|
||||
private async ensureDbReady(): Promise<{ success: boolean; error?: string }> {
|
||||
const connected = await chatService.connect()
|
||||
if (!connected.success) {
|
||||
return { success: false, error: connected.error || '数据库未连接' }
|
||||
}
|
||||
if (!this.dbWarmed) {
|
||||
try {
|
||||
const warm = await chatService.warmupMessageDbSnapshot()
|
||||
if (warm.success) this.dbWarmed = true
|
||||
} catch (e) {
|
||||
console.warn('[HttpService] warmupMessageDbSnapshot failed:', e)
|
||||
}
|
||||
}
|
||||
return { success: true }
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量获取消息(循环游标直到满足 limit)
|
||||
* 绕过 chatService 的单 batch 限制,直接操作 wcdbService 游标
|
||||
* 绕过 chatService 的单 batch 限制,直接操作 wcdbService 游标。
|
||||
*
|
||||
* 健壮性(修复"前几次请求丢消息"):
|
||||
* - 先经 ensureDbReady() 确保账号库已连接并预热,消除冷启动/空闲后缓存为空导致的 -3。
|
||||
* - lite 游标打开失败时回退到 GUI 同款 full openMessageCursor(自带 -3 forceReopen 自愈)。
|
||||
* - lite 打开成功但 offset=0 首页为空且无更多时,疑似 lite 冷缓存误判(heapSize=0),
|
||||
* 再用 full 游标复核一次;full 也为空才认定确实没有数据。既保留 lite 性能又不丢数据。
|
||||
*/
|
||||
private async fetchMessagesBatch(
|
||||
talker: string,
|
||||
@@ -608,25 +641,24 @@ class HttpService {
|
||||
ascending: boolean,
|
||||
useLiteMapping: boolean = true
|
||||
): Promise<{ success: boolean; messages?: Message[]; hasMore?: boolean; error?: string }> {
|
||||
const ready = await this.ensureDbReady()
|
||||
if (!ready.success) {
|
||||
return { success: false, error: ready.error || '数据库未连接' }
|
||||
}
|
||||
|
||||
try {
|
||||
// 深分页时放大 batch,避免 offset 很大时出现大量小批次循环。
|
||||
const batchSize = Math.min(2000, Math.max(500, limit))
|
||||
const beginTimestamp = startTime > 10000000000 ? Math.floor(startTime / 1000) : startTime
|
||||
const endTimestamp = endTime > 10000000000 ? Math.floor(endTime / 1000) : endTime
|
||||
|
||||
const cursorResult = await wcdbService.openMessageCursorLite(talker, batchSize, ascending, beginTimestamp, endTimestamp)
|
||||
if (!cursorResult.success || !cursorResult.cursor) {
|
||||
return { success: false, error: cursorResult.error || '打开消息游标失败' }
|
||||
}
|
||||
|
||||
const cursor = cursorResult.cursor
|
||||
try {
|
||||
// 在单个游标上循环累积:处理 offset 跳过 + limit 累积
|
||||
const collectFromCursor = async (cursor: number): Promise<{ rows: Record<string, any>[]; hasMore: boolean }> => {
|
||||
const collectedRows: Record<string, any>[] = []
|
||||
let hasMore = true
|
||||
let skipped = 0
|
||||
let reachedLimit = false
|
||||
|
||||
// 循环获取消息,处理 offset 跳过 + limit 累积
|
||||
while (collectedRows.length < limit && hasMore) {
|
||||
const batch = await wcdbService.fetchMessageBatch(cursor)
|
||||
if (!batch.success || !batch.rows || batch.rows.length === 0) {
|
||||
@@ -658,15 +690,50 @@ class HttpService {
|
||||
collectedRows.push(...rows)
|
||||
}
|
||||
|
||||
const finalHasMore = hasMore || reachedLimit
|
||||
const messages = useLiteMapping
|
||||
? chatService.mapRowsToMessagesLiteForApi(collectedRows)
|
||||
: chatService.mapRowsToMessagesForApi(collectedRows)
|
||||
await this.backfillMissingSenderUsernames(talker, messages)
|
||||
return { success: true, messages, hasMore: finalHasMore }
|
||||
} finally {
|
||||
await wcdbService.closeMessageCursor(cursor)
|
||||
return { rows: collectedRows, hasMore: hasMore || reachedLimit }
|
||||
}
|
||||
|
||||
// 打开游标 -> 累积 -> 关闭;打开失败返回 null(错误经 lastCursorError 透出)
|
||||
let lastCursorError = '打开消息游标失败'
|
||||
const runWithCursor = async (
|
||||
open: () => Promise<{ success: boolean; cursor?: number; error?: string }>
|
||||
): Promise<{ rows: Record<string, any>[]; hasMore: boolean } | null> => {
|
||||
const cursorResult = await open()
|
||||
if (!cursorResult.success || !cursorResult.cursor) {
|
||||
lastCursorError = cursorResult.error || lastCursorError
|
||||
return null
|
||||
}
|
||||
const cursor = cursorResult.cursor
|
||||
try {
|
||||
return await collectFromCursor(cursor)
|
||||
} finally {
|
||||
await wcdbService.closeMessageCursor(cursor)
|
||||
}
|
||||
}
|
||||
|
||||
const openLite = () => wcdbService.openMessageCursorLite(talker, batchSize, ascending, beginTimestamp, endTimestamp)
|
||||
const openFull = () => wcdbService.openMessageCursor(talker, batchSize, ascending, beginTimestamp, endTimestamp)
|
||||
|
||||
// 1) lite 游标快路径
|
||||
let collected = await runWithCursor(openLite)
|
||||
|
||||
if (!collected) {
|
||||
// 2) lite 打开失败 -> 回退 full 游标
|
||||
collected = await runWithCursor(openFull)
|
||||
if (!collected) {
|
||||
return { success: false, error: lastCursorError }
|
||||
}
|
||||
} else if (collected.rows.length === 0 && offset === 0 && !collected.hasMore) {
|
||||
// 3) lite 首页空且无更多 -> 用 full 复核一次(防 lite 冷缓存误判)
|
||||
const recheck = await runWithCursor(openFull)
|
||||
if (recheck && recheck.rows.length > 0) {
|
||||
collected = recheck
|
||||
}
|
||||
}
|
||||
|
||||
const messages = await this.mapRowsToMessagesYielding(collected.rows, useLiteMapping, talker)
|
||||
await this.backfillMissingSenderUsernames(talker, messages)
|
||||
return { success: true, messages, hasMore: collected.hasMore }
|
||||
} catch (e) {
|
||||
console.error('[HttpService] fetchMessagesBatch error:', e)
|
||||
return { success: false, error: String(e) }
|
||||
@@ -682,6 +749,63 @@ class HttpService {
|
||||
return Math.min(Math.max(parsed, min), max)
|
||||
}
|
||||
|
||||
/**
|
||||
* 让出主进程事件循环一拍。
|
||||
* 大批量消息的解码/映射(decodeMessageContent 的 hex/zlib 解压、toApiMessage 的 XML 解析)
|
||||
* 是 CPU 密集且同步的——一次性处理会长时间独占主进程,阻塞所有依赖主进程 IPC 的 GUI 交互,
|
||||
* 表现为「获取消息时本体卡住」。用 setImmediate 让出(drain 的是宏任务而非仅微任务),
|
||||
* 使主进程能在分片之间处理 GUI 的 IPC 与 Worker 回调。
|
||||
*/
|
||||
private yieldToEventLoop(): Promise<void> {
|
||||
return new Promise((resolve) => setImmediate(resolve))
|
||||
}
|
||||
|
||||
/**
|
||||
* 分片映射数据库行 -> Message[],按时间片(~24ms)让出事件循环,避免阻塞主进程。
|
||||
* 两个底层 mapper 都是逐行独立映射、无跨行状态,故分片输出与一次性调用完全一致。
|
||||
*/
|
||||
private async mapRowsToMessagesYielding(
|
||||
rows: Record<string, any>[],
|
||||
useLiteMapping: boolean,
|
||||
sessionId: string
|
||||
): Promise<Message[]> {
|
||||
const out: Message[] = []
|
||||
const STEP = 16
|
||||
let sliceStart = Date.now()
|
||||
for (let i = 0; i < rows.length; i += STEP) {
|
||||
const slice = rows.slice(i, i + STEP)
|
||||
const mapped = useLiteMapping
|
||||
? chatService.mapRowsToMessagesLiteForApi(slice)
|
||||
: chatService.mapRowsToMessagesForApi(slice, sessionId)
|
||||
for (let k = 0; k < mapped.length; k++) out.push(mapped[k])
|
||||
if (i + STEP < rows.length && Date.now() - sliceStart >= 24) {
|
||||
await this.yieldToEventLoop()
|
||||
sliceStart = Date.now()
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
/**
|
||||
* 分片把 Message[] 转为 API JSON 行,按时间片让出事件循环,避免阻塞主进程。
|
||||
*/
|
||||
private async toApiMessagesYielding(
|
||||
messages: Message[],
|
||||
mediaMap: Map<number, ApiExportedMedia>
|
||||
): Promise<Record<string, any>[]> {
|
||||
const out: Record<string, any>[] = []
|
||||
let sliceStart = Date.now()
|
||||
for (let i = 0; i < messages.length; i++) {
|
||||
const msg = messages[i]
|
||||
out.push(this.toApiMessage(msg, mediaMap.get(msg.localId)))
|
||||
if ((i & 15) === 15 && Date.now() - sliceStart >= 24) {
|
||||
await this.yieldToEventLoop()
|
||||
sliceStart = Date.now()
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
private async backfillMissingSenderUsernames(talker: string, messages: Message[]): Promise<void> {
|
||||
if (!talker.endsWith('@chatroom')) return
|
||||
|
||||
@@ -721,7 +845,7 @@ class HttpService {
|
||||
try {
|
||||
const detail = await wcdbService.getMessageById(talker, localId)
|
||||
if (detail.success && detail.message) {
|
||||
const hydrated = chatService.mapRowsToMessagesForApi([detail.message])[0]
|
||||
const hydrated = chatService.mapRowsToMessagesForApi([detail.message], talker)[0]
|
||||
if (hydrated?.senderUsername) {
|
||||
msg.senderUsername = hydrated.senderUsername
|
||||
}
|
||||
@@ -835,6 +959,11 @@ class HttpService {
|
||||
let hasMore = false
|
||||
|
||||
if (keyword) {
|
||||
const ready = await this.ensureDbReady()
|
||||
if (!ready.success) {
|
||||
this.sendError(res, 500, ready.error || 'Failed to get messages')
|
||||
return
|
||||
}
|
||||
const searchLimit = Math.max(1, limit) + 1
|
||||
const searchResult = await chatService.searchMessages(
|
||||
keyword,
|
||||
@@ -888,7 +1017,7 @@ class HttpService {
|
||||
return
|
||||
}
|
||||
|
||||
const apiMessages = messages.map((msg) => this.toApiMessage(msg, mediaMap.get(msg.localId)))
|
||||
const apiMessages = await this.toApiMessagesYielding(messages, mediaMap)
|
||||
this.sendJson(res, {
|
||||
success: true,
|
||||
talker,
|
||||
|
||||
@@ -606,7 +606,7 @@ class MessagePushService {
|
||||
].join(' ')
|
||||
const result = await wcdbService.execQuery('message', table.dbPath, sql)
|
||||
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue
|
||||
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[]))
|
||||
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[], sessionId))
|
||||
}
|
||||
|
||||
return messages
|
||||
@@ -630,7 +630,7 @@ class MessagePushService {
|
||||
].join(' ')
|
||||
const result = await wcdbService.execQuery('message', table.dbPath, sql)
|
||||
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue
|
||||
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[]))
|
||||
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[], sessionId))
|
||||
}
|
||||
|
||||
return messages.sort((left, right) => this.compareMessagePosition(left, right))
|
||||
@@ -666,7 +666,7 @@ class MessagePushService {
|
||||
].filter(Boolean).join(' ')
|
||||
const result = await wcdbService.execQuery('message', table.dbPath, sql)
|
||||
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue
|
||||
const [message] = chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[])
|
||||
const [message] = chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[], sessionId)
|
||||
if (message && !this.isRevokeSystemMessage(message)) return message
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user