feat(export): fast accurate content session counts on cards

This commit is contained in:
tisonhuang
2026-03-02 19:07:17 +08:00
parent f0f70def8c
commit b6878aefd6
7 changed files with 721 additions and 5 deletions

View File

@@ -13,6 +13,11 @@ import { wcdbService } from './wcdbService'
import { MessageCacheService } from './messageCacheService'
import { ContactCacheService, ContactCacheEntry } from './contactCacheService'
import { SessionStatsCacheService, SessionStatsCacheEntry, SessionStatsCacheStats } from './sessionStatsCacheService'
import {
ExportContentScopeStatsEntry,
ExportContentSessionStatsEntry,
ExportContentStatsCacheService
} from './exportContentStatsCacheService'
import { voiceTranscribeService } from './voiceTranscribeService'
import { LRUCache } from '../utils/LRUCache.js'
@@ -166,6 +171,18 @@ interface ExportSessionStatsCacheMeta {
source: 'memory' | 'disk' | 'fresh'
}
interface ExportContentSessionCounts {
totalSessions: number
textSessions: number
voiceSessions: number
imageSessions: number
videoSessions: number
emojiSessions: number
pendingMediaSessions: number
updatedAt: number
refreshing: boolean
}
interface ExportTabCounts {
private: number
group: number
@@ -209,6 +226,7 @@ class ChatService {
private readonly contactCacheService: ContactCacheService
private readonly messageCacheService: MessageCacheService
private readonly sessionStatsCacheService: SessionStatsCacheService
private readonly exportContentStatsCacheService: ExportContentStatsCacheService
private voiceWavCache: LRUCache<string, Buffer>
private voiceTranscriptCache: LRUCache<string, string>
private voiceTranscriptPending = new Map<string, Promise<{ success: boolean; transcript?: string; error?: string }>>()
@@ -247,6 +265,15 @@ class ChatService {
private allGroupSessionIdsCache: { ids: string[]; updatedAt: number } | null = null
private readonly sessionStatsCacheTtlMs = 10 * 60 * 1000
private readonly allGroupSessionIdsCacheTtlMs = 5 * 60 * 1000
private exportContentStatsScope = ''
private exportContentStatsMemory = new Map<string, ExportContentSessionStatsEntry>()
private exportContentStatsScopeUpdatedAt = 0
private exportContentStatsRefreshPromise: Promise<void> | null = null
private exportContentStatsRefreshQueued = false
private exportContentStatsRefreshForceQueued = false
private exportContentStatsDirtySessionIds = new Set<string>()
private exportContentScopeSessionIdsCache: { ids: string[]; updatedAt: number } | null = null
private readonly exportContentScopeSessionIdsCacheTtlMs = 60 * 1000
constructor() {
this.configService = new ConfigService()
@@ -255,6 +282,7 @@ class ChatService {
this.avatarCache = new Map(Object.entries(persisted))
this.messageCacheService = new MessageCacheService(this.configService.getCacheBasePath())
this.sessionStatsCacheService = new SessionStatsCacheService(this.configService.getCacheBasePath())
this.exportContentStatsCacheService = new ExportContentStatsCacheService(this.configService.getCacheBasePath())
// 初始化LRU缓存限制大小防止内存泄漏
this.voiceWavCache = new LRUCache(this.voiceWavCacheMaxEntries)
this.voiceTranscriptCache = new LRUCache(1000) // 最多缓存1000条转写记录
@@ -325,6 +353,8 @@ class ChatService {
// 预热 listMediaDbs 缓存(后台异步执行,不阻塞连接)
this.warmupMediaDbsCache()
// 预热导出内容会话统计缓存(后台异步,不阻塞连接)
void this.startExportContentStatsRefresh(false)
return { success: true }
} catch (e) {
@@ -393,6 +423,10 @@ class ChatService {
console.error('ChatService: 关闭数据库失败:', e)
}
this.connected = false
this.exportContentStatsRefreshPromise = null
this.exportContentStatsRefreshQueued = false
this.exportContentStatsRefreshForceQueued = false
this.exportContentScopeSessionIdsCache = null
}
/**
@@ -1584,6 +1618,7 @@ class ChatService {
const scope = `${dbPath}::${myWxid}`
if (scope === this.sessionMessageCountCacheScope) {
this.refreshSessionStatsCacheScope(scope)
this.refreshExportContentStatsScope(scope)
return
}
this.sessionMessageCountCacheScope = scope
@@ -1593,6 +1628,311 @@ class ChatService {
this.sessionDetailExtraCache.clear()
this.sessionStatusCache.clear()
this.refreshSessionStatsCacheScope(scope)
this.refreshExportContentStatsScope(scope)
}
private refreshExportContentStatsScope(scope: string): void {
if (scope === this.exportContentStatsScope) return
this.exportContentStatsScope = scope
this.exportContentStatsMemory.clear()
this.exportContentStatsDirtySessionIds.clear()
this.exportContentScopeSessionIdsCache = null
const scopeEntry = this.exportContentStatsCacheService.getScope(scope)
if (scopeEntry) {
this.exportContentStatsScopeUpdatedAt = scopeEntry.updatedAt
for (const [sessionId, entry] of Object.entries(scopeEntry.sessions)) {
this.exportContentStatsMemory.set(sessionId, { ...entry })
}
} else {
this.exportContentStatsScopeUpdatedAt = 0
}
}
private persistExportContentStatsScope(validSessionIds?: Set<string>): void {
if (!this.exportContentStatsScope) return
const sessions: Record<string, ExportContentSessionStatsEntry> = {}
for (const [sessionId, entry] of this.exportContentStatsMemory.entries()) {
if (validSessionIds && !validSessionIds.has(sessionId)) continue
sessions[sessionId] = { ...entry }
}
const updatedAt = this.exportContentStatsScopeUpdatedAt || Date.now()
const scopeEntry: ExportContentScopeStatsEntry = {
updatedAt,
sessions
}
this.exportContentStatsCacheService.setScope(this.exportContentStatsScope, scopeEntry)
}
private async listExportContentScopeSessionIds(force = false): Promise<string[]> {
const now = Date.now()
if (
!force &&
this.exportContentScopeSessionIdsCache &&
now - this.exportContentScopeSessionIdsCache.updatedAt <= this.exportContentScopeSessionIdsCacheTtlMs
) {
return this.exportContentScopeSessionIdsCache.ids
}
const sessionsResult = await this.getSessions()
if (!sessionsResult.success || !sessionsResult.sessions) {
return []
}
const ids = Array.from(
new Set(
sessionsResult.sessions
.map((session) => String(session.username || '').trim())
.filter(Boolean)
.filter((sessionId) => sessionId.endsWith('@chatroom') || !sessionId.startsWith('gh_'))
)
)
this.exportContentScopeSessionIdsCache = {
ids,
updatedAt: now
}
return ids
}
private createDefaultExportContentEntry(): ExportContentSessionStatsEntry {
return {
updatedAt: 0,
hasAny: false,
hasVoice: false,
hasImage: false,
hasVideo: false,
hasEmoji: false,
mediaReady: false
}
}
private isExportContentEntryDirty(sessionId: string): boolean {
return this.exportContentStatsDirtySessionIds.has(sessionId)
}
private async collectExportContentEntry(sessionId: string): Promise<ExportContentSessionStatsEntry> {
const entry = this.createDefaultExportContentEntry()
const cursorResult = await wcdbService.openMessageCursorLite(sessionId, 400, false, 0, 0)
if (!cursorResult.success || !cursorResult.cursor) {
return {
...entry,
updatedAt: Date.now(),
mediaReady: true
}
}
const cursor = cursorResult.cursor
try {
let done = false
while (!done) {
const batch = await wcdbService.fetchMessageBatch(cursor)
if (!batch.success) {
break
}
const rows = Array.isArray(batch.rows) ? batch.rows as Record<string, any>[] : []
for (const row of rows) {
entry.hasAny = true
const localType = this.getRowInt(
row,
['local_type', 'localType', 'type', 'msg_type', 'msgType', 'WCDB_CT_local_type'],
1
)
if (localType === 34) entry.hasVoice = true
if (localType === 3) entry.hasImage = true
if (localType === 43) entry.hasVideo = true
if (localType === 47) entry.hasEmoji = true
if (entry.hasVoice && entry.hasImage && entry.hasVideo && entry.hasEmoji) {
done = true
break
}
}
if (!batch.hasMore || rows.length === 0) {
break
}
}
} finally {
await wcdbService.closeMessageCursor(cursor)
}
entry.mediaReady = true
entry.updatedAt = Date.now()
return entry
}
private async startExportContentStatsRefresh(force = false): Promise<void> {
if (this.exportContentStatsRefreshPromise) {
this.exportContentStatsRefreshQueued = true
this.exportContentStatsRefreshForceQueued = this.exportContentStatsRefreshForceQueued || force
return this.exportContentStatsRefreshPromise
}
const task = (async () => {
const sessionIds = await this.listExportContentScopeSessionIds(force)
const sessionIdSet = new Set(sessionIds)
const targets: string[] = []
for (const sessionId of sessionIds) {
const cached = this.exportContentStatsMemory.get(sessionId)
if (force || this.exportContentStatsDirtySessionIds.has(sessionId) || !cached || !cached.mediaReady) {
targets.push(sessionId)
}
}
if (targets.length > 0) {
await this.forEachWithConcurrency(targets, 3, async (sessionId) => {
const nextEntry = await this.collectExportContentEntry(sessionId)
this.exportContentStatsMemory.set(sessionId, nextEntry)
this.exportContentStatsDirtySessionIds.delete(sessionId)
})
}
for (const sessionId of Array.from(this.exportContentStatsMemory.keys())) {
if (!sessionIdSet.has(sessionId)) {
this.exportContentStatsMemory.delete(sessionId)
this.exportContentStatsDirtySessionIds.delete(sessionId)
}
}
this.exportContentStatsScopeUpdatedAt = Date.now()
this.persistExportContentStatsScope(sessionIdSet)
})()
this.exportContentStatsRefreshPromise = task
try {
await task
} finally {
this.exportContentStatsRefreshPromise = null
if (this.exportContentStatsRefreshQueued) {
const rerunForce = this.exportContentStatsRefreshForceQueued
this.exportContentStatsRefreshQueued = false
this.exportContentStatsRefreshForceQueued = false
void this.startExportContentStatsRefresh(rerunForce)
}
}
}
async getExportContentSessionCounts(options?: {
triggerRefresh?: boolean
forceRefresh?: boolean
}): Promise<{ success: boolean; data?: ExportContentSessionCounts; error?: string }> {
try {
const connectResult = await this.ensureConnected()
if (!connectResult.success) {
return { success: false, error: connectResult.error || '数据库未连接' }
}
this.refreshSessionMessageCountCacheScope()
const forceRefresh = options?.forceRefresh === true
const triggerRefresh = options?.triggerRefresh !== false
const sessionIds = await this.listExportContentScopeSessionIds(forceRefresh)
const sessionIdSet = new Set(sessionIds)
for (const sessionId of Array.from(this.exportContentStatsMemory.keys())) {
if (!sessionIdSet.has(sessionId)) {
this.exportContentStatsMemory.delete(sessionId)
this.exportContentStatsDirtySessionIds.delete(sessionId)
}
}
const missingTextCountSessionIds: string[] = []
let textSessions = 0
let voiceSessions = 0
let imageSessions = 0
let videoSessions = 0
let emojiSessions = 0
const pendingMediaSessionSet = new Set<string>()
for (const sessionId of sessionIds) {
const entry = this.exportContentStatsMemory.get(sessionId)
if (entry) {
if (entry.hasAny) {
textSessions += 1
} else if (this.isExportContentEntryDirty(sessionId)) {
missingTextCountSessionIds.push(sessionId)
}
} else {
missingTextCountSessionIds.push(sessionId)
}
const hasMediaSnapshot = Boolean(entry && entry.mediaReady)
if (hasMediaSnapshot) {
if (entry!.hasVoice) voiceSessions += 1
if (entry!.hasImage) imageSessions += 1
if (entry!.hasVideo) videoSessions += 1
if (entry!.hasEmoji) emojiSessions += 1
} else {
pendingMediaSessionSet.add(sessionId)
}
if (this.isExportContentEntryDirty(sessionId) && hasMediaSnapshot) {
pendingMediaSessionSet.add(sessionId)
}
}
if (missingTextCountSessionIds.length > 0) {
const textCountResult = await this.getSessionMessageCounts(missingTextCountSessionIds)
if (textCountResult.success && textCountResult.counts) {
const now = Date.now()
for (const sessionId of missingTextCountSessionIds) {
const count = textCountResult.counts[sessionId]
const hasAny = Number.isFinite(count) && Number(count) > 0
const prevEntry = this.exportContentStatsMemory.get(sessionId) || this.createDefaultExportContentEntry()
const nextEntry: ExportContentSessionStatsEntry = {
...prevEntry,
hasAny,
updatedAt: prevEntry.updatedAt || now
}
this.exportContentStatsMemory.set(sessionId, nextEntry)
if (hasAny) {
textSessions += 1
}
}
this.persistExportContentStatsScope(sessionIdSet)
}
}
if (forceRefresh && triggerRefresh) {
void this.startExportContentStatsRefresh(true)
} else if (triggerRefresh && (pendingMediaSessionSet.size > 0 || this.exportContentStatsDirtySessionIds.size > 0)) {
void this.startExportContentStatsRefresh(false)
}
return {
success: true,
data: {
totalSessions: sessionIds.length,
textSessions,
voiceSessions,
imageSessions,
videoSessions,
emojiSessions,
pendingMediaSessions: pendingMediaSessionSet.size,
updatedAt: this.exportContentStatsScopeUpdatedAt,
refreshing: this.exportContentStatsRefreshPromise !== null
}
}
} catch (e) {
console.error('ChatService: 获取导出内容会话统计失败:', e)
return { success: false, error: String(e) }
}
}
async refreshExportContentSessionCounts(options?: { forceRefresh?: boolean }): Promise<{ success: boolean; error?: string }> {
try {
const connectResult = await this.ensureConnected()
if (!connectResult.success) {
return { success: false, error: connectResult.error || '数据库未连接' }
}
this.refreshSessionMessageCountCacheScope()
await this.startExportContentStatsRefresh(options?.forceRefresh === true)
return { success: true }
} catch (e) {
console.error('ChatService: 刷新导出内容会话统计失败:', e)
return { success: false, error: String(e) }
}
}
private refreshSessionStatsCacheScope(scope: string): void {
@@ -1741,6 +2081,8 @@ class ChatService {
if (ids.size > 0) {
ids.forEach((sessionId) => this.deleteSessionStatsCacheEntry(sessionId))
this.exportContentScopeSessionIdsCache = null
ids.forEach((sessionId) => this.exportContentStatsDirtySessionIds.add(sessionId))
if (Array.from(ids).some((id) => id.includes('@chatroom'))) {
this.allGroupSessionIdsCache = null
}
@@ -1756,6 +2098,10 @@ class ChatService {
normalizedType.includes('contact')
) {
this.clearSessionStatsCacheForScope()
this.exportContentScopeSessionIdsCache = null
for (const sessionId of this.exportContentStatsMemory.keys()) {
this.exportContentStatsDirtySessionIds.add(sessionId)
}
}
}
@@ -3918,6 +4264,11 @@ class ChatService {
this.sessionStatsPendingFull.clear()
this.allGroupSessionIdsCache = null
this.sessionStatsCacheService.clearAll()
this.exportContentStatsMemory.clear()
this.exportContentStatsDirtySessionIds.clear()
this.exportContentScopeSessionIdsCache = null
this.exportContentStatsScopeUpdatedAt = 0
this.exportContentStatsCacheService.clearAll()
}
for (const state of this.hardlinkCache.values()) {