lookingforwardToHope 2019-12-20 10:18 采纳率: 0%
浏览 728

kafka通过consumer java api实现消费者,KafkaStream打印不出来数据

kafka2.2.0 通过consumer java api实现消费者,KafkaStream打印不出来数据

package kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumerTest extends Thread {

    //在linux环境运行正常
    @Override
    public void run() {
        // TODO Auto-generated method stub
        String topic="powerTopic";
        Properties pro=new Properties();
        pro.put("zookeeper.connect", "10.2.2.61:2181,10.2.2.62:2181,10.2.2.63:2181");
        pro.put("group.id", "test");
//      pro.put("zookeeper.session.timeout.ms", "4000");
//      pro.put("consumer.timeout.ms", "-1");
        ConsumerConfig paramConsumerConfig=new ConsumerConfig(pro);
        ConsumerConnector cosumerConnector=Consumer.createJavaConsumerConnector(paramConsumerConfig);
        Map<String, Integer> paramMap=new HashMap<String, Integer>();
        paramMap.put(topic,1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream=cosumerConnector.createMessageStreams(paramMap);
        KafkaStream<byte[], byte[]> kafkastream=messageStream.get(topic).get(0);
//      System.out.println(kafkastream.size());
        System.out.println("hello");
        ConsumerIterator<byte[], byte[]> iterator=kafkastream.iterator();
        while(iterator.hasNext()){
//          MessageAndMetadata<byte[], byte[]> message=iterator.next();
//          String topic1=message.topic();
            String msg=new String(iterator.next().message());
            System.out.println(msg);
        }
    }

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        new KafkaConsumerTest().start();
        new MyProducer01().start();
    }

}

kafka环境在centos操作系统,在windows系统的eclipse运行程序,打印不出来数据,也不结束报错:
图片说明

打包后在集群环境运行结果:
图片说明

  • 写回答

1条回答 默认 最新

  • shandongwill 新星创作者: 大数据技术领域 2024-02-05 13:38
    关注

    你生产端的消息是什么内容,感觉没毛病应该是kafka中的消息就是数字 和hello +数字

    评论

报告相同问题?