kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了
package test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer extends Thread {

private String topic;

public KafkaConsumer(String topic){
    super();
    this.topic=topic;
}

@Override
public void run(){
    //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka
    ConsumerConnector consumer = createConsumer();
    //Map作用指定获取的topic以及partition
    Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
    topicCountMap.put(topic, 3);
    //consumer连接器获取消息
    Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
    //获取对应的topic中的某一个partition中的数据
    KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0);

    ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
    while(iterator.hasNext()){
        byte[] message = iterator.next().message();
        System.out.println("message is:"+new String(message));
    }

}

private ConsumerConnector createConsumer(){
    Properties properties = new Properties();
    properties.put("zookeeper.connect", "XXX:2181");
    properties.put("auto.offset.reset", "smallest");//读取旧数据  
    properties.put("group.id", "333fcdcd");
    return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}

public static void main(String[] args) {
    new KafkaConsumer("testtest").start();
}

}

1
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
Kafka 远程消费者读不到数据
问题描述 &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;服务器上使用脚本测试 producer &amp;amp;amp;amp; consumer 可生产 &amp;amp;amp;amp; 消费信息,但在使用 Java 代码远程作为消费者时,代码却卡在 comsumer.poll(long timeout) 不往下进行。 解决方式 &amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&a
kafka 0.9.0.0重复消费问题解决
背景:之前用的kafka客户端版本是0.8,近期升级了kafka客户端的版本,写了新的消费者和生产者的代码,在本地测试没有问题,可以正常消费与生产。但最近的项目中使用了新版的代码,当数据量较大时会出现重复消费的问题。现将问题的排除与解决过程记录下来,避免再次踩坑。 问题发现:由于ConsumerRecord对象可以获取到当前消息的分区与偏移量,故在log日志中将当前消息的分区与偏移量也记录下来了
kafka用java编写消费者 消费不到数据
这是报的异常: Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.com...
SparkStreaming消费Kafka数据遇到的问题
SparkStreaming消费Kafka数据遇到的问题: 查看ZK:
java版kafka消费者取不到消息
本案例使用的当前kafka版本:kafka_2.12-0.10.2.0   zookeeper版本:zookeeper-3.5.2-alpha 现在kafka的版本更新到0.10.2.0了,老的版本生产者和消费者实现起来有点麻烦,使用新的KafkaProducer、KafkaConsumer简化多了。 在生成和消费时一定要启动zookeeper、kafka服务,不然无法进行生成和消费,具体启
kafka无法消费消息
1:[root@localhost bin]# /usr/local/kafka1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic  test 1111 22222 另外一个session有: [root@localhost bin]#bin/kafka-console-consumer.sh -
Kafka某topic无法消费解决方案&Kafka某Topic数据清理
由于项目原因,最近经常碰到Kafka消息队列某topic在集群宕机重启后无法消费的情况。碰到这种情况,有三步去判断原因所在: step A:如果用kafka串口(即console-consumer)是可以正常消费该topic,则排除kafka集群出现故障 step B:若平台业务能正常消费其他topic的消息,则排除平台业务代码逻辑问题 step C:不到万不得已,则只能手动删除
kafka生产的数据没有消费(已解决)
    程序中往kafka里写入数据,但是消费命令没有把数据录入数据库,可能是下面问题: 1、程序中的topic 和 connector  的topic   和数据库表名称 三者要一致。 2、jps看下,schema  kafka  connector   三个进程都启动。 3、connector 数据库配置确保正确。...
