修复sse推送丢消息 给推送新增了id [Bug]:SSE消息推送 丢消息

Fixes #832
This commit is contained in:
xuncha
2026-04-24 22:42:19 +08:00
parent b75de26178
commit d1741c931f
2 changed files with 227 additions and 15 deletions

View File

@@ -76,6 +76,12 @@ interface ApiExportedMedia {
relativePath: string
}
interface MessagePushReplayEvent {
id: number
body: string
createdAt: number
}
// ChatLab 消息类型映射
const ChatLabType = {
TEXT: 0,
@@ -107,8 +113,12 @@ class HttpService {
private running: boolean = false
private connections: Set<import('net').Socket> = new Set()
private messagePushClients: Set<http.ServerResponse> = new Set()
private messagePushReplayBuffer: MessagePushReplayEvent[] = []
private messagePushHeartbeatTimer: ReturnType<typeof setInterval> | null = null
private connectionMutex: boolean = false
private messagePushEventId = 0
private readonly messagePushReplayLimit = 1000
private readonly messagePushReplayTtlMs = 10 * 60 * 1000
constructor() {
this.configService = ConfigService.getInstance()
@@ -178,6 +188,7 @@ class HttpService {
} catch {}
}
this.messagePushClients.clear()
this.messagePushReplayBuffer = []
if (this.messagePushHeartbeatTimer) {
clearInterval(this.messagePushHeartbeatTimer)
this.messagePushHeartbeatTimer = null
@@ -232,9 +243,56 @@ class HttpService {
return `http://${this.host}:${this.port}/api/v1/push/messages`
}
private nextMessagePushEventId(): number {
this.messagePushEventId += 1
if (!Number.isSafeInteger(this.messagePushEventId) || this.messagePushEventId <= 0) {
this.messagePushEventId = 1
}
return this.messagePushEventId
}
private rememberMessagePushEvent(id: number, body: string): void {
this.pruneMessagePushReplayBuffer()
this.messagePushReplayBuffer.push({ id, body, createdAt: Date.now() })
if (this.messagePushReplayBuffer.length > this.messagePushReplayLimit) {
this.messagePushReplayBuffer.splice(0, this.messagePushReplayBuffer.length - this.messagePushReplayLimit)
}
}
private pruneMessagePushReplayBuffer(): void {
const cutoff = Date.now() - this.messagePushReplayTtlMs
while (this.messagePushReplayBuffer.length > 0 && this.messagePushReplayBuffer[0].createdAt < cutoff) {
this.messagePushReplayBuffer.shift()
}
}
private parseMessagePushLastEventId(req: http.IncomingMessage, url?: URL): number {
const queryValue = url?.searchParams.get('lastEventId') || url?.searchParams.get('last_event_id') || ''
const headerValue = Array.isArray(req.headers['last-event-id'])
? req.headers['last-event-id'][0]
: req.headers['last-event-id']
const parsed = Number.parseInt(String(queryValue || headerValue || '0').trim(), 10)
return Number.isFinite(parsed) && parsed > 0 ? parsed : 0
}
private replayMessagePushEvents(res: http.ServerResponse, lastEventId: number): void {
this.pruneMessagePushReplayBuffer()
const events = lastEventId > 0
? this.messagePushReplayBuffer.filter((event) => event.id > lastEventId)
: this.messagePushReplayBuffer
for (const event of events) {
if (res.writableEnded || res.destroyed) return
res.write(event.body)
}
}
broadcastMessagePush(payload: Record<string, unknown>): void {
if (!this.running || this.messagePushClients.size === 0) return
const eventBody = `event: message.new\ndata: ${JSON.stringify(payload)}\n\n`
if (!this.running) return
const eventId = this.nextMessagePushEventId()
const eventBody = `id: ${eventId}\nevent: message.new\ndata: ${JSON.stringify(payload)}\n\n`
this.rememberMessagePushEvent(eventId, eventBody)
if (this.messagePushClients.size === 0) return
for (const client of Array.from(this.messagePushClients)) {
try {
@@ -365,7 +423,7 @@ class HttpService {
if (pathname === '/health' || pathname === '/api/v1/health') {
this.sendJson(res, { status: 'ok' })
} else if (pathname === '/api/v1/push/messages') {
this.handleMessagePushStream(req, res)
this.handleMessagePushStream(req, res, url)
} else if (pathname === '/api/v1/messages') {
await this.handleMessages(url, res)
} else if (pathname === '/api/v1/sessions') {
@@ -440,7 +498,7 @@ class HttpService {
}, 25000)
}
private handleMessagePushStream(req: http.IncomingMessage, res: http.ServerResponse): void {
private handleMessagePushStream(req: http.IncomingMessage, res: http.ServerResponse, url: URL): void {
if (this.configService.get('messagePushEnabled') !== true) {
this.sendError(res, 403, 'Message push is disabled')
return
@@ -453,9 +511,10 @@ class HttpService {
'X-Accel-Buffering': 'no'
})
res.flushHeaders?.()
res.write(`event: ready\ndata: ${JSON.stringify({ success: true, stream: this.getMessagePushStreamUrl() })}\n\n`)
this.messagePushClients.add(res)
res.write(`event: ready\ndata: ${JSON.stringify({ success: true, stream: this.getMessagePushStreamUrl() })}\n\n`)
this.replayMessagePushEvents(res, this.parseMessagePushLastEventId(req, url))
const cleanup = () => {
this.messagePushClients.delete(res)