Merge pull request #1042 from xunchahaha/api_dev

尝试优化了api性能
This commit is contained in:
xuncha
2026-05-31 11:23:46 +08:00
committed by GitHub
9 changed files with 1093 additions and 59 deletions

View File

@@ -0,0 +1,24 @@
import { parentPort } from 'worker_threads'
import { mapRowsToMessagesLite } from './services/apiMessageMapping'
/**
* apiMessageWorker —— HTTP API 消息映射的 CPU worker。
*
* 只做一件事把一批原始数据库行message_content / compress_content 等)解码并映射成
* Message[]hex/zstd 解压、字符清洗、key 构造)。这是纯 CPU 工作、无原生依赖、无数据库访问,
* 因此可以安全地脱离主进程,在多个 worker 上并行运行(见 apiMessageMapperPool.ts
* 既不阻塞 WeFlow 本体,又能按核数提速大批量消息的获取。
*
* 协议:收到 { id, rows, myWxid } -> 回 { id, result: Message[] } 或 { id, error: string }。
*/
if (parentPort) {
parentPort.on('message', (msg: { id: number; rows: Record<string, any>[]; myWxid: string }) => {
const { id, rows, myWxid } = msg
try {
const result = mapRowsToMessagesLite(Array.isArray(rows) ? rows : [], String(myWxid || ''))
parentPort!.postMessage({ id, result })
} catch (e) {
parentPort!.postMessage({ id, error: String(e) })
}
})
}

View File

@@ -0,0 +1,151 @@
import { Worker } from 'worker_threads'
import { join } from 'path'
import { existsSync } from 'fs'
import type { Message } from './chatService'
/**
* ApiMessageMapperPool —— HTTP API 消息映射的 worker 线程池。
*
* 把大批量「原始行 -> Message[]」的 CPU 解码/映射工作拆分到多个 worker 上并行执行
* (见 apiMessageWorker.ts使主进程在获取大量消息时既不卡顿工作不在主线程
* 又能按核数提速。worker 是纯 JS无原生依赖任何崩溃/异常都会让 mapRows 抛错,
* 由调用方回退到主线程映射,保证「最坏只是变慢、不会出错」。
*/
interface PendingTask {
resolve: (value: Message[]) => void
reject: (error: Error) => void
}
const MIN_CHUNK_ROWS = 200
export class ApiMessageMapperPool {
private workers: Worker[] = []
private pending = new Map<number, PendingTask>()
private nextId = 1
private readonly poolSize: number
private started = false
private disposed = false
constructor(poolSize: number) {
this.poolSize = Math.max(1, Math.floor(poolSize) || 1)
}
private resolveWorkerPath(): string {
const isDev = process.env.NODE_ENV === 'development'
const devPath = join(__dirname, '../dist-electron/apiMessageWorker.js')
const prodPath = join(__dirname, 'apiMessageWorker.js')
if (isDev && existsSync(devPath)) return devPath
return prodPath
}
/** 拉起线程池幂等。worker 全部退出后会自动允许下次重新拉起。 */
ensureStarted(): void {
if (this.disposed) throw new Error('ApiMessageMapperPool 已释放')
if (this.started && this.workers.length >= this.poolSize) return
const workerPath = this.resolveWorkerPath()
while (this.workers.length < this.poolSize) {
this.spawnWorker(workerPath)
}
this.started = true
}
/** 预热:提前拉起 worker使首个大请求无需等待 worker 启动。 */
warmup(): void {
try {
this.ensureStarted()
} catch {
// 预热失败不致命,真正使用时再尝试,失败则主线程回退
}
}
private spawnWorker(workerPath: string): void {
const worker = new Worker(workerPath)
worker.on('message', (msg: { id: number; result?: Message[]; error?: string }) => {
const task = this.pending.get(msg.id)
if (!task) return
this.pending.delete(msg.id)
if (msg.error) task.reject(new Error(msg.error))
else task.resolve(Array.isArray(msg.result) ? msg.result : [])
})
worker.on('error', (err) => {
this.failAll(err instanceof Error ? err : new Error(String(err)))
this.removeWorker(worker)
})
worker.on('exit', (code) => {
if (code !== 0 && !this.disposed) {
this.failAll(new Error(`apiMessageWorker 异常退出 (code=${code})`))
}
this.removeWorker(worker)
})
this.workers.push(worker)
}
private removeWorker(worker: Worker): void {
const idx = this.workers.indexOf(worker)
if (idx >= 0) this.workers.splice(idx, 1)
if (this.workers.length === 0) {
// 线程池已空:下次 ensureStarted 时重新拉起(崩溃自愈)
this.started = false
}
}
private failAll(err: Error): void {
for (const [, task] of this.pending) {
task.reject(err)
}
this.pending.clear()
}
private dispatch(worker: Worker, rows: Record<string, any>[], myWxid: string): Promise<Message[]> {
return new Promise<Message[]>((resolve, reject) => {
const id = this.nextId++
this.pending.set(id, { resolve, reject })
try {
worker.postMessage({ id, rows, myWxid })
} catch (e) {
this.pending.delete(id)
reject(e instanceof Error ? e : new Error(String(e)))
}
})
}
/**
* 并行映射:把 rows 切成若干连续分片分发到各 worker按原始顺序拼接结果。
* 任一分片失败 -> 整体 reject调用方回退主线程
*/
async mapRows(rows: Record<string, any>[], myWxid: string): Promise<Message[]> {
if (!Array.isArray(rows) || rows.length === 0) return []
this.ensureStarted()
const workerCount = this.workers.length
if (workerCount === 0) throw new Error('无可用的 apiMessageWorker')
const chunkCount = Math.min(workerCount, Math.max(1, Math.ceil(rows.length / MIN_CHUNK_ROWS)))
const chunkSize = Math.ceil(rows.length / chunkCount)
const tasks: Promise<Message[]>[] = []
for (let i = 0; i < chunkCount; i++) {
const start = i * chunkSize
const slice = rows.slice(start, start + chunkSize)
if (slice.length === 0) break
const worker = this.workers[i % this.workers.length]
tasks.push(this.dispatch(worker, slice, myWxid))
}
const results = await Promise.all(tasks)
const out: Message[] = []
for (const chunk of results) {
for (let i = 0; i < chunk.length; i++) out.push(chunk[i])
}
return out
}
async dispose(): Promise<void> {
this.disposed = true
this.failAll(new Error('ApiMessageMapperPool 已释放'))
const workers = this.workers.slice()
this.workers = []
this.started = false
await Promise.all(workers.map((w) => w.terminate().catch(() => undefined)))
}
}

