cyber_bjy 2026-01-17 00:57 采纳率: 50%
浏览 2

Stomp的订阅功能问题

Stomp和websocket的订阅广播功能到底是怎么实现的,广播消息倒还是能收到,点对点的信息几乎是完全收不到了,有时候莫名其妙能收到一下。前端使用的是vue。代码被ai改的有点混乱了。订阅部分的代码如下:

 const subscribeToTopics = () => {

      if (!stompClient || !stompClient.connected) {
        console.error('❌ STOMP客户端未连接,无法订阅')
        return
      }

      console.log('📡 开始订阅主题,文档ID:', docId.value,props.documentId,stompClient);
      if (!stompClient.subscribe) {
        console.error('stompClient.subscribe 方法不存在')
        return
      }
      // 订阅初始化队列
      stompClient.subscribe('/user/queue/init', (message) => {
        console.log('📡 111111:', docId.value);
        try {
          const data = JSON.parse(message.body)
          console.log('收到初始化数据:', data)
          handleInitData(data)
        } catch (error) {
          console.error('解析初始化数据失败:', error)
        }
      })

      // 订阅操作队列
      stompClient.subscribe(`/topic/document/${docId.value}/operations`, (message) => {

        try {
          const data = JSON.parse(message.body)
          console.log('收到操作广播:', data)
          handleOperationBroadcast(data)
        } catch (error) {
          console.error('解析操作广播失败:', error)
        }
      })

      // 订阅光标队列
      stompClient.subscribe(`/topic/document/${docId.value}/cursors`, (message) => {

        try {
          const data = JSON.parse(message.body)
          console.log('收到光标更新:', data)
          handleCursorUpdate(data)
        } catch (error) {
          console.error('解析光标更新失败:', error)
        }
      })

      // 订阅事件队列
      stompClient.subscribe(`/topic/document/${docId.value}/events`, (message) => {

        try {
          const data = JSON.parse(message.body)
          console.log('收到事件:', data)
          handleEvent(data)
        } catch (error) {
          console.error('解析事件失败:', error)
        }
      })

      // 订阅ACK队列
      stompClient.subscribe('/user/queue/ack', (message) => {
        console.log('📡 22222:', docId.value);
        try {
          const data = JSON.parse(message.body)
          console.log('收到操作确认:', data)
          handleOperationAck(data)
        } catch (error) {
          console.error('解析ACK失败:', error)
        }
      })

      // 订阅错误队列
      stompClient.subscribe('/user/queue/errors', (message) => {
        console.log('📡 33333:', docId.value);
        try {
          const data = JSON.parse(message.body)
          console.error('收到错误:', data)
          handleError(data)
        } catch (error) {
          console.error('解析错误失败:', error)
        }
      })

      // 订阅心跳队列
      stompClient.subscribe('/user/queue/heartbeat', (message) => {
        console.log('📡 44444:', docId.value);
        try {
          const data = JSON.parse(message.body)
          console.log('收到心跳响应:', data)
        } catch (error) {
          console.error('解析心跳失败:', error)
        }
      })

      // 订阅保存队列
      stompClient.subscribe('/user/queue/save', (message) => {
        console.log('📡 55555:', docId.value);
        try {
          const data = JSON.parse(message.body)
          console.log('收到保存确认:', data)
          handleSaveComplete(data)
        } catch (error) {
          console.error('解析保存确认失败:', error)
        }
      })

      console.log('📡 已订阅所有主题')
    }

后端代码如下:

