代码写到35岁 2024-03-14 20:46 采纳率: 9.1%
浏览 13

C# 消费kafka的性能问题

我的kafka配置了同一个topic和同一个group,然后开启10个进程去消费,但是我看执行情况,当启动10个进程时,实际最多同时执行的就3、4个,后面随着整体累计消息的变少,同时执行的进程就下降到1 、2个了,这时什么原因?我想开启10个进程就10进程同时进行消费。

      var cuResult = customer.Consume(cts.Token);
      Console.WriteLine("========================此进程抢到消息===================");
      if (cuResult != null)
      {
          Console.WriteLine("消费结果:" + JsonConvert.SerializeObject(cuResult));
          LogHelper.SetInfoLog("消费结果:" + JsonConvert.SerializeObject(cuResult));
          CreateCadFileInput createCadFileInput = JsonConvert.DeserializeObject<CreateCadFileInput>(cuResult.Value);
          if (createCadFileInput == null)
          {
              LogHelper.SetErrorLog($"执行消息返回空结果:{cuResult.Message}");
              // 没有返回结构,需要将这个消息消费掉,防止死循环.
              customer.Commit(cuResult);
              continue;
          }

          // 注入对象为空 异常 日志
          if (_commonService == null)
          {
              LogHelper.SetErrorLog("ICommonService 注入失败");
              continue;
          }

          // 注入对象为空 异常 日志
          if (_cadAndOssService == null)
          {
              LogHelper.SetErrorLog("ICadAndOssService 注入失败");
              continue;
          }

          // 注入对象为空 异常 日志
          if (createCadFileInput.RequestId == null)
          {
              LogHelper.SetErrorLog("createCadFileInput.RequestId 值为Null");
              continue;
          }

          // 更新Redis
          //string redisKey = _commonService.GetLocalServerRedisKey(createCadFileInput.RequestId);
          string redisKey = Constants.RedisRequestKey; // redis key值-不区分 服务器
          var editRedisRequestInput = new EditRedisRequestInput();
          editRedisRequestInput.IsIntoKafka = true;
          editRedisRequestInput.RequestId = createCadFileInput.RequestId;
          editRedisRequestInput.UserId = createCadFileInput.UserId;
          editRedisRequestInput.UserName = createCadFileInput.UserName;
          editRedisRequestInput.RedisKey = redisKey;
          editRedisRequestInput.RequestTime = createCadFileInput.RequestTime;
          createCadFileInput.IsIntoKafka = true; // 打上进入kafka的标记
          var createResult = await _cadAndOssService.CreateCadAndUpLoadOSSAsync(createCadFileInput);
          // 执行成功
          if (createResult.Code == ResultCodeEnum.Success)
          {
              // 然后再更新正在处理的Redis 请求数据
              _commonService.DeleteRedisAddRequest(editRedisRequestInput);

              // 然后提交Kafka消息的状态
              customer.Commit(cuResult);
              Console.WriteLine($"Kafka消费任务创建CAD文件成功;请求id:{editRedisRequestInput.RequestId}");
              LogHelper.SetInfoLog($"Kafka消费任务创建CAD文件成功;请求id:{editRedisRequestInput.RequestId}");
          }
          else
          {
              Console.WriteLine($"Kafka消费任务创建CAD文件失败:{createResult.Message}");

              // 执行失败,也提交执行的消息,并记录
              LogHelper.SetErrorLog($"Kafka消费任务创建CAD文件失败:{createResult.Message}");

              // 然后提交Kafka消息的状态
              customer.Commit(cuResult);
          }

  • 写回答

5条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-03-14 20:47
    关注

    引自免费微信小程序:皆我百晓生

    在C#中使用多个进程消费Kafka中的同一个topic和group时,出现实际并行消费的进程数少于预期的现象,这通常是由于Kafka消费者组内部的分发机制导致的。Kafka为每个消费者组维护了一个“分区”到“消费者”的映射关系,以保证消息的有序性和负载均衡。

    1. 分区数量:首先确认你的topic有多少个分区,因为一个分区在同一时间只能被消费者组内的一个消费者消费。如果你的topic分区数小于10,那么最多就只有那么多的进程能同时消费。

    2. 消费组协调器:Kafka的消费者协调器会控制消费者的分配,确保每个分区只被一个消费者消费。即使你有10个消费者(进程),如果分区数不足,也无法实现10个进程完全并行消费。

    3. 消费者配置:检查消费者的配置项ConsumerConfig.MaxPollRecordsConsumerConfig.SessionTimeoutMs等是否合理。这些参数会影响消费者拉取消息的频率和对消费者活跃状态的判断。

    4. 性能瓶颈:另外,你的消费代码中包含了Redis操作、服务调用(如 _cadAndOssService.CreateCadAndUpLoadOSSAsync)以及日志记录等异步IO操作,这些可能成为性能瓶颈,使得单个进程无法及时处理完从Kafka拉取的消息,进而影响了其他进程的消费速度。

    解决方案:

    • 确保topic有足够的分区数,可以考虑增加分区以支持更多的并发消费者。
    • 调整消费者配置,允许更大的并行度,比如设置合理的MaxPollRecords值,让每个消费者能够一次性获取更多消息进行处理。
    • 优化消费逻辑,减少同步阻塞操作,提高处理效率,确保消息消费的实时性。
    • 如果需要严格保证所有进程都持续有消息消费,可以考虑使用线程池或其他并发模型,在单个进程中创建多个消费者实例,并通过Confluent.Kafka.Consumer.Assign(TopicPartitionOffset[])方法手动指定它们各自订阅不同的分区。但请注意,这种做法可能会破坏Kafka默认提供的负载均衡和故障转移功能。
    // 假设 partitions 是从 Kafka 获取的所有目标 topic 的分区列表
    var tasks = new List<Task>();
    foreach (var partition in partitions)
    {
        var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
        consumer.Assign(new TopicPartition(partition.Topic, partition.Partition));
        
        // 创建并启动一个任务用于消费该分区的消息
        tasks.Add(Task.Run(async () =>
        {
            while (true)
            {
                var consumeResult = consumer.Consume(cts.Token);
                // ... 消费逻辑 ...
            }
        }));
    }
    
    await Task.WhenAll(tasks);
    

    请根据实际情况调整上述代码片段。不过需要注意的是,直接手动分配分区的方式仅适用于特定场景,通常情况下应该依赖Kafka的内置消费者组协调机制来自动分配分区。

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 3月14日

悬赏问题

  • ¥15 Java与Hbase相关问题
  • ¥20 bash代码推送不上去 git fetch origin master #失败了
  • ¥15 LOL外服加入了反作弊系统,现在游戏录像rofl文件离线都无法打开
  • ¥45 工程软件单片机设计课题要求
  • ¥15 在centos7安装conda
  • ¥15 c#调用yolo3 dll文件获取的数据对不上
  • ¥20 WPF 如何实现多语言,label 和cs(live Charts)中是否都能翻译
  • ¥15 STM32F103上电短路问题
  • ¥15 打开软件提示错误:failed to get wglChoosePixelFormatARB
  • ¥15 (标签-python|关键词-char)