miniprogramme/utils/message-sync-manager.js
2025-09-12 16:08:17 +08:00

614 lines
16 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// 消息状态同步管理器 - 微信小程序专用
// 处理消息的已读状态、多端同步、离线消息等
const wsManager = require('./websocket-manager-v2.js');
const apiClient = require('./api-client.js');
/**
* 消息状态同步管理器
* 功能:
* 1. 消息已读状态同步
* 2. 多端消息状态同步
* 3. 离线消息处理
* 4. 消息状态缓存
* 5. 网络状态处理
*/
class MessageSyncManager {
constructor() {
this.isInitialized = false;
// 消息状态缓存
this.messageStatusCache = new Map();
// 待同步的状态队列
this.pendingSyncQueue = [];
// 同步配置
this.syncConfig = {
// 批量同步的最大数量
maxBatchSize: 50,
// 同步间隔(毫秒)
syncInterval: 5000,
// 重试配置
maxRetries: 3,
retryDelay: 2000,
// 缓存过期时间(毫秒)
cacheExpireTime: 30 * 60 * 1000 // 30分钟
};
// 同步状态
this.syncStatus = {
isOnline: false,
lastSyncTime: 0,
pendingCount: 0,
failedCount: 0
};
// 定时器
this.syncTimer = null;
this.retryTimer = null;
this.init();
}
// 初始化同步管理器
async init() {
if (this.isInitialized) return;
console.log('🔄 初始化消息状态同步管理器...');
try {
// 加载缓存的消息状态
await this.loadMessageStatusCache();
// 加载待同步队列
await this.loadPendingSyncQueue();
// 注册WebSocket事件
this.registerWebSocketEvents();
// 注册网络状态监听
this.registerNetworkEvents();
// 启动定时同步
this.startSyncTimer();
this.isInitialized = true;
console.log('✅ 消息状态同步管理器初始化完成');
} catch (error) {
console.error('❌ 消息状态同步管理器初始化失败:', error);
}
}
// 注册WebSocket事件
registerWebSocketEvents() {
// WebSocket连接成功
wsManager.on('connected', () => {
console.log('🔄 WebSocket连接成功开始同步消息状态');
this.syncStatus.isOnline = true;
this.processPendingSyncQueue();
});
// WebSocket连接断开
wsManager.on('disconnected', () => {
console.log('🔄 WebSocket连接断开切换到离线模式');
this.syncStatus.isOnline = false;
});
// 接收消息状态更新
wsManager.on('message', (data) => {
this.handleWebSocketMessage(data);
});
}
// 处理WebSocket消息
async handleWebSocketMessage(data) {
try {
const message = typeof data === 'string' ? JSON.parse(data) : data;
switch (message.type) {
case 'message_read':
await this.handleMessageReadUpdate(message.data);
break;
case 'message_status_sync':
await this.handleMessageStatusSync(message.data);
break;
case 'conversation_read':
await this.handleConversationReadUpdate(message.data);
break;
}
} catch (error) {
console.error('❌ 处理WebSocket消息状态更新失败:', error);
}
}
// 处理消息已读更新
async handleMessageReadUpdate(data) {
console.log('👁️ 收到消息已读更新:', data);
// 更新本地缓存
if (data.messageIds && Array.isArray(data.messageIds)) {
for (const messageId of data.messageIds) {
this.updateMessageStatus(messageId, 'read', data.readTime);
}
}
// 触发页面更新事件
this.triggerEvent('message_read_updated', data);
}
// 处理会话已读更新
async handleConversationReadUpdate(data) {
console.log('👁️ 收到会话已读更新:', data);
// 更新会话中所有消息的状态
if (data.conversationId && data.lastReadMessageId) {
await this.markConversationAsRead(data.conversationId, data.lastReadMessageId);
}
// 触发页面更新事件
this.triggerEvent('conversation_read_updated', data);
}
// 处理消息状态同步
async handleMessageStatusSync(data) {
console.log('🔄 收到消息状态同步:', data);
if (data.messageStatuses && Array.isArray(data.messageStatuses)) {
for (const status of data.messageStatuses) {
this.updateMessageStatus(status.messageId, status.status, status.timestamp);
}
}
// 保存缓存
await this.saveMessageStatusCache();
}
// 标记消息为已读
async markMessageAsRead(messageId, conversationId) {
try {
console.log('👁️ 标记消息为已读:', messageId);
// 更新本地状态
this.updateMessageStatus(messageId, 'read', Date.now());
// 添加到同步队列
this.addToSyncQueue({
type: 'message_read',
messageId: messageId,
conversationId: conversationId,
timestamp: Date.now()
});
// 立即尝试同步(如果在线)
if (this.syncStatus.isOnline) {
await this.processPendingSyncQueue();
}
return true;
} catch (error) {
console.error('❌ 标记消息已读失败:', error);
return false;
}
}
// 标记会话为已读
async markConversationAsRead(conversationId, lastReadMessageId) {
try {
console.log('👁️ 标记会话为已读:', conversationId);
// 添加到同步队列
this.addToSyncQueue({
type: 'conversation_read',
conversationId: conversationId,
lastReadMessageId: lastReadMessageId,
timestamp: Date.now()
});
// 立即尝试同步(如果在线)
if (this.syncStatus.isOnline) {
await this.processPendingSyncQueue();
}
return true;
} catch (error) {
console.error('❌ 标记会话已读失败:', error);
return false;
}
}
// 批量标记消息为已读
async markMessagesAsRead(messageIds, conversationId) {
try {
console.log('👁️ 批量标记消息为已读:', messageIds.length);
const timestamp = Date.now();
// 更新本地状态
for (const messageId of messageIds) {
this.updateMessageStatus(messageId, 'read', timestamp);
}
// 添加到同步队列
this.addToSyncQueue({
type: 'batch_message_read',
messageIds: messageIds,
conversationId: conversationId,
timestamp: timestamp
});
// 立即尝试同步(如果在线)
if (this.syncStatus.isOnline) {
await this.processPendingSyncQueue();
}
return true;
} catch (error) {
console.error('❌ 批量标记消息已读失败:', error);
return false;
}
}
// 更新消息状态
updateMessageStatus(messageId, status, timestamp) {
this.messageStatusCache.set(messageId, {
status: status,
timestamp: timestamp,
synced: false
});
}
// 获取消息状态
getMessageStatus(messageId) {
return this.messageStatusCache.get(messageId);
}
// 检查消息是否已读
isMessageRead(messageId) {
const status = this.getMessageStatus(messageId);
return status && status.status === 'read';
}
// 添加到同步队列
addToSyncQueue(syncItem) {
this.pendingSyncQueue.push({
...syncItem,
id: this.generateSyncId(),
retries: 0,
addedTime: Date.now()
});
this.syncStatus.pendingCount = this.pendingSyncQueue.length;
this.savePendingSyncQueue();
}
// 处理待同步队列
async processPendingSyncQueue() {
if (this.pendingSyncQueue.length === 0) {
return;
}
console.log(`🔄 处理待同步队列,共 ${this.pendingSyncQueue.length}`);
// 按类型分组批量处理
const groupedItems = this.groupSyncItemsByType();
for (const [type, items] of groupedItems) {
try {
await this.syncItemsByType(type, items);
} catch (error) {
console.error(`❌ 同步 ${type} 类型失败:`, error);
this.handleSyncFailure(items, error);
}
}
// 清理已同步的项目
this.cleanupSyncedItems();
// 更新同步状态
this.syncStatus.lastSyncTime = Date.now();
this.syncStatus.pendingCount = this.pendingSyncQueue.length;
}
// 按类型分组同步项目
groupSyncItemsByType() {
const groups = new Map();
for (const item of this.pendingSyncQueue) {
if (!groups.has(item.type)) {
groups.set(item.type, []);
}
groups.get(item.type).push(item);
}
return groups;
}
// 按类型同步项目
async syncItemsByType(type, items) {
switch (type) {
case 'message_read':
await this.syncMessageRead(items);
break;
case 'batch_message_read':
await this.syncBatchMessageRead(items);
break;
case 'conversation_read':
await this.syncConversationRead(items);
break;
default:
console.warn('🔄 未知的同步类型:', type);
}
}
// 同步单个消息已读
async syncMessageRead(items) {
const messageIds = items.map(item => item.messageId);
const response = await apiClient.request({
url: '/api/v1/messages/mark-read',
method: 'POST',
data: {
messageIds: messageIds
}
});
if (response.success) {
// 标记为已同步
items.forEach(item => {
item.synced = true;
// 更新缓存中的同步状态
const status = this.messageStatusCache.get(item.messageId);
if (status) {
status.synced = true;
}
});
console.log(`✅ 同步 ${items.length} 个消息已读状态成功`);
} else {
throw new Error(response.error || '同步失败');
}
}
// 同步批量消息已读
async syncBatchMessageRead(items) {
for (const item of items) {
const response = await apiClient.request({
url: '/api/v1/messages/batch-mark-read',
method: 'POST',
data: {
messageIds: item.messageIds,
conversationId: item.conversationId
}
});
if (response.success) {
item.synced = true;
// 更新缓存中的同步状态
item.messageIds.forEach(messageId => {
const status = this.messageStatusCache.get(messageId);
if (status) {
status.synced = true;
}
});
} else {
throw new Error(response.error || '批量同步失败');
}
}
console.log(`✅ 同步 ${items.length} 个批量消息已读状态成功`);
}
// 同步会话已读
async syncConversationRead(items) {
for (const item of items) {
const response = await apiClient.request({
url: '/api/v1/conversations/mark-read',
method: 'POST',
data: {
conversationId: item.conversationId,
lastReadMessageId: item.lastReadMessageId
}
});
if (response.success) {
item.synced = true;
} else {
throw new Error(response.error || '会话同步失败');
}
}
console.log(`✅ 同步 ${items.length} 个会话已读状态成功`);
}
// 处理同步失败
handleSyncFailure(items, error) {
for (const item of items) {
item.retries++;
if (item.retries >= this.syncConfig.maxRetries) {
console.error(`❌ 同步项目 ${item.id} 达到最大重试次数,放弃同步`);
item.failed = true;
this.syncStatus.failedCount++;
}
}
}
// 清理已同步的项目
cleanupSyncedItems() {
const beforeCount = this.pendingSyncQueue.length;
this.pendingSyncQueue = this.pendingSyncQueue.filter(item => !item.synced && !item.failed);
const afterCount = this.pendingSyncQueue.length;
if (beforeCount !== afterCount) {
console.log(`🧹 清理了 ${beforeCount - afterCount} 个已同步项目`);
this.savePendingSyncQueue();
}
}
// 生成同步ID
generateSyncId() {
return `sync_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
// 启动定时同步
startSyncTimer() {
if (this.syncTimer) {
clearInterval(this.syncTimer);
}
this.syncTimer = setInterval(() => {
if (this.syncStatus.isOnline && this.pendingSyncQueue.length > 0) {
this.processPendingSyncQueue();
}
}, this.syncConfig.syncInterval);
}
// 停止定时同步
stopSyncTimer() {
if (this.syncTimer) {
clearInterval(this.syncTimer);
this.syncTimer = null;
}
}
// 注册网络状态监听
registerNetworkEvents() {
wx.onNetworkStatusChange((res) => {
if (res.isConnected && !this.syncStatus.isOnline) {
console.log('🌐 网络连接恢复,尝试同步消息状态');
this.syncStatus.isOnline = true;
this.processPendingSyncQueue();
} else if (!res.isConnected) {
console.log('🌐 网络连接断开,切换到离线模式');
this.syncStatus.isOnline = false;
}
});
}
// 加载消息状态缓存
async loadMessageStatusCache() {
try {
const cache = wx.getStorageSync('messageStatusCache') || {};
this.messageStatusCache = new Map(Object.entries(cache));
// 清理过期缓存
this.cleanupExpiredCache();
} catch (error) {
console.error('❌ 加载消息状态缓存失败:', error);
}
}
// 保存消息状态缓存
async saveMessageStatusCache() {
try {
const cache = Object.fromEntries(this.messageStatusCache);
wx.setStorageSync('messageStatusCache', cache);
} catch (error) {
console.error('❌ 保存消息状态缓存失败:', error);
}
}
// 清理过期缓存
cleanupExpiredCache() {
const now = Date.now();
const expireTime = this.syncConfig.cacheExpireTime;
for (const [messageId, status] of this.messageStatusCache) {
if (now - status.timestamp > expireTime) {
this.messageStatusCache.delete(messageId);
}
}
}
// 加载待同步队列
async loadPendingSyncQueue() {
try {
this.pendingSyncQueue = wx.getStorageSync('pendingSyncQueue') || [];
this.syncStatus.pendingCount = this.pendingSyncQueue.length;
} catch (error) {
console.error('❌ 加载待同步队列失败:', error);
}
}
// 保存待同步队列
async savePendingSyncQueue() {
try {
wx.setStorageSync('pendingSyncQueue', this.pendingSyncQueue);
} catch (error) {
console.error('❌ 保存待同步队列失败:', error);
}
}
// 事件处理器
eventHandlers = new Map();
// 注册事件监听器
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}
// 触发事件
triggerEvent(event, data) {
const handlers = this.eventHandlers.get(event);
if (handlers) {
handlers.forEach(handler => {
try {
handler(data);
} catch (error) {
console.error(`❌ 事件处理器错误 [${event}]:`, error);
}
});
}
}
// 获取同步状态
getSyncStatus() {
return {
...this.syncStatus,
cacheSize: this.messageStatusCache.size,
pendingItems: this.pendingSyncQueue.length
};
}
// 强制同步
async forceSync() {
console.log('🔄 强制同步消息状态...');
await this.processPendingSyncQueue();
}
// 清理所有数据
reset() {
this.messageStatusCache.clear();
this.pendingSyncQueue = [];
this.syncStatus.pendingCount = 0;
this.syncStatus.failedCount = 0;
this.saveMessageStatusCache();
this.savePendingSyncQueue();
}
// 销毁管理器
destroy() {
this.stopSyncTimer();
this.reset();
this.eventHandlers.clear();
this.isInitialized = false;
}
}
// 创建全局实例
const messageSyncManager = new MessageSyncManager();
module.exports = messageSyncManager;