shikenn(+) 2022-02-16 19:32 采纳率: 100%
浏览 373
已结题

kafka拉取不到数据

最近有需要用kafka接受消息的项目,学习了一下后仿写了一个消费端拉取数据的代码,但是拉取不到数据,想大家看看到底是什么问题,下面附上代码和日志文件
public static void main(String[] args) {
        //1.创建消费者配置信息
        Properties properties = new Properties();
        //2.给配置信息赋值
        //连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "10.100.81.64:29092, 10.100.81.67:29092, 10.100.81.68:29092, 10.100.81.69:29092");
        //开启自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //自动提交offset延迟
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //Key,Value反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KAFKA_GROUP_hcjlctbb");
        //创建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Arrays.asList("BLINK_DATASERVICE_khqfwxt_HIVE_OCS_ACLINESEGMENT_20220214",
                "BLINK_DATASERVICE_khqfwxt_HIVE_PWOCS_BREAKER_20220214"));
        while (true){
            //获取数据
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
            //解析并打印consumerRecords
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"--"+consumerRecord.value());
            }
        }
    }
没有打印数据,只有运行日志,日志文字量太大,没办法直接贴上来,下面贴出链接,请大家查看

日志文件:链接:https://pan.baidu.com/s/12yGdvppj5LnWTFN2oNqfGA?pwd=6wik 提取码:6wik

各位看看到底是哪里写不对
  • 写回答

4条回答 默认 最新

  • Cry丶 2022-02-24 09:42
    关注

    重新开一个topic,然后先启动consumer,再启动producer,再发消息,你这个可能是consumer已经在broker里有了自己的offset,就会读不到之前producer发送到broker里的msg

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(3条)

报告相同问题?

问题事件

  • 系统已结题 3月15日
  • 已采纳回答 3月7日
  • 创建了问题 2月16日

悬赏问题

  • ¥15 nslt的可用模型,或者其他可以进行推理的现有模型
  • ¥15 arduino上连sim900a实现连接mqtt服务器
  • ¥15 vncviewer7.0安装后如何正确注册License许可证,激活使用
  • ¥15 phython如何实现以下功能?查找同一用户名的消费金额合并2
  • ¥66 关于人体营养与饮食规划的线性规划模型
  • ¥15 基于深度学习的快递面单识别系统
  • ¥15 Multisim仿真设计地铁到站提醒电路
  • ¥15 怎么用一个500W电源给5台60W的电脑供电
  • ¥15 请推荐一个轻量级规则引擎,配合流程引擎使用,规则引擎负责判断出符合规则的流程引擎模板id
  • ¥15 Excel表只有年月怎么计算年龄