mirror of
https://github.com/hicccc77/WeFlow.git
synced 2026-03-25 07:16:51 +00:00
fix(export): prevent card stats poll overlap with frontend/backend singleflight
This commit is contained in:
@@ -276,6 +276,12 @@ class ChatService {
|
||||
private exportContentStatsRefreshPromise: Promise<void> | null = null
|
||||
private exportContentStatsRefreshQueued = false
|
||||
private exportContentStatsRefreshForceQueued = false
|
||||
private exportContentSessionCountsInFlight: {
|
||||
promise: Promise<{ success: boolean; data?: ExportContentSessionCounts; error?: string }>
|
||||
forceRefresh: boolean
|
||||
traceId: string
|
||||
startedAt: number
|
||||
} | null = null
|
||||
private exportContentStatsDirtySessionIds = new Set<string>()
|
||||
private exportContentScopeSessionIdsCache: { ids: string[]; updatedAt: number } | null = null
|
||||
private readonly exportContentScopeSessionIdsCacheTtlMs = 60 * 1000
|
||||
@@ -2124,170 +2130,214 @@ class ChatService {
|
||||
traceId?: string
|
||||
}): Promise<{ success: boolean; data?: ExportContentSessionCounts; error?: string }> {
|
||||
const traceId = this.normalizeExportDiagTraceId(options?.traceId)
|
||||
const forceRefresh = options?.forceRefresh === true
|
||||
const triggerRefresh = options?.triggerRefresh !== false
|
||||
const stepStartedAt = this.startExportDiagStep({
|
||||
traceId,
|
||||
stepId: 'backend-get-export-content-session-counts',
|
||||
stepName: '获取导出卡片统计',
|
||||
message: '开始计算导出卡片统计',
|
||||
data: {
|
||||
triggerRefresh: options?.triggerRefresh !== false,
|
||||
forceRefresh: options?.forceRefresh === true
|
||||
triggerRefresh,
|
||||
forceRefresh
|
||||
}
|
||||
})
|
||||
let stepSuccess = false
|
||||
let stepError = ''
|
||||
let stepResult: ExportContentSessionCounts | undefined
|
||||
let activePromise: Promise<{ success: boolean; data?: ExportContentSessionCounts; error?: string }> | null = null
|
||||
let createdInFlight = false
|
||||
|
||||
try {
|
||||
const connectResult = await this.ensureConnected()
|
||||
if (!connectResult.success) {
|
||||
stepError = connectResult.error || '数据库未连接'
|
||||
return { success: false, error: connectResult.error || '数据库未连接' }
|
||||
}
|
||||
this.refreshSessionMessageCountCacheScope()
|
||||
|
||||
const forceRefresh = options?.forceRefresh === true
|
||||
const triggerRefresh = options?.triggerRefresh !== false
|
||||
const sessionIds = await this.listExportContentScopeSessionIds(forceRefresh, traceId)
|
||||
const sessionIdSet = new Set(sessionIds)
|
||||
|
||||
for (const sessionId of Array.from(this.exportContentStatsMemory.keys())) {
|
||||
if (!sessionIdSet.has(sessionId)) {
|
||||
this.exportContentStatsMemory.delete(sessionId)
|
||||
this.exportContentStatsDirtySessionIds.delete(sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
const missingTextCountSessionIds: string[] = []
|
||||
let textSessions = 0
|
||||
let voiceSessions = 0
|
||||
let imageSessions = 0
|
||||
let videoSessions = 0
|
||||
let emojiSessions = 0
|
||||
const pendingMediaSessionSet = new Set<string>()
|
||||
|
||||
for (const sessionId of sessionIds) {
|
||||
const entry = this.exportContentStatsMemory.get(sessionId)
|
||||
if (entry) {
|
||||
if (entry.hasAny) {
|
||||
textSessions += 1
|
||||
} else if (forceRefresh || this.isExportContentEntryDirty(sessionId)) {
|
||||
missingTextCountSessionIds.push(sessionId)
|
||||
}
|
||||
} else {
|
||||
missingTextCountSessionIds.push(sessionId)
|
||||
}
|
||||
|
||||
const hasMediaSnapshot = Boolean(entry && entry.mediaReady)
|
||||
if (hasMediaSnapshot) {
|
||||
if (entry!.hasVoice) voiceSessions += 1
|
||||
if (entry!.hasImage) imageSessions += 1
|
||||
if (entry!.hasVideo) videoSessions += 1
|
||||
if (entry!.hasEmoji) emojiSessions += 1
|
||||
} else {
|
||||
pendingMediaSessionSet.add(sessionId)
|
||||
}
|
||||
|
||||
if (this.isExportContentEntryDirty(sessionId) && hasMediaSnapshot) {
|
||||
pendingMediaSessionSet.add(sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
if (missingTextCountSessionIds.length > 0) {
|
||||
const textCountStepStartedAt = this.startExportDiagStep({
|
||||
if (this.exportContentSessionCountsInFlight) {
|
||||
this.logExportDiag({
|
||||
traceId,
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
message: '开始补全文本会话计数',
|
||||
data: { missingSessions: missingTextCountSessionIds.length }
|
||||
source: 'backend',
|
||||
level: 'info',
|
||||
message: '复用进行中的导出卡片统计任务',
|
||||
stepId: 'backend-get-export-content-session-counts',
|
||||
stepName: '获取导出卡片统计',
|
||||
status: 'running',
|
||||
data: {
|
||||
inFlightTraceId: this.exportContentSessionCountsInFlight.traceId,
|
||||
inFlightForceRefresh: this.exportContentSessionCountsInFlight.forceRefresh,
|
||||
requestedForceRefresh: forceRefresh,
|
||||
inFlightElapsedMs: Date.now() - this.exportContentSessionCountsInFlight.startedAt
|
||||
}
|
||||
})
|
||||
const textCountStallTimer = setTimeout(() => {
|
||||
this.logExportDiag({
|
||||
traceId,
|
||||
source: 'backend',
|
||||
level: 'warn',
|
||||
message: '补全文本会话计数耗时较长',
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
status: 'running',
|
||||
data: {
|
||||
elapsedMs: Date.now() - textCountStepStartedAt,
|
||||
missingSessions: missingTextCountSessionIds.length
|
||||
}
|
||||
})
|
||||
}, 3000)
|
||||
const textCountResult = await this.getSessionMessageCounts(missingTextCountSessionIds, {
|
||||
preferHintCache: false,
|
||||
bypassSessionCache: true,
|
||||
traceId
|
||||
})
|
||||
clearTimeout(textCountStallTimer)
|
||||
if (textCountResult.success && textCountResult.counts) {
|
||||
const now = Date.now()
|
||||
for (const sessionId of missingTextCountSessionIds) {
|
||||
const count = textCountResult.counts[sessionId]
|
||||
const hasAny = Number.isFinite(count) && Number(count) > 0
|
||||
const prevEntry = this.exportContentStatsMemory.get(sessionId) || this.createDefaultExportContentEntry()
|
||||
const nextEntry: ExportContentSessionStatsEntry = {
|
||||
...prevEntry,
|
||||
hasAny,
|
||||
updatedAt: prevEntry.updatedAt || now
|
||||
}
|
||||
this.exportContentStatsMemory.set(sessionId, nextEntry)
|
||||
if (hasAny) {
|
||||
textSessions += 1
|
||||
activePromise = this.exportContentSessionCountsInFlight.promise
|
||||
} else {
|
||||
const createdPromise = (async () => {
|
||||
const connectResult = await this.ensureConnected()
|
||||
if (!connectResult.success) {
|
||||
return { success: false, error: connectResult.error || '数据库未连接' }
|
||||
}
|
||||
this.refreshSessionMessageCountCacheScope()
|
||||
|
||||
const sessionIds = await this.listExportContentScopeSessionIds(forceRefresh, traceId)
|
||||
const sessionIdSet = new Set(sessionIds)
|
||||
|
||||
for (const sessionId of Array.from(this.exportContentStatsMemory.keys())) {
|
||||
if (!sessionIdSet.has(sessionId)) {
|
||||
this.exportContentStatsMemory.delete(sessionId)
|
||||
this.exportContentStatsDirtySessionIds.delete(sessionId)
|
||||
}
|
||||
}
|
||||
this.persistExportContentStatsScope(sessionIdSet)
|
||||
this.endExportDiagStep({
|
||||
traceId,
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
startedAt: textCountStepStartedAt,
|
||||
|
||||
const missingTextCountSessionIds: string[] = []
|
||||
let textSessions = 0
|
||||
let voiceSessions = 0
|
||||
let imageSessions = 0
|
||||
let videoSessions = 0
|
||||
let emojiSessions = 0
|
||||
const pendingMediaSessionSet = new Set<string>()
|
||||
|
||||
for (const sessionId of sessionIds) {
|
||||
const entry = this.exportContentStatsMemory.get(sessionId)
|
||||
if (entry) {
|
||||
if (entry.hasAny) {
|
||||
textSessions += 1
|
||||
} else if (forceRefresh || this.isExportContentEntryDirty(sessionId)) {
|
||||
missingTextCountSessionIds.push(sessionId)
|
||||
}
|
||||
} else {
|
||||
missingTextCountSessionIds.push(sessionId)
|
||||
}
|
||||
|
||||
const hasMediaSnapshot = Boolean(entry && entry.mediaReady)
|
||||
if (hasMediaSnapshot) {
|
||||
if (entry.hasVoice) voiceSessions += 1
|
||||
if (entry.hasImage) imageSessions += 1
|
||||
if (entry.hasVideo) videoSessions += 1
|
||||
if (entry.hasEmoji) emojiSessions += 1
|
||||
} else {
|
||||
pendingMediaSessionSet.add(sessionId)
|
||||
}
|
||||
|
||||
if (this.isExportContentEntryDirty(sessionId) && hasMediaSnapshot) {
|
||||
pendingMediaSessionSet.add(sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
if (missingTextCountSessionIds.length > 0) {
|
||||
const textCountStepStartedAt = this.startExportDiagStep({
|
||||
traceId,
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
message: '开始补全文本会话计数',
|
||||
data: { missingSessions: missingTextCountSessionIds.length }
|
||||
})
|
||||
const textCountStallTimer = setTimeout(() => {
|
||||
this.logExportDiag({
|
||||
traceId,
|
||||
source: 'backend',
|
||||
level: 'warn',
|
||||
message: '补全文本会话计数耗时较长',
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
status: 'running',
|
||||
data: {
|
||||
elapsedMs: Date.now() - textCountStepStartedAt,
|
||||
missingSessions: missingTextCountSessionIds.length
|
||||
}
|
||||
})
|
||||
}, 3000)
|
||||
const textCountResult = await this.getSessionMessageCounts(missingTextCountSessionIds, {
|
||||
preferHintCache: false,
|
||||
bypassSessionCache: true,
|
||||
traceId
|
||||
})
|
||||
clearTimeout(textCountStallTimer)
|
||||
if (textCountResult.success && textCountResult.counts) {
|
||||
const now = Date.now()
|
||||
for (const sessionId of missingTextCountSessionIds) {
|
||||
const count = textCountResult.counts[sessionId]
|
||||
const hasAny = Number.isFinite(count) && Number(count) > 0
|
||||
const prevEntry = this.exportContentStatsMemory.get(sessionId) || this.createDefaultExportContentEntry()
|
||||
const nextEntry: ExportContentSessionStatsEntry = {
|
||||
...prevEntry,
|
||||
hasAny,
|
||||
updatedAt: prevEntry.updatedAt || now
|
||||
}
|
||||
this.exportContentStatsMemory.set(sessionId, nextEntry)
|
||||
if (hasAny) {
|
||||
textSessions += 1
|
||||
}
|
||||
}
|
||||
this.persistExportContentStatsScope(sessionIdSet)
|
||||
this.endExportDiagStep({
|
||||
traceId,
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
startedAt: textCountStepStartedAt,
|
||||
success: true,
|
||||
message: '文本会话计数补全完成',
|
||||
data: { updatedSessions: missingTextCountSessionIds.length }
|
||||
})
|
||||
} else {
|
||||
this.endExportDiagStep({
|
||||
traceId,
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
startedAt: textCountStepStartedAt,
|
||||
success: false,
|
||||
message: '文本会话计数补全失败',
|
||||
data: { error: textCountResult.error || '未知错误' }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if (forceRefresh && triggerRefresh) {
|
||||
void this.startExportContentStatsRefresh(true, traceId)
|
||||
} else if (triggerRefresh && (pendingMediaSessionSet.size > 0 || this.exportContentStatsDirtySessionIds.size > 0)) {
|
||||
void this.startExportContentStatsRefresh(false, traceId)
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: '文本会话计数补全完成',
|
||||
data: { updatedSessions: missingTextCountSessionIds.length }
|
||||
})
|
||||
} else {
|
||||
this.endExportDiagStep({
|
||||
traceId,
|
||||
stepId: 'backend-fill-text-counts',
|
||||
stepName: '补全文本会话计数',
|
||||
startedAt: textCountStepStartedAt,
|
||||
success: false,
|
||||
message: '文本会话计数补全失败',
|
||||
data: { error: textCountResult.error || '未知错误' }
|
||||
})
|
||||
data: {
|
||||
totalSessions: sessionIds.length,
|
||||
textSessions,
|
||||
voiceSessions,
|
||||
imageSessions,
|
||||
videoSessions,
|
||||
emojiSessions,
|
||||
pendingMediaSessions: pendingMediaSessionSet.size,
|
||||
updatedAt: this.exportContentStatsScopeUpdatedAt,
|
||||
refreshing: this.exportContentStatsRefreshPromise !== null
|
||||
}
|
||||
}
|
||||
})()
|
||||
activePromise = createdPromise
|
||||
this.exportContentSessionCountsInFlight = {
|
||||
promise: createdPromise,
|
||||
forceRefresh,
|
||||
traceId,
|
||||
startedAt: Date.now()
|
||||
}
|
||||
createdInFlight = true
|
||||
}
|
||||
|
||||
if (forceRefresh && triggerRefresh) {
|
||||
void this.startExportContentStatsRefresh(true, traceId)
|
||||
} else if (triggerRefresh && (pendingMediaSessionSet.size > 0 || this.exportContentStatsDirtySessionIds.size > 0)) {
|
||||
void this.startExportContentStatsRefresh(false, traceId)
|
||||
if (!activePromise) {
|
||||
stepError = '统计任务未初始化'
|
||||
return { success: false, error: stepError }
|
||||
}
|
||||
|
||||
stepResult = {
|
||||
totalSessions: sessionIds.length,
|
||||
textSessions,
|
||||
voiceSessions,
|
||||
imageSessions,
|
||||
videoSessions,
|
||||
emojiSessions,
|
||||
pendingMediaSessions: pendingMediaSessionSet.size,
|
||||
updatedAt: this.exportContentStatsScopeUpdatedAt,
|
||||
refreshing: this.exportContentStatsRefreshPromise !== null
|
||||
}
|
||||
stepSuccess = true
|
||||
return {
|
||||
success: true,
|
||||
data: stepResult
|
||||
const result = await activePromise
|
||||
stepSuccess = result.success
|
||||
if (result.success && result.data) {
|
||||
stepResult = result.data
|
||||
} else {
|
||||
stepError = result.error || '未知错误'
|
||||
}
|
||||
return result
|
||||
} catch (e) {
|
||||
console.error('ChatService: 获取导出内容会话统计失败:', e)
|
||||
stepError = String(e)
|
||||
return { success: false, error: String(e) }
|
||||
} finally {
|
||||
if (createdInFlight && activePromise && this.exportContentSessionCountsInFlight?.promise === activePromise) {
|
||||
this.exportContentSessionCountsInFlight = null
|
||||
}
|
||||
this.endExportDiagStep({
|
||||
traceId,
|
||||
stepId: 'backend-get-export-content-session-counts',
|
||||
|
||||
Reference in New Issue
Block a user