springmvc项目websocket车辆人员定位本地能够运行,上线后人员正常,车辆不能运行。检查后发现是以下for循环中的sessionmap(sessionmap是存储当前用户的一个集合)为空,但是人员和车辆走的都是这一个for循环,中间没有鉴权为什么车辆会为空?
class SendMessageThread extends Thread {
@Override
public void run() {
while (bStart) {
if (msgQueue.size() > 0) {
try {
MessageWrapper msg = msgQueue.poll();
logger.info("\n------取出队列消息------"+msg.type.toString());
logger.info( "----------for-------"+msg.type.toString()+"----"+sessionMap.entrySet().size());
Collection<SessionContext> values = sessionMap.values();
logger.info("------------------values--------------");
logger.info( "---------"+values.isEmpty());
logger.info("------------------values--------------");
for(SessionContext item : values){
logger.info("------------for---------------------");
logger.info(item.getUserId());
if (item.getTick() > 0 && item.getWebSocketSession().isOpen()) {
logger.info("\n------发送------"+item.getUserId()+"---"+msg.type.toString());
sendMessage(msg, item);
logger.info("\n------发送成功------");
} else {
logger.info("---------------------------删除le---------------");
removeSessionContext(item.getWebSocketSession());
if (logger.isInfoEnabled()) {
logger.info("websocket is removed:{}: tick={}, status={}", item.getTick(), item.getWebSocketSession().isOpen());
}
}
整体代码如下:
public class WebsocketEndPoint extends TextWebSocketHandler implements IMessageConsumer, InitializingBean {
public final static String TYPE_MESSAGE = "message"; //信息
public final static String TYPE_VEHICLE_GPS = "vehicleGps";//车辆gps
public final static String TYPE_ONLINE_VEHICLE = "onlineVehicle"; //车辆上线
public final static String TYPE_USER_GPS = "userGps";//用户gps
public final static String TYPE_ONLINE_USER = "onlineUser";//用户上线
private final static int MAX_TICK = 60;//最大连接点
private static Logger logger = LoggerFactory
.getLogger(WebsocketEndPoint.class);
@Autowired
private TMessageService msgService;//消息服务
@Autowired
private TCarLocusService carLocusService;//车辆轨迹
@Autowired
private TUserLocusService userLocusService;//用户轨迹
@Autowired
private TMessageRecieveRepository messageReceiveRepository;//信息接受库
private boolean bStart = false;
private Object waitLock = new Object();
private SendMessageThread workerThread = null;
@Autowired
private MappingJackson2HttpMessageConverter jsonHttpMessageConverter;//json消息转换为message
private ObjectMapper jacksonObjectMapper;
private ConcurrentHashMap<String, SessionContext> sessionMap = new ConcurrentHashMap<>();
private ConcurrentLinkedQueue<MessageWrapper> msgQueue = new ConcurrentLinkedQueue<>();//消息队列
private ConcurrentHashMap<String, OnlineUserInfo> onlineUserMap = new ConcurrentHashMap<>();//上线用户集合
public ConcurrentHashMap<Integer, OnlineVehicleInfo> onlineVehicleMap = new ConcurrentHashMap<>();//上线车辆集合
/**
* 定时任务
* 减少tick
*/
private Runnable decreaseTick = new Runnable() {
@Override
public void run() {
//check tick
logger.info("_________84_______减少前的数量------------------"+sessionMap.values().size());
for (Map.Entry<String, SessionContext> item : sessionMap.entrySet()) {
SessionContext ctx = item.getValue();
ctx.setTick(ctx.getTick() - 1);
logger.info("__________88____减少tick----------------"+ctx.getUserId()+"======="+ctx.getTick());
//关闭超时的通道
if (ctx.getTick() <= 0 && ctx.getWebSocketSession().isOpen()) {
try {
logger.info("--------92---查看关闭的通道-----------"+ctx.getUserId());
ctx.getWebSocketSession().close();
} catch (IOException e) {
logger.trace("", e);
}
}
}
logger.info("_________84_______减少后的数量------------------"+sessionMap.values().size());
}
};
/**
* 消费信息
* 加锁同步
*/
@Override
public void consume(String msgType, Object msg, Map<String, Object> att) {
if (msg != null) {
try {
msgQueue.add(new MessageWrapper(msgType, msg, att));
Collection<SessionContext> values = sessionMap.values();
for(SessionContext item : values){
logger.info("--------------116--"+msgType+item.getUserId()+"---发送前查看session是否有值----------");}
synchronized (waitLock) {
waitLock.notifyAll();
}
} catch (Exception ex) {
logger.error("", ex);
}
}
}
@Override
public void afterConnectionEstablished(WebSocketSession wss) throws Exception {
logger.debug("connect to the websocket success......");
try {
Principal loginUser = getLoginUser(wss);
Assert.notNull(loginUser, "授权不能为空");
String userId = loginUser.getName();
SessionContext sessContext = new SessionContext(wss, MAX_TICK);
sessContext.setUserId(userId);
logger.info("-------getsess-----176-----"+wss.getId());
logger.info("-------getsess------177----"+wss.getId()+"------------查看添加值--------------");
logger.info("-------context-----178-----"+sessContext.getUserId()+sessContext.getWebSocketSession());
this.sessionMap.put(wss.getId(), sessContext);
logger.debug("connection ok: sid:{} userId:{}", wss.getId(), userId);
} catch (Exception ex) {
logger.error("", ex);
}
}
private GeometryFactory geometryFactory = new GeometryFactory(new PrecisionModel(PrecisionModel.FLOATING), 4326);
@Override
public void handleTransportError(WebSocketSession wss, Throwable thrwbl) throws Exception {
//is exist?
String id = wss.getId();
try {
if (this.sessionMap.containsKey(id)) {
logger.info("---------删除le267------267---------"+wss.getId());
removeSessionContext(wss);
if (wss != null && wss.isOpen()) {
logger.info("----------关闭socket会话------270---------"+wss.getId());
wss.close();
}
}
} catch (Exception ex) {
logger.error("", ex);
}
logger.debug("Error:websocket connection closed......, id={}", id);
logger.trace("", thrwbl);
}
@Override
public void afterConnectionClosed(WebSocketSession wss, CloseStatus cs) throws Exception {
logger.debug("websocket connection closed......");
try {
logger.info("----------socket会话关闭后删除wss---287------------"+wss.getId());
removeSessionContext(wss);
} catch (Exception ex) {
logger.error("", ex);
}
}
public void removeSessionContext(WebSocketSession wss){
SessionContext sessionContext = this.sessionMap.get(wss.getId());
logger.info("--------------------移除session-----303------"+sessionContext.getUserId());
if (sessionContext != null && sessionContext.isUploadLocus()) {
logger.info("-----------用户下线-----305----------"+wss.getPrincipal().getName());
userOffline(wss.getPrincipal().getName());
}else {this.sessionMap.remove(wss.getId());
logger.info("-------------移除了--309--------------"+wss.getId());
}
}
private void sendMessage(MessageWrapper msg, SessionContext sc) {
try {
if (isValid(sc, msg)) {
//检查数据是否需要克隆并筛选
MessageWrapper messageWrapper = checkData(sc, msg);
TextMessage txtMsg = new TextMessage(jacksonObjectMapper.writeValueAsString(ActionResult.Succeed(messageWrapper)));
sc.getWebSocketSession().sendMessage(txtMsg);
//发送消息后将消息类型的标记为已发送
if (msg.getType().equals(TYPE_MESSAGE) && sc.isSendMsg()) {
Object[] datas = (Object[]) msg.getData();
TMessage message = (TMessage) datas[0];
String userId = sc.getUserId();
msgService.markMessageRecieve(message.getMsgId(),userId);
}
}
} catch (Exception ex) {
logger.error("Failed to trans msg", ex);
}
}
private boolean isValid(SessionContext sc, MessageWrapper msg) {
if (msg.getType().equals(TYPE_MESSAGE) && sc.isSendMsg()) {
Object[] datas = (Object[]) msg.getData();
//非初始化的只会包含一条data
if (datas.length > 0) {
TMessage hwImMsg = (TMessage) datas[0];
List<TMessageRecieve> messageRecieveList = hwImMsg.getMessageRecieveList();
for (TMessageRecieve messageRecieve : messageRecieveList) {
if (messageRecieve.getRecieverUserId().equals(sc.getUserId())) {
//查询该用户未读消息总数
TMessage selectMsg = new TMessage();
selectMsg.setRecieverId(sc.getUserId());
int unreadNum = 0;
List<Map<String, Object>> unRead = msgService.selectNumOfUnreadMsg(sc.getUserId());
for (Map<String, Object> stringObjectMap : unRead) {
unreadNum += ((Integer) stringObjectMap.get("count")); }
msg.setUnreadMsgNo(unreadNum);
return true;
}
}
}
}
if (msg.getType().equals(TYPE_ONLINE_VEHICLE) && sc.isSendGPS()) {
return true;
}
if (msg.getType().equals(TYPE_VEHICLE_GPS) && sc.isSendGPS()) {
return true;
}
if (msg.getType().equals(TYPE_ONLINE_USER) && sc.isSendGPS()) {
return true;
}
if (msg.getType().equals(TYPE_USER_GPS) && sc.isSendGPS()) {
return true;
}
return false;
}
private MessageWrapper checkData(SessionContext sc, MessageWrapper msg) {
return msg;
}
private void resetTick(WebSocketSession wss) {
SessionContext ctx = this.sessionMap.get(wss.getId());
logger.info(sessionMap.entrySet().size()+"--------------获取ctx的长度--id不为空设为最大链接-------439-----"+ctx.getUserId());
if (ctx != null) {
ctx.setTick(MAX_TICK);
}
}
/**
* 循环发送消息
*
* @author zhanyan
*/
class SendMessageThread extends Thread {
@Override
public void run() {
while (bStart) {
if (msgQueue.size() > 0) {
try {
MessageWrapper msg = msgQueue.poll();
logger.info("\n------取出队列消息------"+msg.type.toString());
logger.info( "----------for-------"+msg.type.toString()+"----"+sessionMap.entrySet().size());
Collection<SessionContext> values = sessionMap.values();
logger.info("------------------values--------------");
logger.info( "---------"+values.isEmpty());
logger.info("------------------values--------------");
for(SessionContext item : values){
logger.info("------------for---------------------");
logger.info(item.getUserId());
if (item.getTick() > 0 && item.getWebSocketSession().isOpen()) {
logger.info("\n------发送------"+item.getUserId()+"---"+msg.type.toString());
sendMessage(msg, item);
logger.info("\n------发送成功------");
} else {
logger.info("---------------------------删除le---------------");
removeSessionContext(item.getWebSocketSession());
if (logger.isInfoEnabled()) {
logger.info("websocket is removed:{}: tick={}, status={}", item.getTick(), item.getWebSocketSession().isOpen());
}
}
}
} catch (Exception e) {
logger.info("\n------发送失败------"+e.toString());
logger.trace("", e);
}
} else {
try {
synchronized (waitLock) {
waitLock.wait();
}
} catch (InterruptedException e) {
logger.trace("", e);
}
}
}
}
}
public ConcurrentHashMap<String, OnlineUserInfo> getOnlineUserMap() {
return onlineUserMap;
}
public void setOnlineUserMap(ConcurrentHashMap<String, OnlineUserInfo> onlineUserMap) {
this.onlineUserMap = onlineUserMap;
}
public ConcurrentHashMap<Integer, OnlineVehicleInfo> getOnlineVehicleMap() {
return onlineVehicleMap;
}
public void setOnlineVehicleMap(ConcurrentHashMap<Integer, OnlineVehicleInfo> onlineVehicleMap) {
this.onlineVehicleMap = onlineVehicleMap;
}
}
class OnlineVehicleInfo {
private Integer carId;
private boolean online;
public OnlineVehicleInfo(Integer carId) {
this.carId = carId;
this.online = true;
}
public Integer getCarId() {
return carId;
}
public void setCarId(Integer carId) {
this.carId = carId;
}
public boolean isOnline() {
return online;
}
public void setOnline(boolean online) {
this.online = online;
}
}
class OnlineUserInfo {
private String userId;
private boolean online;
public OnlineUserInfo(String userId) {
this.userId = userId;
this.online = true;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public boolean isOnline() {
return online;
}
public void setOnline(boolean online) {
this.online = online;
}
}
class SessionContext {
private String userId;
private WebSocketSession webSocketSession;
private int tick;
private boolean sendGPS = false;
private boolean sendMsg = false;
private boolean uploadLocus = false;
public SessionContext(WebSocketSession session, int tick) {
this.webSocketSession = session;
this.tick = tick;
}
public WebSocketSession getWebSocketSession() {
return webSocketSession;
}
public void setWebSocketSession(WebSocketSession webSocketSession) {
this.webSocketSession = webSocketSession;
}
public int getTick() {
return tick;
}
public synchronized void setTick(int tick) {
this.tick = tick;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public void enableGPS() {
this.sendGPS = true;
}
public void enableMsg() {
this.sendMsg = true;
}
public void enableUploadLocus() {
this.uploadLocus = true;
}
public boolean isSendGPS() {
return sendGPS;
}
public boolean isSendMsg() {
return sendMsg;
}
public boolean isUploadLocus() {
return uploadLocus;
}
}