Stomp和websocket的订阅广播功能到底是怎么实现的,广播消息倒还是能收到,点对点的信息几乎是完全收不到了,有时候莫名其妙能收到一下。前端使用的是vue。代码被ai改的有点混乱了。订阅部分的代码如下:
const subscribeToTopics = () => {
if (!stompClient || !stompClient.connected) {
console.error('❌ STOMP客户端未连接,无法订阅')
return
}
console.log('📡 开始订阅主题,文档ID:', docId.value,props.documentId,stompClient);
if (!stompClient.subscribe) {
console.error('stompClient.subscribe 方法不存在')
return
}
// 订阅初始化队列
stompClient.subscribe('/user/queue/init', (message) => {
console.log('📡 111111:', docId.value);
try {
const data = JSON.parse(message.body)
console.log('收到初始化数据:', data)
handleInitData(data)
} catch (error) {
console.error('解析初始化数据失败:', error)
}
})
// 订阅操作队列
stompClient.subscribe(`/topic/document/${docId.value}/operations`, (message) => {
try {
const data = JSON.parse(message.body)
console.log('收到操作广播:', data)
handleOperationBroadcast(data)
} catch (error) {
console.error('解析操作广播失败:', error)
}
})
// 订阅光标队列
stompClient.subscribe(`/topic/document/${docId.value}/cursors`, (message) => {
try {
const data = JSON.parse(message.body)
console.log('收到光标更新:', data)
handleCursorUpdate(data)
} catch (error) {
console.error('解析光标更新失败:', error)
}
})
// 订阅事件队列
stompClient.subscribe(`/topic/document/${docId.value}/events`, (message) => {
try {
const data = JSON.parse(message.body)
console.log('收到事件:', data)
handleEvent(data)
} catch (error) {
console.error('解析事件失败:', error)
}
})
// 订阅ACK队列
stompClient.subscribe('/user/queue/ack', (message) => {
console.log('📡 22222:', docId.value);
try {
const data = JSON.parse(message.body)
console.log('收到操作确认:', data)
handleOperationAck(data)
} catch (error) {
console.error('解析ACK失败:', error)
}
})
// 订阅错误队列
stompClient.subscribe('/user/queue/errors', (message) => {
console.log('📡 33333:', docId.value);
try {
const data = JSON.parse(message.body)
console.error('收到错误:', data)
handleError(data)
} catch (error) {
console.error('解析错误失败:', error)
}
})
// 订阅心跳队列
stompClient.subscribe('/user/queue/heartbeat', (message) => {
console.log('📡 44444:', docId.value);
try {
const data = JSON.parse(message.body)
console.log('收到心跳响应:', data)
} catch (error) {
console.error('解析心跳失败:', error)
}
})
// 订阅保存队列
stompClient.subscribe('/user/queue/save', (message) => {
console.log('📡 55555:', docId.value);
try {
const data = JSON.parse(message.body)
console.log('收到保存确认:', data)
handleSaveComplete(data)
} catch (error) {
console.error('解析保存确认失败:', error)
}
})
console.log('📡 已订阅所有主题')
}
后端代码如下:
@MessageMapping("/document/{documentId}/join")
public void handleJoinDocument(
@DestinationVariable Integer documentId,
@Payload Map<String, Object> payload,
StompHeaderAccessor accessor, Principal principal) {
try {
String sessionId = accessor.getSessionId();
String token = (String) payload.get("token");
String username = (String) payload.get("username");
Integer userId = (Integer) payload.get("userId");
String connectUserId = principal.getName();
// 验证token(如果提供)
if (token != null) {
try {
String tokenUsername = jwtUtils.getUnameByToken(token);
Integer tokenUserId = jwtUtils.getUidByToken(token);
if (tokenUsername != null) {
username = tokenUsername;
userId = tokenUserId;
}
} catch (Exception e) {
System.out.println("Token验证失败,使用payload中的用户信息");
}
}
if (username == null) username = "匿名用户";
if (userId == null) userId = 0;
// 注册用户会话
documentStateManager.registerUserSession(sessionId, userId, username);
documentStateManager.addUserToDocument(Integer.valueOf(documentId), sessionId);
// 获取文档当前状态
com.example.fi.entity.document doc = documentService.getDocumentById(Integer.valueOf(documentId));
String content = doc != null ? doc.getContent() : "";
Long version = doc != null ? doc.getVersion().longValue() : 0L;
// 初始化文档状态
DocumentStateManager.DocumentState state =
documentStateManager.getDocumentState(Integer.valueOf(documentId), content);
// 获取在线用户列表
List<Map<String, Object>> onlineUsers = getOnlineUsers(Integer.valueOf(documentId));
// 发送初始化数据给用户
Map<String, Object> initResponse = new HashMap<>();
initResponse.put("type", "INIT");
initResponse.put("documentId", documentId);
initResponse.put("content", state.getContent());
initResponse.put("version", state.getVersion());
initResponse.put("title", doc != null ? doc.getTitle() : "未命名文档");
initResponse.put("success", true);
initResponse.put("sessionId", sessionId);
initResponse.put("onlineUsers", onlineUsers);
initResponse.put("timestamp", System.currentTimeMillis());
System.out.println("📤【后端】准备发送初始化数据,目的地: /user/" + sessionId + "/queue/init");
System.out.println("📤【后端】初始化数据内容: " + initResponse);
if (connectUserId != null) {
// 这里发送给 connectUserId,Spring 内部会自动找到对应的 Session 发送出去
messagingTemplate.convertAndSendToUser(connectUserId, "/queue/init", initResponse);
} else {
System.err.println("❌ Principal 为空,无法发送点对点消息");
}
// 广播用户加入事件
Map<String, Object> joinEvent = new HashMap<>();
joinEvent.put("type", "USER_JOINED");
joinEvent.put("documentId", documentId);
joinEvent.put("userId", userId);
joinEvent.put("username", username);
joinEvent.put("sessionId", sessionId);
joinEvent.put("timestamp", System.currentTimeMillis());
joinEvent.put("onlineUsers", onlineUsers);
messagingTemplate.convertAndSend("/topic/document/" + documentId + "/events", (Object) joinEvent);
System.out.println("👤 用户加入文档 - 文档ID: " + documentId +
", 用户: " + username + " (" + userId + ")" +
", 在线用户数: " + onlineUsers.size());
} catch (Exception e) {
System.err.println("处理加入文档请求失败: " + e.getMessage());
e.printStackTrace();
// 发送错误响应
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("type", "ERROR");
errorResponse.put("message", "加入文档失败: " + e.getMessage());
errorResponse.put("documentId", documentId);
messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/errors", errorResponse);
}
}
/**
* 处理文档操作
*/
@MessageMapping("/document/{documentId}/operation")
public void handleDocumentOperation(
@DestinationVariable Integer documentId,
@Payload operation operation,
StompHeaderAccessor accessor,Principal principal) {
String sessionId = accessor.getSessionId();
String connectUserId = principal.getName();
try {
System.out.println("📝 收到文档操作 - 文档ID: " + documentId +
", 类型: " + operation.getType() +
", 会话: " + sessionId);
// 验证操作
if (!validateOperation(operation, sessionId)) {
sendErrorResponse(sessionId, "操作验证失败", documentId);
return;
}
// 设置操作元数据
operation.setDocumentId(documentId);
operation.setSessionId(sessionId);
operation.setTimestamp(LocalDateTime.now());
// 获取用户会话信息
DocumentStateManager.UserSession session =
documentStateManager.getUserSession(sessionId);
if (session != null) {
operation.setUserId(session.getUserId());
operation.setUsername(session.getUsername());
}
// 应用操作到文档状态
operation appliedOperation = documentStateManager.applyOperation(documentId, operation);
// 保存到数据库(异步)
saveDocumentToDatabase(documentId,connectUserId);
// 广播应用后的操作
Map<String, Object> broadcastMessage = new HashMap<>();
broadcastMessage.put("type", "OPERATION_APPLIED");
broadcastMessage.put("operation", appliedOperation);
broadcastMessage.put("documentId", documentId);
broadcastMessage.put("serverVersion", appliedOperation.getVersion());
broadcastMessage.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSend("/topic/document/" + documentId + "/operations", (Object) broadcastMessage);
// 发送确认给发送者
Map<String, Object> ackMessage = new HashMap<>();
ackMessage.put("type", "OPERATION_ACK");
ackMessage.put("operationId", operation.getId());
ackMessage.put("version", appliedOperation.getVersion());
ackMessage.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(connectUserId, "/queue/ack", ackMessage);
System.out.println("✅ 操作处理完成 - 操作ID: " + operation.getId() +
", 版本: " + appliedOperation.getVersion());
} catch (Exception e) {
System.err.println("处理文档操作失败: " + e.getMessage());
e.printStackTrace();
sendErrorResponse(sessionId, "处理操作失败: " + e.getMessage(), documentId);
}
}
/**
* 处理光标更新
*/
@MessageMapping("/document/{documentId}/cursor")
public void handleCursorUpdate(
@DestinationVariable Integer documentId,
@Payload Map<String, Object> cursorData,
StompHeaderAccessor accessor) {
String sessionId = accessor.getSessionId();
try {
// 获取用户会话信息
DocumentStateManager.UserSession session =
documentStateManager.getUserSession(sessionId);
if (session == null) {
return;
}
// 添加用户信息
cursorData.put("userId", session.getUserId());
cursorData.put("username", session.getUsername());
cursorData.put("sessionId", sessionId);
cursorData.put("type", "CURSOR_UPDATE");
cursorData.put("timestamp", System.currentTimeMillis());
// 广播光标位置(排除发送者自己)
messagingTemplate.convertAndSend("/topic/document/" + documentId + "/cursors", (Object) cursorData);
} catch (Exception e) {
System.err.println("处理光标更新失败: " + e.getMessage());
}
}
/**
* 处理心跳
*/
@MessageMapping("/document/{documentId}/heartbeat")
public void handleHeartbeat(
@DestinationVariable Integer documentId,
@Payload Map<String, Object> heartbeat,
StompHeaderAccessor accessor,Principal principal) {
String sessionId = accessor.getSessionId();
String connectUserId = principal.getName();
Map<String, Object> response = new HashMap<>();
response.put("type", "HEARTBEAT_RESPONSE");
response.put("sessionId", sessionId);
response.put("timestamp", System.currentTimeMillis());
response.put("serverTime", System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(connectUserId, "/queue/heartbeat", response);
}
/**
* 处理文档保存请求
*/
@MessageMapping("/document/{documentId}/save")
public void handleSaveDocument(
@DestinationVariable Integer documentId,
@Payload Map<String, Object> saveRequest,
StompHeaderAccessor accessor,Principal principal) {
String sessionId = accessor.getSessionId();
String connectUserId = principal.getName();
try {
// 获取文档当前状态
DocumentStateManager.DocumentState state =
documentStateManager.getCurrentState(documentId);
if (state == null) {
sendErrorResponse(connectUserId, "文档状态不存在", documentId);
return;
}
// 保存到数据库
documentService.save(documentId, state.getContent());
// 发送保存确认
Map<String, Object> saveResponse = new HashMap<>();
saveResponse.put("type", "SAVE_COMPLETE");
saveResponse.put("documentId", documentId);
saveResponse.put("version", state.getVersion());
saveResponse.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(connectUserId, "/queue/save", saveResponse);
System.out.println("💾 文档保存完成 - 文档ID: " + documentId);
} catch (Exception e) {
System.err.println("保存文档失败: " + e.getMessage());
sendErrorResponse(sessionId, "保存失败: " + e.getMessage(), documentId);
}
}
/**
* 用户离开文档
*/
@MessageMapping("/document/{documentId}/leave")
public void handleLeaveDocument(
@DestinationVariable Integer documentId,
@Payload Map<String, Object> leaveData,
StompHeaderAccessor accessor) {
String sessionId = accessor.getSessionId();
try {
// 获取用户会话信息
DocumentStateManager.UserSession session =
documentStateManager.getUserSession(sessionId);
if (session == null) {
return;
}
// 广播用户离开事件
Map<String, Object> leaveEvent = new HashMap<>();
leaveEvent.put("type", "USER_LEFT");
leaveEvent.put("documentId", documentId);
leaveEvent.put("userId", session.getUserId());
leaveEvent.put("username", session.getUsername());
leaveEvent.put("sessionId", sessionId);
leaveEvent.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSend("/topic/document/" + documentId + "/events", (Object) leaveEvent);
documentStateManager.removeUserFromDocument(documentId, sessionId);
// 清理用户会话
documentStateManager.removeUserSession(sessionId);
System.out.println("👋 用户离开文档 - 文档ID: " + documentId +
", 用户: " + session.getUsername());
} catch (Exception e) {
System.err.println("处理用户离开失败: " + e.getMessage());
}
}
// 辅助方法
private boolean validateOperation(operation operation, String sessionId) {
if (operation == null) return false;
if (operation.getType() == null) return false;
if (operation.getBaseVersion() == null) return false;
// 验证用户会话
DocumentStateManager.UserSession session =
documentStateManager.getUserSession(sessionId);
if (session == null) {
System.err.println("用户会话不存在: " + sessionId);
return false;
}
return true;
}
private void sendErrorResponse(String principalName, String message, Integer documentId) {
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("type", "ERROR");
errorResponse.put("message", message);
errorResponse.put("documentId", documentId);
errorResponse.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(principalName, "/queue/errors", errorResponse);
}
private void saveDocumentToDatabase(Integer documentId,String principalName) {
// 异步保存文档
new Thread(() -> {
try {
DocumentStateManager.DocumentState state = documentStateManager.getCurrentState(documentId);
if (state != null) {
documentService.save(documentId, state.getContent());
System.out.println("📁 异步保存文档成功 - ID: " + documentId);
// 保存成功,发送成功回调
Map<String, Object> saveResponse = new HashMap<>();
saveResponse.put("type", "SAVE_COMPLETE");
saveResponse.put("documentId", documentId);
saveResponse.put("version", state.getVersion());
saveResponse.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(principalName, "/queue/save", saveResponse);
}
} catch (Exception e) {
System.err.println("异步保存文档失败: " + e.getMessage());
// ✅ 核心修复:保存失败也发送回调,前端能收到错误提示,不会卡住
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("type", "ERROR");
errorResponse.put("message", "保存失败: " + e.getMessage());
errorResponse.put("documentId", documentId);
errorResponse.put("timestamp", System.currentTimeMillis());
messagingTemplate.convertAndSendToUser(principalName, "/queue/save", errorResponse);
}
}).start();
}