慕雪年少 2024-02-21 10:10 采纳率: 50%
浏览 2
已结题

websocket车辆定位sessionmap为空

springmvc项目websocket车辆人员定位本地能够运行,上线后人员正常,车辆不能运行。检查后发现是以下for循环中的sessionmap(sessionmap是存储当前用户的一个集合)为空,但是人员和车辆走的都是这一个for循环,中间没有鉴权为什么车辆会为空?

 class SendMessageThread extends Thread {
        @Override
        public void run() {
            while (bStart) {
                if (msgQueue.size() > 0) {
                    try {
                    MessageWrapper msg = msgQueue.poll();
                    logger.info("\n------取出队列消息------"+msg.type.toString());
                    logger.info( "----------for-------"+msg.type.toString()+"----"+sessionMap.entrySet().size());
                        Collection<SessionContext> values = sessionMap.values();
                        logger.info("------------------values--------------");
                        logger.info( "---------"+values.isEmpty());
                        logger.info("------------------values--------------");
                       for(SessionContext item : values){
                           logger.info("------------for---------------------");
                           logger.info(item.getUserId());
                           if (item.getTick() > 0 && item.getWebSocketSession().isOpen()) {
                               logger.info("\n------发送------"+item.getUserId()+"---"+msg.type.toString());
                               sendMessage(msg, item);
                               logger.info("\n------发送成功------");
                           } else {
                               logger.info("---------------------------删除le---------------");
                               removeSessionContext(item.getWebSocketSession());
                               if (logger.isInfoEnabled()) {
                                   logger.info("websocket is removed:{}: tick={}, status={}",  item.getTick(), item.getWebSocketSession().isOpen());
                               }
                       }

整体代码如下:

public class WebsocketEndPoint extends TextWebSocketHandler implements IMessageConsumer, InitializingBean {
    public final static String TYPE_MESSAGE = "message"; //信息

    public final static String TYPE_VEHICLE_GPS = "vehicleGps";//车辆gps
    public final static String TYPE_ONLINE_VEHICLE = "onlineVehicle"; //车辆上线

    public final static String TYPE_USER_GPS = "userGps";//用户gps
    public final static String TYPE_ONLINE_USER = "onlineUser";//用户上线

    private final static int MAX_TICK = 60;//最大连接点
    private static Logger logger = LoggerFactory
            .getLogger(WebsocketEndPoint.class);
    @Autowired
    private TMessageService msgService;//消息服务

    @Autowired
    private TCarLocusService carLocusService;//车辆轨迹

    @Autowired
    private TUserLocusService userLocusService;//用户轨迹

    @Autowired
    private TMessageRecieveRepository messageReceiveRepository;//信息接受库

    private boolean bStart = false;

    private Object waitLock = new Object();

    private SendMessageThread workerThread = null;

    @Autowired
    private MappingJackson2HttpMessageConverter jsonHttpMessageConverter;//json消息转换为message

    private ObjectMapper jacksonObjectMapper;

    private ConcurrentHashMap<String, SessionContext> sessionMap = new ConcurrentHashMap<>();

    private ConcurrentLinkedQueue<MessageWrapper> msgQueue = new ConcurrentLinkedQueue<>();//消息队列

    private ConcurrentHashMap<String, OnlineUserInfo> onlineUserMap = new ConcurrentHashMap<>();//上线用户集合

    public ConcurrentHashMap<Integer, OnlineVehicleInfo> onlineVehicleMap = new ConcurrentHashMap<>();//上线车辆集合

    /**
     * 定时任务
     * 减少tick
     */
    private Runnable decreaseTick = new Runnable() {
        @Override
        public void run() {
            //check tick
            logger.info("_________84_______减少前的数量------------------"+sessionMap.values().size());
            for (Map.Entry<String, SessionContext> item : sessionMap.entrySet()) {
                SessionContext ctx = item.getValue();
                ctx.setTick(ctx.getTick() - 1);
                logger.info("__________88____减少tick----------------"+ctx.getUserId()+"======="+ctx.getTick());
                //关闭超时的通道
                if (ctx.getTick() <= 0 && ctx.getWebSocketSession().isOpen()) {
                    try {
                        logger.info("--------92---查看关闭的通道-----------"+ctx.getUserId());
                        ctx.getWebSocketSession().close();
                    } catch (IOException e) {
                        logger.trace("", e);
                    }
                }
            }
            logger.info("_________84_______减少后的数量------------------"+sessionMap.values().size());

        }

    };


/**
 * 消费信息
 * 加锁同步
 */

    @Override
    public void consume(String msgType, Object msg, Map<String, Object> att) {
        if (msg != null) {
            try {
                msgQueue.add(new MessageWrapper(msgType, msg, att));
                Collection<SessionContext> values = sessionMap.values();
                for(SessionContext item : values){
                logger.info("--------------116--"+msgType+item.getUserId()+"---发送前查看session是否有值----------");}
                synchronized (waitLock) {
                    waitLock.notifyAll();
               }

            } catch (Exception ex) {
                logger.error("", ex);
            }

        }
    }


    @Override
    public void afterConnectionEstablished(WebSocketSession wss) throws Exception {
        logger.debug("connect to the websocket success......");
        try {
            Principal loginUser = getLoginUser(wss);
            Assert.notNull(loginUser, "授权不能为空");
            String userId = loginUser.getName();
            SessionContext sessContext = new SessionContext(wss, MAX_TICK);
            sessContext.setUserId(userId);
            logger.info("-------getsess-----176-----"+wss.getId());
            logger.info("-------getsess------177----"+wss.getId()+"------------查看添加值--------------");
            logger.info("-------context-----178-----"+sessContext.getUserId()+sessContext.getWebSocketSession());
            this.sessionMap.put(wss.getId(), sessContext);

            logger.debug("connection ok: sid:{} userId:{}", wss.getId(), userId);
        } catch (Exception ex) {
            logger.error("", ex);
        }

    }

  

    private GeometryFactory geometryFactory = new GeometryFactory(new PrecisionModel(PrecisionModel.FLOATING), 4326);

   
    @Override
    public void handleTransportError(WebSocketSession wss, Throwable thrwbl) throws Exception {
        //is exist?
        String id = wss.getId();
        try {
            if (this.sessionMap.containsKey(id)) {
                logger.info("---------删除le267------267---------"+wss.getId());
                removeSessionContext(wss);
                if (wss != null && wss.isOpen()) {
                    logger.info("----------关闭socket会话------270---------"+wss.getId());
                    wss.close();
                }
            }
        } catch (Exception ex) {
            logger.error("", ex);
        }


        logger.debug("Error:websocket connection closed......, id={}", id);
        logger.trace("", thrwbl);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession wss, CloseStatus cs) throws Exception {
        logger.debug("websocket connection closed......");
        try {
            logger.info("----------socket会话关闭后删除wss---287------------"+wss.getId());
            removeSessionContext(wss);

        } catch (Exception ex) {
            logger.error("", ex);
        }
    }

   
    public void removeSessionContext(WebSocketSession wss){
        SessionContext sessionContext = this.sessionMap.get(wss.getId());
        logger.info("--------------------移除session-----303------"+sessionContext.getUserId());
        if (sessionContext != null && sessionContext.isUploadLocus()) {
        logger.info("-----------用户下线-----305----------"+wss.getPrincipal().getName());
            userOffline(wss.getPrincipal().getName());

        }else {this.sessionMap.remove(wss.getId());
        logger.info("-------------移除了--309--------------"+wss.getId());
        }

    }

 

    private void sendMessage(MessageWrapper msg, SessionContext sc) {
        try {
            if (isValid(sc, msg)) {
                //检查数据是否需要克隆并筛选
                MessageWrapper messageWrapper = checkData(sc, msg);
                TextMessage txtMsg = new TextMessage(jacksonObjectMapper.writeValueAsString(ActionResult.Succeed(messageWrapper)));
                sc.getWebSocketSession().sendMessage(txtMsg);
                //发送消息后将消息类型的标记为已发送
                if (msg.getType().equals(TYPE_MESSAGE) && sc.isSendMsg()) {
                    Object[] datas = (Object[]) msg.getData();
                    TMessage message = (TMessage) datas[0];
                    String userId = sc.getUserId();
                    msgService.markMessageRecieve(message.getMsgId(),userId);

                }
            }

        } catch (Exception ex) {
            logger.error("Failed to trans msg", ex);
        }

    }

    private boolean isValid(SessionContext sc, MessageWrapper msg) {

        if (msg.getType().equals(TYPE_MESSAGE) && sc.isSendMsg()) {
            Object[] datas = (Object[]) msg.getData();
            //非初始化的只会包含一条data
            if (datas.length > 0) {
                TMessage hwImMsg = (TMessage) datas[0];
                List<TMessageRecieve> messageRecieveList = hwImMsg.getMessageRecieveList();
                for (TMessageRecieve messageRecieve : messageRecieveList) {
                    if (messageRecieve.getRecieverUserId().equals(sc.getUserId())) {
                        //查询该用户未读消息总数
                        TMessage selectMsg = new TMessage();
                        selectMsg.setRecieverId(sc.getUserId());
                        int unreadNum = 0;
                        List<Map<String, Object>> unRead = msgService.selectNumOfUnreadMsg(sc.getUserId());
                        for (Map<String, Object> stringObjectMap : unRead) {
                            unreadNum += ((Integer) stringObjectMap.get("count"));                        }
                        msg.setUnreadMsgNo(unreadNum);
                        return true;
                    }
                }
            }
        }
        if (msg.getType().equals(TYPE_ONLINE_VEHICLE) && sc.isSendGPS()) {
            return true;
        }
        if (msg.getType().equals(TYPE_VEHICLE_GPS) && sc.isSendGPS()) {
            return true;
        }
        if (msg.getType().equals(TYPE_ONLINE_USER) && sc.isSendGPS()) {
            return true;
        }
        if (msg.getType().equals(TYPE_USER_GPS) && sc.isSendGPS()) {
            return true;
        }
        return false;
    }

    private MessageWrapper checkData(SessionContext sc, MessageWrapper msg) {
        return msg;
    }

    private void resetTick(WebSocketSession wss) {
        SessionContext ctx = this.sessionMap.get(wss.getId());
        logger.info(sessionMap.entrySet().size()+"--------------获取ctx的长度--id不为空设为最大链接-------439-----"+ctx.getUserId());
        if (ctx != null) {
            ctx.setTick(MAX_TICK);
        }
    }

   


    /**
     * 循环发送消息
     *
     * @author zhanyan
     */
    class SendMessageThread extends Thread {
        @Override
        public void run() {
            while (bStart) {
                if (msgQueue.size() > 0) {
                    try {
                    MessageWrapper msg = msgQueue.poll();
                    logger.info("\n------取出队列消息------"+msg.type.toString());
                    logger.info( "----------for-------"+msg.type.toString()+"----"+sessionMap.entrySet().size());
                        Collection<SessionContext> values = sessionMap.values();
                        logger.info("------------------values--------------");
                        logger.info( "---------"+values.isEmpty());
                        logger.info("------------------values--------------");
                       for(SessionContext item : values){
                           logger.info("------------for---------------------");
                           logger.info(item.getUserId());
                           if (item.getTick() > 0 && item.getWebSocketSession().isOpen()) {
                               logger.info("\n------发送------"+item.getUserId()+"---"+msg.type.toString());
                               sendMessage(msg, item);
                               logger.info("\n------发送成功------");
                           } else {
                               logger.info("---------------------------删除le---------------");
                               removeSessionContext(item.getWebSocketSession());
                               if (logger.isInfoEnabled()) {
                                   logger.info("websocket is removed:{}: tick={}, status={}",  item.getTick(), item.getWebSocketSession().isOpen());
                               }
                       }
                    }
                        } catch (Exception e) {

                            logger.info("\n------发送失败------"+e.toString());
                            logger.trace("", e);
                        }

                } else {
                    try {
                        synchronized (waitLock) {
                            waitLock.wait();
                        }

                    } catch (InterruptedException e) {
                        logger.trace("", e);
                    }
                }
            }

        }
    }

    public ConcurrentHashMap<String, OnlineUserInfo> getOnlineUserMap() {
        return onlineUserMap;
    }

    public void setOnlineUserMap(ConcurrentHashMap<String, OnlineUserInfo> onlineUserMap) {
        this.onlineUserMap = onlineUserMap;
    }

    public ConcurrentHashMap<Integer, OnlineVehicleInfo> getOnlineVehicleMap() {
        return onlineVehicleMap;
    }

    public void setOnlineVehicleMap(ConcurrentHashMap<Integer, OnlineVehicleInfo> onlineVehicleMap) {
        this.onlineVehicleMap = onlineVehicleMap;
    }
}

class OnlineVehicleInfo {

    private Integer carId;

    private boolean online;


    public OnlineVehicleInfo(Integer carId) {
        this.carId = carId;
        this.online = true;
    }

    public Integer getCarId() {
        return carId;
    }

    public void setCarId(Integer carId) {
        this.carId = carId;
    }

    public boolean isOnline() {
        return online;
    }

    public void setOnline(boolean online) {
        this.online = online;
    }
}

class OnlineUserInfo {

    private String userId;

    private boolean online;


    public OnlineUserInfo(String userId) {
        this.userId = userId;
        this.online = true;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public boolean isOnline() {
        return online;
    }

    public void setOnline(boolean online) {
        this.online = online;
    }
}

class SessionContext {
    private String userId;
    private WebSocketSession webSocketSession;
    private int tick;
    private boolean sendGPS = false;
    private boolean sendMsg = false;
    private boolean uploadLocus = false;

    public SessionContext(WebSocketSession session, int tick) {
        this.webSocketSession = session;
        this.tick = tick;
    }

    public WebSocketSession getWebSocketSession() {
        return webSocketSession;
    }

    public void setWebSocketSession(WebSocketSession webSocketSession) {
        this.webSocketSession = webSocketSession;
    }

    public int getTick() {
        return tick;
    }

    public synchronized void setTick(int tick) {
        this.tick = tick;
    }


    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }


    public void enableGPS() {
        this.sendGPS = true;
    }

    public void enableMsg() {
        this.sendMsg = true;
    }

    public void enableUploadLocus() {
        this.uploadLocus = true;
    }


    public boolean isSendGPS() {
        return sendGPS;
    }

    public boolean isSendMsg() {
        return sendMsg;
    }
    public boolean isUploadLocus() {
        return uploadLocus;
    }

}

  • 写回答

2条回答 默认 最新

  • 慕雪年少 2024-02-29 15:05
    关注

    已经解决了,原因是tomcat创建读取bean文件时,一个文件创建了两个load,基地的概率还是被碰上了,重装tomcat解决了

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 3月8日
  • 已采纳回答 2月29日
  • 创建了问题 2月21日

悬赏问题

  • ¥30 STM32 INMP441无法读取数据
  • ¥100 求汇川机器人IRCB300控制器和示教器同版本升级固件文件升级包
  • ¥15 用visualstudio2022创建vue项目后无法启动
  • ¥15 x趋于0时tanx-sinx极限可以拆开算吗
  • ¥500 把面具戴到人脸上,请大家贡献智慧
  • ¥15 任意一个散点图自己下载其js脚本文件并做成独立的案例页面,不要作在线的,要离线状态。
  • ¥15 各位 帮我看看如何写代码,打出来的图形要和如下图呈现的一样,急
  • ¥30 c#打开word开启修订并实时显示批注
  • ¥15 如何解决ldsc的这条报错/index error
  • ¥15 VS2022+WDK驱动开发环境