mirror of
https://github.com/hicccc77/WeFlow.git
synced 2026-03-24 23:06:51 +00:00
fix(stats): ensure accurate transfer red-packet and call counts in detail panels
This commit is contained in:
@@ -162,6 +162,7 @@ interface ExportSessionStatsOptions {
|
||||
includeRelations?: boolean
|
||||
forceRefresh?: boolean
|
||||
allowStaleCache?: boolean
|
||||
preferAccurateSpecialTypes?: boolean
|
||||
}
|
||||
|
||||
interface ExportSessionStatsCacheMeta {
|
||||
@@ -2365,9 +2366,59 @@ class ChatService {
|
||||
return this.extractXmlValue(content, 'type')
|
||||
}
|
||||
|
||||
private buildXmlTypeLikeExpr(columnName: string, xmlType: '2000' | '2001'): string {
|
||||
const colExpr = `LOWER(CAST(COALESCE(${this.quoteSqlIdentifier(columnName)}, '') AS TEXT))`
|
||||
return `${colExpr} LIKE '%<type>${xmlType}</type>%'`
|
||||
private async collectSpecialMessageCountsByCursorScan(sessionId: string): Promise<{
|
||||
transferMessages: number
|
||||
redPacketMessages: number
|
||||
callMessages: number
|
||||
}> {
|
||||
const counters = {
|
||||
transferMessages: 0,
|
||||
redPacketMessages: 0,
|
||||
callMessages: 0
|
||||
}
|
||||
|
||||
const cursorResult = await wcdbService.openMessageCursorLite(sessionId, 500, false, 0, 0)
|
||||
if (!cursorResult.success || !cursorResult.cursor) {
|
||||
return counters
|
||||
}
|
||||
|
||||
const cursor = cursorResult.cursor
|
||||
try {
|
||||
while (true) {
|
||||
const batch = await wcdbService.fetchMessageBatch(cursor)
|
||||
if (!batch.success) break
|
||||
const rows = Array.isArray(batch.rows) ? batch.rows as Record<string, any>[] : []
|
||||
for (const row of rows) {
|
||||
const localType = this.getRowInt(row, ['local_type', 'localType', 'type', 'msg_type', 'msgType', 'WCDB_CT_local_type'], 1)
|
||||
if (localType === 50) {
|
||||
counters.callMessages += 1
|
||||
continue
|
||||
}
|
||||
if (localType === 8589934592049) {
|
||||
counters.transferMessages += 1
|
||||
continue
|
||||
}
|
||||
if (localType === 8594229559345) {
|
||||
counters.redPacketMessages += 1
|
||||
continue
|
||||
}
|
||||
if (localType !== 49) continue
|
||||
|
||||
const rawMessageContent = this.getRowField(row, ['message_content', 'messageContent', 'msg_content', 'msgContent', 'content', 'WCDB_CT_message_content'])
|
||||
const rawCompressContent = this.getRowField(row, ['compress_content', 'compressContent', 'compressed_content', 'compressedContent', 'WCDB_CT_compress_content'])
|
||||
const content = this.decodeMessageContent(rawMessageContent, rawCompressContent)
|
||||
const xmlType = this.extractType49XmlTypeForStats(content)
|
||||
if (xmlType === '2000') counters.transferMessages += 1
|
||||
if (xmlType === '2001') counters.redPacketMessages += 1
|
||||
}
|
||||
|
||||
if (!batch.hasMore || rows.length === 0) break
|
||||
}
|
||||
} finally {
|
||||
await wcdbService.closeMessageCursor(cursor)
|
||||
}
|
||||
|
||||
return counters
|
||||
}
|
||||
|
||||
private async collectSessionExportStatsByCursorScan(
|
||||
@@ -2474,7 +2525,8 @@ class ChatService {
|
||||
|
||||
private async collectSessionExportStats(
|
||||
sessionId: string,
|
||||
selfIdentitySet: Set<string>
|
||||
selfIdentitySet: Set<string>,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
): Promise<ExportSessionStats> {
|
||||
const stats: ExportSessionStats = {
|
||||
totalMessages: 0,
|
||||
@@ -2511,18 +2563,6 @@ class ChatService {
|
||||
const timeCol = this.pickFirstColumn(columnSet, ['create_time', 'createtime', 'msg_create_time', 'time'])
|
||||
const senderCol = this.pickFirstColumn(columnSet, ['sender_username', 'senderusername', 'sender'])
|
||||
const isSendCol = this.pickFirstColumn(columnSet, ['computed_is_send', 'computedissend', 'is_send', 'issend'])
|
||||
const messageContentCol = this.pickFirstColumn(columnSet, ['message_content', 'messagecontent', 'msg_content', 'msgcontent', 'content'])
|
||||
const compressContentCol = this.pickFirstColumn(columnSet, ['compress_content', 'compresscontent', 'compressed_content', 'compressedcontent'])
|
||||
|
||||
const transferXmlConditions: string[] = []
|
||||
if (messageContentCol) transferXmlConditions.push(this.buildXmlTypeLikeExpr(messageContentCol, '2000'))
|
||||
if (compressContentCol) transferXmlConditions.push(this.buildXmlTypeLikeExpr(compressContentCol, '2000'))
|
||||
const transferXmlCond = transferXmlConditions.length > 0 ? `(${transferXmlConditions.join(' OR ')})` : '0'
|
||||
|
||||
const redPacketXmlConditions: string[] = []
|
||||
if (messageContentCol) redPacketXmlConditions.push(this.buildXmlTypeLikeExpr(messageContentCol, '2001'))
|
||||
if (compressContentCol) redPacketXmlConditions.push(this.buildXmlTypeLikeExpr(compressContentCol, '2001'))
|
||||
const redPacketXmlCond = redPacketXmlConditions.length > 0 ? `(${redPacketXmlConditions.join(' OR ')})` : '0'
|
||||
|
||||
const selectParts: string[] = [
|
||||
'COUNT(*) AS total_messages',
|
||||
@@ -2531,8 +2571,8 @@ class ChatService {
|
||||
typeCol ? `SUM(CASE WHEN ${this.quoteSqlIdentifier(typeCol)} = 43 THEN 1 ELSE 0 END) AS video_messages` : '0 AS video_messages',
|
||||
typeCol ? `SUM(CASE WHEN ${this.quoteSqlIdentifier(typeCol)} = 47 THEN 1 ELSE 0 END) AS emoji_messages` : '0 AS emoji_messages',
|
||||
typeCol ? `SUM(CASE WHEN ${this.quoteSqlIdentifier(typeCol)} = 50 THEN 1 ELSE 0 END) AS call_messages` : '0 AS call_messages',
|
||||
typeCol ? `SUM(CASE WHEN ${this.quoteSqlIdentifier(typeCol)} = 8589934592049 THEN 1 WHEN ${this.quoteSqlIdentifier(typeCol)} = 49 AND ${transferXmlCond} THEN 1 ELSE 0 END) AS transfer_messages` : '0 AS transfer_messages',
|
||||
typeCol ? `SUM(CASE WHEN ${this.quoteSqlIdentifier(typeCol)} = 8594229559345 THEN 1 WHEN ${this.quoteSqlIdentifier(typeCol)} = 49 AND ${redPacketXmlCond} THEN 1 ELSE 0 END) AS red_packet_messages` : '0 AS red_packet_messages',
|
||||
typeCol ? `SUM(CASE WHEN ${this.quoteSqlIdentifier(typeCol)} = 8589934592049 THEN 1 ELSE 0 END) AS transfer_messages` : '0 AS transfer_messages',
|
||||
typeCol ? `SUM(CASE WHEN ${this.quoteSqlIdentifier(typeCol)} = 8594229559345 THEN 1 ELSE 0 END) AS red_packet_messages` : '0 AS red_packet_messages',
|
||||
timeCol ? `MIN(${this.quoteSqlIdentifier(timeCol)}) AS first_timestamp` : 'NULL AS first_timestamp',
|
||||
timeCol ? `MAX(${this.quoteSqlIdentifier(timeCol)}) AS last_timestamp` : 'NULL AS last_timestamp'
|
||||
]
|
||||
@@ -2628,6 +2668,17 @@ class ChatService {
|
||||
return this.collectSessionExportStatsByCursorScan(sessionId, selfIdentitySet)
|
||||
}
|
||||
|
||||
if (preferAccurateSpecialTypes) {
|
||||
try {
|
||||
const preciseCounters = await this.collectSpecialMessageCountsByCursorScan(sessionId)
|
||||
stats.transferMessages = preciseCounters.transferMessages
|
||||
stats.redPacketMessages = preciseCounters.redPacketMessages
|
||||
stats.callMessages = preciseCounters.callMessages
|
||||
} catch {
|
||||
// 保留聚合统计结果作为兜底
|
||||
}
|
||||
}
|
||||
|
||||
if (isGroup) {
|
||||
stats.groupActiveSpeakers = senderIdentities.size
|
||||
if (Number.isFinite(stats.groupMyMessages)) {
|
||||
@@ -2728,9 +2779,10 @@ class ChatService {
|
||||
private async computeSessionExportStats(
|
||||
sessionId: string,
|
||||
selfIdentitySet: Set<string>,
|
||||
includeRelations: boolean
|
||||
includeRelations: boolean,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
): Promise<ExportSessionStats> {
|
||||
const stats = await this.collectSessionExportStats(sessionId, selfIdentitySet)
|
||||
const stats = await this.collectSessionExportStats(sessionId, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
const isGroup = sessionId.endsWith('@chatroom')
|
||||
|
||||
if (isGroup) {
|
||||
@@ -2768,7 +2820,8 @@ class ChatService {
|
||||
private async computeSessionExportStatsBatch(
|
||||
sessionIds: string[],
|
||||
includeRelations: boolean,
|
||||
selfIdentitySet: Set<string>
|
||||
selfIdentitySet: Set<string>,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
): Promise<Record<string, ExportSessionStats>> {
|
||||
const normalizedSessionIds = Array.from(
|
||||
new Set(
|
||||
@@ -2824,7 +2877,7 @@ class ChatService {
|
||||
|
||||
await this.forEachWithConcurrency(normalizedSessionIds, 3, async (sessionId) => {
|
||||
try {
|
||||
const stats = await this.collectSessionExportStats(sessionId, selfIdentitySet)
|
||||
const stats = await this.collectSessionExportStats(sessionId, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
if (sessionId.endsWith('@chatroom')) {
|
||||
stats.groupMemberCount = typeof memberCountMap[sessionId] === 'number'
|
||||
? Math.max(0, Math.floor(memberCountMap[sessionId]))
|
||||
@@ -2851,8 +2904,13 @@ class ChatService {
|
||||
private async getOrComputeSessionExportStats(
|
||||
sessionId: string,
|
||||
includeRelations: boolean,
|
||||
selfIdentitySet: Set<string>
|
||||
selfIdentitySet: Set<string>,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
): Promise<ExportSessionStats> {
|
||||
if (preferAccurateSpecialTypes) {
|
||||
return this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations, true)
|
||||
}
|
||||
|
||||
const scopedKey = this.buildScopedSessionStatsKey(sessionId)
|
||||
|
||||
if (!includeRelations) {
|
||||
@@ -2866,7 +2924,7 @@ class ChatService {
|
||||
}
|
||||
|
||||
const targetMap = includeRelations ? this.sessionStatsPendingFull : this.sessionStatsPendingBasic
|
||||
const pending = this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations)
|
||||
const pending = this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations, false)
|
||||
targetMap.set(scopedKey, pending)
|
||||
try {
|
||||
return await pending
|
||||
@@ -5275,6 +5333,7 @@ class ChatService {
|
||||
const includeRelations = options.includeRelations ?? true
|
||||
const forceRefresh = options.forceRefresh === true
|
||||
const allowStaleCache = options.allowStaleCache === true
|
||||
const preferAccurateSpecialTypes = options.preferAccurateSpecialTypes === true
|
||||
|
||||
const normalizedSessionIds = Array.from(
|
||||
new Set(
|
||||
@@ -5298,7 +5357,7 @@ class ChatService {
|
||||
? this.getGroupMyMessageCountHintEntry(sessionId)
|
||||
: null
|
||||
const cachedResult = this.getSessionStatsCacheEntry(sessionId)
|
||||
if (!forceRefresh) {
|
||||
if (!forceRefresh && !preferAccurateSpecialTypes) {
|
||||
if (cachedResult && this.supportsRequestedRelation(cachedResult.entry, includeRelations)) {
|
||||
const stale = now - cachedResult.entry.updatedAt > this.sessionStatsCacheTtlMs
|
||||
if (!stale || allowStaleCache) {
|
||||
@@ -5334,7 +5393,7 @@ class ChatService {
|
||||
if (pendingSessionIds.length === 1) {
|
||||
const sessionId = pendingSessionIds[0]
|
||||
try {
|
||||
const stats = await this.getOrComputeSessionExportStats(sessionId, includeRelations, selfIdentitySet)
|
||||
const stats = await this.getOrComputeSessionExportStats(sessionId, includeRelations, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
resultMap[sessionId] = stats
|
||||
const updatedAt = this.setSessionStatsCacheEntry(sessionId, stats, includeRelations)
|
||||
cacheMeta[sessionId] = {
|
||||
@@ -5352,7 +5411,8 @@ class ChatService {
|
||||
const batchedStatsMap = await this.computeSessionExportStatsBatch(
|
||||
pendingSessionIds,
|
||||
includeRelations,
|
||||
selfIdentitySet
|
||||
selfIdentitySet,
|
||||
preferAccurateSpecialTypes
|
||||
)
|
||||
for (const sessionId of pendingSessionIds) {
|
||||
const stats = batchedStatsMap[sessionId]
|
||||
@@ -5375,7 +5435,7 @@ class ChatService {
|
||||
if (!usedBatchedCompute) {
|
||||
await this.forEachWithConcurrency(pendingSessionIds, 3, async (sessionId) => {
|
||||
try {
|
||||
const stats = await this.getOrComputeSessionExportStats(sessionId, includeRelations, selfIdentitySet)
|
||||
const stats = await this.getOrComputeSessionExportStats(sessionId, includeRelations, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
resultMap[sessionId] = stats
|
||||
const updatedAt = this.setSessionStatsCacheEntry(sessionId, stats, includeRelations)
|
||||
cacheMeta[sessionId] = {
|
||||
|
||||
Reference in New Issue
Block a user