From d1741c931f60276160a5ca0efd146cc226771c3f Mon Sep 17 00:00:00 2001 From: xuncha <1658671838@qq.com> Date: Fri, 24 Apr 2026 22:42:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dsse=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E4=B8=A2=E6=B6=88=E6=81=AF=20=E7=BB=99=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BA=86id=20[Bug]:SSE=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=8E=A8=E9=80=81=20=E4=B8=A2=E6=B6=88=E6=81=AF=20Fixes=20#832?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- electron/services/httpService.ts | 69 +++++++++- electron/services/messagePushService.ts | 173 ++++++++++++++++++++++-- 2 files changed, 227 insertions(+), 15 deletions(-) diff --git a/electron/services/httpService.ts b/electron/services/httpService.ts index 492024e..24e8825 100644 --- a/electron/services/httpService.ts +++ b/electron/services/httpService.ts @@ -76,6 +76,12 @@ interface ApiExportedMedia { relativePath: string } +interface MessagePushReplayEvent { + id: number + body: string + createdAt: number +} + // ChatLab 消息类型映射 const ChatLabType = { TEXT: 0, @@ -107,8 +113,12 @@ class HttpService { private running: boolean = false private connections: Set = new Set() private messagePushClients: Set = new Set() + private messagePushReplayBuffer: MessagePushReplayEvent[] = [] private messagePushHeartbeatTimer: ReturnType | null = null private connectionMutex: boolean = false + private messagePushEventId = 0 + private readonly messagePushReplayLimit = 1000 + private readonly messagePushReplayTtlMs = 10 * 60 * 1000 constructor() { this.configService = ConfigService.getInstance() @@ -178,6 +188,7 @@ class HttpService { } catch {} } this.messagePushClients.clear() + this.messagePushReplayBuffer = [] if (this.messagePushHeartbeatTimer) { clearInterval(this.messagePushHeartbeatTimer) this.messagePushHeartbeatTimer = null @@ -232,9 +243,56 @@ class HttpService { return `http://${this.host}:${this.port}/api/v1/push/messages` } + private nextMessagePushEventId(): number { + this.messagePushEventId += 1 + if (!Number.isSafeInteger(this.messagePushEventId) || this.messagePushEventId <= 0) { + this.messagePushEventId = 1 + } + return this.messagePushEventId + } + + private rememberMessagePushEvent(id: number, body: string): void { + this.pruneMessagePushReplayBuffer() + this.messagePushReplayBuffer.push({ id, body, createdAt: Date.now() }) + if (this.messagePushReplayBuffer.length > this.messagePushReplayLimit) { + this.messagePushReplayBuffer.splice(0, this.messagePushReplayBuffer.length - this.messagePushReplayLimit) + } + } + + private pruneMessagePushReplayBuffer(): void { + const cutoff = Date.now() - this.messagePushReplayTtlMs + while (this.messagePushReplayBuffer.length > 0 && this.messagePushReplayBuffer[0].createdAt < cutoff) { + this.messagePushReplayBuffer.shift() + } + } + + private parseMessagePushLastEventId(req: http.IncomingMessage, url?: URL): number { + const queryValue = url?.searchParams.get('lastEventId') || url?.searchParams.get('last_event_id') || '' + const headerValue = Array.isArray(req.headers['last-event-id']) + ? req.headers['last-event-id'][0] + : req.headers['last-event-id'] + const parsed = Number.parseInt(String(queryValue || headerValue || '0').trim(), 10) + return Number.isFinite(parsed) && parsed > 0 ? parsed : 0 + } + + private replayMessagePushEvents(res: http.ServerResponse, lastEventId: number): void { + this.pruneMessagePushReplayBuffer() + const events = lastEventId > 0 + ? this.messagePushReplayBuffer.filter((event) => event.id > lastEventId) + : this.messagePushReplayBuffer + + for (const event of events) { + if (res.writableEnded || res.destroyed) return + res.write(event.body) + } + } + broadcastMessagePush(payload: Record): void { - if (!this.running || this.messagePushClients.size === 0) return - const eventBody = `event: message.new\ndata: ${JSON.stringify(payload)}\n\n` + if (!this.running) return + const eventId = this.nextMessagePushEventId() + const eventBody = `id: ${eventId}\nevent: message.new\ndata: ${JSON.stringify(payload)}\n\n` + this.rememberMessagePushEvent(eventId, eventBody) + if (this.messagePushClients.size === 0) return for (const client of Array.from(this.messagePushClients)) { try { @@ -365,7 +423,7 @@ class HttpService { if (pathname === '/health' || pathname === '/api/v1/health') { this.sendJson(res, { status: 'ok' }) } else if (pathname === '/api/v1/push/messages') { - this.handleMessagePushStream(req, res) + this.handleMessagePushStream(req, res, url) } else if (pathname === '/api/v1/messages') { await this.handleMessages(url, res) } else if (pathname === '/api/v1/sessions') { @@ -440,7 +498,7 @@ class HttpService { }, 25000) } - private handleMessagePushStream(req: http.IncomingMessage, res: http.ServerResponse): void { + private handleMessagePushStream(req: http.IncomingMessage, res: http.ServerResponse, url: URL): void { if (this.configService.get('messagePushEnabled') !== true) { this.sendError(res, 403, 'Message push is disabled') return @@ -453,9 +511,10 @@ class HttpService { 'X-Accel-Buffering': 'no' }) res.flushHeaders?.() - res.write(`event: ready\ndata: ${JSON.stringify({ success: true, stream: this.getMessagePushStreamUrl() })}\n\n`) this.messagePushClients.add(res) + res.write(`event: ready\ndata: ${JSON.stringify({ success: true, stream: this.getMessagePushStreamUrl() })}\n\n`) + this.replayMessagePushEvents(res, this.parseMessagePushLastEventId(req, url)) const cleanup = () => { this.messagePushClients.delete(res) diff --git a/electron/services/messagePushService.ts b/electron/services/messagePushService.ts index 6617526..8b4fd71 100644 --- a/electron/services/messagePushService.ts +++ b/electron/services/messagePushService.ts @@ -12,6 +12,15 @@ interface SessionBaseline { unreadCount: number } +interface PushSessionResult { + fetched: boolean + maxFetchedTimestamp: number + incomingCandidateCount: number + observedIncomingCount: number + expectedIncomingCount: number + retry: boolean +} + interface MessagePushPayload { event: 'message.new' sessionId: string @@ -37,10 +46,13 @@ class MessagePushService { private readonly configService: ConfigService private readonly sessionBaseline = new Map() private readonly recentMessageKeys = new Map() + private readonly seenMessageKeys = new Map() + private readonly seenPrimedSessions = new Set() private readonly groupNicknameCache = new Map; updatedAt: number }>() private readonly pushAvatarCacheDir: string private readonly pushAvatarDataCache = new Map() private readonly debounceMs = 350 + private readonly lookbackSeconds = 2 private readonly recentMessageTtlMs = 10 * 60 * 1000 private readonly groupNicknameCacheTtlMs = 5 * 60 * 1000 private debounceTimer: ReturnType | null = null @@ -111,6 +123,8 @@ class MessagePushService { private resetRuntimeState(): void { this.sessionBaseline.clear() this.recentMessageKeys.clear() + this.seenMessageKeys.clear() + this.seenPrimedSessions.clear() this.groupNicknameCache.clear() this.baselineReady = false this.messageTableScanRequested = false @@ -190,7 +204,6 @@ class MessagePushService { } const previousBaseline = new Map(this.sessionBaseline) - this.setBaseline(sessions) const candidates = sessions.filter((session) => { const previous = previousBaseline.get(session.username) @@ -199,11 +212,24 @@ class MessagePushService { } return scanMessageBackedSessions && this.shouldScanMessageBackedSession(previous, session) }) + const candidateIds = new Set() for (const session of candidates) { - await this.pushSessionMessages( + const sessionId = String(session.username || '').trim() + if (sessionId) candidateIds.add(sessionId) + const result = await this.pushSessionMessages( session, previousBaseline.get(session.username) || this.sessionBaseline.get(session.username) ) + this.updateInspectedBaseline(session, previousBaseline.get(session.username), result) + if (result.retry) { + this.rerunRequested = true + } + } + + for (const session of sessions) { + const sessionId = String(session.username || '').trim() + if (!sessionId || candidateIds.has(sessionId)) continue + this.updateObservedBaseline(session, previousBaseline.get(sessionId)) } } finally { this.processing = false @@ -235,6 +261,40 @@ class MessagePushService { } } + private updateObservedBaseline(session: ChatSession, previous: SessionBaseline | undefined): void { + const username = String(session.username || '').trim() + if (!username) return + + const sessionTimestamp = Number(session.lastTimestamp || 0) + const previousTimestamp = Number(previous?.lastTimestamp || 0) + this.sessionBaseline.set(username, { + lastTimestamp: Math.max(sessionTimestamp, previousTimestamp), + unreadCount: Number(session.unreadCount ?? previous?.unreadCount ?? 0) + }) + } + + private updateInspectedBaseline( + session: ChatSession, + previous: SessionBaseline | undefined, + result: PushSessionResult + ): void { + const username = String(session.username || '').trim() + if (!username) return + + const previousTimestamp = Number(previous?.lastTimestamp || 0) + const current = this.sessionBaseline.get(username) || previous || { lastTimestamp: 0, unreadCount: 0 } + const nextTimestamp = result.retry + ? previousTimestamp + : Math.max(previousTimestamp, current.lastTimestamp, result.maxFetchedTimestamp) + + this.sessionBaseline.set(username, { + lastTimestamp: nextTimestamp, + unreadCount: result.retry + ? Number(previous?.unreadCount || 0) + : Number(session.unreadCount || 0) + }) + } + private shouldInspectSession(previous: SessionBaseline | undefined, session: ChatSession): boolean { const sessionId = String(session.username || '').trim() if (!sessionId || sessionId.toLowerCase().includes('placeholder_foldgroup')) { @@ -275,26 +335,84 @@ class MessagePushService { return Boolean(previous) || Number(session.lastTimestamp || 0) > 0 } - private async pushSessionMessages(session: ChatSession, previous: SessionBaseline | undefined): Promise { - const since = Math.max(0, Number(previous?.lastTimestamp || 0)) + private async pushSessionMessages(session: ChatSession, previous: SessionBaseline | undefined): Promise { + const previousTimestamp = Math.max(0, Number(previous?.lastTimestamp || 0)) + const previousUnreadCount = Math.max(0, Number(previous?.unreadCount || 0)) + const currentUnreadCount = Math.max(0, Number(session.unreadCount || 0)) + const expectedIncomingCount = previous + ? Math.max(0, currentUnreadCount - previousUnreadCount) + : 0 + const since = previous + ? Math.max(0, previousTimestamp - this.lookbackSeconds) + : 0 const newMessagesResult = await chatService.getNewMessages(session.username, since, 1000) if (!newMessagesResult.success || !newMessagesResult.messages || newMessagesResult.messages.length === 0) { - return + return { + fetched: false, + maxFetchedTimestamp: previousTimestamp, + incomingCandidateCount: 0, + observedIncomingCount: 0, + expectedIncomingCount, + retry: expectedIncomingCount > 0 + } } + const sessionId = String(session.username || '').trim() + const maxFetchedTimestamp = newMessagesResult.messages.reduce((max, message) => { + const createTime = Number(message.createTime || 0) + return Number.isFinite(createTime) && createTime > max ? createTime : max + }, previousTimestamp) + const seenPrimed = sessionId ? this.seenPrimedSessions.has(sessionId) : false + const sameTimestampIncoming: Message[] = [] + const candidateMessages: Message[] = [] + let observedIncomingCount = 0 + for (const message of newMessagesResult.messages) { const messageKey = String(message.messageKey || '').trim() if (!messageKey) continue + const createTime = Number(message.createTime || 0) + const seen = this.isSeenMessage(messageKey) + const recent = this.isRecentMessage(messageKey) + + if (message.isSend !== 1) { + if (!previous || createTime > previousTimestamp || (seenPrimed && createTime === previousTimestamp)) { + observedIncomingCount += 1 + } + } + + if (previous && !seenPrimed && createTime < previousTimestamp) { + this.rememberSeenMessageKey(messageKey) + continue + } + + if (seen || recent) { + continue + } if (message.isSend === 1) continue - - if (previous && Number(message.createTime || 0) <= Number(previous.lastTimestamp || 0)) { + if (previous && !seenPrimed && createTime === previousTimestamp) { + sameTimestampIncoming.push(message) continue } - if (this.isRecentMessage(messageKey)) { - continue - } + candidateMessages.push(message) + } + const futureIncomingCount = candidateMessages.filter((message) => { + const createTime = Number(message.createTime || 0) + return !previous || createTime > previousTimestamp || seenPrimed + }).length + const sameTimestampAllowance = previous && !seenPrimed + ? Math.max(0, expectedIncomingCount - futureIncomingCount) + : 0 + const selectedSameTimestamp = sameTimestampAllowance > 0 + ? sameTimestampIncoming.slice(-sameTimestampAllowance) + : [] + const messagesToPush = [...selectedSameTimestamp, ...candidateMessages] + const incomingCandidateCount = messagesToPush.length + + for (const message of messagesToPush) { + const messageKey = String(message.messageKey || '').trim() + if (!messageKey) continue const payload = await this.buildPayload(session, message) if (!payload) continue if (!this.shouldPushPayload(payload)) continue @@ -303,6 +421,21 @@ class MessagePushService { this.rememberMessageKey(messageKey) this.bumpSessionBaseline(session.username, message) } + + for (const message of newMessagesResult.messages) { + const messageKey = String(message.messageKey || '').trim() + if (messageKey) this.rememberSeenMessageKey(messageKey) + } + if (sessionId) this.seenPrimedSessions.add(sessionId) + + return { + fetched: true, + maxFetchedTimestamp, + incomingCandidateCount, + observedIncomingCount, + expectedIncomingCount, + retry: expectedIncomingCount > 0 && observedIncomingCount < expectedIncomingCount + } } private async buildPayload(session: ChatSession, message: Message): Promise { @@ -558,6 +691,17 @@ class MessagePushService { this.pruneRecentMessageKeys() } + private isSeenMessage(messageKey: string): boolean { + this.pruneSeenMessageKeys() + const timestamp = this.seenMessageKeys.get(messageKey) + return typeof timestamp === 'number' && Date.now() - timestamp < this.recentMessageTtlMs + } + + private rememberSeenMessageKey(messageKey: string): void { + this.seenMessageKeys.set(messageKey, Date.now()) + this.pruneSeenMessageKeys() + } + private pruneRecentMessageKeys(): void { const now = Date.now() for (const [key, timestamp] of this.recentMessageKeys.entries()) { @@ -567,6 +711,15 @@ class MessagePushService { } } + private pruneSeenMessageKeys(): void { + const now = Date.now() + for (const [key, timestamp] of this.seenMessageKeys.entries()) { + if (now - timestamp > this.recentMessageTtlMs) { + this.seenMessageKeys.delete(key) + } + } + } + } export const messagePushService = new MessagePushService()