@MessageMapping("/document/{documentId}/join")
    public void handleJoinDocument(
            @DestinationVariable Integer documentId,
            @Payload Map<String, Object> payload,
            StompHeaderAccessor accessor, Principal principal) {
        
        try {
            String sessionId = accessor.getSessionId();
            String token = (String) payload.get("token");
            String username = (String) payload.get("username");
            Integer userId = (Integer) payload.get("userId");
            String connectUserId = principal.getName();

            // 验证token(如果提供)
            if (token != null) {
                try {
                    String tokenUsername = jwtUtils.getUnameByToken(token);
                    Integer tokenUserId = jwtUtils.getUidByToken(token);

                    if (tokenUsername != null) {
                        username = tokenUsername;
                        userId = tokenUserId;
                    }
                } catch (Exception e) {
                    System.out.println("Token验证失败,使用payload中的用户信息");
                }
            }


            if (username == null) username = "匿名用户";
            if (userId == null) userId = 0;

            // 注册用户会话
            documentStateManager.registerUserSession(sessionId, userId, username);
            documentStateManager.addUserToDocument(Integer.valueOf(documentId), sessionId);

            // 获取文档当前状态
            com.example.fi.entity.document doc = documentService.getDocumentById(Integer.valueOf(documentId));
            String content = doc != null ? doc.getContent() : "";
            Long version = doc != null ? doc.getVersion().longValue() : 0L;

            // 初始化文档状态
            DocumentStateManager.DocumentState state =
                    documentStateManager.getDocumentState(Integer.valueOf(documentId), content);

            // 获取在线用户列表
            List<Map<String, Object>> onlineUsers = getOnlineUsers(Integer.valueOf(documentId));

            // 发送初始化数据给用户
            Map<String, Object> initResponse = new HashMap<>();
            initResponse.put("type", "INIT");
            initResponse.put("documentId", documentId);
            initResponse.put("content", state.getContent());
            initResponse.put("version", state.getVersion());
            initResponse.put("title", doc != null ? doc.getTitle() : "未命名文档");
            initResponse.put("success", true);
            initResponse.put("sessionId", sessionId);
            initResponse.put("onlineUsers", onlineUsers);
            initResponse.put("timestamp", System.currentTimeMillis());

            System.out.println("📤【后端】准备发送初始化数据,目的地: /user/" + sessionId + "/queue/init");
            System.out.println("📤【后端】初始化数据内容: " + initResponse);

            if (connectUserId != null) {
                // 这里发送给 connectUserId,Spring 内部会自动找到对应的 Session 发送出去
                messagingTemplate.convertAndSendToUser(connectUserId, "/queue/init", initResponse);
            } else {
                System.err.println("❌ Principal 为空,无法发送点对点消息");
            }

            // 广播用户加入事件
            Map<String, Object> joinEvent = new HashMap<>();
            joinEvent.put("type", "USER_JOINED");
            joinEvent.put("documentId", documentId);
            joinEvent.put("userId", userId);
            joinEvent.put("username", username);
            joinEvent.put("sessionId", sessionId);
            joinEvent.put("timestamp", System.currentTimeMillis());
            joinEvent.put("onlineUsers", onlineUsers);

            messagingTemplate.convertAndSend("/topic/document/" + documentId + "/events", (Object) joinEvent);

            System.out.println("👤 用户加入文档 - 文档ID: " + documentId +
                    ", 用户: " + username + " (" + userId + ")" +
                    ", 在线用户数: " + onlineUsers.size());

        } catch (Exception e) {
            System.err.println("处理加入文档请求失败: " + e.getMessage());
            e.printStackTrace();

            // 发送错误响应
            Map<String, Object> errorResponse = new HashMap<>();
            errorResponse.put("type", "ERROR");
            errorResponse.put("message", "加入文档失败: " + e.getMessage());
            errorResponse.put("documentId", documentId);

            messagingTemplate.convertAndSendToUser(principal.getName(), "/queue/errors", errorResponse);
        }
    }

    /**
     * 处理文档操作
     */
    @MessageMapping("/document/{documentId}/operation")
    public void handleDocumentOperation(
            @DestinationVariable Integer documentId,
            @Payload operation operation,
            StompHeaderAccessor accessor,Principal principal) {

        String sessionId = accessor.getSessionId();
        String connectUserId = principal.getName();

        try {
            System.out.println("📝 收到文档操作 - 文档ID: " + documentId +
                    ", 类型: " + operation.getType() +
                    ", 会话: " + sessionId);

            // 验证操作
            if (!validateOperation(operation, sessionId)) {
                sendErrorResponse(sessionId, "操作验证失败", documentId);
                return;
            }

            // 设置操作元数据
            operation.setDocumentId(documentId);
            operation.setSessionId(sessionId);
            operation.setTimestamp(LocalDateTime.now());

            // 获取用户会话信息
            DocumentStateManager.UserSession session =
                    documentStateManager.getUserSession(sessionId);
            if (session != null) {
                operation.setUserId(session.getUserId());
                operation.setUsername(session.getUsername());
            }

            // 应用操作到文档状态
            operation appliedOperation = documentStateManager.applyOperation(documentId, operation);

            // 保存到数据库(异步)
            saveDocumentToDatabase(documentId,connectUserId);

            // 广播应用后的操作
            Map<String, Object> broadcastMessage = new HashMap<>();
            broadcastMessage.put("type", "OPERATION_APPLIED");
            broadcastMessage.put("operation", appliedOperation);
            broadcastMessage.put("documentId", documentId);
            broadcastMessage.put("serverVersion", appliedOperation.getVersion());
            broadcastMessage.put("timestamp", System.currentTimeMillis());

            messagingTemplate.convertAndSend("/topic/document/" + documentId + "/operations", (Object) broadcastMessage);

            // 发送确认给发送者
            Map<String, Object> ackMessage = new HashMap<>();
            ackMessage.put("type", "OPERATION_ACK");
            ackMessage.put("operationId", operation.getId());
            ackMessage.put("version", appliedOperation.getVersion());
            ackMessage.put("timestamp", System.currentTimeMillis());

            messagingTemplate.convertAndSendToUser(connectUserId, "/queue/ack", ackMessage);

            System.out.println("✅ 操作处理完成 - 操作ID: " + operation.getId() +
                    ", 版本: " + appliedOperation.getVersion());

        } catch (Exception e) {
            System.err.println("处理文档操作失败: " + e.getMessage());
            e.printStackTrace();

            sendErrorResponse(sessionId, "处理操作失败: " + e.getMessage(), documentId);
        }
    }

    /**
     * 处理光标更新
     */
    @MessageMapping("/document/{documentId}/cursor")
    public void handleCursorUpdate(
            @DestinationVariable Integer documentId,
            @Payload Map<String, Object> cursorData,
            StompHeaderAccessor accessor) {

        String sessionId = accessor.getSessionId();


        try {
            // 获取用户会话信息
            DocumentStateManager.UserSession session =
                    documentStateManager.getUserSession(sessionId);

            if (session == null) {
                return;
            }

            // 添加用户信息
            cursorData.put("userId", session.getUserId());
            cursorData.put("username", session.getUsername());
            cursorData.put("sessionId", sessionId);
            cursorData.put("type", "CURSOR_UPDATE");
            cursorData.put("timestamp", System.currentTimeMillis());

            // 广播光标位置(排除发送者自己)
            messagingTemplate.convertAndSend("/topic/document/" + documentId + "/cursors", (Object) cursorData);

        } catch (Exception e) {
            System.err.println("处理光标更新失败: " + e.getMessage());
        }
    }

    /**
     * 处理心跳
     */
    @MessageMapping("/document/{documentId}/heartbeat")
    public void handleHeartbeat(
            @DestinationVariable Integer documentId,
            @Payload Map<String, Object> heartbeat,
            StompHeaderAccessor accessor,Principal principal) {

        String sessionId = accessor.getSessionId();
        String connectUserId = principal.getName();

        Map<String, Object> response = new HashMap<>();
        response.put("type", "HEARTBEAT_RESPONSE");
        response.put("sessionId", sessionId);
        response.put("timestamp", System.currentTimeMillis());
        response.put("serverTime", System.currentTimeMillis());


        messagingTemplate.convertAndSendToUser(connectUserId, "/queue/heartbeat", response);
    }

    /**
     * 处理文档保存请求
     */
    @MessageMapping("/document/{documentId}/save")
    public void handleSaveDocument(
            @DestinationVariable Integer documentId,
            @Payload Map<String, Object> saveRequest,
            StompHeaderAccessor accessor,Principal principal) {

        String sessionId = accessor.getSessionId();
        String connectUserId = principal.getName();

        try {
            // 获取文档当前状态
            DocumentStateManager.DocumentState state =
                    documentStateManager.getCurrentState(documentId);

            if (state == null) {
                sendErrorResponse(connectUserId, "文档状态不存在", documentId);
                return;
            }

            // 保存到数据库
            documentService.save(documentId, state.getContent());

            // 发送保存确认
            Map<String, Object> saveResponse = new HashMap<>();
            saveResponse.put("type", "SAVE_COMPLETE");
            saveResponse.put("documentId", documentId);
            saveResponse.put("version", state.getVersion());
            saveResponse.put("timestamp", System.currentTimeMillis());

            messagingTemplate.convertAndSendToUser(connectUserId, "/queue/save", saveResponse);

            System.out.println("💾 文档保存完成 - 文档ID: " + documentId);

        } catch (Exception e) {
            System.err.println("保存文档失败: " + e.getMessage());
            sendErrorResponse(sessionId, "保存失败: " + e.getMessage(), documentId);
        }
    }

    /**
     * 用户离开文档
     */
    @MessageMapping("/document/{documentId}/leave")
    public void handleLeaveDocument(
            @DestinationVariable Integer documentId,
            @Payload Map<String, Object> leaveData,
            StompHeaderAccessor accessor) {

        String sessionId = accessor.getSessionId();


        try {
            // 获取用户会话信息
            DocumentStateManager.UserSession session =
                    documentStateManager.getUserSession(sessionId);

            if (session == null) {
                return;
            }


            // 广播用户离开事件
            Map<String, Object> leaveEvent = new HashMap<>();
            leaveEvent.put("type", "USER_LEFT");
            leaveEvent.put("documentId", documentId);
            leaveEvent.put("userId", session.getUserId());
            leaveEvent.put("username", session.getUsername());
            leaveEvent.put("sessionId", sessionId);
            leaveEvent.put("timestamp", System.currentTimeMillis());

            messagingTemplate.convertAndSend("/topic/document/" + documentId + "/events", (Object) leaveEvent);

            documentStateManager.removeUserFromDocument(documentId, sessionId);
            // 清理用户会话
            documentStateManager.removeUserSession(sessionId);

            System.out.println("👋 用户离开文档 - 文档ID: " + documentId +
                    ", 用户: " + session.getUsername());

        } catch (Exception e) {
            System.err.println("处理用户离开失败: " + e.getMessage());
        }
    }

    // 辅助方法

    private boolean validateOperation(operation operation, String sessionId) {
        if (operation == null) return false;
        if (operation.getType() == null) return false;
        if (operation.getBaseVersion() == null) return false;

        // 验证用户会话
        DocumentStateManager.UserSession session =
                documentStateManager.getUserSession(sessionId);

        if (session == null) {
            System.err.println("用户会话不存在: " + sessionId);
            return false;
        }

        return true;
    }

    private void sendErrorResponse(String principalName, String message, Integer documentId) {
        Map<String, Object> errorResponse = new HashMap<>();
        errorResponse.put("type", "ERROR");
        errorResponse.put("message", message);
        errorResponse.put("documentId", documentId);
        errorResponse.put("timestamp", System.currentTimeMillis());

        messagingTemplate.convertAndSendToUser(principalName, "/queue/errors", errorResponse);
    }
 private void saveDocumentToDatabase(Integer documentId,String principalName) {
        // 异步保存文档
        new Thread(() -> {
            try {
                DocumentStateManager.DocumentState state = documentStateManager.getCurrentState(documentId);
                if (state != null) {
                    documentService.save(documentId, state.getContent());
                    System.out.println("📁 异步保存文档成功 - ID: " + documentId);

                    // 保存成功,发送成功回调
                    Map<String, Object> saveResponse = new HashMap<>();
                    saveResponse.put("type", "SAVE_COMPLETE");
                    saveResponse.put("documentId", documentId);
                    saveResponse.put("version", state.getVersion());
                    saveResponse.put("timestamp", System.currentTimeMillis());
                    messagingTemplate.convertAndSendToUser(principalName, "/queue/save", saveResponse);
                }
            } catch (Exception e) {
                System.err.println("异步保存文档失败: " + e.getMessage());
                // ✅ 核心修复:保存失败也发送回调,前端能收到错误提示,不会卡住
                Map<String, Object> errorResponse = new HashMap<>();
                errorResponse.put("type", "ERROR");
                errorResponse.put("message", "保存失败: " + e.getMessage());
                errorResponse.put("documentId", documentId);
                errorResponse.put("timestamp", System.currentTimeMillis());
                messagingTemplate.convertAndSendToUser(principalName, "/queue/save", errorResponse);
            }
        }).start();
    }

  • 写回答

