shikenn(+) 2022-02-16 19:32 采纳率: 100%
浏览 372
已结题

kafka拉取不到数据

最近有需要用kafka接受消息的项目,学习了一下后仿写了一个消费端拉取数据的代码,但是拉取不到数据,想大家看看到底是什么问题,下面附上代码和日志文件
public static void main(String[] args) {
        //1.创建消费者配置信息
        Properties properties = new Properties();
        //2.给配置信息赋值
        //连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "10.100.81.64:29092, 10.100.81.67:29092, 10.100.81.68:29092, 10.100.81.69:29092");
        //开启自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //自动提交offset延迟
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //Key,Value反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KAFKA_GROUP_hcjlctbb");
        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Arrays.asList("BLINK_DATASERVICE_khqfwxt_HIVE_OCS_ACLINESEGMENT_20220214",
                "BLINK_DATASERVICE_khqfwxt_HIVE_PWOCS_BREAKER_20220214"));
        while (true){
            //获取数据
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
            //解析并打印consumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"--"+consumerRecord.value());
            }
        }
    }
没有打印数据,只有运行日志,日志文字量太大,没办法直接贴上来,下面贴出链接,请大家查看

日志文件:链接:https://pan.baidu.com/s/12yGdvppj5LnWTFN2oNqfGA?pwd=6wik 提取码:6wik

各位看看到底是哪里写不对
  • 写回答

4条回答 默认 最新

  • Cry丶 2022-02-24 09:42
    关注

    重新开一个topic,然后先启动consumer,再启动producer,再发消息,你这个可能是consumer已经在broker里有了自己的offset,就会读不到之前producer发送到broker里的msg

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(3条)

报告相同问题?

问题事件

  • 系统已结题 3月15日
  • 已采纳回答 3月7日
  • 创建了问题 2月16日

悬赏问题

  • ¥30 求一段fortran代码用IVF编译运行的结果
  • ¥15 深度学习根据CNN网络模型,搭建BP模型并训练MNIST数据集
  • ¥15 lammps拉伸应力应变曲线分析
  • ¥15 C++ 头文件/宏冲突问题解决
  • ¥15 用comsol模拟大气湍流通过底部加热(温度不同)的腔体
  • ¥50 安卓adb backup备份子用户应用数据失败
  • ¥20 有人能用聚类分析帮我分析一下文本内容嘛
  • ¥15 请问Lammps做复合材料拉伸模拟,应力应变曲线问题
  • ¥30 python代码,帮调试,帮帮忙吧
  • ¥15 #MATLAB仿真#车辆换道路径规划