stevenjin 2024-05-14 14:55 采纳率: 97.3%
浏览 11
已结题

mqtt.net自身断开没有跳到UseDisconnectedHandler中

1.最近Mqtt客户端无数据推送,过了数小时才恢复。经查,是没有接收到MQTT数据的原因,在UseApplicationMessageReceivedHandler里没有日志输出。
2.查看服务器日志,有打印客户端断开日志;但mqtt客户端并没有在UseDisconnectedHandler中去打印日志。
为什么客户端没有打印断开日志呢?客户端似乎自动连接恢复了,这段时间发生了什么,没有接收消息呢?
另:由于另一台机也连接了mqtt服务器,此时有数据,可以排除服务器的原因

  • 写回答

2条回答 默认 最新

  • wanghui0380 2024-05-14 15:21
    关注

    客户端似乎自动连接恢复了,这段时间发生了什么,没有接收消息呢?

    那么这段时间客户端的重连日志呢?? 你说没有进入UseDisconnectedHandler,那么他最终是怎么重连上的?是,我们知道tcp默认保活大概2小时,但是mqtt协议自己写了心跳,没有心跳早断了。

    mqttClientOptionsBuilder.WithKeepAlivePeriod(TimeSpan.FromSeconds(30));//这里就是mqttnet自己的心跳时长

    所以我随便贴一个代码,我不保证能运行,只是在现有代码里随便找的代码段。
    环境:net8+mqttnet(mqttclient)+polly

    /

    /定义polly 重试策略,同时记录相关日志,日志等级debug,因为重试策略日志可能过大,所以置为debug,你可以根据需要调整成warn
      ResiliencePipelineBuilder pipelineBuilder = new ResiliencePipelineBuilder();
      pipelineBuilder.AddRetry(new RetryStrategyOptions()
      {
          MaxRetryAttempts = Int32.MaxValue, //永远重试
          MaxDelay = TimeSpan.FromMinutes(1), //重试最大间隔时间2分钟
          BackoffType = DelayBackoffType.Exponential, //指数退避重试
          Delay = TimeSpan.FromSeconds(2), //最小间隔1秒
          OnRetry = oc =>
          {
              
              _logger.LogDebug($"重试记录:次数{oc.AttemptNumber},上次超时时间{oc.Duration},下次重试延迟{oc.RetryDelay}");
              return default;
          }
      }) //超时重试策略
          .AddCircuitBreaker(new CircuitBreakerStrategyOptions()
          {
              FailureRatio = 1.0,
              SamplingDuration = TimeSpan.FromSeconds(120),//2分钟连续失败6次,进入熔断
              MinimumThroughput =5 ,
              BreakDuration = TimeSpan.FromMinutes(2), //熔断探测间隔2分钟,
              OnClosed = oc =>
              {
                  _logger.LogDebug("关闭熔断,允许请求进入");
                  return default;
              },
              OnOpened = oc =>
              {
                  _logger.LogDebug("触发熔断,屏蔽请求进入");
                  return default;
              },
              OnHalfOpened = oc =>
              {
                  _logger.LogDebug("半开熔断,尝试恢复请求");
                  return default;
              }
          });//熔断器策略
    
    
      this.pollyPipeline = pipelineBuilder.Build();
    
    
    

    连接部分

    private System.Threading.SemaphoreSlim slim = new SemaphoreSlim(1);
    private async Task connMqttclient(CancellationToken stoptoken)
    {
        if (slim.CurrentCount == 0) return;
        await slim.WaitAsync();
        while (!stoptoken.IsCancellationRequested)
        {
            try
            {
                await pollyPipeline.ExecuteAsync(async token =>
                {
                    _logger.LogInformation("准备连接远程mqtt服务");
                    await this.mqttClient.ConnectAsync(this.mqttClientOptions, cancellationToken);
                    _logger.LogInformation("远程mqtt服务成功");
                }, stoptoken);
                break;
            }
            catch (Exception e)
            {
                await Task.Delay(1000, stoptoken);
            }
        }
    
        slim.Release();
    }
    
    

    mqttclient初始部分

    MqttClientOptionsBuilder mqttClientOptionsBuilder = new MqttClientOptionsBuilder();
    
      var uri =CommonConfig.RemoteMqttServerUri;
      mqttClientOptionsBuilder.WithTcpServer(uri.Host, uri.Port);
      if (!string.IsNullOrEmpty(uri.UserInfo))
      {
          string username = "";
          string password = "";
          string[] info = uri.UserInfo.Split(':');
          username = info[0];
          if (info.Length > 2)
          {
              password = info[1];
          }
    
          mqttClientOptionsBuilder.WithCredentials(username, password);
      }
      mqttClientOptionsBuilder.WithCleanSession(true);
      mqttClientOptionsBuilder.WithKeepAlivePeriod(TimeSpan.FromSeconds(30)); //这个就是mqtt自己心跳保活时长
    
      this.mqttClientOptions = mqttClientOptionsBuilder.Build();
    
      mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
      mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
    
    
    
    

    非预料断开处理

    private async Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
     {
         
         if (!cancellationToken.IsCancellationRequested)
         {
             if (slim.CurrentCount > 0)
             {  _logger.LogInformation(arg.Exception," 远程mqtt服务器已断开");
                 connMqttclient(cancellationToken);
             }
         }
     }
    
    

    你可以看到几乎每一个动作我们都有日志,只是等级你根据自己的项目自己设置

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 5月24日
  • 已采纳回答 5月16日
  • 修改了问题 5月14日
  • 修改了问题 5月14日
  • 展开全部

悬赏问题

  • ¥15 网络分析设施点无法识别
  • ¥15 状态图的并发态问题咨询
  • ¥15 PFC3D,plot
  • ¥15 VAE模型编程报错无法解决
  • ¥100 基于SVM的信息粒化时序回归预测,有偿求解!
  • ¥15 物体组批优化问题-数学建模求解答
  • ¥15 微信原生小程序tabBar编译报错
  • ¥350 麦克风声源定位坐标不准
  • ¥15 apifox与swagger使用
  • ¥15 egg异步请求返回404的问题