2条回答 默认 最新

  • 檀越@新空间 2026-01-17 00:58
    关注

    上午好☀️☀️☀️️
    本答案参考通义千问

    你提到的 STOMP 和 WebSocket 的订阅功能问题,特别是点对点消息收不到,而广播消息能收到,这说明你的 STOMP 订阅逻辑或后端实现可能存在问题。我们来详细分析并提供解决方案。


    🚨 问题分析

    1. STOMP 订阅类型

    在 STOMP 中,有两种主要的订阅方式:

    • Broadcast(广播):使用 /topic/... 路径,所有订阅者都能接收到消息。
    • Point-to-Point(点对点):使用 /queue/... 路径,只有订阅该队列的客户端才能接收到消息。

    你在前端代码中看到的是 /user/queue/.../topic/document/... 等路径,这说明你正在尝试通过 STOMP 实现 广播和点对点 的通信。

    但根据你描述的情况:

    • 广播消息(如 /topic/document/...)可以正常接收
    • 点对点消息(如 /user/queue/...)几乎收不到,偶尔能收到一次

    这表明你的 点对点消息没有被正确发送或订阅


    ✅ 解决方案

    🔧 一、确认 STOMP 客户端是否已连接

    if (!stompClient || !stompClient.connected) {
        console.error('❌ STOMP客户端未连接,无法订阅');
        return;
    }
    

    ✅ 重点:确保 stompClient 已成功连接到服务器。


    🔧 二、检查订阅路径是否匹配

    你订阅了如下路径:

    stompClient.subscribe('/user/queue/init', ...);
    stompClient.subscribe('/user/queue/ack', ...);
    stompClient.subscribe('/user/queue/errors', ...);
    stompClient.subscribe('/user/queue/heartbeat', ...);
    stompClient.subscribe('/user/queue/save', ...);
    

    这些路径是 点对点队列,意味着只有特定用户会收到这些消息。

    ❗️ 问题可能出在这里:

    • 后端是否真的将消息发送到了正确的 /user/queue/... 路径?
    • 前端是否在订阅时使用了正确的 session ID 或 user ID?

    ✅ 重点:STOMP 的 /user/queue/... 是基于当前用户的,必须确保订阅路径与后端发送路径一致。


    🔧 三、检查后端是否正确发送消息

    在你的后端代码中,你有一个方法:

    @MessageMapping("/document/{documentId}/join")
    public void handleJoinDocument(...)
    

    但你没有展示完整的后端代码。我们假设你使用的是 Spring WebSocket + STOMP

    ✅ 正确的点对点消息发送方式(示例):

    // 发送点对点消息给指定用户
    String destination = "/user/" + userId + "/queue/ack";
    stompMessagingTemplate.convertAndSend(destination, message);
    

    ✅ 重点:确保后端使用了正确的路径格式,例如 /user/{userId}/queue/...,并且使用了 StompMessagingTemplateSimpMessagingTemplate 发送消息。


    🔧 四、检查前端是否重复订阅

    如果你在每次调用 subscribeToTopics() 时都重新订阅,可能会导致多个监听器被注册,从而造成混乱。

    ✅ 建议做法:

    • 在组件挂载时只订阅一次。
    • 如果需要动态更新订阅,应先取消之前的订阅。

    修改建议(Vue 中):

    // 在 created() 或 mounted() 中初始化订阅
    mounted() {
      this.connectToWebSocket();
      this.subscribeToTopics();
    }
    
    // 取消所有订阅
    unsubscribeAll() {
      stompClient.unsubscribe('/user/queue/init');
      stompClient.unsubscribe(`/topic/document/${docId.value}/operations`);
      // 其他路径...
    }
    
    // 重新订阅
    subscribeToTopics() {
      this.unsubscribeAll();
      // 重新订阅...
    }
    

    ✅ 重点:避免重复订阅,防止消息丢失或重复处理。


    🔧 五、调试建议

    1. 使用浏览器开发者工具查看 WebSocket 消息

    • 打开 Chrome 开发者工具 → Network → WebSockets → 查看实际接收到的消息。
    • 检查是否有 /user/queue/... 的消息被发送。

    2. 添加日志输出

    在前端订阅后,打印出订阅路径:

    console.log('订阅路径:', '/user/queue/init');
    

    在后端,打印出发送的路径:

    System.out.println("发送到路径: " + destination);
    

    🧠 总结

    | 问题 | 原因 | 解决方案 | |------|------|----------| | 点对点消息收不到 | 订阅路径不匹配或后端未正确发送 | 确保前后端路径一致,使用 convertAndSend 发送 | | 广播消息正常 | 广播路径正确 | 无需修改 | | 重复订阅 | 多次调用订阅函数 | 使用 unsubscribe 清除旧订阅 |


    📌 最终建议代码(修改后的前端订阅部分)

    const subscribeToTopics = () => {
      if (!stompClient || !stompClient.connected) {
        console.error('❌ STOMP客户端未连接,无法订阅');
        return;
      }
    
      // 避免重复订阅
      unsubscribeAll();
    
      console.log('📡 开始订阅主题,文档ID:', docId.value);
    
      // 订阅初始化队列
      stompClient.subscribe('/user/queue/init', (message) => {
        try {
          const data = JSON.parse(message.body);
          handleInitData(data);
        } catch (error) {
          console.error('解析初始化数据失败:', error);
        }
      });
    
      // 订阅操作队列
      stompClient.subscribe(`/topic/document/${docId.value}/operations`, (message) => {
        try {
          const data = JSON.parse(message.body);
          handleOperationBroadcast(data);
        } catch (error) {
          console.error('解析操作广播失败:', error);
        }
      });
    
      // 其他订阅类似...
    
      console.log('📡 已订阅所有主题');
    };
    
    function unsubscribeAll() {
      stompClient.unsubscribe('/user/queue/init');
      stompClient.unsubscribe(`/topic/document/${docId.value}/operations`);
      // 其他路径...
    }
    

    📦 附加说明(后端示例)

    如果你使用的是 Spring Boot,确保后端有如下配置:

    @Configuration
    @EnableWebSocketMessageBroker
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/ws").withSockJS();
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.setApplicationDestinationPrefixes("/app");
            registry.enableSimpleBroker("/topic", "/queue");
            registry.setUserDestinationPrefix("/user/");
        }
    }
    

    ✅ 重点:确保 /user/queue/... 被正确配置为点对点消息的前缀。


    如果你能提供更完整的后端代码(比如 handleJoinDocument 的后续逻辑),我可以进一步帮你定位具体问题。希望以上内容对你有帮助!

    评论

报告相同问题?

问题事件

  • 创建了问题 1月17日