kraida 2024-05-29 11:13 采纳率: 57.1%
浏览 5
已结题

kafka消费者组异常

程序启动时通过

consumer = kafkaSecondFactory.createConsumer(“testTopic”,null);

consumer.subscribe(Collections.singleton(“test111”));

在一个消费者组内订阅消费者

但是看kafka的展示却是:

Consumer group 'testTopic' has no active members.

怎么解决?

  • 写回答

3条回答 默认 最新

  • 阿里嘎多学长 2024-05-29 11:13
    关注

    以下内容由CHATGPT及阿里嘎多学长共同生成、有用望采纳:


    在 Kafka 中,消费者组(Consumer Group)是一组逻辑上相关联的消费者实例,它们共同消费一个或多个主题(topics)中的消息。如果 Kafka 显示消费者组 “testTopic” 没有活跃成员,这可能意味着消费者实例没有成功加入到消费者组中,或者消费者实例没有正确地发送心跳以维持其在消费者组中的活跃状态。

    以下是一些可能的解决方案和建议:

    1. 检查消费者配置:确保消费者实例的配置正确,特别是 group.id 配置项,它应该与消费者组的名称一致。如果 group.id 配置错误,消费者将无法加入正确的消费者组。

    2. 检查消费者启动逻辑:确保在调用 consumer.subscribe() 方法之后,有逻辑来调用 consumer.poll() 方法,因为 Kafka 消费者通过 poll() 方法来接收消息,并且 poll() 方法也会发送心跳到 Kafka 集群以维持活跃状态。

    3. 检查网络和权限:确保消费者实例能够访问 Kafka 集群,并且有足够的权限来订阅主题和加入消费者组。

    4. 检查消费者组状态:使用 Kafka 提供的命令行工具 kafka-consumer-groups.sh 来检查消费者组的状态。例如,可以使用以下命令来查看消费者组的详细信息:

      kafka-consumer-groups.sh --bootstrap-server <kafka-broker>:<port> --describe --group testTopic
      
    5. 检查消费者的日志:查看消费者实例的日志,可能会有关于为什么消费者没有成为活跃成员的错误或警告信息。

    6. 检查 Kafka 集群状态:确保 Kafka 集群运行正常,没有遇到任何故障或性能问题。

    7. 代码实现:以下是一个简单的 Kafka 消费者示例代码,它订阅了一个主题并持续轮询消息:

      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.util.Collections;
      import java.util.Properties;
      
      public class SimpleConsumer {
          public static void main(String[] args) {
              Properties props = new Properties();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<kafka-broker>:<port>");
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              props.put(ConsumerConfig.GROUP_ID_CONFIG, "testTopic");
              props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
              consumer.subscribe(Collections.singletonList("test111"));
      
              while (true) {
                  ConsumerRecords<String, String> records = consumer.poll(100);
                  for (ConsumerRecord<String, String> record : records) {
                      // 处理记录
                  }
              }
          }
      }
      

    请注意,上述代码是一个简单的示例,实际使用时需要根据你的具体需求进行调整。

    关于参考资料,以下是一些可能对你有帮助的链接:

    希望这些信息能帮助你解决消费者组没有活跃成员的问题。如果问题仍然存在,建议进一步检查 Kafka 集群和消费者实例的配置和状态。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(2条)

报告相同问题?

问题事件

  • 系统已结题 6月7日
  • 已采纳回答 5月30日
  • 创建了问题 5月29日

悬赏问题

  • ¥15 35114 SVAC视频验签的问题
  • ¥15 impedancepy
  • ¥15 在虚拟机环境下完成以下,要求截图!
  • ¥15 求往届大挑得奖作品(ppt…)
  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见