新增了一个消息推送

This commit is contained in:
xuncha
2026-03-17 23:29:21 +08:00
parent d0457a2782
commit e0b2f152b0
8 changed files with 621 additions and 15 deletions

View File

@@ -103,6 +103,8 @@ class HttpService {
private port: number = 5031
private running: boolean = false
private connections: Set<import('net').Socket> = new Set()
private messagePushClients: Set<http.ServerResponse> = new Set()
private messagePushHeartbeatTimer: ReturnType<typeof setInterval> | null = null
private connectionMutex: boolean = false
constructor() {
@@ -153,6 +155,7 @@ class HttpService {
this.server.listen(this.port, '127.0.0.1', () => {
this.running = true
this.startMessagePushHeartbeat()
console.log(`[HttpService] HTTP API server started on http://127.0.0.1:${this.port}`)
resolve({ success: true, port: this.port })
})
@@ -165,6 +168,16 @@ class HttpService {
async stop(): Promise<void> {
return new Promise((resolve) => {
if (this.server) {
for (const client of this.messagePushClients) {
try {
client.end()
} catch {}
}
this.messagePushClients.clear()
if (this.messagePushHeartbeatTimer) {
clearInterval(this.messagePushHeartbeatTimer)
this.messagePushHeartbeatTimer = null
}
// 使用互斥锁保护连接集合操作
this.connectionMutex = true
const socketsToClose = Array.from(this.connections)
@@ -211,6 +224,28 @@ class HttpService {
return this.getApiMediaExportPath()
}
getMessagePushStreamUrl(): string {
return `http://127.0.0.1:${this.port}/api/v1/push/messages`
}
broadcastMessagePush(payload: Record<string, unknown>): void {
if (!this.running || this.messagePushClients.size === 0) return
const eventBody = `event: message.new\ndata: ${JSON.stringify(payload)}\n\n`
for (const client of Array.from(this.messagePushClients)) {
try {
if (client.writableEnded || client.destroyed) {
this.messagePushClients.delete(client)
continue
}
client.write(eventBody)
} catch {
this.messagePushClients.delete(client)
try { client.end() } catch {}
}
}
}
/**
* 处理 HTTP 请求
*/
@@ -233,6 +268,8 @@ class HttpService {
// 路由处理
if (pathname === '/health' || pathname === '/api/v1/health') {
this.sendJson(res, { status: 'ok' })
} else if (pathname === '/api/v1/push/messages') {
this.handleMessagePushStream(req, res)
} else if (pathname === '/api/v1/messages') {
await this.handleMessages(url, res)
} else if (pathname === '/api/v1/sessions') {
@@ -252,6 +289,50 @@ class HttpService {
}
}
private startMessagePushHeartbeat(): void {
if (this.messagePushHeartbeatTimer) return
this.messagePushHeartbeatTimer = setInterval(() => {
for (const client of Array.from(this.messagePushClients)) {
try {
if (client.writableEnded || client.destroyed) {
this.messagePushClients.delete(client)
continue
}
client.write(': ping\n\n')
} catch {
this.messagePushClients.delete(client)
try { client.end() } catch {}
}
}
}, 25000)
}
private handleMessagePushStream(req: http.IncomingMessage, res: http.ServerResponse): void {
if (this.configService.get('messagePushEnabled') !== true) {
this.sendError(res, 403, 'Message push is disabled')
return
}
res.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no'
})
res.flushHeaders?.()
res.write(`event: ready\ndata: ${JSON.stringify({ success: true, stream: this.getMessagePushStreamUrl() })}\n\n`)
this.messagePushClients.add(res)
const cleanup = () => {
this.messagePushClients.delete(res)
}
req.on('close', cleanup)
res.on('close', cleanup)
res.on('error', cleanup)
}
private handleMediaRequest(pathname: string, res: http.ServerResponse): void {
const mediaBasePath = this.getApiMediaExportPath()
const relativePath = pathname.replace('/api/v1/media/', '')