最近有需要用kafka接受消息的项目,学习了一下后仿写了一个消费端拉取数据的代码,但是拉取不到数据,想大家看看到底是什么问题,下面附上代码和日志文件
public static void main(String[] args) {
//1.创建消费者配置信息
Properties properties = new Properties();
//2.给配置信息赋值
//连接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"10.100.81.64:29092, 10.100.81.67:29092, 10.100.81.68:29092, 10.100.81.69:29092");
//开启自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交offset延迟
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//Key,Value反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KAFKA_GROUP_hcjlctbb");
//创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Arrays.asList("BLINK_DATASERVICE_khqfwxt_HIVE_OCS_ACLINESEGMENT_20220214",
"BLINK_DATASERVICE_khqfwxt_HIVE_PWOCS_BREAKER_20220214"));
while (true){
//获取数据
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
//解析并打印consumerRecords
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key()+"--"+consumerRecord.value());
}
}
}
没有打印数据,只有运行日志,日志文字量太大,没办法直接贴上来,下面贴出链接,请大家查看
日志文件:链接:https://pan.baidu.com/s/12yGdvppj5LnWTFN2oNqfGA?pwd=6wik 提取码:6wik