// 消息状态同步管理器 - 微信小程序专用 // 处理消息的已读状态、多端同步、离线消息等 const wsManager = require('./websocket-manager-v2.js'); const apiClient = require('./api-client.js'); /** * 消息状态同步管理器 * 功能: * 1. 消息已读状态同步 * 2. 多端消息状态同步 * 3. 离线消息处理 * 4. 消息状态缓存 * 5. 网络状态处理 */ class MessageSyncManager { constructor() { this.isInitialized = false; // 消息状态缓存 this.messageStatusCache = new Map(); // 待同步的状态队列 this.pendingSyncQueue = []; // 同步配置 this.syncConfig = { // 批量同步的最大数量 maxBatchSize: 50, // 同步间隔(毫秒) syncInterval: 5000, // 重试配置 maxRetries: 3, retryDelay: 2000, // 缓存过期时间(毫秒) cacheExpireTime: 30 * 60 * 1000 // 30分钟 }; // 同步状态 this.syncStatus = { isOnline: false, lastSyncTime: 0, pendingCount: 0, failedCount: 0 }; // 定时器 this.syncTimer = null; this.retryTimer = null; this.init(); } // 初始化同步管理器 async init() { if (this.isInitialized) return; console.log('🔄 初始化消息状态同步管理器...'); try { // 加载缓存的消息状态 await this.loadMessageStatusCache(); // 加载待同步队列 await this.loadPendingSyncQueue(); // 注册WebSocket事件 this.registerWebSocketEvents(); // 注册网络状态监听 this.registerNetworkEvents(); // 启动定时同步 this.startSyncTimer(); this.isInitialized = true; console.log('✅ 消息状态同步管理器初始化完成'); } catch (error) { console.error('❌ 消息状态同步管理器初始化失败:', error); } } // 注册WebSocket事件 registerWebSocketEvents() { // WebSocket连接成功 wsManager.on('connected', () => { console.log('🔄 WebSocket连接成功,开始同步消息状态'); this.syncStatus.isOnline = true; this.processPendingSyncQueue(); }); // WebSocket连接断开 wsManager.on('disconnected', () => { console.log('🔄 WebSocket连接断开,切换到离线模式'); this.syncStatus.isOnline = false; }); // 接收消息状态更新 wsManager.on('message', (data) => { this.handleWebSocketMessage(data); }); } // 处理WebSocket消息 async handleWebSocketMessage(data) { try { const message = typeof data === 'string' ? JSON.parse(data) : data; switch (message.type) { case 'message_read': await this.handleMessageReadUpdate(message.data); break; case 'message_status_sync': await this.handleMessageStatusSync(message.data); break; case 'conversation_read': await this.handleConversationReadUpdate(message.data); break; } } catch (error) { console.error('❌ 处理WebSocket消息状态更新失败:', error); } } // 处理消息已读更新 async handleMessageReadUpdate(data) { console.log('👁️ 收到消息已读更新:', data); // 更新本地缓存 if (data.messageIds && Array.isArray(data.messageIds)) { for (const messageId of data.messageIds) { this.updateMessageStatus(messageId, 'read', data.readTime); } } // 触发页面更新事件 this.triggerEvent('message_read_updated', data); } // 处理会话已读更新 async handleConversationReadUpdate(data) { console.log('👁️ 收到会话已读更新:', data); // 更新会话中所有消息的状态 if (data.conversationId && data.lastReadMessageId) { await this.markConversationAsRead(data.conversationId, data.lastReadMessageId); } // 触发页面更新事件 this.triggerEvent('conversation_read_updated', data); } // 处理消息状态同步 async handleMessageStatusSync(data) { console.log('🔄 收到消息状态同步:', data); if (data.messageStatuses && Array.isArray(data.messageStatuses)) { for (const status of data.messageStatuses) { this.updateMessageStatus(status.messageId, status.status, status.timestamp); } } // 保存缓存 await this.saveMessageStatusCache(); } // 标记消息为已读 async markMessageAsRead(messageId, conversationId) { try { console.log('👁️ 标记消息为已读:', messageId); // 更新本地状态 this.updateMessageStatus(messageId, 'read', Date.now()); // 添加到同步队列 this.addToSyncQueue({ type: 'message_read', messageId: messageId, conversationId: conversationId, timestamp: Date.now() }); // 立即尝试同步(如果在线) if (this.syncStatus.isOnline) { await this.processPendingSyncQueue(); } return true; } catch (error) { console.error('❌ 标记消息已读失败:', error); return false; } } // 标记会话为已读 async markConversationAsRead(conversationId, lastReadMessageId) { try { console.log('👁️ 标记会话为已读:', conversationId); // 添加到同步队列 this.addToSyncQueue({ type: 'conversation_read', conversationId: conversationId, lastReadMessageId: lastReadMessageId, timestamp: Date.now() }); // 立即尝试同步(如果在线) if (this.syncStatus.isOnline) { await this.processPendingSyncQueue(); } return true; } catch (error) { console.error('❌ 标记会话已读失败:', error); return false; } } // 批量标记消息为已读 async markMessagesAsRead(messageIds, conversationId) { try { console.log('👁️ 批量标记消息为已读:', messageIds.length); const timestamp = Date.now(); // 更新本地状态 for (const messageId of messageIds) { this.updateMessageStatus(messageId, 'read', timestamp); } // 添加到同步队列 this.addToSyncQueue({ type: 'batch_message_read', messageIds: messageIds, conversationId: conversationId, timestamp: timestamp }); // 立即尝试同步(如果在线) if (this.syncStatus.isOnline) { await this.processPendingSyncQueue(); } return true; } catch (error) { console.error('❌ 批量标记消息已读失败:', error); return false; } } // 更新消息状态 updateMessageStatus(messageId, status, timestamp) { this.messageStatusCache.set(messageId, { status: status, timestamp: timestamp, synced: false }); } // 获取消息状态 getMessageStatus(messageId) { return this.messageStatusCache.get(messageId); } // 检查消息是否已读 isMessageRead(messageId) { const status = this.getMessageStatus(messageId); return status && status.status === 'read'; } // 添加到同步队列 addToSyncQueue(syncItem) { this.pendingSyncQueue.push({ ...syncItem, id: this.generateSyncId(), retries: 0, addedTime: Date.now() }); this.syncStatus.pendingCount = this.pendingSyncQueue.length; this.savePendingSyncQueue(); } // 处理待同步队列 async processPendingSyncQueue() { if (this.pendingSyncQueue.length === 0) { return; } console.log(`🔄 处理待同步队列,共 ${this.pendingSyncQueue.length} 项`); // 按类型分组批量处理 const groupedItems = this.groupSyncItemsByType(); for (const [type, items] of groupedItems) { try { await this.syncItemsByType(type, items); } catch (error) { console.error(`❌ 同步 ${type} 类型失败:`, error); this.handleSyncFailure(items, error); } } // 清理已同步的项目 this.cleanupSyncedItems(); // 更新同步状态 this.syncStatus.lastSyncTime = Date.now(); this.syncStatus.pendingCount = this.pendingSyncQueue.length; } // 按类型分组同步项目 groupSyncItemsByType() { const groups = new Map(); for (const item of this.pendingSyncQueue) { if (!groups.has(item.type)) { groups.set(item.type, []); } groups.get(item.type).push(item); } return groups; } // 按类型同步项目 async syncItemsByType(type, items) { switch (type) { case 'message_read': await this.syncMessageRead(items); break; case 'batch_message_read': await this.syncBatchMessageRead(items); break; case 'conversation_read': await this.syncConversationRead(items); break; default: console.warn('🔄 未知的同步类型:', type); } } // 同步单个消息已读 async syncMessageRead(items) { const messageIds = items.map(item => item.messageId); const response = await apiClient.request({ url: '/api/v1/messages/mark-read', method: 'POST', data: { messageIds: messageIds } }); if (response.success) { // 标记为已同步 items.forEach(item => { item.synced = true; // 更新缓存中的同步状态 const status = this.messageStatusCache.get(item.messageId); if (status) { status.synced = true; } }); console.log(`✅ 同步 ${items.length} 个消息已读状态成功`); } else { throw new Error(response.error || '同步失败'); } } // 同步批量消息已读 async syncBatchMessageRead(items) { for (const item of items) { const response = await apiClient.request({ url: '/api/v1/messages/batch-mark-read', method: 'POST', data: { messageIds: item.messageIds, conversationId: item.conversationId } }); if (response.success) { item.synced = true; // 更新缓存中的同步状态 item.messageIds.forEach(messageId => { const status = this.messageStatusCache.get(messageId); if (status) { status.synced = true; } }); } else { throw new Error(response.error || '批量同步失败'); } } console.log(`✅ 同步 ${items.length} 个批量消息已读状态成功`); } // 同步会话已读 async syncConversationRead(items) { for (const item of items) { const response = await apiClient.request({ url: '/api/v1/conversations/mark-read', method: 'POST', data: { conversationId: item.conversationId, lastReadMessageId: item.lastReadMessageId } }); if (response.success) { item.synced = true; } else { throw new Error(response.error || '会话同步失败'); } } console.log(`✅ 同步 ${items.length} 个会话已读状态成功`); } // 处理同步失败 handleSyncFailure(items, error) { for (const item of items) { item.retries++; if (item.retries >= this.syncConfig.maxRetries) { console.error(`❌ 同步项目 ${item.id} 达到最大重试次数,放弃同步`); item.failed = true; this.syncStatus.failedCount++; } } } // 清理已同步的项目 cleanupSyncedItems() { const beforeCount = this.pendingSyncQueue.length; this.pendingSyncQueue = this.pendingSyncQueue.filter(item => !item.synced && !item.failed); const afterCount = this.pendingSyncQueue.length; if (beforeCount !== afterCount) { console.log(`🧹 清理了 ${beforeCount - afterCount} 个已同步项目`); this.savePendingSyncQueue(); } } // 生成同步ID generateSyncId() { return `sync_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } // 启动定时同步 startSyncTimer() { if (this.syncTimer) { clearInterval(this.syncTimer); } this.syncTimer = setInterval(() => { if (this.syncStatus.isOnline && this.pendingSyncQueue.length > 0) { this.processPendingSyncQueue(); } }, this.syncConfig.syncInterval); } // 停止定时同步 stopSyncTimer() { if (this.syncTimer) { clearInterval(this.syncTimer); this.syncTimer = null; } } // 注册网络状态监听 registerNetworkEvents() { wx.onNetworkStatusChange((res) => { if (res.isConnected && !this.syncStatus.isOnline) { console.log('🌐 网络连接恢复,尝试同步消息状态'); this.syncStatus.isOnline = true; this.processPendingSyncQueue(); } else if (!res.isConnected) { console.log('🌐 网络连接断开,切换到离线模式'); this.syncStatus.isOnline = false; } }); } // 加载消息状态缓存 async loadMessageStatusCache() { try { const cache = wx.getStorageSync('messageStatusCache') || {}; this.messageStatusCache = new Map(Object.entries(cache)); // 清理过期缓存 this.cleanupExpiredCache(); } catch (error) { console.error('❌ 加载消息状态缓存失败:', error); } } // 保存消息状态缓存 async saveMessageStatusCache() { try { const cache = Object.fromEntries(this.messageStatusCache); wx.setStorageSync('messageStatusCache', cache); } catch (error) { console.error('❌ 保存消息状态缓存失败:', error); } } // 清理过期缓存 cleanupExpiredCache() { const now = Date.now(); const expireTime = this.syncConfig.cacheExpireTime; for (const [messageId, status] of this.messageStatusCache) { if (now - status.timestamp > expireTime) { this.messageStatusCache.delete(messageId); } } } // 加载待同步队列 async loadPendingSyncQueue() { try { this.pendingSyncQueue = wx.getStorageSync('pendingSyncQueue') || []; this.syncStatus.pendingCount = this.pendingSyncQueue.length; } catch (error) { console.error('❌ 加载待同步队列失败:', error); } } // 保存待同步队列 async savePendingSyncQueue() { try { wx.setStorageSync('pendingSyncQueue', this.pendingSyncQueue); } catch (error) { console.error('❌ 保存待同步队列失败:', error); } } // 事件处理器 eventHandlers = new Map(); // 注册事件监听器 on(event, handler) { if (!this.eventHandlers.has(event)) { this.eventHandlers.set(event, []); } this.eventHandlers.get(event).push(handler); } // 触发事件 triggerEvent(event, data) { const handlers = this.eventHandlers.get(event); if (handlers) { handlers.forEach(handler => { try { handler(data); } catch (error) { console.error(`❌ 事件处理器错误 [${event}]:`, error); } }); } } // 获取同步状态 getSyncStatus() { return { ...this.syncStatus, cacheSize: this.messageStatusCache.size, pendingItems: this.pendingSyncQueue.length }; } // 强制同步 async forceSync() { console.log('🔄 强制同步消息状态...'); await this.processPendingSyncQueue(); } // 清理所有数据 reset() { this.messageStatusCache.clear(); this.pendingSyncQueue = []; this.syncStatus.pendingCount = 0; this.syncStatus.failedCount = 0; this.saveMessageStatusCache(); this.savePendingSyncQueue(); } // 销毁管理器 destroy() { this.stopSyncTimer(); this.reset(); this.eventHandlers.clear(); this.isInitialized = false; } } // 创建全局实例 const messageSyncManager = new MessageSyncManager(); module.exports = messageSyncManager;