View File

@@ -0,0 +1,585 @@
import { basename, extname } from 'path'
import * as fzstd from 'fzstd'
import type { Message } from './chatService'
/**
* apiMessageMapping —— HTTP API 非媒体消息的「行 -> Message」纯函数映射管线。
*
* 这是 chatService.mapRowsToMessagesLiteForApi 及其完整调用闭包(约 23 个纯辅助函数)的
* 忠实拷贝,去掉了对 `this` / configService / wcdbService / 原生层的依赖,唯一外部输入是
* 调用方传入的 `myWxid`。目的是让这套 CPU 密集hex/zstd 解压、字符清洗、key 构造)的解码
* 映射可以脱离主进程,运行在 worker 线程(见 apiMessageWorker.ts / apiMessageMapperPool.ts
* 从而既不卡住 WeFlow 本体,又能多线程并行提速。
*
* 重要:这里的逻辑必须与 chatService 中对应的私有方法保持一致。两边都是「逐行独立、无跨行状态」
* 的纯映射,因此分片/并行处理与一次性处理输出完全一致。若将来修改了 chatService 的任何解码/
* 映射细节,请同步修改本文件(反之亦然)。
*/
function encodeMessageKeySegment(value: unknown): string {
const normalized = String(value ?? '').trim()
return encodeURIComponent(normalized)
}
function getMessageSourceInfo(row: Record<string, any>): { dbName?: string; tableName?: string; dbPath?: string } {
const dbPath = String(row._db_path || row.db_path || '').trim()
const explicitDbName = String(row.db_name || '').trim()
const tableName = String(row.table_name || '').trim()
const dbName = explicitDbName || (dbPath ? basename(dbPath, extname(dbPath)) : '')
return {
dbName: dbName || undefined,
tableName: tableName || undefined,
dbPath: dbPath || undefined
}
}
function buildMessageKey(input: {
localId: number
serverId: number
createTime: number
sortSeq: number
senderUsername?: string | null
localType: number
dbName?: string
tableName?: string
dbPath?: string
}): string {
const localId = Number.isFinite(input.localId) ? Math.max(0, Math.floor(input.localId)) : 0
const serverId = Number.isFinite(input.serverId) ? Math.max(0, Math.floor(input.serverId)) : 0
const createTime = Number.isFinite(input.createTime) ? Math.max(0, Math.floor(input.createTime)) : 0
const sortSeq = Number.isFinite(input.sortSeq) ? Math.max(0, Math.floor(input.sortSeq)) : 0
const localType = Number.isFinite(input.localType) ? Math.floor(input.localType) : 0
const senderUsername = encodeMessageKeySegment(input.senderUsername || '')
const dbPath = String(input.dbPath || '').trim()
const dbName = String(input.dbName || '').trim() || (input.dbPath ? basename(input.dbPath, extname(input.dbPath)) : '')
const tableName = String(input.tableName || '').trim()
const sourceScope = dbPath || dbName
if (localId > 0 && sourceScope && tableName) {
return `${encodeMessageKeySegment(sourceScope)}:${encodeMessageKeySegment(tableName)}:${localId}`
}
if (localId > 0 && sourceScope) {
// 当底层未返回 table_name 时,避免使用 db:_:localId会误并同库不同表的消息
return `local:${encodeMessageKeySegment(sourceScope)}:${localId}:${createTime}:${sortSeq}:${senderUsername}:${localType}`
}
if (serverId > 0) {
const scopedServer = sourceScope ? `${encodeMessageKeySegment(sourceScope)}:${serverId}` : String(serverId)
return `server:${scopedServer}:${createTime}:${sortSeq}:${localId}:${senderUsername}:${localType}`
}
return `fallback:${encodeMessageKeySegment(sourceScope)}:${createTime}:${sortSeq}:${localId}:${senderUsername}:${localType}`
}
function getRowField(row: Record<string, any>, keys: string[]): any {
for (const key of keys) {
if (row[key] !== undefined && row[key] !== null) return row[key]
}
const lowerMap = new Map<string, string>()
for (const actual of Object.keys(row)) {
lowerMap.set(actual.toLowerCase(), actual)
}
for (const key of keys) {
const actual = lowerMap.get(key.toLowerCase())
if (actual && row[actual] !== undefined && row[actual] !== null) {
return row[actual]
}
}
return undefined
}
function coerceRowNumber(raw: any): number {
if (raw === undefined || raw === null) return NaN
if (typeof raw === 'number') return raw
if (typeof raw === 'bigint') return Number(raw)
if (Buffer.isBuffer(raw)) {
return coerceRowNumber(raw.toString('utf-8'))
}
if (raw instanceof Uint8Array) {
return coerceRowNumber(Buffer.from(raw).toString('utf-8'))
}
if (Array.isArray(raw)) {
return coerceRowNumber(Buffer.from(raw).toString('utf-8'))
}
if (typeof raw === 'object') {
if ('value' in raw) return coerceRowNumber(raw.value)
if ('intValue' in raw) return coerceRowNumber(raw.intValue)
if ('low' in raw && 'high' in raw) {
try {
const low = BigInt(raw.low >>> 0)
const high = BigInt(raw.high >>> 0)
return Number((high << 32n) + low)
} catch {
return NaN
}
}
const text = raw.toString ? String(raw) : ''
if (text && text !== '[object Object]') {
return coerceRowNumber(text)
}
return NaN
}
const text = String(raw).trim()
if (!text) return NaN
if (/^[+-]?\d+$/.test(text)) {
const parsed = Number(text)
return Number.isFinite(parsed) ? parsed : NaN
}
if (/^[+-]?\d+\.\d+$/.test(text)) {
const parsed = Number(text)
return Number.isFinite(parsed) ? parsed : NaN
}
return NaN
}
function getRowInt(row: Record<string, any>, keys: string[], fallback = 0): number {
const raw = getRowField(row, keys)
if (raw === undefined || raw === null || raw === '') return fallback
const parsed = coerceRowNumber(raw)
return Number.isFinite(parsed) ? parsed : fallback
}
function parseCompactDateTimeDigitsToSeconds(raw: string): number {
const text = String(raw || '').trim()
if (!/^\d{8}(?:\d{4}(?:\d{2})?)?$/.test(text)) return 0
const year = Number.parseInt(text.slice(0, 4), 10)
const month = Number.parseInt(text.slice(4, 6), 10)
const day = Number.parseInt(text.slice(6, 8), 10)
const hour = text.length >= 12 ? Number.parseInt(text.slice(8, 10), 10) : 0
const minute = text.length >= 12 ? Number.parseInt(text.slice(10, 12), 10) : 0
const second = text.length >= 14 ? Number.parseInt(text.slice(12, 14), 10) : 0
if (!Number.isFinite(year) || year < 1990 || year > 2200) return 0
if (!Number.isFinite(month) || month < 1 || month > 12) return 0
if (!Number.isFinite(day) || day < 1 || day > 31) return 0
if (!Number.isFinite(hour) || hour < 0 || hour > 23) return 0
if (!Number.isFinite(minute) || minute < 0 || minute > 59) return 0
if (!Number.isFinite(second) || second < 0 || second > 59) return 0
const dt = new Date(year, month - 1, day, hour, minute, second)
if (
dt.getFullYear() !== year ||
dt.getMonth() !== month - 1 ||
dt.getDate() !== day ||
dt.getHours() !== hour ||
dt.getMinutes() !== minute ||
dt.getSeconds() !== second
) {
return 0
}
const ts = Math.floor(dt.getTime() / 1000)
return Number.isFinite(ts) && ts > 0 ? ts : 0
}
function parseDateTimeTextToSeconds(raw: unknown): number {
const text = String(raw ?? '').trim()
if (!text) return 0
const compactDigits = parseCompactDateTimeDigitsToSeconds(text)
if (compactDigits > 0) return compactDigits
if (/[zZ]|[+-]\d{2}:?\d{2}$/.test(text)) {
const parsed = Date.parse(text)
const seconds = Math.floor(parsed / 1000)
if (Number.isFinite(seconds) && seconds > 0) return seconds
}
const normalized = text.replace('T', ' ').replace(/\.\d+$/, '').replace(/\//g, '-')
const match = normalized.match(/^(\d{4})-(\d{1,2})-(\d{1,2})(?:\s+(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?)?$/)
if (!match) return 0
const year = Number.parseInt(match[1], 10)
const month = Number.parseInt(match[2], 10)
const day = Number.parseInt(match[3], 10)
const hour = Number.parseInt(match[4] || '0', 10)
const minute = Number.parseInt(match[5] || '0', 10)
const second = Number.parseInt(match[6] || '0', 10)
if (!Number.isFinite(year) || !Number.isFinite(month) || !Number.isFinite(day)) return 0
const dt = new Date(year, month - 1, day, hour, minute, second)
const ts = Math.floor(dt.getTime() / 1000)
return Number.isFinite(ts) && ts > 0 ? ts : 0
}
function normalizeTimestampLikeToSeconds(raw: unknown): number {
if (raw === undefined || raw === null || raw === '') return 0
const text = String(raw ?? '').trim()
if (!text) return 0
const compactDigits = parseCompactDateTimeDigitsToSeconds(text)
if (compactDigits > 0) return compactDigits
const parsed = coerceRowNumber(raw)
if (Number.isFinite(parsed) && parsed > 0) {
let normalized = Math.floor(parsed)
while (normalized > 10000000000) {
normalized = Math.floor(normalized / 1000)
}
return normalized
}
return parseDateTimeTextToSeconds(text)
}
function getRowTimestampSeconds(row: Record<string, any>, keys: string[], fallback = 0): number {
const raw = getRowField(row, keys)
if (raw === undefined || raw === null || raw === '') return fallback
const parsed = normalizeTimestampLikeToSeconds(raw)
return parsed > 0 ? parsed : fallback
}
function normalizeUnsignedIntegerToken(raw: any): string | undefined {
if (raw === undefined || raw === null || raw === '') return undefined
if (typeof raw === 'bigint') {
return raw >= 0n ? raw.toString() : '0'
}
if (typeof raw === 'number') {
if (!Number.isFinite(raw)) return undefined
return String(Math.max(0, Math.floor(raw)))
}
if (Buffer.isBuffer(raw)) {
return normalizeUnsignedIntegerToken(raw.toString('utf-8').trim())
}
if (raw instanceof Uint8Array) {
return normalizeUnsignedIntegerToken(Buffer.from(raw).toString('utf-8').trim())
}
if (Array.isArray(raw)) {
return normalizeUnsignedIntegerToken(Buffer.from(raw).toString('utf-8').trim())
}
if (typeof raw === 'object') {
if ('value' in raw) return normalizeUnsignedIntegerToken(raw.value)
if ('intValue' in raw) return normalizeUnsignedIntegerToken(raw.intValue)
if ('low' in raw && 'high' in raw) {
try {
const low = BigInt(raw.low >>> 0)
const high = BigInt(raw.high >>> 0)
const value = (high << 32n) + low
return value >= 0n ? value.toString() : '0'
} catch {
return undefined
}
}
const text = raw.toString ? String(raw).trim() : ''
if (text && text !== '[object Object]') {
return normalizeUnsignedIntegerToken(text)
}
return undefined
}
const text = String(raw).trim()
if (!text) return undefined
if (/^\d+$/.test(text)) {
return text.replace(/^0+(?=\d)/, '') || '0'
}
if (/^[+-]?\d+$/.test(text)) {
try {
const value = BigInt(text)
return value >= 0n ? value.toString() : '0'
} catch {
return undefined
}
}
const parsed = Number(text)
if (Number.isFinite(parsed)) {
return String(Math.max(0, Math.floor(parsed)))
}
return undefined
}
function cleanAccountDirName(dirName: string): string {
const trimmed = dirName.trim()
if (!trimmed) return trimmed
if (trimmed.toLowerCase().startsWith('wxid_')) {
const match = trimmed.match(/^(wxid_[^_]+)/i)
if (match) return match[1]
return trimmed
}
const suffixMatch = trimmed.match(/^(.+)_([a-zA-Z0-9]{4})$/)
const cleaned = suffixMatch ? suffixMatch[1] : trimmed
return cleaned
}
function buildIdentityKeys(raw: string): string[] {
const value = String(raw || '').trim()
if (!value) return []
const lowerRaw = value.toLowerCase()
const cleaned = cleanAccountDirName(value).toLowerCase()
if (cleaned && cleaned !== lowerRaw) {
return [cleaned, lowerRaw]
}
return [lowerRaw]
}
/**
* 判断消息是否由「我」发出。与 chatService.resolveMessageIsSend 等价,但 myWxid 由参数传入
* chatService 版本从 configService.getMyWxidCleaned() 读取)。
*/
function resolveMessageIsSend(
rawIsSend: number | null,
senderUsername: string | null | undefined,
myWxidRaw: string
): { isSend: number | null; selfMatched: boolean; correctedBySelfIdentity: boolean } {
const normalizedRawIsSend = Number.isFinite(rawIsSend as number) ? rawIsSend : null
const senderKeys = buildIdentityKeys(String(senderUsername || ''))
if (senderKeys.length === 0) {
return {
isSend: normalizedRawIsSend,
selfMatched: false,
correctedBySelfIdentity: false
}
}
const myWxid = String(myWxidRaw || '').trim()
const selfKeys = buildIdentityKeys(myWxid)
if (selfKeys.length === 0) {
return {
isSend: normalizedRawIsSend,
selfMatched: false,
correctedBySelfIdentity: false
}
}
const selfMatched = senderKeys.some(senderKey =>
selfKeys.some(selfKey =>
senderKey === selfKey ||
senderKey.startsWith(selfKey + '_') ||
selfKey.startsWith(senderKey + '_')
)
)
if (selfMatched && normalizedRawIsSend !== 1) {
return {
isSend: 1,
selfMatched: true,
correctedBySelfIdentity: true
}
}
if (normalizedRawIsSend === null) {
return {
isSend: selfMatched ? 1 : 0,
selfMatched,
correctedBySelfIdentity: false
}
}
return {
isSend: normalizedRawIsSend,
selfMatched,
correctedBySelfIdentity: false
}
}
function decodeHtmlEntities(content: string): string {
return content
.replace(/&amp;/g, '&')
.replace(/&lt;/g, '<')
.replace(/&gt;/g, '>')
.replace(/&quot;/g, '"')
.replace(/&apos;/g, "'")
}
function cleanUtf16(input: string): string {
if (!input) return input
try {
const cleaned = input.replace(/[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F-\x9F]/g, '')
const codeUnits = cleaned.split('').map((c) => c.charCodeAt(0))
const validUnits: number[] = []
for (let i = 0; i < codeUnits.length; i += 1) {
const unit = codeUnits[i]
if (unit >= 0xd800 && unit <= 0xdbff) {
if (i + 1 < codeUnits.length) {
const nextUnit = codeUnits[i + 1]
if (nextUnit >= 0xdc00 && nextUnit <= 0xdfff) {
validUnits.push(unit, nextUnit)
i += 1
continue
}
}
continue
}
if (unit >= 0xdc00 && unit <= 0xdfff) {
continue
}
validUnits.push(unit)
}
return String.fromCharCode(...validUnits)
} catch {
return input.replace(/[^ -~一-鿿 -〿]/g, '')
}
}
function extractSenderUsernameFromContent(content: string): string | null {
if (!content) return null
const normalized = cleanUtf16(decodeHtmlEntities(String(content)))
const match = /^\s*([a-zA-Z0-9_@-]{4,}):(?!\/\/)\s*(?:\r?\n|<br\s*\/?>)/i.exec(normalized)
if (!match?.[1]) return null
const candidate = match[1].trim()
return candidate || null
}
function compactEncodedPayload(raw: string): string {
return String(raw || '').replace(/\s+/g, '').trim()
}
function looksLikeHex(s: string): boolean {
const compact = compactEncodedPayload(s)
if (compact.length % 2 !== 0) return false
return /^[0-9a-fA-F]+$/.test(compact)
}
function looksLikeBase64(s: string): boolean {
const compact = compactEncodedPayload(s)
if (compact.length % 4 !== 0) return false
return /^[A-Za-z0-9+/=]+$/.test(compact)
}
function decodeBinaryContent(data: Buffer, fallbackValue?: string): string {
if (data.length === 0) return ''
try {
// 检查是否是 zstd 压缩数据 (magic number: 0xFD2FB528)
if (data.length >= 4) {
const magicLE = data.readUInt32LE(0)
const magicBE = data.readUInt32BE(0)
if (magicLE === 0xFD2FB528 || magicBE === 0xFD2FB528) {
// zstd 压缩,需要解压
try {
const decompressed = fzstd.decompress(data)
return Buffer.from(decompressed).toString('utf-8')
} catch (e) {
console.error('zstd 解压失败:', e)
}
}
}
// 尝试直接 UTF-8 解码
const decoded = data.toString('utf-8')
// 检查是否有太多替换字符
const replacementCount = (decoded.match(/<2F>/g) || []).length
if (replacementCount < decoded.length * 0.2) {
return decoded.replace(/<2F>/g, '')
}
// 如果提供了 fallbackValue且解码结果看起来像二进制垃圾则返回 fallbackValue
if (fallbackValue && replacementCount > 0) {
return fallbackValue
}
// 尝试 latin1 解码
return data.toString('latin1')
} catch {
return fallbackValue || ''
}
}
function decodeMaybeCompressed(raw: any): string {
if (!raw) return ''
// 如果是 Buffer/Uint8Array
if (Buffer.isBuffer(raw) || raw instanceof Uint8Array) {
return decodeBinaryContent(Buffer.from(raw), String(raw))
}
// 如果是字符串
if (typeof raw === 'string') {
if (raw.length === 0) return ''
const compactRaw = compactEncodedPayload(raw)
// 检查是否是 hex 编码
if (compactRaw.length > 16 && looksLikeHex(compactRaw)) {
const bytes = Buffer.from(compactRaw, 'hex')
if (bytes.length > 0) {
return decodeBinaryContent(bytes, raw)
}
}
// 检查是否是 base64 编码
if (compactRaw.length > 16 && looksLikeBase64(compactRaw)) {
try {
const bytes = Buffer.from(compactRaw, 'base64')
return decodeBinaryContent(bytes, raw)
} catch { }
}
// 普通字符串
return raw
}
return ''
}
function decodeMessageContent(messageContent: any, compressContent: any): string {
// 优先使用 compress_content
let content = decodeMaybeCompressed(compressContent)
if (!content || content.length === 0) {
content = decodeMaybeCompressed(messageContent)
}
return content
}
/**
* 行 -> Message[] 的轻量映射(非媒体)。等价于 chatService.mapRowsToMessagesLiteForApi
* 但是纯函数myWxid 由参数传入),可在 worker 线程运行。逐行独立、无跨行状态。
*/
export function mapRowsToMessagesLite(rows: Record<string, any>[], myWxidRaw: string): Message[] {
const myWxid = String(myWxidRaw || '').trim()
const messages: Message[] = []
for (const row of rows) {
const sourceInfo = getMessageSourceInfo(row)
const localType = getRowInt(row, ['local_type'], 1)
const createTime = getRowTimestampSeconds(row, ['create_time', 'createTime', 'msg_time', 'msgTime', 'time'], 0)
const sortSeq = getRowInt(row, ['sort_seq'], createTime > 0 ? createTime * 1000 : 0)
const localId = getRowInt(row, ['local_id'], 0)
const serverIdRaw = normalizeUnsignedIntegerToken(row.server_id)
const serverId = getRowInt(row, ['server_id'], 0)
const content = decodeMessageContent(row.message_content, row.compress_content)
const isSendRaw = row.computed_is_send ?? row.is_send
const parsedRawIsSend = isSendRaw === null || isSendRaw === undefined
? null
: parseInt(String(isSendRaw), 10)
const normalizedIsSend = typeof parsedRawIsSend === 'number' && Number.isFinite(parsedRawIsSend)
? parsedRawIsSend
: null
const senderFromRow = String(row.sender_username || '').trim() || extractSenderUsernameFromContent(content) || null
const { isSend } = resolveMessageIsSend(normalizedIsSend, senderFromRow, myWxid)
const senderUsername = senderFromRow || (isSend === 1 && myWxid ? myWxid : null)
messages.push({
messageKey: buildMessageKey({
localId,
serverId,
createTime,
sortSeq,
senderUsername,
localType,
...sourceInfo
}),
localId,
serverId,
serverIdRaw,
localType,
createTime,
sortSeq,
isSend,
senderUsername,
parsedContent: '',
rawContent: content,
content,
_db_path: sourceInfo.dbPath
} as Message)
}
return messages
}

View File

@@ -4926,7 +4926,7 @@ class ChatService {
/**
* HTTP API 复用消息解析逻辑,确保和应用内展示一致。
*/
mapRowsToMessagesForApi(rows: Record<string, any>[], sessionId: string): Message[] {
mapRowsToMessagesForApi(rows: Record<string, any>[], sessionId: string = ''): Message[] {
return this.mapRowsToMessages(rows, sessionId)
}

View File

@@ -14,6 +14,9 @@ import { videoService } from './videoService'
import { imageDecryptService } from './imageDecryptService'
import { groupAnalyticsService } from './groupAnalyticsService'
import { snsService } from './snsService'
import * as os from 'os'
import { ApiMessageMapperPool } from './apiMessageMapperPool'
import { mapRowsToMessagesLite } from './apiMessageMapping'
// ChatLab 格式定义
interface ChatLabHeader {
@@ -126,6 +129,14 @@ class HttpService {
private port: number = 5031
private host: string = '127.0.0.1'
private running: boolean = false
private dbWarmed: boolean = false
/**
* API 消息映射线程池:把大批量「行 -> Message」的解码/映射放到 worker 并行执行,
* 既不阻塞本体主进程又能按核数提速。懒创建stop() 时释放;映射失败回退主线程。
*/
private apiMapperPool: ApiMessageMapperPool | null = null
private static readonly API_PARALLEL_MAP_THRESHOLD = 300
private connections: Set<import('net').Socket> = new Set()
private messagePushClients: Set<http.ServerResponse> = new Set()
private messagePushReplayBuffer: MessagePushReplayEvent[] = []
@@ -184,6 +195,11 @@ class HttpService {
this.server.listen(this.port, this.host, () => {
this.running = true
// 主动预热数据库连接与消息库索引HTTP 服务可能经 http:start 独立启动
// (未走 GUI 的 connect/warmup避免首批请求因原生消息库缓存为空而整页丢消息。
void this.ensureDbReady().catch((e) => console.warn('[HttpService] warmup on start failed:', e))
// 预热映射线程池,使首个大请求无需等待 worker 启动
try { this.getApiMapperPool().warmup() } catch {}
this.startMessagePushHeartbeat()
console.log(`[HttpService] HTTP API server started on http://${this.host}:${this.port}`)
resolve({ success: true, port: this.port })
@@ -195,6 +211,11 @@ class HttpService {
* 停止 HTTP 服务
*/
async stop(): Promise<void> {
if (this.apiMapperPool) {
const pool = this.apiMapperPool
this.apiMapperPool = null
void pool.dispose().catch(() => undefined)
}
return new Promise((resolve) => {
if (this.server) {
for (const client of this.messagePushClients) {
@@ -596,37 +617,64 @@ class HttpService {
}
/**
* 批量获取消息(循环游标直到满足 limit
* 绕过 chatService 的单 batch 限制,直接操作 wcdbService 游标
* 确保账号数据库已连接并完成一次消息库索引预热。
* HTTP 读消息不像 GUI 那样自带 ensureConnected/warmup当应用冷启动、或 HTTP 服务经
* http:start 独立启动时原生消息库缓存可能尚未建立lite 游标会返回 -3
* (WCDB_STATUS_NO_MESSAGE_DB) 而整页丢消息(见 upstream issue #926 / #1029
* connect() 在已连接时会短路秒返warmup 只在成功前重试,成功后本进程不再重复。
*/
private async fetchMessagesBatch(
private async ensureDbReady(): Promise<{ success: boolean; error?: string }> {
const connected = await chatService.connect()
if (!connected.success) {
return { success: false, error: connected.error || '数据库未连接' }
}
if (!this.dbWarmed) {
try {
const warm = await chatService.warmupMessageDbSnapshot()
if (warm.success) this.dbWarmed = true
} catch (e) {
console.warn('[HttpService] warmupMessageDbSnapshot failed:', e)
}
}
return { success: true }
}
/**
* 批量获取消息(循环游标直到满足 limit
* 绕过 chatService 的单 batch 限制,直接操作 wcdbService 游标。
*
* 健壮性(修复"前几次请求丢消息"
* - 先经 ensureDbReady() 确保账号库已连接并预热,消除冷启动/空闲后缓存为空导致的 -3。
* - lite 游标打开失败时回退到 GUI 同款 full openMessageCursor自带 -3 forceReopen 自愈)。
* - lite 打开成功但 offset=0 首页为空且无更多时,疑似 lite 冷缓存误判heapSize=0
* 再用 full 游标复核一次full 也为空才认定确实没有数据。既保留 lite 性能又不丢数据。
*/
private async collectRawRows(
talker: string,
offset: number,
limit: number,
startTime: number,
endTime: number,
ascending: boolean,
useLiteMapping: boolean = true
): Promise<{ success: boolean; messages?: Message[]; hasMore?: boolean; error?: string }> {
ascending: boolean
): Promise<{ success: boolean; rows?: Record<string, any>[]; hasMore?: boolean; error?: string }> {
const ready = await this.ensureDbReady()
if (!ready.success) {
return { success: false, error: ready.error || '数据库未连接' }
}
try {
// 深分页时放大 batch避免 offset 很大时出现大量小批次循环。
const batchSize = Math.min(2000, Math.max(500, limit))
const beginTimestamp = startTime > 10000000000 ? Math.floor(startTime / 1000) : startTime
const endTimestamp = endTime > 10000000000 ? Math.floor(endTime / 1000) : endTime
const cursorResult = await wcdbService.openMessageCursorLite(talker, batchSize, ascending, beginTimestamp, endTimestamp)
if (!cursorResult.success || !cursorResult.cursor) {
return { success: false, error: cursorResult.error || '打开消息游标失败' }
}
const cursor = cursorResult.cursor
try {
// 在单个游标上循环累积:处理 offset 跳过 + limit 累积
const collectFromCursor = async (cursor: number): Promise<{ rows: Record<string, any>[]; hasMore: boolean }> => {
const collectedRows: Record<string, any>[] = []
let hasMore = true
let skipped = 0
let reachedLimit = false
// 循环获取消息,处理 offset 跳过 + limit 累积
while (collectedRows.length < limit && hasMore) {
const batch = await wcdbService.fetchMessageBatch(cursor)
if (!batch.success || !batch.rows || batch.rows.length === 0) {
@@ -658,21 +706,141 @@ class HttpService {
collectedRows.push(...rows)
}
const finalHasMore = hasMore || reachedLimit
const messages = useLiteMapping
? chatService.mapRowsToMessagesLiteForApi(collectedRows)
: chatService.mapRowsToMessagesForApi(collectedRows)
await this.backfillMissingSenderUsernames(talker, messages)
return { success: true, messages, hasMore: finalHasMore }
} finally {
await wcdbService.closeMessageCursor(cursor)
return { rows: collectedRows, hasMore: hasMore || reachedLimit }
}
// 打开游标 -> 累积 -> 关闭;打开失败返回 null错误经 lastCursorError 透出)
let lastCursorError = '打开消息游标失败'
const runWithCursor = async (
open: () => Promise<{ success: boolean; cursor?: number; error?: string }>
): Promise<{ rows: Record<string, any>[]; hasMore: boolean } | null> => {
const cursorResult = await open()
if (!cursorResult.success || !cursorResult.cursor) {
lastCursorError = cursorResult.error || lastCursorError
return null
}
const cursor = cursorResult.cursor
try {
return await collectFromCursor(cursor)
} finally {
await wcdbService.closeMessageCursor(cursor)
}
}
const openLite = () => wcdbService.openMessageCursorLite(talker, batchSize, ascending, beginTimestamp, endTimestamp)
const openFull = () => wcdbService.openMessageCursor(talker, batchSize, ascending, beginTimestamp, endTimestamp)
// 1) lite 游标快路径
let collected = await runWithCursor(openLite)
if (!collected) {
// 2) lite 打开失败 -> 回退 full 游标
collected = await runWithCursor(openFull)
if (!collected) {
return { success: false, error: lastCursorError }
}
} else if (collected.rows.length === 0 && offset === 0 && !collected.hasMore) {
// 3) lite 首页空且无更多 -> 用 full 复核一次(防 lite 冷缓存误判)
const recheck = await runWithCursor(openFull)
if (recheck && recheck.rows.length > 0) {
collected = recheck
}
}
return { success: true, rows: collected.rows, hasMore: collected.hasMore }
} catch (e) {
console.error('[HttpService] fetchMessagesBatch error:', e)
console.error('[HttpService] collectRawRows error:', e)
return { success: false, error: String(e) }
}
}
/**
* 批量获取并映射消息(主线程映射版)。供媒体导出路径与 ChatLab Pull 等沿用,行为不变。
* 非媒体 JSON 路径改用并行映射 fetchApiMessagesParallel见下
*/
private async fetchMessagesBatch(
talker: string,
offset: number,
limit: number,
startTime: number,
endTime: number,
ascending: boolean,
useLiteMapping: boolean = true
): Promise<{ success: boolean; messages?: Message[]; hasMore?: boolean; error?: string }> {
const collected = await this.collectRawRows(talker, offset, limit, startTime, endTime, ascending)
if (!collected.success || !collected.rows) {
return { success: false, error: collected.error }
}
const messages = await this.mapRowsToMessagesYielding(collected.rows, useLiteMapping, talker)
await this.backfillMissingSenderUsernames(talker, messages)
return { success: true, messages, hasMore: collected.hasMore === true }
}
/**
* 非媒体 JSON 路径:取原始行后用线程池并行映射(不卡本体、按核数提速)。
* 大批量走线程池;小批量或线程池异常时回退主线程模块映射(分片让出,输出与并行路径一致)。
*/
private async fetchApiMessagesParallel(
talker: string,
offset: number,
limit: number,
startTime: number,
endTime: number,
ascending: boolean = false
): Promise<{ success: boolean; messages?: Message[]; hasMore?: boolean; error?: string }> {
const collected = await this.collectRawRows(talker, offset, limit, startTime, endTime, ascending)
if (!collected.success || !collected.rows) {
return { success: false, error: collected.error }
}
const rows = collected.rows
const myWxid = String(this.configService.getMyWxidCleaned() || '')
let messages: Message[]
if (rows.length >= HttpService.API_PARALLEL_MAP_THRESHOLD) {
try {
messages = await this.getApiMapperPool().mapRows(rows, myWxid)
} catch (e) {
console.warn('[HttpService] 并行映射失败,回退主线程:', e)
messages = await this.mapRowsLiteOnMainYielding(rows, myWxid)
}
} else {
messages = await this.mapRowsLiteOnMainYielding(rows, myWxid)
}
await this.backfillMissingSenderUsernames(talker, messages)
return { success: true, messages, hasMore: collected.hasMore === true }
}
private getApiMapperPool(): ApiMessageMapperPool {
if (!this.apiMapperPool) {
let cores = 4
try { cores = os.cpus().length || 4 } catch {}
const size = Math.min(4, Math.max(2, cores - 2))
this.apiMapperPool = new ApiMessageMapperPool(size)
}
return this.apiMapperPool
}
/**
* 主线程分片映射(模块版,与 worker 同源):按时间片让出事件循环,避免卡顿。
* 作为线程池的回退路径与小批量路径,输出与并行路径完全一致。
*/
private async mapRowsLiteOnMainYielding(rows: Record<string, any>[], myWxid: string): Promise<Message[]> {
const out: Message[] = []
const STEP = 16
let sliceStart = Date.now()
for (let i = 0; i < rows.length; i += STEP) {
const slice = rows.slice(i, i + STEP)
const mapped = mapRowsToMessagesLite(slice, myWxid)
for (let k = 0; k < mapped.length; k++) out.push(mapped[k])
if (i + STEP < rows.length && Date.now() - sliceStart >= 24) {
await this.yieldToEventLoop()
sliceStart = Date.now()
}
}
return out
}
/**
* Query param helpers.
*/
@@ -682,6 +850,63 @@ class HttpService {
return Math.min(Math.max(parsed, min), max)
}
/**
* 让出主进程事件循环一拍。
* 大批量消息的解码/映射decodeMessageContent 的 hex/zlib 解压、toApiMessage 的 XML 解析)
* 是 CPU 密集且同步的——一次性处理会长时间独占主进程,阻塞所有依赖主进程 IPC 的 GUI 交互,
* 表现为「获取消息时本体卡住」。用 setImmediate 让出drain 的是宏任务而非仅微任务),
* 使主进程能在分片之间处理 GUI 的 IPC 与 Worker 回调。
*/
private yieldToEventLoop(): Promise<void> {
return new Promise((resolve) => setImmediate(resolve))
}
/**
* 分片映射数据库行 -> Message[],按时间片(~24ms让出事件循环避免阻塞主进程。
* 两个底层 mapper 都是逐行独立映射、无跨行状态,故分片输出与一次性调用完全一致。
*/
private async mapRowsToMessagesYielding(
rows: Record<string, any>[],
useLiteMapping: boolean,
sessionId: string
): Promise<Message[]> {
const out: Message[] = []
const STEP = 16
let sliceStart = Date.now()
for (let i = 0; i < rows.length; i += STEP) {
const slice = rows.slice(i, i + STEP)
const mapped = useLiteMapping
? chatService.mapRowsToMessagesLiteForApi(slice)
: chatService.mapRowsToMessagesForApi(slice, sessionId)
for (let k = 0; k < mapped.length; k++) out.push(mapped[k])
if (i + STEP < rows.length && Date.now() - sliceStart >= 24) {
await this.yieldToEventLoop()
sliceStart = Date.now()
}
}
return out
}
/**
* 分片把 Message[] 转为 API JSON 行,按时间片让出事件循环,避免阻塞主进程。
*/
private async toApiMessagesYielding(
messages: Message[],
mediaMap: Map<number, ApiExportedMedia>
): Promise<Record<string, any>[]> {
const out: Record<string, any>[] = []
let sliceStart = Date.now()
for (let i = 0; i < messages.length; i++) {
const msg = messages[i]
out.push(this.toApiMessage(msg, mediaMap.get(msg.localId)))
if ((i & 15) === 15 && Date.now() - sliceStart >= 24) {
await this.yieldToEventLoop()
sliceStart = Date.now()
}
}
return out
}
private async backfillMissingSenderUsernames(talker: string, messages: Message[]): Promise<void> {
if (!talker.endsWith('@chatroom')) return
@@ -721,7 +946,7 @@ class HttpService {
try {
const detail = await wcdbService.getMessageById(talker, localId)
if (detail.success && detail.message) {
const hydrated = chatService.mapRowsToMessagesForApi([detail.message])[0]
const hydrated = chatService.mapRowsToMessagesForApi([detail.message], talker)[0]
if (hydrated?.senderUsername) {
msg.senderUsername = hydrated.senderUsername
}
@@ -835,6 +1060,11 @@ class HttpService {
let hasMore = false
if (keyword) {
const ready = await this.ensureDbReady()
if (!ready.success) {
this.sendError(res, 500, ready.error || 'Failed to get messages')
return
}
const searchLimit = Math.max(1, limit) + 1
const searchResult = await chatService.searchMessages(
keyword,
@@ -850,6 +1080,16 @@ class HttpService {
}
hasMore = searchResult.messages.length > limit
messages = hasMore ? searchResult.messages.slice(0, limit) : searchResult.messages
} else if (!mediaOptions.enabled) {
// 非媒体路径json 与 chatlab 共用):取原始行后线程池并行映射,不卡本体、按核数提速。
// 两种格式底层都用 lite 映射,输出与改前一致;随后再各自走 toApiMessage / convertToChatLab。
const result = await this.fetchApiMessagesParallel(talker, offset, limit, startTime, endTime)
if (!result.success || !result.messages) {
this.sendError(res, 500, result.error || 'Failed to get messages')
return
}
messages = result.messages
hasMore = result.hasMore === true
} else {
const result = await this.fetchMessagesBatch(
talker,
@@ -888,7 +1128,7 @@ class HttpService {
return
}
const apiMessages = messages.map((msg) => this.toApiMessage(msg, mediaMap.get(msg.localId)))
const apiMessages = await this.toApiMessagesYielding(messages, mediaMap)
this.sendJson(res, {
success: true,
talker,
@@ -976,7 +1216,8 @@ class HttpService {
const endTime = endParam ? this.parseTimeParam(endParam, true) : 0
try {
const result = await this.fetchMessagesBatch(sessionId, offset, limit, startTime, endTime, true, true)
// ChatLab Pull 始终非媒体、lite 映射走线程池并行映射limit 可达 5000提速更明显
const result = await this.fetchApiMessagesParallel(sessionId, offset, limit, startTime, endTime, true)
if (!result.success || !result.messages) {
this.sendError(res, 500, result.error || 'Failed to get messages')
return
@@ -1892,16 +2133,24 @@ class HttpService {
}
}
// 构建成员列表
// 构建成员列表(分片让出,避免大批量时阻塞主进程/卡本体)
const memberMap = new Map<string, ChatLabMember>()
for (const msg of messages) {
const senderInfo = this.resolveChatLabSenderInfo(msg, talkerId, talkerName, myWxid, isGroup, senderNames, groupNicknamesMap)
if (!memberMap.has(senderInfo.sender)) {
memberMap.set(senderInfo.sender, {
platformId: senderInfo.sender,
accountName: senderInfo.accountName,
groupNickname: senderInfo.groupNickname
})
{
let sliceStart = Date.now()
for (let i = 0; i < messages.length; i++) {
const msg = messages[i]
const senderInfo = this.resolveChatLabSenderInfo(msg, talkerId, talkerName, myWxid, isGroup, senderNames, groupNicknamesMap)
if (!memberMap.has(senderInfo.sender)) {
memberMap.set(senderInfo.sender, {
platformId: senderInfo.sender,
accountName: senderInfo.accountName,
groupNickname: senderInfo.groupNickname
})
}
if ((i & 15) === 15 && Date.now() - sliceStart >= 24) {
await this.yieldToEventLoop()
sliceStart = Date.now()
}
}
}
@@ -1926,26 +2175,36 @@ class HttpService {
}
}
// 转换消息
const chatLabMessages: ChatLabMessage[] = messages.map(msg => {
const senderInfo = this.resolveChatLabSenderInfo(msg, talkerId, talkerName, myWxid, isGroup, senderNames, groupNicknamesMap)
const quoteInfo = this.extractApiQuoteInfo(msg)
// 转换消息(分片让出,避免大批量时阻塞主进程/卡本体)
const chatLabMessages: ChatLabMessage[] = []
{
let sliceStart = Date.now()
for (let i = 0; i < messages.length; i++) {
const msg = messages[i]
const senderInfo = this.resolveChatLabSenderInfo(msg, talkerId, talkerName, myWxid, isGroup, senderNames, groupNicknamesMap)
const quoteInfo = this.extractApiQuoteInfo(msg)
const chatLabMessage: ChatLabMessage = {
sender: senderInfo.sender,
accountName: senderInfo.accountName,
groupNickname: senderInfo.groupNickname,
timestamp: msg.createTime,
type: this.mapMessageType(msg.localType, msg),
content: this.getMessageContent(msg, quoteInfo),
platformMessageId: this.getMessageServerId(msg) || undefined,
mediaPath: mediaMap.get(msg.localId) ? `http://${this.host}:${this.port}/api/v1/media/${mediaMap.get(msg.localId)!.relativePath}` : undefined
const chatLabMessage: ChatLabMessage = {
sender: senderInfo.sender,
accountName: senderInfo.accountName,
groupNickname: senderInfo.groupNickname,
timestamp: msg.createTime,
type: this.mapMessageType(msg.localType, msg),
content: this.getMessageContent(msg, quoteInfo),
platformMessageId: this.getMessageServerId(msg) || undefined,
mediaPath: mediaMap.get(msg.localId) ? `http://${this.host}:${this.port}/api/v1/media/${mediaMap.get(msg.localId)!.relativePath}` : undefined
}
if (quoteInfo?.replyToMessageId) {
chatLabMessage.replyToMessageId = quoteInfo.replyToMessageId
}
chatLabMessages.push(chatLabMessage)
if ((i & 15) === 15 && Date.now() - sliceStart >= 24) {
await this.yieldToEventLoop()
sliceStart = Date.now()
}
}
if (quoteInfo?.replyToMessageId) {
chatLabMessage.replyToMessageId = quoteInfo.replyToMessageId
}
return chatLabMessage
})
}
return {
chatlab: {

View File

@@ -606,7 +606,7 @@ class MessagePushService {
].join(' ')
const result = await wcdbService.execQuery('message', table.dbPath, sql)
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[]))
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[], sessionId))
}
return messages
@@ -630,7 +630,7 @@ class MessagePushService {
].join(' ')
const result = await wcdbService.execQuery('message', table.dbPath, sql)
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[]))
messages.push(...chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[], sessionId))
}
return messages.sort((left, right) => this.compareMessagePosition(left, right))
@@ -666,7 +666,7 @@ class MessagePushService {
].filter(Boolean).join(' ')
const result = await wcdbService.execQuery('message', table.dbPath, sql)
if (!result.success || !Array.isArray(result.rows) || result.rows.length === 0) continue
const [message] = chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[])
const [message] = chatService.mapRowsToMessagesForApi(result.rows as Record<string, any>[], sessionId)
if (message && !this.isRevokeSystemMessage(message)) return message
}

2
package-lock.json generated
View File

@@ -1,6 +1,6 @@
{
"name": "weflow",
"version": "4.3.0",
"version": "4.6.0",
"lockfileVersion": 3,
"requires": true,
"packages": {

View File

@@ -1,6 +1,6 @@
{
"name": "weflow",
"version": "4.3.0",
"version": "4.6.0",
"description": "WeFlow",
"main": "dist-electron/main.js",
"author": {

View File

@@ -232,6 +232,21 @@ export default defineConfig({
}
}
},
{
entry: 'electron/apiMessageWorker.ts',
onstart: handleElectronOnStart,
vite: {
build: {
outDir: 'dist-electron',
rollupOptions: {
output: {
entryFileNames: 'apiMessageWorker.js',
inlineDynamicImports: true
}
}
}
}
},
{
entry: 'electron/preload.ts',
onstart: handleElectronOnStart,