华玥作者 2024-07-25 16:43 采纳率: 22.2%
浏览 38

springboot使用websocket建立连接问题

网上百度建立websocket连接如下代码

package com.example.demo.system.service.impl;


import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.example.demo.system.service.IWebSocketServer;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @ClassName: 开启WebSocket支持
 * @Description:
 * @Author: jwb
 * @Date: 2023/2/21 16:48
 */
@ServerEndpoint("/dev-api/websocket/{userId}")
@Component
public class WebSocketServer {

    static Log log = LogFactory.get(WebSocketServer.class);
    /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
    private static int onlineCount = 0;
    /**concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。*/
    private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
    private Session session;
    /**接收userId*/
    private String userId="";

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        this.session = session;
        this.userId=userId;
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            webSocketMap.put(userId,this);
            //加入set中
        }else{
            webSocketMap.put(userId,this);
            //加入set中
            addOnlineCount();
            //在线数加1
        }

        log.info("用户连接:"+userId+",当前在线人数为:" + getOnlineCount());

        try {
            sendMessage("连接成功");
        } catch (IOException e) {
            log.error("用户:"+userId+",网络异常!!!!!!");
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        if(webSocketMap.containsKey(userId)){
            webSocketMap.remove(userId);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:"+userId+",当前在线人数为:" + getOnlineCount());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("用户消息:"+userId+",报文:"+message);
        //可以群发消息
        //消息保存到数据库、redis
        if(StringUtils.isNotBlank(message)){
            try {
                //解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);

            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:"+this.userId+",原因:"+error.getMessage());
        error.printStackTrace();
    }

    /**
     * 实现服务器主动推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }


    /**
     * 实现服务器主动推送
     */
    public static void sendAllMessage(String message) throws IOException {
        ConcurrentHashMap.KeySetView<String, WebSocketServer> userIds = webSocketMap.keySet();
        for (String userId : userIds) {
            WebSocketServer webSocketServer = webSocketMap.get(userId);
            webSocketServer.session.getBasicRemote().sendText(message);
            System.out.println("webSocket实现服务器主动推送成功userIds===="+userIds);
        }
    }

    /**
     * 发送自定义消息
     * */
    public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
        log.info("发送消息到:"+userId+",报文:"+message);
        if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
            webSocketMap.get(userId).sendMessage(message);
        }else{
            log.error("用户"+userId+",不在线!");
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}

建立连接报错

img

  • 写回答

5条回答 默认 最新

  • 阿里嘎多学长 2024-07-25 16:43
    关注

    以下内容由AIGC及阿里嘎多学长共同生成、有用望采纳:


    问题分析与解决方案

    从你提供的代码和错误日志来看,主要问题在于WebSocket连接时的异常处理和用户ID的初始化。以下是详细的分析和解决方案:

    1. EOFException 异常处理

    java.io.EOFException 通常表示客户端已经关闭连接。在你的代码中,onError 方法需要处理这种异常,以确保服务器能够正确地处理客户端断开连接的情况。

    2. 用户ID 初始化问题

    从错误日志来看,userId 可能在某些情况下没有被正确初始化。这需要在 onOpen 方法中确保 userId 被正确设置。

    修改后的代码

    以下是修改后的代码示例,包括对 userId 的初始化和异常处理的改进:

    package com.example.demo.system.service.impl;
    
    import cn.hutool.log.Log;
    import cn.hutool.log.LogFactory;
    import com.alibaba.fastjson2.JSON;
    import com.alibaba.fastjson2.JSONObject;
    import com.baomidou.mybatisplus.core.toolkit.StringUtils;
    import com.example.demo.system.service.IWebSocketServer;
    import org.springframework.stereotype.Component;
    
    import javax.websocket.*;
    import javax.websocket.server.PathParam;
    import javax.websocket.server.ServerEndpoint;
    import java.io.IOException;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * @ClassName: 开启WebSocket支持
     * @Description:
     * @Author: jwb
     * @Date: 2023/2/21 16:48
     */
    @ServerEndpoint("/dev-api/websocket/{userId}")
    @Component
    public class WebSocketServer {
    
        static Log log = LogFactory.get(WebSocketServer.class);
        /**静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。*/
        private static int onlineCount = 0;
        /**concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。*/
        private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
        /**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
        private Session session;
        /**接收userId*/
        private String userId = "";
    
        /**
         * 连接建立成功调用的方法
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("userId") String userId) {
            this.session = session;
            this.userId = userId;
            if (webSocketMap.containsKey(userId)) {
                webSocketMap.remove(userId);
                webSocketMap.put(userId, this);
                // 加入set中
            } else {
                webSocketMap.put(userId, this);
                // 加入set中
                addOnlineCount();
                // 在线数加1
            }
    
            log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
    
            try {
                sendMessage("连接成功");
            } catch (IOException e) {
                log.error("用户:" + userId + ",网络异常!!!!!!");
            }
        }
    
        /**
         * 连接关闭调用的方法
         */
        @OnClose
        public void onClose() {
            if (webSocketMap.containsKey(userId)) {
                webSocketMap.remove(userId);
                // 从set中删除
                subOnlineCount();
            }
            log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
        }
    
        /**
         * 收到客户端消息后调用的方法
         *
         * @param message 客户端发送过来的消息
         */
        @OnMessage
        public void onMessage(String message, Session session) {
            log.info("用户消息:" + userId + ",报文:" + message);
            // 可以群发消息
            // 消息保存到数据库、redis
            if (StringUtils.isNotBlank(message)) {
                try {
                    // 解析发送的报文
                    JSONObject jsonObject = JSON.parseObject(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 处理错误
         *
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            if (error instanceof java.io.EOFException) {
                log.error("用户:" + (this.userId != null ? this.userId : "未知") + ",连接已断开");
            } else {
                log.error("用户错误:" + (this.userId != null ? this.userId : "未知") + ",原因:" + error.getMessage());
                error.printStackTrace();
            }
            if (webSocketMap.containsKey(userId)) {
                webSocketMap.remove(userId);
                subOnlineCount();
            }
        }
    
        /**
         * 实现服务器主动推送
         */
        public void sendMessage(String message) throws IOException {
            this.session.getBasicRemote().sendText(message);
        }
    
        /**
         * 实现服务器主动推送
         */
        public static void sendAllMessage(String message) throws IOException {
            ConcurrentHashMap.KeySetView<String, WebSocketServer> userIds = webSocketMap.keySet();
            for (String userId : userIds) {
                WebSocketServer webSocketServer = webSocketMap.get(userId);
                webSocketServer.session.getBasicRemote().sendText(message);
                System.out.println("webSocket实现服务器主动推送成功userIds====" + userIds);
            }
        }
    
        /**
         * 发送自定义消息
         */
        public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
            log.info("发送消息到:" + userId + ",报文:" + message);
            if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
                webSocketMap.get(userId).sendMessage(message);
            } else {
                log.error("用户" + userId + ",不在线!");
            }
        }
    
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        public static synchronized void addOnlineCount() {
            WebSocketServer.onlineCount++;
        }
    
        public static synchronized void subOnlineCount() {
            WebSocketServer.onlineCount--;
        }
    }
    

    示例输入和输出

    示例输入

    • 用户通过WebSocket客户端连接到服务器,发送消息。

    示例输出

    • 服务器接收到连接请求,调用onOpen方法,记录用户ID和在线人数。
    • 服务器接收到客户端消息,调用onMessage方法,记录消息内容。
    • 服务器主动推送消息,调用sendAllMessage方法,向所有在线用户发送消息。

    运行环境

    • Spring Boot:2.x版本
    • Java:1.8或更高版本
    • WebSocket API:JSR 356

    操作步骤

    1. 启动Spring Boot应用:运行Spring Boot应用,确保WebSocket服务端启动。
    2. 连接WebSocket客户端:使用WebSocket客户端连接到服务器的/dev-api/websocket/{userId}路径。
    3. 发送和接收消息:客户端发送消息,服务器接收并处理消息,同时服务器可以主动推送消息到客户端。

    通过以上修改和示例,用户应该能够解决WebSocket连接中遇到的问题,并实现基本的通信功能。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 7月25日