springcloud 微服务 A,B
B服务上有websocket,通过前端链接后端,前端发送心跳
A通过feign请求B接口,接口中没有具体的业务
问题:如果前端的websocket开启多个【四五个】的时候,一会儿A服务请求B服务就会出现请求超时的情况。
RetryableException: Read timed out executing POST http://*******/******] with root cause
java.net.SocketTimeoutException: Read timed out
如果关闭websoket,则不会有问题。
不知道为什么websocket会导致 B服务接收不到feign的请求。
已配置A服务的feign的超时时间,不管配置多少都有这个问题。
package com.fmjee.service.stso.config.websocket;
import com.alibaba.fastjson.JSON;
import com.fmjee.service.common.utils.BaseMap;
import com.fmjee.service.stso.config.redis.RedisClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @Author scott
* @Date 2019/11/29 9:41
* @Description: 此注解相当于设置访问URL
*/
@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}") //此注解相当于设置访问URL
public class WebSocket {
private Session session;
private String userId;
private static final String REDIS_TOPIC_NAME = "socketHandler";
@Resource
private RedisClient redisClient;
/**
* 缓存 webSocket连接到单机服务class中(整体方案支持集群)
*/
private static final CopyOnWriteArraySet<WebSocket> webSockets = new CopyOnWriteArraySet<>();
private static final Map<String, Session> sessionPool = new HashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId") String userId) {
try {
this.session = session;
this.userId = userId;
webSockets.add(this);
sessionPool.put(userId, session);
log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
} catch (Exception e) {
log.error("onOpen异常{}", JSON.toJSON(e));
}
}
@OnClose
public void onClose() {
try {
webSockets.remove(this);
sessionPool.remove(this.userId);
log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
} catch (Exception e) {
log.error("onClose异常{}", JSON.toJSON(e));
}
}
/**
* 服务端推送消息
*
* @param userId
* @param message
*/
void pushMessage(String userId, String message) {
Session sessionTemp = sessionPool.get(userId);
if (sessionTemp != null && sessionTemp.isOpen()) {
try {
log.info("【websocket消息】 单点消息:" + message);
sessionTemp.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 服务器端推送消息
*/
void pushMessage(String message) {
try {
for (WebSocket webSocket : webSockets) {
log.info("userId={}", webSocket.userId);
synchronized (session) {
session.getBasicRemote().sendText(message);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 服务器端监听客户端的推送消息
*/
@OnMessage
public void onMessage(String message) {
log.info("【websocket消息】收到客户端消息:" + message);
for (WebSocket webSocket : webSockets) {
log.info("userId={}", webSocket.userId);
webSocket.pushMessage(message);
}
}
/**
* 后台发送消息到redis
*
* @param message
*/
public void sendMessage(String message) {
log.info("【websocket消息】广播消息:" + message);
BaseMap baseMap = new BaseMap();
baseMap.put("userId", "");
baseMap.put("message", message);
redisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
}
/**
* 此为单点消息
*
* @param userId
* @param message
*/
public void sendMessage(String userId, String message) {
BaseMap baseMap = new BaseMap();
baseMap.put("userId", userId);
baseMap.put("message", message);
redisClient.sendMessage(REDIS_TOPIC_NAME, baseMap);
}
/**
* 此为单点消息(多人)
*
* @param userIds
* @param message
*/
public void sendMessage(String[] userIds, String message) {
for (String userid : userIds) {
sendMessage(userid, message);
}
}
}