Kafka无法消费!?究竟是bug的“沦陷”还是配置的“扭曲”?
在一个月黑风高的夜晚,突然收到现网生产环境Kafka消息积压的告警,梦中惊醒啊,马上起来排查日志。   问题现象 消费请求卡死在查找Coordinator   Coordinator为何物?Coordinator用于管理Consumer Group中各个成员,负责消费offset位移管理和Consumer Rebalance。Consumer在消费时必须先确认Consumer Group...
kafka问题排查之 Java代码不进行消费
发现问题 使用 kafka 在linux系统,通过命令测试消费正常, 但在Java 代码无法正常接收队列消息 控制台提示信息: 15:21:33.804 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler [&quot;http-nio-8886&quot;] 15:21:33.836 [main...
使用python进行kafka消费时遇到的问题
一、NoBrokersAvailable when using bootstrap_server list 这个主要是api的问题,需要指定一个能用的api,不然不能连上borker   二、执行报错,key一定要对   三、如果第一次没有设置offset reset和auto_commit,一旦消费过这些数据,无法用该groupid消费同一组数据。可以采用不同的group。 ...
kafka无法收到flume采集的数据的解决办法
问题重现 在写黑名单那篇博文的时候,我是通过直接copy log日志文件到监控目录下的方式来模拟数据的,在前几次模拟访问日志文件的时候挺正常的,copy进去基本都是秒采集(文件显示直接加了.COMPLETED后缀)。 但到后来再往采集目录下copy log日志文件的时候,待采集目录下的文件并不会显示被采集(文件没有.COMPLETED后缀),kafka也一直收不到flume采集来的数据。但重启...
spark Streaming从kafka消费数据遇到的问题,求解
问题: INFO JobScheduler: Added jobs for time 消费kafka数据的过程中,有时候会一直出现 INFO JobScheduler: Added jobs for time ,百度说的是设置的单线程,但是在程序里面设置的是local[*],偶然跑了下,发现会有Added jobs for time和数据一起出现的情况,求解~~ ...
Storm消费Kafka异常 - topic其中两个分区达到某个值不进行消费,持续阻塞
Kafka消费storm,突然有两个分区无法消费数据(或重复消费无法提交offset) offset是我们自己进行管理,kafka日志也是正常没有报错,storm日志也是没有报错~ 就是卡住了 1.尝试将partition为0,1的offset记录删除,重新跑一遍,结果还是到那个offset处卡住 2.再次尝试手动修改offset跳过卡住的那一条数据(这种做法会丢失一条数据,在数据不允许...
springboot kafka消费者获取不到信息
本地单机部署kafka,测试时发现生产者可以正常生成,但是消费者不能接收到信息: 我的问题是在kafka有一个配置文件consumer.properties,里面有group-id的配置,默认只有一个test-consume-group,如果不配置的话,只有该组的消费者才能接受消息,所以我测试时无法接受到信息。增加后即可正常消费 继续学习- -!
记用 SpringBoot 消费 Kafka 过程中的一次问题排查
1、现象 最近在接受公司Kafka的集群,之前公司重启一台服务器的时候出现重复消费的情况,所以就跟消费端的服务一起联调查册。消费端目前有两个服务,但是在测试的过程中发现其中一个服务可以接受到数据,但是另一个服务端却接受不到数据。于是查看了一下它的日志,关键日志如下: [INFO] org.apache.kafka.clients.consumer.internals.AbstractCoord...
平台搭建---Kafka使用---Kafka重复消费和丢失数据
来源 1、Kafka保证数据不丢失的原理 1.1、kafka消息的位置 用好Kafka,维护其消息偏移量对于避免消息的重复消费与遗漏消费,确保消息的Exactly-once是至关重要的。 kafka的消息所在的位置Topic、Partitions、Offsets三个因素决定。 Kafka消费者消费的消息位置还与consumer的group.id有关。 consumerOffse
Kafka异常处理(消费者不消费数据)
问题生产问题,OffsetMonitor 监控发现运行了一个月的kafka突然间消费有lag.而且消费端不消费数据分析在客户端写try..catch…捕获异常: 2017-08-27 09:47:48,103 ERROR [com.ecar.eoc.message.platform.kafka.Kafka211Context] - [kafka_Exception———->>org.apache.
kafka踩坑之消费者收不到消息
生产者发送消息,客户端始终消费不到 原因: 客户端版本与服务端不一致 解决: 我这里服务端使用的是:kafka_2.10-0.8.2.1.tgz,客户端原来使用的是0.8.1,需要改为: &amp;lt;dependency&amp;gt; &amp;lt;groupId&amp;gt;org.apache.kafka&amp;lt;/groupId&amp;gt; &amp;lt;artifactId&amp;gt;...
flink消费kafka消息
package testMaven.testMaven; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streami
kafka生产者和消费者端的数据不一致
今天测试遇到了问题,kafka生产者和消费者端的数据不一致,而且数据相差还比较大,测试生产10000条数据 消费者有时候能消费10000条 有时候只能消费9多条,最开始以为是ack等等的设置,之后调整了一些生产者的参数,发现还是有问题,最后还是另外一个同事发现了最根本的问题。kafka生产者是异步生产数据,我写了个测试方法在main函数里面 用for循环模拟发送10000条数据,就是因为这个main
Kafka Java客户端无法消费 又不报错
首先kafka java api有新旧两个版本,区别: 旧版数据是存在zookeeper,配置文件是“zookeeper.connect ” 默认2181端口 新版数据存在topic,配置文件是 “bootstrap.servers” 默认是9092端口 推荐用新版 另外连接虚拟机打主机名需要在win10上配置linux的主机名IP映射才能连接 ...
记一次Kafka不能消费故障
背景: kafka集群机器升级,使得部分spark Streaming不能消费读取数据 问题原因: kafka会自动创建一个默认的topic __consumer_offsets,用于保存offset到Kafka系统 由于我们集群kafka节点有7个,当逐渐的下架上架机器后,使得__consumer_offsets  Partition 出现Leader为-1 Kafka将直连Kaf...
kafka(java客户端)消费者取不到消息,生产者消息也没发送成功
kafka(java客户端)消费者取不到消息,生产者消息也没发送成功先说下我使用的各种版本: kafka版本:kafka_2.12-1.0.0 zookeeper版本:3.4.11 pom中使用的客户端版本:0.11.0.0在Ubuntu虚拟机上安装了kafka,zookeepe之后,没有修改过其中的任何配置文件,kafka默认端口是9029,zookeeper默认端口是2181, 分别运行
Kafka重复消费和丢失数据问题
Kafka重复消费原因 底层根本原因:已经消费了数据,但是offset没提交。 原因1:强行kill线程,导致消费后的数据,offset没有提交。 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用&amp;nbsp;consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重...
Kafka重启Consumer后抓不到数据的问题
使用kafka时,经常会遇到这样的问题: 重启某一消费者程序后,一条数据都抓不到了,但是队列中明明还有很多未被消费的消息,这是怎么回事呢? 先说原因,四个字:重复消费 下面进行详细分析: “消费确认”是所有消息中间件都要解决的一个问题,在kafka中涉及到两个消费位置: (1)当前取消息所在的consume offset; (2)程序处理完毕发送ack(确...
kafka consumer不能消费消息及其处理办法
我这里的Kafka Consumer代码是拷贝网上的,就是开一个线程监听kafka topic,一有消息就处理。开始的代码是这样的: public void kafkaStart() { final String topic = hipchatAction.properties.getProperty("kafka.hipchat.topic"); final i
Spark通过receiver方式消费kafka数据时数据积压问题
Receiver方式消费kafka数据积压问题 1.问题 在通过receiver方式接受kafka消息时,发现有大量消息在队列中阻塞最终导致spark任务执行失败。 经过排查发现,利用receiver方式来消费kafka的数据时可能会因为zk的group是首次创建或者有一段时间未消费,在程序启动的时候一次性读取进来大量的数据导致数据积压严重报错oom或者yarn资源不够而崩溃,可...
配置Logstash消费kafka消息
input{ kafka { topics =&amp;gt; &quot;my-log-topic&quot; //这里要和你Java中logback.xml中的&amp;lt;topic&amp;gt;&amp;lt;/topic&amp;gt;一致 type =&amp;gt; &quot;kafka&quot; bootstrap_servers =&amp;gt; &quot;192.168.80.112:9092,1...
Java API 生产和消费Kafka消息
在我的文章 Kafka集群搭建中,展示了如何创建一个单机版的Kafka服务,在此基础上我们可以利用Java程序来对Kafka服务进行生产和消费消息。1. 创建Maven程序首先在 Intellij IDEA中创建一个maven程序,在pom.xml文件中加入如下的依赖和插件: <dependencies> <dependency> <groupId>o
kafka重复消费问题
问题描述 采用kafka读取消息进行处理时,consumer会重复读取afka队列中的数据。问题原因 kafka的consumer消费数据时首先会从broker里读取一批消息数据进行处理,处理完成后再提交offset。而我们项目中的consumer消费能力比较低,导致取出的一批数据在session.timeout.ms时间内没有处理完成,自动提交offset失败,然后kafka会重新分配part
如何确定Kafka的分区数,key和consumer线程数,以及不消费问题解决
在Kafak中国社区的qq群中,这个问题被提及的比例是相当高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。希望对大家有所帮助。怎么确定分区数?“我应该选择几个分区?”——如果你在Kafka中国社区的群里,这样的问题你会经常碰到的。不过有些遗憾的是,我们似乎并没有很权威的答案能够解答这样的问题。其实这也不奇怪,毕竟这样的问题通常都是没有固定答案的。
kafka主题消费积压问题总结
  故障描述: 12月6日下午运维反馈说,某个主题的一个分区消费积压,由于这个主题非常重要,且已经有用户投诉所以运维很紧张,紧急打印堆栈并Dump堆内存后,就重启了这台机器。   故障分析1: 消费这个主题的集群的业务逻辑相对比较简单,主要就是读取某些主题,然后逻辑判断+DB操作后,分流写入到另外某些主题。运维通过kafka监控平台找到积压的主题,发现主题的某个分区积压了几万消息后,...
Kafka消费者——从 Kafka读取数据
应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 。 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法。如果不先理解 这些概念,就难以理解如何使用消费者 API。所以我们接下来先解释这些重要的概念,然 后再举几个例子,横示如何使用消费者 API 实现不同的应用程序。 消费者和消费者群组 假设我们有一个应用程序需要从-个 ...
kafka无法正常生产消费,但依旧可以创建生产者和消费者
kafka无法正常生产和消费了,具体问题也没有定位,重启了下,解决问题;神奇的是,依旧可以创建生产者和消费者
在集成flink和kafka时遇到的问题
addsource(flinkkafka10出错) 如果kafka版本为0.8或者0.9 导入flink-connector-kafka-0.8_2.11-1.6.1.jar/flink-connector-kafka-0.9_2.11-1.6.1.jar 和flink-connector-kafka-base_2.11-1.6.1.jar 即可。 如果kafka版本为0.10 出去flink...
Python脚本消费kafka数据
kafka简介(摘自百度百科) 一、简介: 详见:https://blog.csdn.net/Beyond_F4/article/details/80310507 二、安装 详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689 三、按照官网的样例,先跑一个应用 1、生产者: from kafka...
Kafka如何解决消息丢失问题,消费重复问题
Kafka简介:        Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。                                                           ...
kafka消费者客户端启动之后消费不到消息的原因分析
如果你发现你的一个消费者客户端A已经启动了,但是就是不消费消息,此时你应该检查一下该消费者所在的组中(ConsumerGroup)是否还有其他的消费者,topic的分区可能被组中其他的消费者线程抢走(负载均衡机制),从而导致消费者客户端A连一个分区都没有得到,自然消费不到消息。  ...
使用java代码连接不上kafka的解决方案(生产者与消费者都没能连上)
本篇修改设置等操作是基于上一篇kafka安装配置操作基础之上:https://blog.csdn.net/QYHuiiQ/article/details/86556591 使用命令可以生产并消费成功,但是使用java代码生产的消息没能写入kafka中,尝试以下解决方案,以下是我解决问题时的倒序方法,你也可以尝试着从第一步开始: 1.把代码中的localhost改为服务器IP: 这一步很重要!...
文章热词 设计制作学习 机器学习教程 Objective-C培训 交互设计视频教程 颜色模型
相关热词 mysql关联查询两次本表 native底部 react extjs glyph 图标 消费区块链 java kafka学习