luofangfu 2021-11-18 11:15 采纳率: 0%
浏览 48

使用jeromq的rep/req多线程高频1M以上数据发送运行时间一周以上出现收不到服务端回复

问题1:使用jeromq的rep/req,一个服务(发送)同时向多个服务(接收)高频发送数据。为了提升各效率在我们把各个服务的zmq.socket都保存起来,分别使用线程发送数据。在一个接收方挂掉或者断开后。重连次数过多后,Jeromq就会出现emfile的问题,原因经过我们排查是由于socket在断开后再此发送时,重新建联,当建联数量超过1024个数量后(this.emptySlots.isEmpty()),再次建联就会报这个错。
我们现在的解决方案有两个
方案一:在创建是获取这个异常,然后在catch里面重新创建上下文,这里在关闭原上下文时(context.term())会一直卡死在这。

try {
    socket = context.socket(ZMQ.REQ);
}catch (IllegalStateException e){//this.emptySlots.isEmpty()
    context.term();
    if(context.isClosed()||context==null)
        context = ZMQ.context(1);
}

方案二:在断开后发送数据失败时,将原来的连接杀死;测试环境下没有发现问题,但是在生产环境下长时间运行依然会出现emfile报错。
socket.disconnect(addr);
socket.close();
socket.setLinger(0);

问题2:同样使用jeromq的rep/req,一个服务(发送)同时向多个服务(接收)高频发送数据。为了提升各效率在我们把各个服务的zmq.socket都保存起来,分别使用线程发送数据。在连续发送7天以上的数据后出现一个发送端向多个接收端高频发送消息时,出现发送给其中几个服务端出现收不到回复,重启接收端程序能够正常运行成功,运行7天以上后又会重复出现该情况。

TransferSocket socket = (TransferSocket)ConnectionSocket.SOCKETS.get(ip);
      try
      {
        ZMsg recv = socket.send(zFrames);
        if (recv == null) {
          logger.error("传输失败,未收到" + ip + "回复");
          destory(ip);
          statusCode = 1;
        } else {
          logger.info("向[" + ip + "]发送数据成功");
          try {
            errorMessage = new String(base64.decode(recv.toArray()[4].toString()), "UTF-8");
            logger.info("收到[" + ip + "]的响应:" + errorMessage);
          } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
          }
          statusCode = 0;
          recv.destroy();
        }
      } catch (Exception e) {
        statusCode = 1;
        errorMessage = "传输异常";
        e.printStackTrace();
      }

可能出现原因分析:

  1. 短暂的出现收不到回复,销毁ZMQ.Socket时,发送端Socket连接未断开,导致无论怎么新建socket都收不到回复。
  2. 出现一次收不到回复后,销毁了ZMQ.Socket,销毁后新建ZQM.Socket 未连接成功,所以出现收不到回复后一直不能恢复状态。
    未找到合理的解决方案。
  • 写回答

1条回答 默认 最新

  • qq_27910291 2021-11-19 17:13
    关注

    Mark

    评论

报告相同问题?

问题事件

  • 创建了问题 11月18日

悬赏问题

  • ¥20 完全没有学习过GAN,看了CSDN的一篇文章,里面有代码但是完全不知道如何操作
  • ¥15 使用ue5插件narrative时如何切换关卡也保存叙事任务记录
  • ¥20 软件测试决策法疑问求解答
  • ¥15 win11 23H2删除推荐的项目,支持注册表等
  • ¥15 matlab 用yalmip搭建模型,cplex求解,线性化处理的方法
  • ¥15 qt6.6.3 基于百度云的语音识别 不会改
  • ¥15 关于#目标检测#的问题:大概就是类似后台自动检测某下架商品的库存,在他监测到该商品上架并且可以购买的瞬间点击立即购买下单
  • ¥15 神经网络怎么把隐含层变量融合到损失函数中?
  • ¥15 lingo18勾选global solver求解使用的算法
  • ¥15 全部备份安卓app数据包括密码,可以复制到另一手机上运行