技术选型与实现
控制器20070001这个就下图片中的设备,需要在页面中点击“下载报警器档案协议”向控制器发送对应的数据,这个控制器是要保持socket长连接的,实现的话,是只需要我写服务端,还是服务端与客户端都写。本人tcp小bai一个
控制器20070001这个就下图片中的设备,需要在页面中点击“下载报警器档案协议”向控制器发送对应的数据,这个控制器是要保持socket长连接的,实现的话,是只需要我写服务端,还是服务端与客户端都写。本人tcp小bai一个
【相关推荐】
socker服务端代码:
1、开启socket服务
/**
* 启动服务
*/
public void start() {
Thread socketServiceThread = new Thread(() -> {
ServerSocket serverSocket = null;
try {
//serverSocket = new ServerSocket(8000);
serverSocket = new ServerSocket(port);
log.info("服务端 socket 在[{}]启动正常", port);
//记录已开启的线程,便于管理
while (true) {
Socket newSocket = serverSocket.accept();
//设定输入流读取阻塞超时时间(60秒收不到客户端消息判定断线;与客户端心跳检测结合一起使用的)
newSocket.setSoTimeout(60000); //注意:读取时间等待超时时间,必须比心跳检测消息发送时间大;否则就不断在中断连接的循环之中
if (clientSocket != null && !clientSocket.isClosed()) {
//如果已有一个连接上客户端且没有关闭,则丢弃新连进来的
OutputStream outputStream = newSocket.getOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
//开启这个接收线程,纯属关闭资源用
ServerRecvThread otherRecvThread = new ServerRecvThread(newSocket);
new Thread(otherRecvThread).start();
//发送个无用的消息,告知具体情况
msgDataVo.setType(SocketMsgTypeEnum.SERVER_NOT_ALLOW.getType());
msgDataVo.setBody("from server: one is connected and other is not allowed at present");
SocketUtil.writeMsgData(dataOutputStream, msgDataVo);
log.warn("one is connected and new is not allowed at present");
//继续监听
continue;
}
//1、关闭已开启的线程
this.closeOpenedThreads();
//2、重建新的socket服务
clientSocket = newSocket;
ServerRecvThread newRecvThread = new ServerRecvThread(clientSocket);
threadMap.put(clientSocket, newRecvThread);
new Thread(newRecvThread).start();
ServerSendThread newServerSendThread = new ServerSendThread(clientSocket);
sendThread = newServerSendThread;
new Thread(newServerSendThread).start();
}
} catch (IOException e) {
log.error("socket服务端发生异常");
e.printStackTrace();
//释放资源
//关闭已开启的线程
this.closeOpenedThreads();
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
});
socketServiceThread.setName("socket server main thread");
socketServiceThread.start();
}
/**
*关闭已开启的线程
*/
private void closeOpenedThreads() {
if (clientSocket != null) {
log.info("删除旧的无效连接及其接收、发送线程");
ServerRecvThread oldRecvThread = threadMap.remove(clientSocket);
oldRecvThread.setStop(true);
sendThread.setStop(true);
SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
//发送个无用的消息,唤醒线程(可能处于阻塞),以便结束旧的发送线程
msgDataVo.setType(SocketMsgTypeEnum.HEART_BEAT.getType());
msgDataVo.setBody("from server: null message");
sendThread.addMsgToQueue(msgDataVo);
log.info("旧的无效连接及其接收、发送线程已回收");
}
}
2、接收线程
@Override
public void run() {
//线程终止条件: 设置标志位为 true or socket 已关闭
InputStream inputStream = null;
DataInputStream dataInputStream = null;
try {
inputStream = socket.getInputStream();
dataInputStream = new DataInputStream(inputStream);
while (!isStop && !socket.isClosed()) {
SocketMsgDataVo msgDataVo = SocketUtil.readMsgData(dataInputStream);
if (msgDataVo.getType() == SocketMsgTypeEnum.HEART_BEAT.getType()) {
//客户端心跳监测不用处理
log.info("收到客户端心跳消息");
}
}
} catch (IOException e) {
log.error("服务端接收消息发生异常");
e.printStackTrace();
} finally {
log.info("服务端旧接收线程已摧毁");
StreamUtil.closeInputStream(dataInputStream);
StreamUtil.closeInputStream(inputStream);
SocketUtil.closeSocket(socket);
}
}
3、阻塞发送线程
//阻塞安全队列,设置队列容量,否则为无限大
private final BlockingQueue<SocketMsgDataVo> msgQueue = new LinkedBlockingQueue<>(100);
private Socket socket;
private volatile boolean isStop = false;
public ServerSendThread(Socket socket) {
this.socket = socket;
}
public void addMsgToQueue(SocketMsgDataVo msgDataVo) {
try {
//队列已满,阻塞直到未满放进元素(这里一直阻塞不太行,建议使用offer()来设置入队列超时时间)
msgQueue.put(msgDataVo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() {
OutputStream outputStream = null;
DataOutputStream dataOutputStream = null;
try {
outputStream = socket.getOutputStream();
dataOutputStream = new DataOutputStream(outputStream);
while (!this.isStop && !socket.isClosed()) {
//队列为空阻塞,直到队列不为空,再取出
SocketMsgDataVo msgDataVo = null;
try {
msgDataVo = msgQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (msgDataVo != null && msgDataVo.getBody() != null) { //正文内容不能为空,否则不发
SocketUtil.writeMsgData(dataOutputStream, msgDataVo);
log.info("服务端消息已发送!");
}
}
} catch (Exception e) {
log.error("服务端消息发送异常");
e.printStackTrace();
} finally {
log.info("服务端旧消息发送线程已摧毁");
//释放资源
msgQueue.clear();
StreamUtil.closeOutputStream(dataOutputStream);
StreamUtil.closeOutputStream(outputStream);
SocketUtil.closeSocket(socket);
}
}
socker客户端代码:
1、开启客户端socket服务
/**
* 启动服务
*/
public void start(){
Thread socketServiceThread = new Thread(() -> {
while (true) {
try {
//尝试重新建立连接
//socket = SocketUtil.createClientSocket("127.0.0.1", 9999);
socket = SocketUtil.createClientSocket(host, port);
log.info("客户端 socket 在[{}]连接正常", port);
ClientRecvThread recvThread = new ClientRecvThread(socket, mongoUpdateRealTimeInterfaceDataUrl);
new Thread(recvThread).start();
ClientHeartBeatThread heartBeatThread = new ClientHeartBeatThread(socket);
new Thread(heartBeatThread).start();
//1、连接成功后,心跳异常检测、随后重连
//可以不用轮询,避免减少线程频繁切换,花费更多的cpu资源:先wait()阻塞线程,使用心跳线程finally操作中notify()唤醒重建连接
while (!heartBeatThread.isStop()) {
//进行空循环, 掉线休眠,防止损耗过大, 随即重连
try {
Thread.sleep(ClientSocketService.THREAD_SLEEP_MILLS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
//recvThread.setStop(true);
//旧的、接收线程、心跳线程摧毁,准备重建连接、接收线程、心跳线程
} catch (IOException e) {
log.error("socket客户端进行连接发生异常");
e.printStackTrace();
//2、第一次启动时连接异常发生,休眠, 重建连接
try {
Thread.sleep(ClientSocketService.THREAD_SLEEP_MILLS);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
});
socketServiceThread.setName("socket client main thread");
socketServiceThread.start();
}
2、心跳线程
@Override
public void run() {
OutputStream outputStream = null;
DataOutputStream dataOutputStream = null;
try {
outputStream = socket.getOutputStream();
dataOutputStream = new DataOutputStream(outputStream);
//客户端心跳检测
while (!this.isStop && !socket.isClosed()) {
SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
msgDataVo.setType(SocketMsgTypeEnum.HEART_BEAT.getType());
msgDataVo.setBody("from client:Is connect ok ?");
if (msgDataVo != null && msgDataVo.getBody() != null) { //正文内容不能为空,否则不发)
SocketUtil.writeMsgData(dataOutputStream, msgDataVo);
}
try {
Thread.sleep(ClientHeartBeatThread.CHECK_MILLS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
log.error("客户端心跳消息发送异常");
e.printStackTrace();
} finally {
this.isStop = true;
log.info("客户端旧心跳线程已摧毁");
StreamUtil.closeOutputStream(dataOutputStream);
StreamUtil.closeOutputStream(outputStream);
SocketUtil.closeSocket(socket);
//这里可以使用对象锁:notify() 通知唤醒等待线程重建socket连接; 避免socket服务轮询等待重建连接操作
}
}
3、接收线程
@Override
public void run() {
//线程终止条件: 设置标志位为 true or socket 已关闭
InputStream inputStream = null;
DataInputStream dataInputStream = null;
try {
inputStream = socket.getInputStream();
dataInputStream = new DataInputStream(inputStream);
while (!isStop && !socket.isClosed()) {
SocketMsgDataVo msgDataVo = SocketUtil.readMsgData(dataInputStream);
log.info("客户端收到消息:{}",msgDataVo.toString());
//相对耗时,需要开线程来处理消息,否则影响后续消息接收处理速率
//根据消息类型,进行不同的业务逻辑处理即可
//..............此处不显示具体项目操作逻辑...................
if (msgDataVo.getType() == "某某类型") {
} else {
//其它消息类型不处理
}
}
} catch (IOException e) {
log.error("客户端接收消息发生异常");
e.printStackTrace();
} finally {
this.isStop = true;
log.info("客户端旧接收线程已摧毁");
StreamUtil.closeInputStream(dataInputStream);
StreamUtil.closeInputStream(inputStream);
SocketUtil.closeSocket(socket);
/*if (socket.isClosed()) {
System.out.println("socket.isClosed");
}*/
}
消息发送采用定长方式,避免粘包、读取错乱等情况,工具类封装方法如下:
public static void writeMsgData(DataOutputStream dataOutputStream, SocketMsgDataVo msgDataVo) throws IOException {
byte[] data = msgDataVo.getBody().getBytes();
int len = data.length + SocketUtil.BLANK_SPACE_COUNT;
dataOutputStream.writeByte(msgDataVo.getType());
dataOutputStream.writeInt(len);
dataOutputStream.write(data);
dataOutputStream.flush();
}
public static SocketMsgDataVo readMsgData(DataInputStream dataInputStream) throws IOException {
byte type = dataInputStream.readByte();
int len = dataInputStream.readInt();
byte[] data = new byte[len - SocketUtil.BLANK_SPACE_COUNT];
dataInputStream.readFully(data);
String str = new String(data);
System.out.println("获取的数据类型为:" + type);
System.out.println("获取的数据长度为:" + len);
System.out.println("获取的数据内容为:" + str);
SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
msgDataVo.setType(type);
msgDataVo.setBody(str);
return msgDataVo;
}