lookingforwardToHope 2019-12-20 10:18 采纳率: 0%
浏览 727

kafka通过consumer java api实现消费者,KafkaStream打印不出来数据

kafka2.2.0 通过consumer java api实现消费者,KafkaStream打印不出来数据

package kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumerTest extends Thread {

    //在linux环境运行正常
    @Override
    public void run() {
        // TODO Auto-generated method stub
        String topic="powerTopic";
        Properties pro=new Properties();
        pro.put("zookeeper.connect", "10.2.2.61:2181,10.2.2.62:2181,10.2.2.63:2181");
        pro.put("group.id", "test");
//      pro.put("zookeeper.session.timeout.ms", "4000");
//      pro.put("consumer.timeout.ms", "-1");
        ConsumerConfig paramConsumerConfig=new ConsumerConfig(pro);
        ConsumerConnector cosumerConnector=Consumer.createJavaConsumerConnector(paramConsumerConfig);
        Map<String, Integer> paramMap=new HashMap<String, Integer>();
        paramMap.put(topic,1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream=cosumerConnector.createMessageStreams(paramMap);
        KafkaStream<byte[], byte[]> kafkastream=messageStream.get(topic).get(0);
//      System.out.println(kafkastream.size());
        System.out.println("hello");
        ConsumerIterator<byte[], byte[]> iterator=kafkastream.iterator();
        while(iterator.hasNext()){
//          MessageAndMetadata<byte[], byte[]> message=iterator.next();
//          String topic1=message.topic();
            String msg=new String(iterator.next().message());
            System.out.println(msg);
        }
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        new KafkaConsumerTest().start();
        new MyProducer01().start();
    }

}

kafka环境在centos操作系统,在windows系统的eclipse运行程序,打印不出来数据,也不结束报错:
图片说明

打包后在集群环境运行结果:
图片说明

  • 写回答

1条回答

  • shandongwill 大数据领域新星创作者 2024-02-05 13:38
    关注

    你生产端的消息是什么内容,感觉没毛病应该是kafka中的消息就是数字 和hello +数字

    评论

报告相同问题?

悬赏问题

  • ¥15 对于相关问题的求解与代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作
  • ¥15 求NPF226060磁芯的详细资料
  • ¥15 使用R语言marginaleffects包进行边际效应图绘制
  • ¥20 usb设备兼容性问题
  • ¥15 错误(10048): “调用exui内部功能”库命令的参数“参数4”不能接受空数据。怎么解决啊
  • ¥15 安装svn网络有问题怎么办