放逐N_K 2023-12-27 11:11 采纳率: 0%
浏览 6
已结题

java socket的实现

技术选型与实现

控制器20070001这个就下图片中的设备,需要在页面中点击“下载报警器档案协议”向控制器发送对应的数据,这个控制器是要保持socket长连接的,实现的话,是只需要我写服务端,还是服务端与客户端都写。本人tcp小bai一个

img

img

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-12-27 14:03
    关注

    【相关推荐】



    • 帮你找了个相似的问题, 你可以看下: https://ask.csdn.net/questions/737065
    • 我还给你找了一篇非常好的博客,你可以看看是否有帮助,链接:[Socket编程] 封装TCP实现一个socket类,实现客户端与服务端间的tcp通信
    • 除此之外, 这篇博客: java socket网络编程之一对一长连接通讯(包括心跳检测、掉线重连)中的 实现 部分也许能够解决你的问题, 你可以仔细阅读以下内容或跳转源博客中阅读:

      socker服务端代码:

      1、开启socket服务

       /**
           * 启动服务
           */
          public void start() {
              Thread socketServiceThread = new Thread(() -> {
                  ServerSocket serverSocket = null;
                  try {
                      //serverSocket = new ServerSocket(8000);
                      serverSocket = new ServerSocket(port);
                      log.info("服务端 socket 在[{}]启动正常", port);
                      //记录已开启的线程,便于管理
                      while (true) {
                          Socket newSocket = serverSocket.accept();
                          //设定输入流读取阻塞超时时间(60秒收不到客户端消息判定断线;与客户端心跳检测结合一起使用的)
                          newSocket.setSoTimeout(60000);  //注意:读取时间等待超时时间,必须比心跳检测消息发送时间大;否则就不断在中断连接的循环之中
                          if (clientSocket != null && !clientSocket.isClosed()) {
                              //如果已有一个连接上客户端且没有关闭,则丢弃新连进来的
                              OutputStream outputStream = newSocket.getOutputStream();
                              DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
                              SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
                              //开启这个接收线程,纯属关闭资源用
                              ServerRecvThread otherRecvThread = new ServerRecvThread(newSocket);
                              new Thread(otherRecvThread).start();
                              //发送个无用的消息,告知具体情况
                              msgDataVo.setType(SocketMsgTypeEnum.SERVER_NOT_ALLOW.getType());
                              msgDataVo.setBody("from server: one is connected and other is not allowed at present");
                              SocketUtil.writeMsgData(dataOutputStream, msgDataVo);
                              log.warn("one is connected and new is not allowed at present");
                              //继续监听
                              continue;
                          }
                          //1、关闭已开启的线程
                          this.closeOpenedThreads();
                          //2、重建新的socket服务
                          clientSocket = newSocket;
                          ServerRecvThread newRecvThread = new ServerRecvThread(clientSocket);
                          threadMap.put(clientSocket, newRecvThread);
                          new Thread(newRecvThread).start();
                          ServerSendThread newServerSendThread = new ServerSendThread(clientSocket);
                          sendThread = newServerSendThread;
                          new Thread(newServerSendThread).start();
                      }
                  } catch (IOException e) {
                      log.error("socket服务端发生异常");
                      e.printStackTrace();
                      //释放资源
                      //关闭已开启的线程
                      this.closeOpenedThreads();
                      if (serverSocket != null && !serverSocket.isClosed()) {
                          try {
                              serverSocket.close();
                          } catch (IOException e1) {
                              e1.printStackTrace();
                          }
                      }
                  }
              });
              socketServiceThread.setName("socket server main thread");
              socketServiceThread.start();
          }
      
          /**
           *关闭已开启的线程
           */
          private void closeOpenedThreads() {
              if (clientSocket != null) {
                  log.info("删除旧的无效连接及其接收、发送线程");
                  ServerRecvThread oldRecvThread = threadMap.remove(clientSocket);
                  oldRecvThread.setStop(true);
                  sendThread.setStop(true);
                  SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
                  //发送个无用的消息,唤醒线程(可能处于阻塞),以便结束旧的发送线程
                  msgDataVo.setType(SocketMsgTypeEnum.HEART_BEAT.getType());
                  msgDataVo.setBody("from server: null message");
                  sendThread.addMsgToQueue(msgDataVo);
                  log.info("旧的无效连接及其接收、发送线程已回收");
              }
          }

      2、接收线程

       @Override
          public void run() {
              //线程终止条件: 设置标志位为 true or socket 已关闭
              InputStream inputStream = null;
              DataInputStream dataInputStream = null;
              try {
                  inputStream = socket.getInputStream();
                  dataInputStream = new DataInputStream(inputStream);
                  while (!isStop && !socket.isClosed()) {
                      SocketMsgDataVo msgDataVo = SocketUtil.readMsgData(dataInputStream);
                      if (msgDataVo.getType() == SocketMsgTypeEnum.HEART_BEAT.getType()) {
                          //客户端心跳监测不用处理
                          log.info("收到客户端心跳消息");
                      }
                  }
              } catch (IOException e) {
                  log.error("服务端接收消息发生异常");
                  e.printStackTrace();
              } finally {
                  log.info("服务端旧接收线程已摧毁");
                  StreamUtil.closeInputStream(dataInputStream);
                  StreamUtil.closeInputStream(inputStream);
                  SocketUtil.closeSocket(socket);
              }
      
          }

      3、阻塞发送线程

       //阻塞安全队列,设置队列容量,否则为无限大
          private final BlockingQueue<SocketMsgDataVo> msgQueue = new LinkedBlockingQueue<>(100);
      
          private Socket socket;
      
          private volatile boolean isStop = false;
      
          public ServerSendThread(Socket socket) {
              this.socket = socket;
          }
      
          public void addMsgToQueue(SocketMsgDataVo msgDataVo) {
              try {
                  //队列已满,阻塞直到未满放进元素(这里一直阻塞不太行,建议使用offer()来设置入队列超时时间)
                  msgQueue.put(msgDataVo);
              } catch (InterruptedException e) {
                  e.printStackTrace();
              }
          }
      
          @Override
          public void run() {
              OutputStream outputStream = null;
              DataOutputStream dataOutputStream = null;
              try {
                  outputStream = socket.getOutputStream();
                  dataOutputStream = new DataOutputStream(outputStream);
                  while (!this.isStop && !socket.isClosed()) {
                      //队列为空阻塞,直到队列不为空,再取出
                      SocketMsgDataVo msgDataVo = null;
                      try {
                          msgDataVo = msgQueue.take();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      if (msgDataVo != null && msgDataVo.getBody() != null) { //正文内容不能为空,否则不发
                          SocketUtil.writeMsgData(dataOutputStream, msgDataVo);
                          log.info("服务端消息已发送!");
                      }
                  }
              } catch (Exception e) {
                  log.error("服务端消息发送异常");
                  e.printStackTrace();
              } finally {
                  log.info("服务端旧消息发送线程已摧毁");
                  //释放资源
                  msgQueue.clear();
                  StreamUtil.closeOutputStream(dataOutputStream);
                  StreamUtil.closeOutputStream(outputStream);
                  SocketUtil.closeSocket(socket);
              }
          }

      socker客户端代码:

      1、开启客户端socket服务

       /**
           * 启动服务
           */
          public void start(){
              Thread socketServiceThread = new Thread(() -> {
                  while (true) {
                      try {
                          //尝试重新建立连接
                          //socket = SocketUtil.createClientSocket("127.0.0.1", 9999);
                          socket = SocketUtil.createClientSocket(host, port);
                          log.info("客户端 socket 在[{}]连接正常", port);
                          ClientRecvThread recvThread = new ClientRecvThread(socket, mongoUpdateRealTimeInterfaceDataUrl);
                          new Thread(recvThread).start();
                          ClientHeartBeatThread heartBeatThread = new ClientHeartBeatThread(socket);
                          new Thread(heartBeatThread).start();
                          //1、连接成功后,心跳异常检测、随后重连
                          //可以不用轮询,避免减少线程频繁切换,花费更多的cpu资源:先wait()阻塞线程,使用心跳线程finally操作中notify()唤醒重建连接
                          while (!heartBeatThread.isStop()) {
                              //进行空循环, 掉线休眠,防止损耗过大, 随即重连
                              try {
                                  Thread.sleep(ClientSocketService.THREAD_SLEEP_MILLS);
                              } catch (InterruptedException e1) {
                                  e1.printStackTrace();
                              }
                          }
                          //recvThread.setStop(true);
                          //旧的、接收线程、心跳线程摧毁,准备重建连接、接收线程、心跳线程
                      } catch (IOException e) {
                          log.error("socket客户端进行连接发生异常");
                          e.printStackTrace();
                          //2、第一次启动时连接异常发生,休眠, 重建连接
                          try {
                              Thread.sleep(ClientSocketService.THREAD_SLEEP_MILLS);
                          } catch (InterruptedException e1) {
                              e1.printStackTrace();
                          }
                      }
                  }
              });
              socketServiceThread.setName("socket client main thread");
              socketServiceThread.start();
          }

      2、心跳线程

      @Override
          public void run() {
              OutputStream outputStream = null;
              DataOutputStream dataOutputStream = null;
              try {
                  outputStream = socket.getOutputStream();
                  dataOutputStream = new DataOutputStream(outputStream);
                  //客户端心跳检测
                  while (!this.isStop && !socket.isClosed()) {
                      SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
                      msgDataVo.setType(SocketMsgTypeEnum.HEART_BEAT.getType());
                      msgDataVo.setBody("from client:Is connect ok ?");
                      if (msgDataVo != null && msgDataVo.getBody() != null) { //正文内容不能为空,否则不发)
                          SocketUtil.writeMsgData(dataOutputStream, msgDataVo);
                      }
                      try {
                          Thread.sleep(ClientHeartBeatThread.CHECK_MILLS);
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              } catch (Exception e) {
                  log.error("客户端心跳消息发送异常");
                  e.printStackTrace();
              } finally {
                  this.isStop = true;
                  log.info("客户端旧心跳线程已摧毁");
                  StreamUtil.closeOutputStream(dataOutputStream);
                  StreamUtil.closeOutputStream(outputStream);
                  SocketUtil.closeSocket(socket);
      
                  //这里可以使用对象锁:notify() 通知唤醒等待线程重建socket连接; 避免socket服务轮询等待重建连接操作
              }
          }

      3、接收线程

      @Override
          public void run() {
              //线程终止条件: 设置标志位为 true or socket 已关闭
              InputStream inputStream = null;
              DataInputStream dataInputStream = null;
              try {
                  inputStream = socket.getInputStream();
                  dataInputStream = new DataInputStream(inputStream);
                  while (!isStop && !socket.isClosed()) {
                      SocketMsgDataVo msgDataVo = SocketUtil.readMsgData(dataInputStream);
                      log.info("客户端收到消息:{}",msgDataVo.toString());
                      //相对耗时,需要开线程来处理消息,否则影响后续消息接收处理速率
                      //根据消息类型,进行不同的业务逻辑处理即可
                      //..............此处不显示具体项目操作逻辑...................
                      if (msgDataVo.getType() == "某某类型") {
                          
                      } else {
                          //其它消息类型不处理
                      }
                  }
              } catch (IOException e) {
                  log.error("客户端接收消息发生异常");
                  e.printStackTrace();
              } finally {
                  this.isStop = true;
                  log.info("客户端旧接收线程已摧毁");
                  StreamUtil.closeInputStream(dataInputStream);
                  StreamUtil.closeInputStream(inputStream);
                  SocketUtil.closeSocket(socket);
                  /*if (socket.isClosed()) {
                      System.out.println("socket.isClosed");
                  }*/
              }

      消息发送采用定长方式,避免粘包、读取错乱等情况,工具类封装方法如下:

      public static void writeMsgData(DataOutputStream dataOutputStream, SocketMsgDataVo msgDataVo) throws IOException {
              byte[] data = msgDataVo.getBody().getBytes();
              int len = data.length + SocketUtil.BLANK_SPACE_COUNT;
              dataOutputStream.writeByte(msgDataVo.getType());
              dataOutputStream.writeInt(len);
              dataOutputStream.write(data);
              dataOutputStream.flush();
          }
      
          public static SocketMsgDataVo readMsgData(DataInputStream dataInputStream) throws IOException {
              byte type = dataInputStream.readByte();
              int len = dataInputStream.readInt();
              byte[] data = new byte[len - SocketUtil.BLANK_SPACE_COUNT];
              dataInputStream.readFully(data);
              String str = new String(data);
              System.out.println("获取的数据类型为:" + type);
              System.out.println("获取的数据长度为:" + len);
              System.out.println("获取的数据内容为:" + str);
              SocketMsgDataVo msgDataVo = new SocketMsgDataVo();
              msgDataVo.setType(type);
              msgDataVo.setBody(str);
              return msgDataVo;
          }

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 1月9日
  • 创建了问题 12月27日

悬赏问题

  • ¥15 DS18B20内部ADC模数转换器
  • ¥15 做个有关计算的小程序
  • ¥15 MPI读取tif文件无法正常给各进程分配路径
  • ¥15 如何用MATLAB实现以下三个公式(有相互嵌套)
  • ¥30 关于#算法#的问题:运用EViews第九版本进行一系列计量经济学的时间数列数据回归分析预测问题 求各位帮我解答一下
  • ¥15 setInterval 页面闪烁,怎么解决
  • ¥15 如何让企业微信机器人实现消息汇总整合
  • ¥50 关于#ui#的问题:做yolov8的ui界面出现的问题
  • ¥15 如何用Python爬取各高校教师公开的教育和工作经历
  • ¥15 TLE9879QXA40 电机驱动