kafka2.2.0 通过consumer java api实现消费者,KafkaStream打印不出来数据
package kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumerTest extends Thread {
//在linux环境运行正常
@Override
public void run() {
// TODO Auto-generated method stub
String topic="powerTopic";
Properties pro=new Properties();
pro.put("zookeeper.connect", "10.2.2.61:2181,10.2.2.62:2181,10.2.2.63:2181");
pro.put("group.id", "test");
// pro.put("zookeeper.session.timeout.ms", "4000");
// pro.put("consumer.timeout.ms", "-1");
ConsumerConfig paramConsumerConfig=new ConsumerConfig(pro);
ConsumerConnector cosumerConnector=Consumer.createJavaConsumerConnector(paramConsumerConfig);
Map<String, Integer> paramMap=new HashMap<String, Integer>();
paramMap.put(topic,1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream=cosumerConnector.createMessageStreams(paramMap);
KafkaStream<byte[], byte[]> kafkastream=messageStream.get(topic).get(0);
// System.out.println(kafkastream.size());
System.out.println("hello");
ConsumerIterator<byte[], byte[]> iterator=kafkastream.iterator();
while(iterator.hasNext()){
// MessageAndMetadata<byte[], byte[]> message=iterator.next();
// String topic1=message.topic();
String msg=new String(iterator.next().message());
System.out.println(msg);
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
new KafkaConsumerTest().start();
new MyProducer01().start();
}
}