From 032aad6539039751242d4a2991156fc2da8376a3 Mon Sep 17 00:00:00 2001 From: xuncha <1658671838@qq.com> Date: Sat, 25 Apr 2026 00:57:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=92=A4=E5=9B=9E=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/HTTP-API.md | 15 +- electron/services/httpService.ts | 8 +- electron/services/messagePushService.ts | 786 +++++++++++++++++++++++- src/pages/SettingsPage.tsx | 6 +- 4 files changed, 778 insertions(+), 37 deletions(-) diff --git a/docs/HTTP-API.md b/docs/HTTP-API.md index 0bec407..c11c66a 100644 --- a/docs/HTTP-API.md +++ b/docs/HTTP-API.md @@ -74,14 +74,14 @@ GET /api/v1/push/messages - 需要先在设置页开启 `HTTP API 服务` - 同时需要开启 `主动推送` - 响应类型为 `text/event-stream` -- 新消息事件名固定为 `message.new` -- 建议接收端按 `messageKey` 去重 +- 事件名包含 `message.new` 和 `message.revoke` +- 建议接收端按 `event + rawid` 去重 ### 事件字段 - `event` - `sessionId` -- `messageKey` +- `rawid` - `avatarUrl` - `sourceName` - `groupName`(仅群聊) @@ -98,7 +98,14 @@ curl -N "http://127.0.0.1:5031/api/v1/push/messages?access_token=YOUR_TOKEN ```text event: message.new -data: {"event":"message.new","sessionId":"xxx@chatroom","messageKey":"server:123456:1760000123:1760000123000:321:wxid_member:1","avatarUrl":"https://example.com/group.jpg","sourceName":"李四","groupName":"项目群","content":"[图片]","timestamp":1760000123} +data: {"event":"message.new","sessionId":"xxx@chatroom","sessionType":"group","rawid":"1234567890123456789","avatarUrl":"https://example.com/group.jpg","sourceName":"李四","groupName":"项目群","content":"[图片]","timestamp":1760000123} +``` + +撤回事件示例: + +```text +event: message.revoke +data: {"event":"message.revoke","sessionId":"wxid_xxx","sessionType":"other","rawid":"1234567890123456789","avatarUrl":"https://example.com/avatar.jpg","sourceName":"张三","content":"对方撤回了一条消息(rawid:1234567890123456789) 内容为“你好”","timestamp":1760000180} ``` --- diff --git a/electron/services/httpService.ts b/electron/services/httpService.ts index 24e8825..863ed80 100644 --- a/electron/services/httpService.ts +++ b/electron/services/httpService.ts @@ -290,7 +290,8 @@ class HttpService { broadcastMessagePush(payload: Record): void { if (!this.running) return const eventId = this.nextMessagePushEventId() - const eventBody = `id: ${eventId}\nevent: message.new\ndata: ${JSON.stringify(payload)}\n\n` + const eventName = this.getMessagePushEventName(payload) + const eventBody = `id: ${eventId}\nevent: ${eventName}\ndata: ${JSON.stringify(payload)}\n\n` this.rememberMessagePushEvent(eventId, eventBody) if (this.messagePushClients.size === 0) return @@ -308,6 +309,11 @@ class HttpService { } } + private getMessagePushEventName(payload: Record): string { + const eventName = String(payload?.event || '').trim() + return /^[a-z0-9._-]+$/i.test(eventName) ? eventName : 'message.new' + } + async autoStart(): Promise { const enabled = this.configService.get('httpApiEnabled') if (enabled) { diff --git a/electron/services/messagePushService.ts b/electron/services/messagePushService.ts index 8b4fd71..cacdcf4 100644 --- a/electron/services/messagePushService.ts +++ b/electron/services/messagePushService.ts @@ -21,11 +21,17 @@ interface PushSessionResult { retry: boolean } +interface PushSessionOptions { + scanRecentRevokes?: boolean +} + +type MessagePushEventName = 'message.new' | 'message.revoke' + interface MessagePushPayload { - event: 'message.new' + event: MessagePushEventName sessionId: string sessionType: 'private' | 'group' | 'official' | 'other' - messageKey: string + rawid: string avatarUrl?: string sourceName: string groupName?: string @@ -47,6 +53,7 @@ class MessagePushService { private readonly sessionBaseline = new Map() private readonly recentMessageKeys = new Map() private readonly seenMessageKeys = new Map() + private readonly recentlyRevokedOriginalTokens = new Map() private readonly seenPrimedSessions = new Set() private readonly groupNicknameCache = new Map; updatedAt: number }>() private readonly pushAvatarCacheDir: string @@ -55,12 +62,17 @@ class MessagePushService { private readonly lookbackSeconds = 2 private readonly recentMessageTtlMs = 10 * 60 * 1000 private readonly groupNicknameCacheTtlMs = 5 * 60 * 1000 + private readonly messageTableRescanDelayMs = 500 + private readonly recentRevokeScanSeconds = 150 + private readonly directRevokeScanLimit = 20 private debounceTimer: ReturnType | null = null + private messageTableRescanTimer: ReturnType | null = null private processing = false private rerunRequested = false private started = false private baselineReady = false private messageTableScanRequested = false + private readonly pendingMessageTableNames = new Set() constructor() { this.configService = ConfigService.getInstance() @@ -92,13 +104,23 @@ class MessagePushService { } const tableName = String(payload?.table || '').trim() + const messageTableNames = this.collectMessageTableNamesFromPayload(payload) if (this.isSessionTableChange(tableName)) { this.scheduleSync() return } - if (!tableName || this.isMessageTableChange(tableName)) { - this.scheduleSync({ scanMessageBackedSessions: true }) + if (!tableName && messageTableNames.length === 0) { + this.scheduleSync() + return + } + + if (this.isMessageTableChange(tableName) || messageTableNames.length > 0) { + this.scheduleSync({ + scanMessageBackedSessions: true, + messageTableNames + }) + this.scheduleMessageTableRescan(messageTableNames) } } @@ -124,14 +146,20 @@ class MessagePushService { this.sessionBaseline.clear() this.recentMessageKeys.clear() this.seenMessageKeys.clear() + this.recentlyRevokedOriginalTokens.clear() this.seenPrimedSessions.clear() this.groupNicknameCache.clear() this.baselineReady = false this.messageTableScanRequested = false + this.pendingMessageTableNames.clear() if (this.debounceTimer) { clearTimeout(this.debounceTimer) this.debounceTimer = null } + if (this.messageTableRescanTimer) { + clearTimeout(this.messageTableRescanTimer) + this.messageTableRescanTimer = null + } } private async refreshConfiguration(reason: string): Promise { @@ -158,10 +186,14 @@ class MessagePushService { this.baselineReady = true } - private scheduleSync(options: { scanMessageBackedSessions?: boolean } = {}): void { + private scheduleSync(options: { scanMessageBackedSessions?: boolean; messageTableNames?: string[] } = {}): void { if (options.scanMessageBackedSessions) { this.messageTableScanRequested = true } + for (const tableName of options.messageTableNames || []) { + const normalized = String(tableName || '').trim() + if (normalized) this.pendingMessageTableNames.add(normalized) + } if (this.debounceTimer) { clearTimeout(this.debounceTimer) @@ -173,6 +205,22 @@ class MessagePushService { }, this.debounceMs) } + private scheduleMessageTableRescan(messageTableNames: string[]): void { + if (this.messageTableRescanTimer) { + clearTimeout(this.messageTableRescanTimer) + } + + const tableNames = [...messageTableNames] + this.messageTableRescanTimer = setTimeout(() => { + this.messageTableRescanTimer = null + if (!this.started || !this.isPushEnabled()) return + this.scheduleSync({ + scanMessageBackedSessions: true, + messageTableNames: tableNames + }) + }, this.messageTableRescanDelayMs) + } + private async flushPendingChanges(): Promise { if (this.processing) { this.rerunRequested = true @@ -184,6 +232,8 @@ class MessagePushService { if (!this.isPushEnabled()) return const scanMessageBackedSessions = this.messageTableScanRequested this.messageTableScanRequested = false + const pendingMessageTableNames = Array.from(this.pendingMessageTableNames) + this.pendingMessageTableNames.clear() const connectResult = await chatService.connect() if (!connectResult.success) { @@ -204,9 +254,14 @@ class MessagePushService { } const previousBaseline = new Map(this.sessionBaseline) + const messageTableTargetSessionIds = this.resolveMessageTableTargetSessionIds(sessions, pendingMessageTableNames) const candidates = sessions.filter((session) => { + const sessionId = String(session.username || '').trim() const previous = previousBaseline.get(session.username) + if (sessionId && messageTableTargetSessionIds.has(sessionId)) { + return true + } if (this.shouldInspectSession(previous, session)) { return true } @@ -216,9 +271,14 @@ class MessagePushService { for (const session of candidates) { const sessionId = String(session.username || '').trim() if (sessionId) candidateIds.add(sessionId) + const previous = previousBaseline.get(session.username) || this.sessionBaseline.get(session.username) + const scanRecentRevokes = this.hasUnreadCountDecreased(previous, session) || + (this.hasUnreadCountChanged(previous, session) && this.isRevokeSessionSummary(session)) || + (Boolean(sessionId) && messageTableTargetSessionIds.has(sessionId)) const result = await this.pushSessionMessages( session, - previousBaseline.get(session.username) || this.sessionBaseline.get(session.username) + previous, + { scanRecentRevokes } ) this.updateInspectedBaseline(session, previousBaseline.get(session.username), result) if (result.retry) { @@ -301,11 +361,6 @@ class MessagePushService { return false } - const summary = String(session.summary || '').trim() - if (Number(session.lastMsgType || 0) === 10002 || summary.includes('撤回了一条消息')) { - return false - } - const lastTimestamp = Number(session.lastTimestamp || 0) const unreadCount = Number(session.unreadCount || 0) @@ -313,7 +368,21 @@ class MessagePushService { return unreadCount > 0 && lastTimestamp > 0 } - return lastTimestamp > previous.lastTimestamp || unreadCount > previous.unreadCount + if (this.isRevokeSessionSummary(session) && lastTimestamp >= previous.lastTimestamp) { + return true + } + + return lastTimestamp > previous.lastTimestamp || unreadCount !== previous.unreadCount + } + + private hasUnreadCountChanged(previous: SessionBaseline | undefined, session: ChatSession): boolean { + if (!previous) return false + return Number(session.unreadCount || 0) !== Number(previous.unreadCount || 0) + } + + private hasUnreadCountDecreased(previous: SessionBaseline | undefined, session: ChatSession): boolean { + if (!previous) return false + return Number(session.unreadCount || 0) < Number(previous.unreadCount || 0) } private shouldScanMessageBackedSession(previous: SessionBaseline | undefined, session: ChatSession): boolean { @@ -322,20 +391,19 @@ class MessagePushService { return false } - const summary = String(session.summary || '').trim() - if (Number(session.lastMsgType || 0) === 10002 || summary.includes('撤回了一条消息')) { - return false - } - const sessionType = this.getSessionType(sessionId, session) - if (sessionType === 'private') { + if (sessionType === 'private' && !this.isRevokeSessionSummary(session)) { return false } return Boolean(previous) || Number(session.lastTimestamp || 0) > 0 } - private async pushSessionMessages(session: ChatSession, previous: SessionBaseline | undefined): Promise { + private async pushSessionMessages( + session: ChatSession, + previous: SessionBaseline | undefined, + options: PushSessionOptions = {} + ): Promise { const previousTimestamp = Math.max(0, Number(previous?.lastTimestamp || 0)) const previousUnreadCount = Math.max(0, Number(previous?.unreadCount || 0)) const currentUnreadCount = Math.max(0, Number(session.unreadCount || 0)) @@ -346,7 +414,10 @@ class MessagePushService { ? Math.max(0, previousTimestamp - this.lookbackSeconds) : 0 const newMessagesResult = await chatService.getNewMessages(session.username, since, 1000) - if (!newMessagesResult.success || !newMessagesResult.messages || newMessagesResult.messages.length === 0) { + const fetchedMessages = newMessagesResult.success && Array.isArray(newMessagesResult.messages) + ? newMessagesResult.messages + : [] + if (fetchedMessages.length === 0 && !options.scanRecentRevokes) { return { fetched: false, maxFetchedTimestamp: previousTimestamp, @@ -358,7 +429,7 @@ class MessagePushService { } const sessionId = String(session.username || '').trim() - const maxFetchedTimestamp = newMessagesResult.messages.reduce((max, message) => { + const maxFetchedTimestamp = fetchedMessages.reduce((max, message) => { const createTime = Number(message.createTime || 0) return Number.isFinite(createTime) && createTime > max ? createTime : max }, previousTimestamp) @@ -367,12 +438,13 @@ class MessagePushService { const candidateMessages: Message[] = [] let observedIncomingCount = 0 - for (const message of newMessagesResult.messages) { + for (const message of fetchedMessages) { const messageKey = String(message.messageKey || '').trim() if (!messageKey) continue const createTime = Number(message.createTime || 0) const seen = this.isSeenMessage(messageKey) const recent = this.isRecentMessage(messageKey) + const revokeMessage = this.isRevokeSystemMessage(message) if (message.isSend !== 1) { if (!previous || createTime > previousTimestamp || (seenPrimed && createTime === previousTimestamp)) { @@ -381,16 +453,27 @@ class MessagePushService { } if (previous && !seenPrimed && createTime < previousTimestamp) { + if (revokeMessage && !recent) { + candidateMessages.push(message) + continue + } this.rememberSeenMessageKey(messageKey) continue } if (seen || recent) { + if (seen && !recent && revokeMessage) { + candidateMessages.push(message) + } continue } if (message.isSend === 1) continue if (previous && !seenPrimed && createTime === previousTimestamp) { - sameTimestampIncoming.push(message) + if (revokeMessage) { + candidateMessages.push(message) + } else { + sameTimestampIncoming.push(message) + } continue } @@ -408,12 +491,24 @@ class MessagePushService { ? sameTimestampIncoming.slice(-sameTimestampAllowance) : [] const messagesToPush = [...selectedSameTimestamp, ...candidateMessages] + const suppressedNormalMessageKeys = this.collectSuppressedNormalMessageKeys(messagesToPush, fetchedMessages) const incomingCandidateCount = messagesToPush.length for (const message of messagesToPush) { const messageKey = String(message.messageKey || '').trim() if (!messageKey) continue - const payload = await this.buildPayload(session, message) + if (!this.isRevokeSystemMessage(message) && suppressedNormalMessageKeys.has(messageKey)) { + this.rememberMessageKey(messageKey) + continue + } + if (!this.isRevokeSystemMessage(message) && this.isRecentlyRevokedOriginal(session.username, message)) { + this.rememberMessageKey(messageKey) + this.rememberSeenMessageKey(messageKey) + continue + } + const payload = this.isRevokeSystemMessage(message) + ? await this.buildRevokePayload(session, message, fetchedMessages) + : await this.buildPayload(session, message) if (!payload) continue if (!this.shouldPushPayload(payload)) continue @@ -422,22 +517,194 @@ class MessagePushService { this.bumpSessionBaseline(session.username, message) } - for (const message of newMessagesResult.messages) { + for (const message of fetchedMessages) { const messageKey = String(message.messageKey || '').trim() if (messageKey) this.rememberSeenMessageKey(messageKey) } if (sessionId) this.seenPrimedSessions.add(sessionId) + const recentRevokeResult = options.scanRecentRevokes + ? await this.pushRecentRevokeMessages(session, previous, fetchedMessages) + : { pushedCount: 0, maxPushedTimestamp: 0 } + return { fetched: true, - maxFetchedTimestamp, - incomingCandidateCount, + maxFetchedTimestamp: Math.max(maxFetchedTimestamp, recentRevokeResult.maxPushedTimestamp), + incomingCandidateCount: incomingCandidateCount + recentRevokeResult.pushedCount, observedIncomingCount, expectedIncomingCount, retry: expectedIncomingCount > 0 && observedIncomingCount < expectedIncomingCount } } + private async pushRecentRevokeMessages( + session: ChatSession, + previous: SessionBaseline | undefined, + contextMessages: Message[] + ): Promise<{ pushedCount: number; maxPushedTimestamp: number }> { + const sessionId = String(session.username || '').trim() + if (!sessionId) return { pushedCount: 0, maxPushedTimestamp: 0 } + + const since = this.getRecentRevokeScanSince(session, previous) + const revokeMessages = await this.getRecentRevokeMessagesFromTables(sessionId, since) + if (revokeMessages.length === 0) { + return { pushedCount: 0, maxPushedTimestamp: 0 } + } + + const mergedMessages = this.mergeMessagesForRevokeLookup(contextMessages, revokeMessages) + let pushedCount = 0 + let maxPushedTimestamp = 0 + + for (const message of revokeMessages) { + const messageKey = String(message.messageKey || '').trim() + if (!messageKey || !this.isRevokeSystemMessage(message)) continue + if (this.isRecentMessage(messageKey)) continue + + const payload = await this.buildRevokePayload(session, message, mergedMessages) + if (!payload) continue + if (!this.shouldPushPayload(payload)) continue + + httpService.broadcastMessagePush(payload) + this.rememberMessageKey(messageKey) + this.rememberSeenMessageKey(messageKey) + this.bumpSessionBaseline(sessionId, message) + pushedCount += 1 + + const createTime = Number(message.createTime || 0) + if (Number.isFinite(createTime) && createTime > maxPushedTimestamp) { + maxPushedTimestamp = createTime + } + } + + return { pushedCount, maxPushedTimestamp } + } + + private getRecentRevokeScanSince(session: ChatSession, previous: SessionBaseline | undefined): number { + const nowSeconds = Math.floor(Date.now() / 1000) + const anchor = Math.max( + nowSeconds, + Number(session.lastTimestamp || 0), + Number(previous?.lastTimestamp || 0) + ) + return Math.max(0, anchor - this.recentRevokeScanSeconds) + } + + private async getRecentRevokeMessagesFromTables(sessionId: string, since: number): Promise { + const tables = await this.getCandidateMessageTables(sessionId, since) + if (tables.length === 0) return [] + + const messages: Message[] = [] + const sinceSeconds = this.toSafeSqlInteger(since) + for (const table of tables) { + const sql = [ + `SELECT *, '${this.escapeSqlString(table.dbPath)}' AS _db_path, '${this.escapeSqlString(table.tableName)}' AS table_name`, + `FROM ${this.quoteSqlIdentifier(table.tableName)}`, + `WHERE create_time >= ${sinceSeconds}`, + `AND (local_type IN (10000, 10002) OR message_content LIKE '%撤回%' OR message_content LIKE '%revokemsg%' OR message_content LIKE '%[])) + } + + return messages + .filter((message) => this.isRevokeSystemMessage(message)) + .sort((left, right) => this.compareMessagePosition(left, right)) + } + + private async getRecentRevokeContextMessages(sessionId: string, since: number): Promise { + const tables = await this.getCandidateMessageTables(sessionId, since) + if (tables.length === 0) return [] + + const messages: Message[] = [] + const sinceSeconds = this.toSafeSqlInteger(since) + for (const table of tables) { + const sql = [ + `SELECT *, '${this.escapeSqlString(table.dbPath)}' AS _db_path, '${this.escapeSqlString(table.tableName)}' AS table_name`, + `FROM ${this.quoteSqlIdentifier(table.tableName)}`, + `WHERE create_time >= ${sinceSeconds}`, + `ORDER BY create_time ASC, sort_seq ASC, local_id ASC`, + `LIMIT ${this.directRevokeScanLimit * 4}` + ].join(' ') + const result = await wcdbService.execQuery('message', table.dbPath, sql) + if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue + messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record[])) + } + + return messages.sort((left, right) => this.compareMessagePosition(left, right)) + } + + private async findMessageByServerIdDirect( + sessionId: string, + revokeMessage: Message, + serverId: string + ): Promise { + const normalizedServerId = this.normalizeMessageIdToken(serverId) + if (!normalizedServerId) return undefined + + const source = this.parseMessageKeySource(revokeMessage.messageKey) + const tables = source + ? [source] + : await this.getCandidateMessageTables(sessionId, Math.max(0, Number(revokeMessage.createTime || 0) - 5 * 60)) + const revokeLocalId = Number(revokeMessage.localId || 0) + + for (const table of tables) { + const serverPredicate = this.buildServerIdPredicate('server_id', normalizedServerId) + const localFilter = Number.isFinite(revokeLocalId) && revokeLocalId > 0 + ? `AND local_id <> ${this.toSafeSqlInteger(revokeLocalId)}` + : '' + const sql = [ + `SELECT *, '${this.escapeSqlString(table.dbPath)}' AS _db_path, '${this.escapeSqlString(table.tableName)}' AS table_name`, + `FROM ${this.quoteSqlIdentifier(table.tableName)}`, + `WHERE ${serverPredicate}`, + localFilter, + `AND local_type NOT IN (10000, 10002)`, + `ORDER BY local_id ASC`, + `LIMIT 1` + ].filter(Boolean).join(' ') + const result = await wcdbService.execQuery('message', table.dbPath, sql) + if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue + const [message] = chatService.mapRowsToMessagesForApi(result.rows as Record[]) + if (message && !this.isRevokeSystemMessage(message)) return message + } + + return undefined + } + + private async getCandidateMessageTables( + sessionId: string, + since: number + ): Promise> { + const result = await wcdbService.getMessageTableStats(sessionId) + if (!result.success || !Array.isArray(result.tables)) return [] + + const sinceSeconds = Math.max(0, Number(since || 0)) + return result.tables + .map((table) => ({ + dbPath: String(table?.db_path || table?.dbPath || '').trim(), + tableName: String(table?.table_name || table?.tableName || '').trim(), + lastTime: Number(table?.last_time || table?.lastTime || 0) + })) + .filter((table) => table.dbPath && table.tableName && (!sinceSeconds || table.lastTime >= sinceSeconds)) + .sort((left, right) => right.lastTime - left.lastTime) + } + + private mergeMessagesForRevokeLookup(primary: Message[], secondary: Message[]): Message[] { + const merged: Message[] = [] + const keys = new Set() + for (const message of [...primary, ...secondary]) { + const key = String(message.messageKey || '').trim() + if (key) { + if (keys.has(key)) continue + keys.add(key) + } + merged.push(message) + } + return merged + } + private async buildPayload(session: ChatSession, message: Message): Promise { const sessionId = String(session.username || '').trim() const messageKey = String(message.messageKey || '').trim() @@ -446,6 +713,7 @@ class MessagePushService { const isGroup = sessionId.endsWith('@chatroom') const sessionType = this.getSessionType(sessionId, session) const content = this.getMessageDisplayContent(message) + const rawid = this.getMessageRawId(message) const createTime = Number(message.createTime || 0) @@ -458,7 +726,7 @@ class MessagePushService { event: 'message.new', sessionId, sessionType, - messageKey, + rawid, avatarUrl, groupName, sourceName, @@ -473,7 +741,7 @@ class MessagePushService { event: 'message.new', sessionId, sessionType, - messageKey, + rawid, avatarUrl, sourceName: session.displayName || contactInfo?.displayName || sessionId, content, @@ -481,6 +749,376 @@ class MessagePushService { } } + private isRevokeSystemMessage(message: Message): boolean { + const localType = Number(message.localType || 0) + const content = `${message.rawContent || ''}\n${message.parsedContent || ''}` + if (content.includes('revokemsg') || content.includes(' { + const fromFetched = this.findRevokedOriginalInMessages(fetchedMessages, revokeMessage, revokedMessageId) + if (fromFetched) return fromFetched + + const createTime = Number(revokeMessage.createTime || 0) + if (!Number.isFinite(createTime) || createTime <= 0) return undefined + + if (revokedMessageId) { + const directMessage = await this.findMessageByServerIdDirect(sessionId, revokeMessage, revokedMessageId) + if (directMessage) return directMessage + } + + const lookupMessages = await this.getRecentRevokeContextMessages(sessionId, Math.max(0, createTime - 5 * 60)) + if (lookupMessages.length === 0) return undefined + return this.findRevokedOriginalInMessages(lookupMessages, revokeMessage, revokedMessageId) + } + + private collectSuppressedNormalMessageKeys(messagesToPush: Message[], fetchedMessages: Message[]): Set { + const suppressed = new Set() + const pushKeySet = new Set(messagesToPush.map((message) => String(message.messageKey || '').trim()).filter(Boolean)) + for (const message of messagesToPush) { + if (!this.isRevokeSystemMessage(message)) continue + const originalMessage = this.findRevokedOriginalInMessages(fetchedMessages, message, this.extractRevokedMessageId(message)) + const originalKey = String(originalMessage?.messageKey || '').trim() + if (originalKey && pushKeySet.has(originalKey)) { + suppressed.add(originalKey) + } + } + return suppressed + } + + private findRevokedOriginalInMessages( + messages: Message[], + revokeMessage: Message, + revokedMessageId?: string + ): Message | undefined { + if (revokedMessageId) { + const byPlatformId = this.findMessageByPlatformId(messages, revokedMessageId, revokeMessage) + if (byPlatformId) return byPlatformId + } + return this.findNearestMessageBeforeRevoke(messages, revokeMessage) + } + + private findMessageByPlatformId(messages: Message[], revokedMessageId: string, revokeMessage: Message): Message | undefined { + const normalizedTarget = this.normalizeMessageIdToken(revokedMessageId) + if (!normalizedTarget) return undefined + + for (const message of messages) { + if (message.messageKey === revokeMessage.messageKey) continue + if (this.isRevokeSystemMessage(message)) continue + if (this.getMessageIdTokens(message).has(normalizedTarget)) { + return message + } + } + return undefined + } + + private findNearestMessageBeforeRevoke(messages: Message[], revokeMessage: Message): Message | undefined { + const revokeCreateTime = Number(revokeMessage.createTime || 0) + const revokeSortSeq = Number(revokeMessage.sortSeq || 0) + const revokeLocalId = Number(revokeMessage.localId || 0) + + let best: Message | undefined + for (const message of messages) { + if (message.messageKey === revokeMessage.messageKey) continue + if (message.isSend === 1) continue + if (this.isRevokeSystemMessage(message)) continue + + const createTime = Number(message.createTime || 0) + const sortSeq = Number(message.sortSeq || 0) + const localId = Number(message.localId || 0) + if (revokeCreateTime > 0 && createTime > revokeCreateTime) continue + if (revokeCreateTime > 0 && createTime === revokeCreateTime) { + if (revokeSortSeq > 0 && sortSeq > revokeSortSeq) continue + if (revokeSortSeq <= 0 && revokeLocalId > 0 && localId > revokeLocalId) continue + } + + if (!best || this.compareMessagePosition(message, best) > 0) { + best = message + } + } + return best + } + + private compareMessagePosition(left: Message, right: Message): number { + const leftCreateTime = Number(left.createTime || 0) + const rightCreateTime = Number(right.createTime || 0) + if (leftCreateTime !== rightCreateTime) return leftCreateTime - rightCreateTime + + const leftSortSeq = Number(left.sortSeq || 0) + const rightSortSeq = Number(right.sortSeq || 0) + if (leftSortSeq !== rightSortSeq) return leftSortSeq - rightSortSeq + + const leftLocalId = Number(left.localId || 0) + const rightLocalId = Number(right.localId || 0) + if (leftLocalId !== rightLocalId) return leftLocalId - rightLocalId + + return String(left.messageKey || '').localeCompare(String(right.messageKey || '')) + } + + private getMessageIdTokens(message: Message): Set { + const tokens = new Set() + const add = (value: unknown) => { + const normalized = this.normalizeMessageIdToken(value) + if (normalized) tokens.add(normalized) + } + add(message.serverIdRaw) + add(message.serverId) + add(message.localId) + const content = String(message.rawContent || '') + add(this.extractXmlValue(content, 'newmsgid')) + add(this.extractXmlValue(content, 'msgid')) + add(this.extractXmlValue(content, 'oldmsgid')) + add(this.extractXmlValue(content, 'svrid')) + return tokens + } + + private extractRevokedMessageId(message: Message): string | undefined { + const content = String(message.rawContent || message.parsedContent || '') + const candidates = [ + this.extractXmlValue(content, 'newmsgid'), + this.extractXmlValue(content, 'msgid'), + this.extractXmlValue(content, 'oldmsgid'), + this.extractXmlValue(content, 'svrid'), + message.serverIdRaw, + message.serverId + ] + for (const candidate of candidates) { + const normalized = this.normalizeMessageIdToken(candidate) + if (normalized) return normalized + } + return undefined + } + + private extractRevokerUsername(message: Message): string | undefined { + const content = String(message.rawContent || '') + const candidates = [ + this.extractXmlValue(content, 'fromusername'), + this.extractXmlValue(content, 'session'), + message.senderUsername + ] + for (const candidate of candidates) { + const normalized = String(candidate || '').trim() + if (normalized) return normalized + } + return undefined + } + + private getRevokeFallbackContent(message: Message): string | null { + const content = String(message.rawContent || message.parsedContent || '') + const replacemsg = this.extractXmlValue(content, 'replacemsg') + if (replacemsg && !replacemsg.includes('撤回了一条消息')) return replacemsg + return null + } + + private extractXmlValue(xml: string, tagName: string): string { + const decoded = this.decodeBasicXmlEntities(String(xml || '')) + const regex = new RegExp(`<${tagName}>([\\s\\S]*?)<\\/${tagName}>`, 'i') + const match = regex.exec(decoded) + if (!match) return '' + return match[1].replace(//g, '').trim() + } + + private decodeBasicXmlEntities(value: string): string { + return value + .replace(/</g, '<') + .replace(/>/g, '>') + .replace(/"/g, '"') + .replace(/'/g, "'") + .replace(/&/g, '&') + } + + private normalizeMessageIdToken(value: unknown): string { + const raw = String(value ?? '').trim() + if (!raw) return '' + const numeric = /^-?\d+$/.test(raw) ? raw.replace(/^-/, '').replace(/^0+(?=\d)/, '') : raw + return numeric === '0' ? '' : numeric + } + + private parseMessageKeySource(messageKey?: string): { dbPath: string; tableName: string } | null { + const raw = String(messageKey || '').trim() + if (!raw) return null + + const parts = raw.split(':') + if (parts.length < 3) return null + parts.pop() + const tableName = String(parts.pop() || '').trim() + const encodedDbPath = parts.join(':') + if (!tableName || !encodedDbPath) return null + + try { + const dbPath = decodeURIComponent(encodedDbPath) + return dbPath ? { dbPath, tableName } : null + } catch { + return { dbPath: encodedDbPath, tableName } + } + } + + private quoteSqlIdentifier(identifier: string): string { + return `"${String(identifier || '').replace(/"/g, '""')}"` + } + + private escapeSqlString(value: string): string { + return String(value || '').replace(/'/g, "''") + } + + private toSafeSqlInteger(value: unknown): number { + const numeric = Number(value) + if (!Number.isFinite(numeric)) return 0 + return Math.max(0, Math.floor(numeric)) + } + + private buildServerIdPredicate(columnName: string, serverId: string): string { + const column = this.quoteSqlIdentifier(columnName) + const escaped = this.escapeSqlString(serverId) + if (/^\d+$/.test(serverId)) { + return `(${column} = ${serverId} OR CAST(${column} AS TEXT) = '${escaped}')` + } + return `CAST(${column} AS TEXT) = '${escaped}'` + } + + private async buildRevokePayload( + session: ChatSession, + message: Message, + fetchedMessages: Message[] + ): Promise { + const sessionId = String(session.username || '').trim() + const messageKey = String(message.messageKey || '').trim() + if (!sessionId || !messageKey) return null + if (this.isSelfRevokeMessage(message)) return null + + const revokedMessageId = this.extractRevokedMessageId(message) + const originalMessage = await this.findRevokedOriginalMessage(sessionId, message, fetchedMessages, revokedMessageId) + const rawid = this.getDisplayRawId(originalMessage, revokedMessageId, message) + const originalContent = originalMessage + ? this.getMessageDisplayContent(originalMessage) + : this.getRevokeFallbackContent(message) + const safeContent = String(originalContent || '未知内容').trim() || '未知内容' + const content = `对方撤回了一条消息(rawid:${rawid}) 内容为“${safeContent}”` + this.rememberRecentlyRevokedOriginalTokens(sessionId, originalMessage, revokedMessageId, message) + const isGroup = sessionId.endsWith('@chatroom') + const sessionType = this.getSessionType(sessionId, session) + const createTime = Number(message.createTime || 0) + + if (isGroup) { + const groupInfo = await chatService.getContactAvatar(sessionId) + const groupName = session.displayName || groupInfo?.displayName || sessionId + const revokerUsername = this.extractRevokerUsername(message) + const sourceMessage = revokerUsername ? { ...message, senderUsername: revokerUsername } : message + const sourceName = await this.resolveGroupSourceName(sessionId, sourceMessage, session) + const avatarUrl = await this.normalizePushAvatarUrl(session.avatarUrl || groupInfo?.avatarUrl) + return { + event: 'message.revoke', + sessionId, + sessionType, + rawid, + avatarUrl, + groupName, + sourceName, + content, + timestamp: createTime + } + } + + const contactInfo = await chatService.getContactAvatar(sessionId) + const avatarUrl = await this.normalizePushAvatarUrl(session.avatarUrl || contactInfo?.avatarUrl) + return { + event: 'message.revoke', + sessionId, + sessionType, + rawid, + avatarUrl, + sourceName: session.displayName || contactInfo?.displayName || sessionId, + content, + timestamp: createTime + } + } + + private getMessageRawId(message: Message): string { + return String(message.serverIdRaw || '').trim() + } + + private getDisplayRawId(originalMessage?: Message, revokedMessageId?: string, revokeMessage?: Message): string { + const candidates = originalMessage + ? [originalMessage.serverIdRaw, revokedMessageId] + : [revokedMessageId, revokeMessage?.serverIdRaw] + for (const candidate of candidates) { + const normalized = this.normalizeMessageIdToken(candidate) + if (normalized) return normalized + } + return '未知' + } + + private rememberRecentlyRevokedOriginalTokens( + sessionId: string, + originalMessage?: Message, + revokedMessageId?: string, + revokeMessage?: Message + ): void { + const keyPrefix = String(sessionId || '').trim() + if (!keyPrefix) return + + this.pruneRecentlyRevokedOriginalTokens() + const tokens = new Set() + const add = (value: unknown) => { + const normalized = this.normalizeMessageIdToken(value) + if (normalized) tokens.add(normalized) + } + + if (originalMessage) { + add(originalMessage.serverIdRaw) + add(originalMessage.serverId) + } + add(revokedMessageId) + add(revokeMessage?.serverIdRaw) + add(revokeMessage?.serverId) + + const now = Date.now() + for (const token of tokens) { + this.recentlyRevokedOriginalTokens.set(`${keyPrefix}\u0000${token}`, now) + } + } + + private isRecentlyRevokedOriginal(sessionId: string, message: Message): boolean { + const keyPrefix = String(sessionId || '').trim() + if (!keyPrefix) return false + + this.pruneRecentlyRevokedOriginalTokens() + for (const token of this.getMessageIdTokens(message)) { + if (this.recentlyRevokedOriginalTokens.has(`${keyPrefix}\u0000${token}`)) { + return true + } + } + return false + } + + private pruneRecentlyRevokedOriginalTokens(): void { + const now = Date.now() + for (const [key, timestamp] of this.recentlyRevokedOriginalTokens.entries()) { + if (now - timestamp > this.recentMessageTtlMs) { + this.recentlyRevokedOriginalTokens.delete(key) + } + } + } + private async normalizePushAvatarUrl(avatarUrl?: string): Promise { const normalized = String(avatarUrl || '').trim() if (!normalized) return undefined @@ -566,6 +1204,39 @@ class MessagePushService { return new Set(value.map((item) => String(item || '').trim()).filter(Boolean)) } + private collectMessageTableNamesFromPayload(payload: Record | null): string[] { + const tableNames = new Set() + const visit = (value: unknown, keyHint = '') => { + if (value === null || value === undefined) return + if (typeof value === 'string') { + const trimmed = value.trim() + if (!trimmed) return + const key = keyHint.toLowerCase() + if (key.includes('table') && this.isMessageTableChange(trimmed)) { + tableNames.add(trimmed) + return + } + for (const match of trimmed.matchAll(/\b(?:msg|message)_[a-z0-9_]+/gi)) { + const tableName = String(match[0] || '').trim() + if (tableName && this.isMessageTableChange(tableName)) tableNames.add(tableName) + } + return + } + if (Array.isArray(value)) { + for (const item of value) visit(item, keyHint) + return + } + if (typeof value !== 'object') return + + for (const [key, nested] of Object.entries(value as Record)) { + visit(nested, key) + } + } + + visit(payload) + return Array.from(tableNames) + } + private isSessionTableChange(tableName: string): boolean { return String(tableName || '').trim().toLowerCase() === 'session' } @@ -580,6 +1251,63 @@ class MessagePushService { normalized.includes('message') } + private resolveMessageTableTargetSessionIds(sessions: ChatSession[], tableNames: string[]): Set { + const targets = new Set() + if (!Array.isArray(tableNames) || tableNames.length === 0) return targets + + const fullHashLookup = new Map() + const shortHashLookup = new Map() + for (const session of sessions) { + const sessionId = String(session.username || '').trim() + if (!sessionId) continue + const fullHash = createHash('md5').update(sessionId).digest('hex').toLowerCase() + fullHashLookup.set(fullHash, sessionId) + const shortHash = fullHash.slice(0, 16) + const existing = shortHashLookup.get(shortHash) + if (existing === undefined) { + shortHashLookup.set(shortHash, sessionId) + } else if (existing !== sessionId) { + shortHashLookup.set(shortHash, null) + } + } + + for (const tableName of tableNames) { + const matched = this.matchSessionIdByMessageTableName(tableName, fullHashLookup, shortHashLookup) + if (matched) targets.add(matched) + } + return targets + } + + private matchSessionIdByMessageTableName( + tableName: string, + fullHashLookup: Map, + shortHashLookup: Map + ): string | null { + const normalized = String(tableName || '').trim().toLowerCase() + if (!normalized) return null + + const suffix = normalized.startsWith('msg_') ? normalized.slice(4) : '' + if (suffix) { + const directFull = fullHashLookup.get(suffix) + if (directFull) return directFull + + if (suffix.length >= 16) { + const directShort = shortHashLookup.get(suffix.slice(0, 16)) + if (typeof directShort === 'string') return directShort + } + } + + const hashMatch = /[a-f0-9]{32}|[a-f0-9]{16}/i.exec(normalized) + if (!hashMatch?.[0]) return null + const hash = hashMatch[0].toLowerCase() + if (hash.length >= 32) { + const full = fullHashLookup.get(hash) + if (full) return full + } + const short = shortHashLookup.get(hash.slice(0, 16)) + return typeof short === 'string' ? short : null + } + private bumpSessionBaseline(sessionId: string, message: Message): void { const key = String(sessionId || '').trim() if (!key) return diff --git a/src/pages/SettingsPage.tsx b/src/pages/SettingsPage.tsx index 80a21e6..9727fef 100644 --- a/src/pages/SettingsPage.tsx +++ b/src/pages/SettingsPage.tsx @@ -4120,16 +4120,16 @@ function SettingsPage({ onClose }: SettingsPageProps = {}) {
- SSE 事件名为 `message.new`;私聊推送 `avatarUrl/sourceName/content/timestamp`,群聊额外附带 `groupName`,其中 `timestamp` 为秒级 Unix 时间戳 + SSE 事件名包含 `message.new` 和 `message.revoke`;私聊推送 `rawid/avatarUrl/sourceName/content/timestamp`,群聊额外附带 `groupName`,其中 `timestamp` 为秒级 Unix 时间戳
GET {`http://${httpApiHost}:${httpApiPort}/api/v1/push/messages`}
-

通过 SSE 长连接接收消息事件,建议接收端按 `messageKey` 去重。

+

通过 SSE 长连接接收消息事件,建议接收端按 `event + rawid` 去重。

- {['event', 'sessionId', 'sessionType', 'messageKey', 'avatarUrl', 'sourceName', 'groupName?', 'content', 'timestamp'].map((param) => ( + {['event', 'sessionId', 'sessionType', 'rawid', 'avatarUrl', 'sourceName', 'groupName?', 'content', 'timestamp'].map((param) => ( {param}