mirror of
https://github.com/hicccc77/WeFlow.git
synced 2026-04-22 15:09:04 +00:00
优化底层游标索引性能;优化HTTPAPI索引逻辑;优化导出图片的索引写入逻辑
This commit is contained in:
@@ -198,6 +198,8 @@ interface ExportSessionStatsOptions {
|
||||
allowStaleCache?: boolean
|
||||
preferAccurateSpecialTypes?: boolean
|
||||
cacheOnly?: boolean
|
||||
beginTimestamp?: number
|
||||
endTimestamp?: number
|
||||
}
|
||||
|
||||
interface ExportSessionStatsCacheMeta {
|
||||
@@ -2178,28 +2180,31 @@ class ChatService {
|
||||
return { success: false, error: connectResult.error || '数据库未连接' }
|
||||
}
|
||||
|
||||
const batchSize = Math.max(1, limit)
|
||||
const cursorResult = await wcdbService.openMessageCursor(sessionId, batchSize, false, 0, 0)
|
||||
if (!cursorResult.success || !cursorResult.cursor) {
|
||||
return { success: false, error: cursorResult.error || '打开消息游标失败' }
|
||||
// 聊天页首屏优先走稳定路径:直接拉取固定窗口并做本地确定性排序,
|
||||
// 避免游标首批在极端数据分布下出现不稳定边界。
|
||||
const pageLimit = Math.max(1, Math.floor(limit || this.messageBatchDefault))
|
||||
const probeLimit = Math.min(500, pageLimit + 1)
|
||||
const result = await wcdbService.getMessages(sessionId, probeLimit, 0)
|
||||
if (!result.success || !Array.isArray(result.messages)) {
|
||||
return { success: false, error: result.error || '获取最新消息失败' }
|
||||
}
|
||||
|
||||
try {
|
||||
const collected = await this.collectVisibleMessagesFromCursor(sessionId, cursorResult.cursor, limit)
|
||||
if (!collected.success) {
|
||||
return { success: false, error: collected.error || '获取消息失败' }
|
||||
}
|
||||
console.log(
|
||||
`[ChatService] getLatestMessages session=${sessionId} rawRowsConsumed=${collected.rawRowsConsumed || 0} visibleMessagesReturned=${collected.messages?.length || 0} filteredOut=${collected.filteredOut || 0} nextOffset=${collected.rawRowsConsumed || 0} hasMore=${collected.hasMore === true}`
|
||||
)
|
||||
return {
|
||||
success: true,
|
||||
messages: collected.messages,
|
||||
hasMore: collected.hasMore,
|
||||
nextOffset: collected.rawRowsConsumed || 0
|
||||
}
|
||||
} finally {
|
||||
await wcdbService.closeMessageCursor(cursorResult.cursor)
|
||||
const rawRows = result.messages as Record<string, any>[]
|
||||
const hasMore = rawRows.length > pageLimit
|
||||
const selectedRows = hasMore ? rawRows.slice(0, pageLimit) : rawRows
|
||||
const mapped = this.mapRowsToMessages(selectedRows)
|
||||
const visible = mapped.filter((msg) => this.isMessageVisibleForSession(sessionId, msg))
|
||||
const normalized = this.normalizeMessageOrder(visible)
|
||||
await this.repairEmojiMessages(normalized)
|
||||
|
||||
console.log(
|
||||
`[ChatService] getLatestMessages(stable) session=${sessionId} rawRows=${rawRows.length} visibleMessagesReturned=${normalized.length} nextOffset=${selectedRows.length} hasMore=${hasMore}`
|
||||
)
|
||||
return {
|
||||
success: true,
|
||||
messages: normalized,
|
||||
hasMore,
|
||||
nextOffset: selectedRows.length
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('ChatService: 获取最新消息失败:', e)
|
||||
@@ -2241,16 +2246,59 @@ class ChatService {
|
||||
}
|
||||
}
|
||||
|
||||
private compareMessagesByTimeline(a: Message, b: Message): number {
|
||||
const aSortSeq = Math.max(0, Number(a.sortSeq || 0))
|
||||
const bSortSeq = Math.max(0, Number(b.sortSeq || 0))
|
||||
const aCreateTime = Math.max(0, Number(a.createTime || 0))
|
||||
const bCreateTime = Math.max(0, Number(b.createTime || 0))
|
||||
const aLocalId = Math.max(0, Number(a.localId || 0))
|
||||
const bLocalId = Math.max(0, Number(b.localId || 0))
|
||||
const aServerId = Math.max(0, Number(a.serverId || 0))
|
||||
const bServerId = Math.max(0, Number(b.serverId || 0))
|
||||
|
||||
// 与 C++ 侧归并规则一致:当两侧都有 sortSeq 时优先 sortSeq,否则先看 createTime。
|
||||
if (aSortSeq > 0 && bSortSeq > 0 && aSortSeq !== bSortSeq) {
|
||||
return aSortSeq - bSortSeq
|
||||
}
|
||||
if (aCreateTime !== bCreateTime) {
|
||||
return aCreateTime - bCreateTime
|
||||
}
|
||||
if (aSortSeq !== bSortSeq) {
|
||||
return aSortSeq - bSortSeq
|
||||
}
|
||||
if (aLocalId !== bLocalId) {
|
||||
return aLocalId - bLocalId
|
||||
}
|
||||
if (aServerId !== bServerId) {
|
||||
return aServerId - bServerId
|
||||
}
|
||||
|
||||
const aKey = String(a.messageKey || '')
|
||||
const bKey = String(b.messageKey || '')
|
||||
if (aKey < bKey) return -1
|
||||
if (aKey > bKey) return 1
|
||||
return 0
|
||||
}
|
||||
|
||||
private normalizeMessageOrder(messages: Message[]): Message[] {
|
||||
if (messages.length < 2) return messages
|
||||
const first = messages[0]
|
||||
const last = messages[messages.length - 1]
|
||||
const firstKey = first.sortSeq || first.createTime || first.localId || 0
|
||||
const lastKey = last.sortSeq || last.createTime || last.localId || 0
|
||||
if (firstKey > lastKey) {
|
||||
return [...messages].reverse()
|
||||
|
||||
const withIndex = messages.map((msg, index) => ({ msg, index }))
|
||||
withIndex.sort((left, right) => {
|
||||
const diff = this.compareMessagesByTimeline(left.msg, right.msg)
|
||||
if (diff !== 0) return diff
|
||||
return left.index - right.index
|
||||
})
|
||||
|
||||
let changed = false
|
||||
for (let index = 0; index < withIndex.length; index += 1) {
|
||||
if (withIndex[index].msg !== messages[index]) {
|
||||
changed = true
|
||||
break
|
||||
}
|
||||
}
|
||||
return messages
|
||||
if (!changed) return messages
|
||||
return withIndex.map((entry) => entry.msg)
|
||||
}
|
||||
|
||||
private encodeMessageKeySegment(value: unknown): string {
|
||||
@@ -2436,6 +2484,95 @@ class ChatService {
|
||||
return Number.isFinite(parsed) ? parsed : fallback
|
||||
}
|
||||
|
||||
private parseCompactDateTimeDigitsToSeconds(raw: string): number {
|
||||
const text = String(raw || '').trim()
|
||||
if (!/^\d{8}(?:\d{4}(?:\d{2})?)?$/.test(text)) return 0
|
||||
|
||||
const year = Number.parseInt(text.slice(0, 4), 10)
|
||||
const month = Number.parseInt(text.slice(4, 6), 10)
|
||||
const day = Number.parseInt(text.slice(6, 8), 10)
|
||||
const hour = text.length >= 12 ? Number.parseInt(text.slice(8, 10), 10) : 0
|
||||
const minute = text.length >= 12 ? Number.parseInt(text.slice(10, 12), 10) : 0
|
||||
const second = text.length >= 14 ? Number.parseInt(text.slice(12, 14), 10) : 0
|
||||
|
||||
if (!Number.isFinite(year) || year < 1990 || year > 2200) return 0
|
||||
if (!Number.isFinite(month) || month < 1 || month > 12) return 0
|
||||
if (!Number.isFinite(day) || day < 1 || day > 31) return 0
|
||||
if (!Number.isFinite(hour) || hour < 0 || hour > 23) return 0
|
||||
if (!Number.isFinite(minute) || minute < 0 || minute > 59) return 0
|
||||
if (!Number.isFinite(second) || second < 0 || second > 59) return 0
|
||||
|
||||
const dt = new Date(year, month - 1, day, hour, minute, second)
|
||||
if (
|
||||
dt.getFullYear() !== year ||
|
||||
dt.getMonth() !== month - 1 ||
|
||||
dt.getDate() !== day ||
|
||||
dt.getHours() !== hour ||
|
||||
dt.getMinutes() !== minute ||
|
||||
dt.getSeconds() !== second
|
||||
) {
|
||||
return 0
|
||||
}
|
||||
const ts = Math.floor(dt.getTime() / 1000)
|
||||
return Number.isFinite(ts) && ts > 0 ? ts : 0
|
||||
}
|
||||
|
||||
private parseDateTimeTextToSeconds(raw: unknown): number {
|
||||
const text = String(raw ?? '').trim()
|
||||
if (!text) return 0
|
||||
|
||||
const compactDigits = this.parseCompactDateTimeDigitsToSeconds(text)
|
||||
if (compactDigits > 0) return compactDigits
|
||||
|
||||
if (/[zZ]|[+-]\d{2}:?\d{2}$/.test(text)) {
|
||||
const parsed = Date.parse(text)
|
||||
const seconds = Math.floor(parsed / 1000)
|
||||
if (Number.isFinite(seconds) && seconds > 0) return seconds
|
||||
}
|
||||
|
||||
const normalized = text.replace('T', ' ').replace(/\.\d+$/, '').replace(/\//g, '-')
|
||||
const match = normalized.match(/^(\d{4})-(\d{1,2})-(\d{1,2})(?:\s+(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?)?$/)
|
||||
if (!match) return 0
|
||||
|
||||
const year = Number.parseInt(match[1], 10)
|
||||
const month = Number.parseInt(match[2], 10)
|
||||
const day = Number.parseInt(match[3], 10)
|
||||
const hour = Number.parseInt(match[4] || '0', 10)
|
||||
const minute = Number.parseInt(match[5] || '0', 10)
|
||||
const second = Number.parseInt(match[6] || '0', 10)
|
||||
if (!Number.isFinite(year) || !Number.isFinite(month) || !Number.isFinite(day)) return 0
|
||||
const dt = new Date(year, month - 1, day, hour, minute, second)
|
||||
const ts = Math.floor(dt.getTime() / 1000)
|
||||
return Number.isFinite(ts) && ts > 0 ? ts : 0
|
||||
}
|
||||
|
||||
private normalizeTimestampLikeToSeconds(raw: unknown): number {
|
||||
if (raw === undefined || raw === null || raw === '') return 0
|
||||
const text = String(raw ?? '').trim()
|
||||
if (!text) return 0
|
||||
|
||||
const compactDigits = this.parseCompactDateTimeDigitsToSeconds(text)
|
||||
if (compactDigits > 0) return compactDigits
|
||||
|
||||
const parsed = this.coerceRowNumber(raw)
|
||||
if (Number.isFinite(parsed) && parsed > 0) {
|
||||
let normalized = Math.floor(parsed)
|
||||
while (normalized > 10000000000) {
|
||||
normalized = Math.floor(normalized / 1000)
|
||||
}
|
||||
return normalized
|
||||
}
|
||||
|
||||
return this.parseDateTimeTextToSeconds(text)
|
||||
}
|
||||
|
||||
private getRowTimestampSeconds(row: Record<string, any>, keys: string[], fallback = 0): number {
|
||||
const raw = this.getRowField(row, keys)
|
||||
if (raw === undefined || raw === null || raw === '') return fallback
|
||||
const parsed = this.normalizeTimestampLikeToSeconds(raw)
|
||||
return parsed > 0 ? parsed : fallback
|
||||
}
|
||||
|
||||
private hasAnyContactExtendedFieldKey(row: Record<string, any>): boolean {
|
||||
for (const key of Object.keys(row || {})) {
|
||||
if (this.contactExtendedFieldCandidateSet.has(String(key || '').toLowerCase())) {
|
||||
@@ -3066,13 +3203,13 @@ class ChatService {
|
||||
if (typeof raw === 'number') return raw
|
||||
if (typeof raw === 'bigint') return Number(raw)
|
||||
if (Buffer.isBuffer(raw)) {
|
||||
return parseInt(raw.toString('utf-8'), 10)
|
||||
return this.coerceRowNumber(raw.toString('utf-8'))
|
||||
}
|
||||
if (raw instanceof Uint8Array) {
|
||||
return parseInt(Buffer.from(raw).toString('utf-8'), 10)
|
||||
return this.coerceRowNumber(Buffer.from(raw).toString('utf-8'))
|
||||
}
|
||||
if (Array.isArray(raw)) {
|
||||
return parseInt(Buffer.from(raw).toString('utf-8'), 10)
|
||||
return this.coerceRowNumber(Buffer.from(raw).toString('utf-8'))
|
||||
}
|
||||
if (typeof raw === 'object') {
|
||||
if ('value' in raw) return this.coerceRowNumber(raw.value)
|
||||
@@ -3088,13 +3225,21 @@ class ChatService {
|
||||
}
|
||||
const text = raw.toString ? String(raw) : ''
|
||||
if (text && text !== '[object Object]') {
|
||||
const parsed = parseInt(text, 10)
|
||||
return Number.isFinite(parsed) ? parsed : NaN
|
||||
return this.coerceRowNumber(text)
|
||||
}
|
||||
return NaN
|
||||
}
|
||||
const parsed = parseInt(String(raw), 10)
|
||||
return Number.isFinite(parsed) ? parsed : NaN
|
||||
const text = String(raw).trim()
|
||||
if (!text) return NaN
|
||||
if (/^[+-]?\d+$/.test(text)) {
|
||||
const parsed = Number(text)
|
||||
return Number.isFinite(parsed) ? parsed : NaN
|
||||
}
|
||||
if (/^[+-]?\d+\.\d+$/.test(text)) {
|
||||
const parsed = Number(text)
|
||||
return Number.isFinite(parsed) ? parsed : NaN
|
||||
}
|
||||
return NaN
|
||||
}
|
||||
|
||||
private buildIdentityKeys(raw: string): string[] {
|
||||
@@ -3656,7 +3801,11 @@ class ChatService {
|
||||
return this.extractXmlValue(content, 'type')
|
||||
}
|
||||
|
||||
private async collectSpecialMessageCountsByCursorScan(sessionId: string): Promise<{
|
||||
private async collectSpecialMessageCountsByCursorScan(
|
||||
sessionId: string,
|
||||
beginTimestamp: number = 0,
|
||||
endTimestamp: number = 0
|
||||
): Promise<{
|
||||
transferMessages: number
|
||||
redPacketMessages: number
|
||||
callMessages: number
|
||||
@@ -3667,7 +3816,7 @@ class ChatService {
|
||||
callMessages: 0
|
||||
}
|
||||
|
||||
const cursorResult = await wcdbService.openMessageCursorLite(sessionId, 500, false, 0, 0)
|
||||
const cursorResult = await wcdbService.openMessageCursorLite(sessionId, 500, false, beginTimestamp, endTimestamp)
|
||||
if (!cursorResult.success || !cursorResult.cursor) {
|
||||
return counters
|
||||
}
|
||||
@@ -3713,7 +3862,9 @@ class ChatService {
|
||||
|
||||
private async collectSessionExportStatsByCursorScan(
|
||||
sessionId: string,
|
||||
selfIdentitySet: Set<string>
|
||||
selfIdentitySet: Set<string>,
|
||||
beginTimestamp: number = 0,
|
||||
endTimestamp: number = 0
|
||||
): Promise<ExportSessionStats> {
|
||||
const stats: ExportSessionStats = {
|
||||
totalMessages: 0,
|
||||
@@ -3731,7 +3882,7 @@ class ChatService {
|
||||
}
|
||||
|
||||
const senderIdentities = new Set<string>()
|
||||
const cursorResult = await wcdbService.openMessageCursorLite(sessionId, 500, false, 0, 0)
|
||||
const cursorResult = await wcdbService.openMessageCursorLite(sessionId, 500, false, beginTimestamp, endTimestamp)
|
||||
if (!cursorResult.success || !cursorResult.cursor) {
|
||||
return stats
|
||||
}
|
||||
@@ -3806,7 +3957,7 @@ class ChatService {
|
||||
|
||||
if (sessionId.endsWith('@chatroom')) {
|
||||
stats.groupActiveSpeakers = senderIdentities.size
|
||||
if (Number.isFinite(stats.groupMyMessages)) {
|
||||
if ((beginTimestamp <= 0 && endTimestamp <= 0) && Number.isFinite(stats.groupMyMessages)) {
|
||||
this.setGroupMyMessageCountHintEntry(sessionId, stats.groupMyMessages as number)
|
||||
}
|
||||
}
|
||||
@@ -3816,7 +3967,9 @@ class ChatService {
|
||||
private async collectSessionExportStats(
|
||||
sessionId: string,
|
||||
selfIdentitySet: Set<string>,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
preferAccurateSpecialTypes: boolean = false,
|
||||
beginTimestamp: number = 0,
|
||||
endTimestamp: number = 0
|
||||
): Promise<ExportSessionStats> {
|
||||
const stats: ExportSessionStats = {
|
||||
totalMessages: 0,
|
||||
@@ -3834,9 +3987,9 @@ class ChatService {
|
||||
stats.groupActiveSpeakers = 0
|
||||
}
|
||||
|
||||
const nativeResult = await wcdbService.getSessionMessageTypeStats(sessionId, 0, 0)
|
||||
const nativeResult = await wcdbService.getSessionMessageTypeStats(sessionId, beginTimestamp, endTimestamp)
|
||||
if (!nativeResult.success || !nativeResult.data) {
|
||||
return this.collectSessionExportStatsByCursorScan(sessionId, selfIdentitySet)
|
||||
return this.collectSessionExportStatsByCursorScan(sessionId, selfIdentitySet, beginTimestamp, endTimestamp)
|
||||
}
|
||||
|
||||
const data = nativeResult.data as Record<string, any>
|
||||
@@ -3856,7 +4009,7 @@ class ChatService {
|
||||
|
||||
if (preferAccurateSpecialTypes) {
|
||||
try {
|
||||
const preciseCounters = await this.collectSpecialMessageCountsByCursorScan(sessionId)
|
||||
const preciseCounters = await this.collectSpecialMessageCountsByCursorScan(sessionId, beginTimestamp, endTimestamp)
|
||||
stats.transferMessages = preciseCounters.transferMessages
|
||||
stats.redPacketMessages = preciseCounters.redPacketMessages
|
||||
stats.callMessages = preciseCounters.callMessages
|
||||
@@ -3868,14 +4021,19 @@ class ChatService {
|
||||
if (isGroup) {
|
||||
stats.groupMyMessages = Math.max(0, Math.floor(Number(data.group_my_messages || 0)))
|
||||
stats.groupActiveSpeakers = Math.max(0, Math.floor(Number(data.group_sender_count || 0)))
|
||||
if (Number.isFinite(stats.groupMyMessages)) {
|
||||
if ((beginTimestamp <= 0 && endTimestamp <= 0) && Number.isFinite(stats.groupMyMessages)) {
|
||||
this.setGroupMyMessageCountHintEntry(sessionId, stats.groupMyMessages as number)
|
||||
}
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
private toExportSessionStatsFromNativeTypeRow(sessionId: string, row: Record<string, any>): ExportSessionStats {
|
||||
private toExportSessionStatsFromNativeTypeRow(
|
||||
sessionId: string,
|
||||
row: Record<string, any>,
|
||||
options?: { updateGroupHint?: boolean }
|
||||
): ExportSessionStats {
|
||||
const updateGroupHint = options?.updateGroupHint !== false
|
||||
const stats: ExportSessionStats = {
|
||||
totalMessages: Math.max(0, Math.floor(Number(row?.total_messages || 0))),
|
||||
voiceMessages: Math.max(0, Math.floor(Number(row?.voice_messages || 0))),
|
||||
@@ -3895,7 +4053,7 @@ class ChatService {
|
||||
if (sessionId.endsWith('@chatroom')) {
|
||||
stats.groupMyMessages = Math.max(0, Math.floor(Number(row?.group_my_messages || 0)))
|
||||
stats.groupActiveSpeakers = Math.max(0, Math.floor(Number(row?.group_sender_count || 0)))
|
||||
if (Number.isFinite(stats.groupMyMessages)) {
|
||||
if (updateGroupHint && Number.isFinite(stats.groupMyMessages)) {
|
||||
this.setGroupMyMessageCountHintEntry(sessionId, stats.groupMyMessages as number)
|
||||
}
|
||||
}
|
||||
@@ -4025,9 +4183,17 @@ class ChatService {
|
||||
sessionId: string,
|
||||
selfIdentitySet: Set<string>,
|
||||
includeRelations: boolean,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
preferAccurateSpecialTypes: boolean = false,
|
||||
beginTimestamp: number = 0,
|
||||
endTimestamp: number = 0
|
||||
): Promise<ExportSessionStats> {
|
||||
const stats = await this.collectSessionExportStats(sessionId, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
const stats = await this.collectSessionExportStats(
|
||||
sessionId,
|
||||
selfIdentitySet,
|
||||
preferAccurateSpecialTypes,
|
||||
beginTimestamp,
|
||||
endTimestamp
|
||||
)
|
||||
const isGroup = sessionId.endsWith('@chatroom')
|
||||
|
||||
if (isGroup) {
|
||||
@@ -4066,7 +4232,9 @@ class ChatService {
|
||||
sessionIds: string[],
|
||||
includeRelations: boolean,
|
||||
selfIdentitySet: Set<string>,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
preferAccurateSpecialTypes: boolean = false,
|
||||
beginTimestamp: number = 0,
|
||||
endTimestamp: number = 0
|
||||
): Promise<Record<string, ExportSessionStats>> {
|
||||
const normalizedSessionIds = Array.from(
|
||||
new Set(
|
||||
@@ -4127,8 +4295,8 @@ class ChatService {
|
||||
try {
|
||||
const quickMode = !includeRelations && normalizedSessionIds.length > 1
|
||||
const nativeBatch = await wcdbService.getSessionMessageTypeStatsBatch(normalizedSessionIds, {
|
||||
beginTimestamp: 0,
|
||||
endTimestamp: 0,
|
||||
beginTimestamp,
|
||||
endTimestamp,
|
||||
quickMode,
|
||||
includeGroupSenderCount: true
|
||||
})
|
||||
@@ -4136,7 +4304,9 @@ class ChatService {
|
||||
for (const sessionId of normalizedSessionIds) {
|
||||
const row = nativeBatch.data?.[sessionId] as Record<string, any> | undefined
|
||||
if (!row || typeof row !== 'object') continue
|
||||
nativeBatchStats[sessionId] = this.toExportSessionStatsFromNativeTypeRow(sessionId, row)
|
||||
nativeBatchStats[sessionId] = this.toExportSessionStatsFromNativeTypeRow(sessionId, row, {
|
||||
updateGroupHint: beginTimestamp <= 0 && endTimestamp <= 0
|
||||
})
|
||||
}
|
||||
hasNativeBatchStats = Object.keys(nativeBatchStats).length > 0
|
||||
} else {
|
||||
@@ -4151,7 +4321,13 @@ class ChatService {
|
||||
try {
|
||||
const stats = hasNativeBatchStats && nativeBatchStats[sessionId]
|
||||
? { ...nativeBatchStats[sessionId] }
|
||||
: await this.collectSessionExportStats(sessionId, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
: await this.collectSessionExportStats(
|
||||
sessionId,
|
||||
selfIdentitySet,
|
||||
preferAccurateSpecialTypes,
|
||||
beginTimestamp,
|
||||
endTimestamp
|
||||
)
|
||||
if (sessionId.endsWith('@chatroom')) {
|
||||
if (shouldLoadGroupMemberCount) {
|
||||
stats.groupMemberCount = typeof memberCountMap[sessionId] === 'number'
|
||||
@@ -4181,10 +4357,12 @@ class ChatService {
|
||||
sessionId: string,
|
||||
includeRelations: boolean,
|
||||
selfIdentitySet: Set<string>,
|
||||
preferAccurateSpecialTypes: boolean = false
|
||||
preferAccurateSpecialTypes: boolean = false,
|
||||
beginTimestamp: number = 0,
|
||||
endTimestamp: number = 0
|
||||
): Promise<ExportSessionStats> {
|
||||
if (preferAccurateSpecialTypes) {
|
||||
return this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations, true)
|
||||
return this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations, true, beginTimestamp, endTimestamp)
|
||||
}
|
||||
|
||||
const scopedKey = this.buildScopedSessionStatsKey(sessionId)
|
||||
@@ -4199,8 +4377,13 @@ class ChatService {
|
||||
if (pendingFull) return pendingFull
|
||||
}
|
||||
|
||||
const shouldUsePendingPool = beginTimestamp <= 0 && endTimestamp <= 0
|
||||
if (!shouldUsePendingPool) {
|
||||
return this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations, false, beginTimestamp, endTimestamp)
|
||||
}
|
||||
|
||||
const targetMap = includeRelations ? this.sessionStatsPendingFull : this.sessionStatsPendingBasic
|
||||
const pending = this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations, false)
|
||||
const pending = this.computeSessionExportStats(sessionId, selfIdentitySet, includeRelations, false, beginTimestamp, endTimestamp)
|
||||
targetMap.set(scopedKey, pending)
|
||||
try {
|
||||
return await pending
|
||||
@@ -4216,6 +4399,55 @@ class ChatService {
|
||||
return this.mapRowsToMessages(rows)
|
||||
}
|
||||
|
||||
mapRowsToMessagesLiteForApi(rows: Record<string, any>[]): Message[] {
|
||||
const myWxid = String(this.configService.get('myWxid') || '').trim()
|
||||
const messages: Message[] = []
|
||||
for (const row of rows) {
|
||||
const sourceInfo = this.getMessageSourceInfo(row)
|
||||
const localType = this.getRowInt(row, ['local_type'], 1)
|
||||
const createTime = this.getRowTimestampSeconds(row, ['create_time', 'createTime', 'msg_time', 'msgTime', 'time'], 0)
|
||||
const sortSeq = this.getRowInt(row, ['sort_seq'], createTime > 0 ? createTime * 1000 : 0)
|
||||
const localId = this.getRowInt(row, ['local_id'], 0)
|
||||
const serverId = this.getRowInt(row, ['server_id'], 0)
|
||||
const content = this.decodeMessageContent(row.message_content, row.compress_content)
|
||||
|
||||
const isSendRaw = row.computed_is_send ?? row.is_send
|
||||
const parsedRawIsSend = isSendRaw === null || isSendRaw === undefined
|
||||
? null
|
||||
: parseInt(String(isSendRaw), 10)
|
||||
const normalizedIsSend = typeof parsedRawIsSend === 'number' && Number.isFinite(parsedRawIsSend)
|
||||
? parsedRawIsSend
|
||||
: null
|
||||
const senderFromRow = String(row.sender_username || '').trim() || this.extractSenderUsernameFromContent(content) || null
|
||||
const { isSend } = this.resolveMessageIsSend(normalizedIsSend, senderFromRow)
|
||||
const senderUsername = senderFromRow || (isSend === 1 && myWxid ? myWxid : null)
|
||||
|
||||
messages.push({
|
||||
messageKey: this.buildMessageKey({
|
||||
localId,
|
||||
serverId,
|
||||
createTime,
|
||||
sortSeq,
|
||||
senderUsername,
|
||||
localType,
|
||||
...sourceInfo
|
||||
}),
|
||||
localId,
|
||||
serverId,
|
||||
localType,
|
||||
createTime,
|
||||
sortSeq,
|
||||
isSend,
|
||||
senderUsername,
|
||||
parsedContent: '',
|
||||
rawContent: content,
|
||||
content,
|
||||
_db_path: sourceInfo.dbPath
|
||||
})
|
||||
}
|
||||
return messages
|
||||
}
|
||||
|
||||
private mapRowsToMessages(rows: Record<string, any>[]): Message[] {
|
||||
const myWxid = this.configService.get('myWxid')
|
||||
|
||||
@@ -4233,7 +4465,7 @@ class ChatService {
|
||||
|| this.extractSenderUsernameFromContent(content)
|
||||
|| null
|
||||
const { isSend } = this.resolveMessageIsSend(parsedRawIsSend, senderUsername)
|
||||
const createTime = this.getRowInt(row, ['create_time'], 0)
|
||||
const createTime = this.getRowTimestampSeconds(row, ['create_time', 'createTime', 'msg_time', 'msgTime', 'time'], 0)
|
||||
|
||||
if (senderUsername && !myWxid) {
|
||||
// [DEBUG] Issue #34: 未配置 myWxid,无法判断是否发送
|
||||
@@ -7042,6 +7274,9 @@ class ChatService {
|
||||
const allowStaleCache = options.allowStaleCache === true
|
||||
const preferAccurateSpecialTypes = options.preferAccurateSpecialTypes === true
|
||||
const cacheOnly = options.cacheOnly === true
|
||||
const beginTimestamp = this.normalizeTimestampSeconds(Number(options.beginTimestamp || 0))
|
||||
const endTimestamp = this.normalizeTimestampSeconds(Number(options.endTimestamp || 0))
|
||||
const useRangeFilter = beginTimestamp > 0 || endTimestamp > 0
|
||||
|
||||
const normalizedSessionIds = Array.from(
|
||||
new Set(
|
||||
@@ -7065,7 +7300,7 @@ class ChatService {
|
||||
? this.getGroupMyMessageCountHintEntry(sessionId)
|
||||
: null
|
||||
const cachedResult = this.getSessionStatsCacheEntry(sessionId)
|
||||
const canUseCache = cacheOnly || (!forceRefresh && !preferAccurateSpecialTypes)
|
||||
const canUseCache = !useRangeFilter && (cacheOnly || (!forceRefresh && !preferAccurateSpecialTypes))
|
||||
if (canUseCache && cachedResult && this.supportsRequestedRelation(cachedResult.entry, includeRelations)) {
|
||||
const stale = now - cachedResult.entry.updatedAt > this.sessionStatsCacheTtlMs
|
||||
if (!stale || allowStaleCache || cacheOnly) {
|
||||
@@ -7103,31 +7338,16 @@ class ChatService {
|
||||
if (pendingSessionIds.length === 1) {
|
||||
const sessionId = pendingSessionIds[0]
|
||||
try {
|
||||
const stats = await this.getOrComputeSessionExportStats(sessionId, includeRelations, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
resultMap[sessionId] = stats
|
||||
const updatedAt = this.setSessionStatsCacheEntry(sessionId, stats, includeRelations)
|
||||
cacheMeta[sessionId] = {
|
||||
updatedAt,
|
||||
stale: false,
|
||||
includeRelations,
|
||||
source: 'fresh'
|
||||
}
|
||||
usedBatchedCompute = true
|
||||
} catch {
|
||||
usedBatchedCompute = false
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
const batchedStatsMap = await this.computeSessionExportStatsBatch(
|
||||
pendingSessionIds,
|
||||
const stats = await this.getOrComputeSessionExportStats(
|
||||
sessionId,
|
||||
includeRelations,
|
||||
selfIdentitySet,
|
||||
preferAccurateSpecialTypes
|
||||
preferAccurateSpecialTypes,
|
||||
beginTimestamp,
|
||||
endTimestamp
|
||||
)
|
||||
for (const sessionId of pendingSessionIds) {
|
||||
const stats = batchedStatsMap[sessionId]
|
||||
if (!stats) continue
|
||||
resultMap[sessionId] = stats
|
||||
resultMap[sessionId] = stats
|
||||
if (!useRangeFilter) {
|
||||
const updatedAt = this.setSessionStatsCacheEntry(sessionId, stats, includeRelations)
|
||||
cacheMeta[sessionId] = {
|
||||
updatedAt,
|
||||
@@ -7140,19 +7360,56 @@ class ChatService {
|
||||
} catch {
|
||||
usedBatchedCompute = false
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
const batchedStatsMap = await this.computeSessionExportStatsBatch(
|
||||
pendingSessionIds,
|
||||
includeRelations,
|
||||
selfIdentitySet,
|
||||
preferAccurateSpecialTypes,
|
||||
beginTimestamp,
|
||||
endTimestamp
|
||||
)
|
||||
for (const sessionId of pendingSessionIds) {
|
||||
const stats = batchedStatsMap[sessionId]
|
||||
if (!stats) continue
|
||||
resultMap[sessionId] = stats
|
||||
if (!useRangeFilter) {
|
||||
const updatedAt = this.setSessionStatsCacheEntry(sessionId, stats, includeRelations)
|
||||
cacheMeta[sessionId] = {
|
||||
updatedAt,
|
||||
stale: false,
|
||||
includeRelations,
|
||||
source: 'fresh'
|
||||
}
|
||||
}
|
||||
}
|
||||
usedBatchedCompute = true
|
||||
} catch {
|
||||
usedBatchedCompute = false
|
||||
}
|
||||
}
|
||||
|
||||
if (!usedBatchedCompute) {
|
||||
await this.forEachWithConcurrency(pendingSessionIds, 3, async (sessionId) => {
|
||||
try {
|
||||
const stats = await this.getOrComputeSessionExportStats(sessionId, includeRelations, selfIdentitySet, preferAccurateSpecialTypes)
|
||||
resultMap[sessionId] = stats
|
||||
const updatedAt = this.setSessionStatsCacheEntry(sessionId, stats, includeRelations)
|
||||
cacheMeta[sessionId] = {
|
||||
updatedAt,
|
||||
stale: false,
|
||||
const stats = await this.getOrComputeSessionExportStats(
|
||||
sessionId,
|
||||
includeRelations,
|
||||
source: 'fresh'
|
||||
selfIdentitySet,
|
||||
preferAccurateSpecialTypes,
|
||||
beginTimestamp,
|
||||
endTimestamp
|
||||
)
|
||||
resultMap[sessionId] = stats
|
||||
if (!useRangeFilter) {
|
||||
const updatedAt = this.setSessionStatsCacheEntry(sessionId, stats, includeRelations)
|
||||
cacheMeta[sessionId] = {
|
||||
updatedAt,
|
||||
stale: false,
|
||||
includeRelations,
|
||||
source: 'fresh'
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
resultMap[sessionId] = this.buildEmptyExportSessionStats(sessionId, includeRelations)
|
||||
@@ -8892,7 +9149,11 @@ class ChatService {
|
||||
private normalizeTimestampSeconds(value: number): number {
|
||||
const numeric = Number(value || 0)
|
||||
if (!Number.isFinite(numeric) || numeric <= 0) return 0
|
||||
return numeric > 1e12 ? Math.floor(numeric / 1000) : Math.floor(numeric)
|
||||
let normalized = Math.floor(numeric)
|
||||
while (normalized > 10000000000) {
|
||||
normalized = Math.floor(normalized / 1000)
|
||||
}
|
||||
return normalized
|
||||
}
|
||||
|
||||
private toSafeInt(value: unknown, fallback = 0): number {
|
||||
@@ -10532,8 +10793,8 @@ class ChatService {
|
||||
const serverIdRaw = this.normalizeUnsignedIntegerToken(row.server_id)
|
||||
const serverId = this.getRowInt(row, ['server_id'], 0)
|
||||
const localType = this.getRowInt(row, ['local_type'], 0)
|
||||
const createTime = this.getRowInt(row, ['create_time'], 0)
|
||||
const sortSeq = this.getRowInt(row, ['sort_seq'], createTime)
|
||||
const createTime = this.getRowTimestampSeconds(row, ['create_time', 'createTime', 'msg_time', 'msgTime', 'time'], 0)
|
||||
const sortSeq = this.getRowInt(row, ['sort_seq'], createTime > 0 ? createTime * 1000 : 0)
|
||||
const rawIsSend = row.computed_is_send ?? row.is_send
|
||||
const senderUsername = await this.resolveSenderUsernameForMessageRow(row, rawContent)
|
||||
const sendState = this.resolveMessageIsSend(rawIsSend === null ? null : parseInt(rawIsSend, 10), senderUsername)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -516,27 +516,29 @@ class HttpService {
|
||||
limit: number,
|
||||
startTime: number,
|
||||
endTime: number,
|
||||
ascending: boolean
|
||||
ascending: boolean,
|
||||
useLiteMapping: boolean = true
|
||||
): Promise<{ success: boolean; messages?: Message[]; hasMore?: boolean; error?: string }> {
|
||||
try {
|
||||
// 使用固定 batch 大小(与 limit 相同或最多 500)来减少循环次数
|
||||
const batchSize = Math.min(limit, 500)
|
||||
// 深分页时放大 batch,避免 offset 很大时出现大量小批次循环。
|
||||
const batchSize = Math.min(2000, Math.max(500, limit))
|
||||
const beginTimestamp = startTime > 10000000000 ? Math.floor(startTime / 1000) : startTime
|
||||
const endTimestamp = endTime > 10000000000 ? Math.floor(endTime / 1000) : endTime
|
||||
|
||||
const cursorResult = await wcdbService.openMessageCursor(talker, batchSize, ascending, beginTimestamp, endTimestamp)
|
||||
const cursorResult = await wcdbService.openMessageCursorLite(talker, batchSize, ascending, beginTimestamp, endTimestamp)
|
||||
if (!cursorResult.success || !cursorResult.cursor) {
|
||||
return { success: false, error: cursorResult.error || '打开消息游标失败' }
|
||||
}
|
||||
|
||||
const cursor = cursorResult.cursor
|
||||
try {
|
||||
const allRows: Record<string, any>[] = []
|
||||
const collectedRows: Record<string, any>[] = []
|
||||
let hasMore = true
|
||||
let skipped = 0
|
||||
let reachedLimit = false
|
||||
|
||||
// 循环获取消息,处理 offset 跳过 + limit 累积
|
||||
while (allRows.length < limit && hasMore) {
|
||||
while (collectedRows.length < limit && hasMore) {
|
||||
const batch = await wcdbService.fetchMessageBatch(cursor)
|
||||
if (!batch.success || !batch.rows || batch.rows.length === 0) {
|
||||
hasMore = false
|
||||
@@ -557,12 +559,20 @@ class HttpService {
|
||||
skipped = offset
|
||||
}
|
||||
|
||||
allRows.push(...rows)
|
||||
const remainingCapacity = limit - collectedRows.length
|
||||
if (rows.length > remainingCapacity) {
|
||||
collectedRows.push(...rows.slice(0, remainingCapacity))
|
||||
reachedLimit = true
|
||||
break
|
||||
}
|
||||
|
||||
collectedRows.push(...rows)
|
||||
}
|
||||
|
||||
const trimmedRows = allRows.slice(0, limit)
|
||||
const finalHasMore = hasMore || allRows.length > limit
|
||||
const messages = chatService.mapRowsToMessagesForApi(trimmedRows)
|
||||
const finalHasMore = hasMore || reachedLimit
|
||||
const messages = useLiteMapping
|
||||
? chatService.mapRowsToMessagesLiteForApi(collectedRows)
|
||||
: chatService.mapRowsToMessagesForApi(collectedRows)
|
||||
await this.backfillMissingSenderUsernames(talker, messages)
|
||||
return { success: true, messages, hasMore: finalHasMore }
|
||||
} finally {
|
||||
@@ -590,32 +600,70 @@ class HttpService {
|
||||
if (targets.length === 0) return
|
||||
|
||||
const myWxid = (this.configService.get('myWxid') || '').trim()
|
||||
for (const msg of targets) {
|
||||
const localId = Number(msg.localId || 0)
|
||||
if (Number.isFinite(localId) && localId > 0) {
|
||||
try {
|
||||
const detail = await wcdbService.getMessageById(talker, localId)
|
||||
if (detail.success && detail.message) {
|
||||
const hydrated = chatService.mapRowsToMessagesForApi([detail.message])[0]
|
||||
if (hydrated?.senderUsername) {
|
||||
msg.senderUsername = hydrated.senderUsername
|
||||
}
|
||||
if ((msg.isSend === null || msg.isSend === undefined) && hydrated?.isSend !== undefined) {
|
||||
msg.isSend = hydrated.isSend
|
||||
}
|
||||
if (!msg.rawContent && hydrated?.rawContent) {
|
||||
msg.rawContent = hydrated.rawContent
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn('[HttpService] backfill sender failed:', error)
|
||||
const MAX_DETAIL_BACKFILL = 120
|
||||
if (targets.length > MAX_DETAIL_BACKFILL) {
|
||||
for (const msg of targets) {
|
||||
if (!msg.senderUsername && msg.isSend === 1 && myWxid) {
|
||||
msg.senderUsername = myWxid
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if (!msg.senderUsername && msg.isSend === 1 && myWxid) {
|
||||
msg.senderUsername = myWxid
|
||||
const queue = [...targets]
|
||||
const workerCount = Math.max(1, Math.min(6, queue.length))
|
||||
const state = {
|
||||
attempted: 0,
|
||||
hydrated: 0,
|
||||
consecutiveMiss: 0
|
||||
}
|
||||
const MAX_DETAIL_LOOKUPS = 80
|
||||
const MAX_CONSECUTIVE_MISS = 36
|
||||
const runWorker = async (): Promise<void> => {
|
||||
while (queue.length > 0) {
|
||||
if (state.attempted >= MAX_DETAIL_LOOKUPS) break
|
||||
if (state.consecutiveMiss >= MAX_CONSECUTIVE_MISS && state.hydrated <= 0) break
|
||||
const msg = queue.shift()
|
||||
if (!msg) break
|
||||
|
||||
const localId = Number(msg.localId || 0)
|
||||
if (Number.isFinite(localId) && localId > 0) {
|
||||
state.attempted += 1
|
||||
try {
|
||||
const detail = await wcdbService.getMessageById(talker, localId)
|
||||
if (detail.success && detail.message) {
|
||||
const hydrated = chatService.mapRowsToMessagesForApi([detail.message])[0]
|
||||
if (hydrated?.senderUsername) {
|
||||
msg.senderUsername = hydrated.senderUsername
|
||||
}
|
||||
if ((msg.isSend === null || msg.isSend === undefined) && hydrated?.isSend !== undefined) {
|
||||
msg.isSend = hydrated.isSend
|
||||
}
|
||||
if (!msg.rawContent && hydrated?.rawContent) {
|
||||
msg.rawContent = hydrated.rawContent
|
||||
}
|
||||
if (msg.senderUsername) {
|
||||
state.hydrated += 1
|
||||
state.consecutiveMiss = 0
|
||||
} else {
|
||||
state.consecutiveMiss += 1
|
||||
}
|
||||
} else {
|
||||
state.consecutiveMiss += 1
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn('[HttpService] backfill sender failed:', error)
|
||||
state.consecutiveMiss += 1
|
||||
}
|
||||
}
|
||||
|
||||
if (!msg.senderUsername && msg.isSend === 1 && myWxid) {
|
||||
msg.senderUsername = myWxid
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.all(Array.from({ length: workerCount }, () => runWorker()))
|
||||
}
|
||||
|
||||
private parseBooleanParam(url: URL, keys: string[], defaultValue: boolean = false): boolean {
|
||||
@@ -663,7 +711,7 @@ class HttpService {
|
||||
const talker = (url.searchParams.get('talker') || '').trim()
|
||||
const limit = this.parseIntParam(url.searchParams.get('limit'), 100, 1, 10000)
|
||||
const offset = this.parseIntParam(url.searchParams.get('offset'), 0, 0, Number.MAX_SAFE_INTEGER)
|
||||
const keyword = (url.searchParams.get('keyword') || '').trim().toLowerCase()
|
||||
const keyword = (url.searchParams.get('keyword') || '').trim()
|
||||
const startParam = url.searchParams.get('start')
|
||||
const endParam = url.searchParams.get('end')
|
||||
const chatlab = this.parseBooleanParam(url, ['chatlab'], false)
|
||||
@@ -683,26 +731,41 @@ class HttpService {
|
||||
|
||||
const startTime = this.parseTimeParam(startParam)
|
||||
const endTime = this.parseTimeParam(endParam, true)
|
||||
const queryOffset = keyword ? 0 : offset
|
||||
const queryLimit = keyword ? 10000 : limit
|
||||
|
||||
const result = await this.fetchMessagesBatch(talker, queryOffset, queryLimit, startTime, endTime, false)
|
||||
if (!result.success || !result.messages) {
|
||||
this.sendError(res, 500, result.error || 'Failed to get messages')
|
||||
return
|
||||
}
|
||||
|
||||
let messages = result.messages
|
||||
let hasMore = result.hasMore === true
|
||||
let messages: Message[] = []
|
||||
let hasMore = false
|
||||
|
||||
if (keyword) {
|
||||
const filtered = messages.filter((msg) => {
|
||||
const content = (msg.parsedContent || msg.rawContent || '').toLowerCase()
|
||||
return content.includes(keyword)
|
||||
})
|
||||
const endIndex = offset + limit
|
||||
hasMore = filtered.length > endIndex
|
||||
messages = filtered.slice(offset, endIndex)
|
||||
const searchLimit = Math.max(1, limit) + 1
|
||||
const searchResult = await chatService.searchMessages(
|
||||
keyword,
|
||||
talker,
|
||||
searchLimit,
|
||||
offset,
|
||||
startTime,
|
||||
endTime
|
||||
)
|
||||
if (!searchResult.success || !searchResult.messages) {
|
||||
this.sendError(res, 500, searchResult.error || 'Failed to search messages')
|
||||
return
|
||||
}
|
||||
hasMore = searchResult.messages.length > limit
|
||||
messages = hasMore ? searchResult.messages.slice(0, limit) : searchResult.messages
|
||||
} else {
|
||||
const result = await this.fetchMessagesBatch(
|
||||
talker,
|
||||
offset,
|
||||
limit,
|
||||
startTime,
|
||||
endTime,
|
||||
false,
|
||||
!mediaOptions.enabled
|
||||
)
|
||||
if (!result.success || !result.messages) {
|
||||
this.sendError(res, 500, result.error || 'Failed to get messages')
|
||||
return
|
||||
}
|
||||
messages = result.messages
|
||||
hasMore = result.hasMore === true
|
||||
}
|
||||
|
||||
const mediaMap = mediaOptions.enabled
|
||||
@@ -812,7 +875,7 @@ class HttpService {
|
||||
const endTime = endParam ? this.parseTimeParam(endParam, true) : 0
|
||||
|
||||
try {
|
||||
const result = await this.fetchMessagesBatch(sessionId, offset, limit, startTime, endTime, true)
|
||||
const result = await this.fetchMessagesBatch(sessionId, offset, limit, startTime, endTime, true, true)
|
||||
if (!result.success || !result.messages) {
|
||||
this.sendError(res, 500, result.error || 'Failed to get messages')
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user