kafkaconsumer数据不能批量消费 50C

我用kafkaconsumer批量消费数据, 无法获取获取批量, 从日志看offset提交也异常, 规律性的每3次提交成功一次, 数据每次获取一条, 无法批量获取。
困扰了两天了, 莫名啊。

public class KafkaManualConsumer
{

    public static void main(String[] args)
    {
        Properties properties = new Properties();
        System.setProperty("java.security.auth.login.config", "c:/kafka_client_jaas.conf"); //配置文件路径
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("bootstrap.servers", "VM_0_16_centos:9092");    //kafka:9092
        properties.put("enable.auto.commit", "false"); 
        //properties.put("session.timeout.ms", 60000);
        properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        properties.put("fetch.max.wait.ms", 5000);
        properties.put("max.poll.records", 5000);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "yuu67u36");
//        properties.put("receive.buffer.bytes", 3276800);
//        properties.put("heartbeat.interval.ms", 59000);
//        properties.put("client.id", "t4t5t234f34f3f");
//        properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32*1024*1024);
//        properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 64*1024*1024);
//        properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 128*1024*1024);
        //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//        properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 2000*1024);

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList("topic-video-dev-attendphotos"));

        //kafkaConsumer.subscribe(Arrays.asList("topic-video-dev-stat"));


        while (true)
        {

            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000L);


            System.out.println("-----------------");
            System.out.println(records.count());
            for (ConsumerRecord<String, String> record : records)
            {

                System.out.println("offset = " + record.offset());
                VideoPhotoOuter dto = JSON.parseObject(record.value(), VideoPhotoOuter.class);
                System.out.println(dto.getPhotos().get(0).getPhotoFmt());
                //System.out.printf("offset = %d, value = %s", record.offset(), record.value());
            }

            try
            {

                kafkaConsumer.commitSync();
                Thread.currentThread().sleep(1000L);
            }
            catch(Exception ex)
            {
                //手动抛出SQLException使用事务回滚
            }


        }
        //kafkaConsumer.close();

    }
}

下面是控制台日志

17:32:06.758 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [VM_0_16_centos:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = t4t5t234f34f3f
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 5000
    fetch.min.bytes = 1
    group.id = yuu67u36
    group.instance.id = null
    heartbeat.interval.ms = 59000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 5000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 3276800
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 60000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = PLAIN
    security.protocol = SASL_PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 60000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
.......................
........................
-----------------
0
17:32:08.068 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90261 for partition topic-video-dev-attendphotos-0
-----------------
0
17:32:10.095 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90261 for partition topic-video-dev-attendphotos-0
17:32:12.091 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Node 90 sent a full fetch response that created a new incremental fetch session 725149318 with 1 response partition(s)
17:32:12.092 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Fetch READ_UNCOMMITTED at offset 90261 for partition topic-video-dev-attendphotos-0 returned fetch data (error=NONE, highWaterMark=93666, lastStableOffset = 93666, logStartOffset = 5372, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
17:32:12.120 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-video-dev-attendphotos.bytes-fetched
17:32:12.120 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-video-dev-attendphotos.records-fetched
17:32:12.121 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic-video-dev-attendphotos-0.records-lag
17:32:12.121 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic-video-dev-attendphotos-0.records-lead
17:32:12.122 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Added READ_UNCOMMITTED fetch request for partition topic-video-dev-attendphotos-0 at position FetchPosition{offset=90262, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=VM_0_16_centos:9092 (id: 90 rack: null), epoch=0}} to node VM_0_16_centos:9092 (id: 90 rack: null)
17:32:12.122 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Built incremental fetch (sessionId=725149318, epoch=1) for node 90. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
17:32:12.122 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-video-dev-attendphotos-0), toForget=(), implied=()) to broker VM_0_16_centos:9092 (id: 90 rack: null)
-----------------
1
offset = 90261
JPG
17:32:12.239 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0
-----------------
0
17:32:14.256 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0
-----------------
0
17:32:16.279 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0
17:32:16.603 [kafka-coordinator-heartbeat-thread | yuu67u36] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Node 90 sent an incremental fetch response for session 725149318 with 1 response partition(s)
17:32:16.603 [kafka-coordinator-heartbeat-thread | yuu67u36] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Fetch READ_UNCOMMITTED at offset 90262 for partition topic-video-dev-attendphotos-0 returned fetch data (error=NONE, highWaterMark=93668, lastStableOffset = 93668, logStartOffset = 5372, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576)
17:32:17.280 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Added READ_UNCOMMITTED fetch request for partition topic-video-dev-attendphotos-0 at position FetchPosition{offset=90263, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=VM_0_16_centos:9092 (id: 90 rack: null), epoch=0}} to node VM_0_16_centos:9092 (id: 90 rack: null)
17:32:17.281 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Built incremental fetch (sessionId=725149318, epoch=2) for node 90. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
17:32:17.281 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-video-dev-attendphotos-0), toForget=(), implied=()) to broker VM_0_16_centos:9092 (id: 90 rack: null)
-----------------
1



1个回答

wulfwhuiwhsc
wulfwhuiwhsc [root@VM_0_16_centos bin]# ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topic-video-dev-attendphotos --describe Topic:topic-video-dev-attendphotos PartitionCount:1 ReplicationFactor:1 Configs: Topic: topic-video-dev-attendphotos Partition: 0 Leader: 90 Replicas: 90 Isr: 90 我是童单consumer与单partition
2 个月之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
kafka Consumer消费数据手动commit问题

Consumer设置成手动提交enable.auto.commit=false,但是在处理完消息过后没有进行consumer.commitAsync()。按照我的理解此时消费的offset是没有更新的,如果第一次producer发了五条信息,Consumer处理了这五条信息,第二次Producer又发五条信息,此时Consumer poll数据的时候应该是第一次和第二次相加的10条数据(因为消费的offset没有更新,Consumer应该从第一次发送的数据进行poll),但是我测试的结果是还是五条(Consumer没有重启,一直启动的,producer发多少条消息,Consumer就消费多少条消息)。 我的疑问就是,既然没有commit最新的 offset,那么为什么producer发送新发送的消息,Consumer就能接收到,而不是从原来的offset poll数据。但是,如果重启一下Consumer,poll的数据就是10条。再重启也是最新的没有更新offset的那10条数据。

kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了 package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic){ super(); this.topic=topic; } @Override public void run(){ //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka ConsumerConnector consumer = createConsumer(); //Map作用指定获取的topic以及partition Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(topic, 3); //consumer连接器获取消息 Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); //获取对应的topic中的某一个partition中的数据 KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()){ byte[] message = iterator.next().message(); System.out.println("message is:"+new String(message)); } } private ConsumerConnector createConsumer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "XXX:2181"); properties.put("auto.offset.reset", "smallest");//读取旧数据 properties.put("group.id", "333fcdcd"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("testtest").start(); } }

kafka通过consumer java api实现消费者,KafkaStream打印不出来数据

kafka2.2.0 通过consumer java api实现消费者,KafkaStream打印不出来数据 ``` package kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumerTest extends Thread { //在linux环境运行正常 @Override public void run() { // TODO Auto-generated method stub String topic="powerTopic"; Properties pro=new Properties(); pro.put("zookeeper.connect", "10.2.2.61:2181,10.2.2.62:2181,10.2.2.63:2181"); pro.put("group.id", "test"); // pro.put("zookeeper.session.timeout.ms", "4000"); // pro.put("consumer.timeout.ms", "-1"); ConsumerConfig paramConsumerConfig=new ConsumerConfig(pro); ConsumerConnector cosumerConnector=Consumer.createJavaConsumerConnector(paramConsumerConfig); Map<String, Integer> paramMap=new HashMap<String, Integer>(); paramMap.put(topic,1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStream=cosumerConnector.createMessageStreams(paramMap); KafkaStream<byte[], byte[]> kafkastream=messageStream.get(topic).get(0); // System.out.println(kafkastream.size()); System.out.println("hello"); ConsumerIterator<byte[], byte[]> iterator=kafkastream.iterator(); while(iterator.hasNext()){ // MessageAndMetadata<byte[], byte[]> message=iterator.next(); // String topic1=message.topic(); String msg=new String(iterator.next().message()); System.out.println(msg); } } public static void main(String[] args) { // TODO Auto-generated method stub new KafkaConsumerTest().start(); new MyProducer01().start(); } } ``` kafka环境在centos操作系统,在windows系统的eclipse运行程序,打印不出来数据,也不结束报错: ![图片说明](https://img-ask.csdn.net/upload/201912/20/1576807897_599340.jpg) 打包后在集群环境运行结果: ![图片说明](https://img-ask.csdn.net/upload/201912/20/1576808400_74063.jpg)

Kafka Consumer 拉取消息

基于0.10,最近在测试consumer端消费集群消息, 设置“一次最大拉取的条数”的参数,但是实际拉取的条数不唯一,如果将其设置成500或600,那么每次拉取的条数就是500或600定值,但是如果设置成1W,那么拉取条数在4k-1W不等,(每条消息的大小是1KB) 所以想请问下,consumer拉取的具体的机制是什么样的,为什么会出现每次拉取的条数是不一样的? 注:消息是之前就已经写入好partition中的。

kafka数据可以写入数据,消费不可数据

kafka的offset值一直不变,可以往里面写数据 会是什么原因呢 手动改变offset的值是可以消费数据的

python消费kafka数据,为什么前面取几次都取不到?

python 消费kafka数据时,刚开始连接时为什么取不到数据? 代码如下: ``` # -*- coding:utf8 -*- from kafka import KafkaConsumer from kafka import TopicPartition import kafka import time # 测试kafka poll方法能拉取多少的记录 consumer = KafkaConsumer( bootstrap_servers=['192.168.13.202:9092'], group_id='group-1', auto_offset_reset='earliest', enable_auto_commit=False) consumer.subscribe('test') print ("t1",time.time()) while True: print("t2", time.time()) msg = consumer.poll(timeout_ms=100, max_records=5) # 从kafka获取消息 # print (len(msg)) for i in msg.values(): for k in i: print(k.offset, k.value) time.sleep(1) ``` 打印的结果却是 ``` t1 1567669170.438951 t2 1567669170.438951 t2 1567669171.8450315 t2 1567669172.945094 t2 1567669174.0471573 t2 1567669175.1472201 0 b'{"ast":"\xe7\x82\xb"}' 1 b'{"ast":"","dm":2}' 2 b'{"ast":"12"}' 3 b'{"ast":"sd"}' 4 b'{"ast":"12ds"}' t2 1567669176.1822793 ``` 为什么连接上kafka之后,会取5次才会取到数据?

kafka 消费者消费不到数据

[root@hzctc-kafka-5d61 ~]# kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group sbs-haodian-message1 --topic Message --zookeeper 10.1.5.61:2181 [2018-04-18 16:43:43,467] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/sbs-haodian-message1/offsets/Message/8. 用kafka的时候 用命令查看消费组消费情况 报这个错误 其他的消费组是正常的 哪位大神知道这是什么原因导致的 我在消费操作的时候加了缓存锁 每次poll操作之后的间隔时间不确定 可能是10S或者20S或者30S 不过我的sessiontimeiut设置了90s。这个会有什么影响吗

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

kafka消费数据老是丢失

WARN TaskSetManager: Lost task 9.0 in stage 26569.0 (TID 812602, 2, 2, 104-250-138-250.static.gorillaservers.com): k): k): ): ): kafka.common.NotLeaderForPForPForPartitionException 有两个groupID消费一个topic,出现上面的警告后,有一个groupID就消费不到数据了

如何使用Sarama Go Kafka Consumer从最新的抵消量消耗?

<div class="post-text" itemprop="text"> <p>我有三个问题:</p> <ol> <li>“oldest offset”是什么意思? Oldest offset并不意味着偏移量为0?</li> </ol> <blockquote> <p>// OffsetOldest stands for the oldest offset available on the broker for a<br> // partition.<br> OffsetOldest int64 = -2</p> </blockquote> <ol start="2"> <li><p>假设:</p> <p>A. 三个broker运行在一台机器上<br> B. 使用者群体只有一个使用者线程<br> C. 使用者信任OffsetOldest标志<br> D. 已经产生了100条消息,目前使用者线程已经消耗了90条消息</p> <p>因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始?是91还是0?</p></li> <li><p>在下面的代码中,似乎每次启动使用者时都会重新消耗消息,但实际上这种情况不应该发生。为什么重新消费会紧接着重新启动后发生(不是全部) ?</p> <pre><code> func (this *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { this.handler(message) session.MarkMessage(message, "") } return nil } ctx := context.Background() conf := sarama.NewConfig() conf.Version = sarama.V2_0_0_0 conf.Consumer.Offsets.Initial = sarama.OffsetOldest conf.Consumer.Return.Errors = true consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf) if err != nil { logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err) return } </code></pre></li> </ol> </div>

kafka 消费端 处理数据比较慢,会不会出现数据积压?

如题,kafka消费端接收到数据后 要进行部分业务逻辑操作,可能会有3秒左右,处理很慢 的话,对程序有什么影响呢?新手提问, 望各位大神不吝赐教!

kafka多个消费者消费同一数据

kafka不同groupid下的消费者,消费同一topic下的某一条数据,为什么offset值不变? 只被消费了一次吗?

kafka consumer 提交offset 如何查看?

现在发现一个问题,我本地kafka消费了 但是却没有提交offset 导致每次一重启就重新开始消费! 请问哪位大神知道该如何查看我是否有提交offset 或者说怎么排查这个问题?我kafak consumer 配置如下: props.put("zookeeper.session.timeout.ms", "10000"); props.put("zookeeper.connection.timeout.ms", "6000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "5000"); props.put("auto.offset.reset", "smallest"); props.put("auto.commit.enable", "false");

kafka_2.11-1.0.0在控制台kafka-console-consumer消费者消费的时候zookeeper 和 bootstrap-server区别

今天试了两个kafka的版本都存在这个问题 1、创建一个topic > kafka-topics.bat --create --partitions 1 --replication-factor 1 --topic test --zookeepe r localhost:2181 2、对改topic进行消息写入 > kafka-console-producer.bat --broker-list localhost:9092 --topic test 3,控制台形式消费该topic消息,--zookeeper localhost:2181 这种能正常消费消息 > kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning 4,同样是控制台消费,--bootstrap-server localhost:9092,这样就收不到消费消息 > kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginn ing 5,今天在windows 和 虚拟机linux环境下 都存在这个问题,而且也试了两个kafka版本 6,stackoverflow也看到有人出现这个问题 > https://stackoverflow.com/questions/41774446/kafka-bootstrap-servers-vs-zookeeper-in-kafka-console-consumer ![老外也有遇到这个问题](https://nim.nosdn.127.net/NDA3MzIzNw==/bmltYV8yMzg4MDEzNjMxXzE1NjU0NDUxOTI1NjdfMmYxMDRjYjUtZTVjNS00YjM4LWFjMzgtOWFlZTdlYWY4ZDdk) 有无人遇到一样的问题,怎么让 --bootstrap-server localhost:9092 这种也能消费到消息

spark读取kafka数据, 缓存当天数据

spark stream从kafka读取数据,10秒间隔;需要缓存当天数据用于业务分析。 思路1:定义static rdd用于union每次接收到的rdd;用window窗口(窗口长1小时,滑动步长20分钟);union之后checkpoint。 但是发现在利用static rdd做业务分析的时候,应该是因为磁盘io,所以执行时间太长。 思路2:一样定义static rdd, context调用remember(24小时)保留数据24小时(数据缓存在哪里了,暂时不清楚,汗);但是在业务分析时,发现static rdd的count结果为0 求教怎么缓存一段时间的rdd 数据放executor内存或分布放个worker都可以,一天的数据量大概在100g,过滤后再5g,机器内存256g

kafka 消费者 获取消息

activeqmq 都是broker push 到 消费者,消费者 建立 messageListener 监听器 就可以 获取消息,但kafka 是 需要去broker pull消息, 怎么才能知道 broker中 已经 有了对应 topic 呢 ?定时 获取?

kafka的数据存放,想做配置的修改

kafka的默认存放时间为7天,因为在做测试,所以想让他保留的时间长一点。修改了 log.retention.hours=1440 log.retention.bytes=1073741824 这个两个配置。但是今天在做测试的时候,发现kafka 的数据没有啦。只有四条数据啦。我就想知道kafka数据怎么就没有啦,是因为什么没有的。 望大神多多指教!

spring-kafka做了分区,部分分区数据可以正常消费,部分分区始终无法消费?

spring-kafka(版本:1.0.6.RELEASE,kafka-client:0.9.0.1),创建了8个分区,有一个分区的数据始终无法消费,其他分区数据正常消费。有大神知道是为什么?

kafka消费者迭代器卡死

配置kafka+zk在虚拟机上,自己在本机上的生产者可以正常发送消息,但是消费者在 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 之后 iterator 的任何操作都直接卡死了,请问这个是怎么回事?在这个操作之前debug是可以看到变量内部的数值的,这个操作之后就不能看了,value全部清空了。 贴源码 ``` package com.weixinjia.recreation.queue.client; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerGroupExample extends Thread{ private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // // executor = Executors.newFixedThreadPool(a_numThreads); System.out.println(streams.size()); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); System.out.println(iterator); while(iterator.hasNext()){ MessageAndMetadata<byte[], byte[]> next = iterator.next(); byte[] message = next.message(); String string = new String(message); System.out.println(string); } } System.out.println("消息输出完成"); } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "1500"); props.put("zookeeper.sync.time.ms", "4000"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "10240000"); props.put("auto.commit.enable", "true"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "masterServce:2181"; String groupId = "group-1"; String topic = "test2"; int threads = 3; ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } } ``` 消息在kafka那边直接用kafka-console-consumer.sh是可以查询到的

Python数据挖掘简易入门

&nbsp; &nbsp; &nbsp; &nbsp; 本课程为Python数据挖掘方向的入门课程,课程主要以真实数据为基础,详细介绍数据挖掘入门的流程和使用Python实现pandas与numpy在数据挖掘方向的运用,并深入学习如何运用scikit-learn调用常用的数据挖掘算法解决数据挖掘问题,为进一步深入学习数据挖掘打下扎实的基础。

HoloLens2开发入门教程

本课程为HoloLens2开发入门教程,讲解部署开发环境,安装VS2019,Unity版本,Windows SDK,创建Unity项目,讲解如何使用MRTK,编辑器模拟手势交互,打包VS工程并编译部署应用到HoloLens上等。

2019 Python开发者日-培训

本次活动将秉承“只讲技术,拒绝空谈”的理念,邀请十余位身处一线的Python技术专家,重点围绕Web开发、自动化运维、数据分析、人工智能等技术模块,分享真实生产环境中使用Python应对IT挑战的真知灼见。此外,针对不同层次的开发者,大会还安排了深度培训实操环节,为开发者们带来更多深度实战的机会。

Only老K说-爬取妹子图片(简单入门)

安装第三方请求库 requests 被网站禁止了访问 原因是我们是Python过来的 重新给一段 可能还是存在用不了,使用网页的 编写代码 上面注意看匹配内容 User-Agent:请求对象 AppleWebKit:请求内核 Chrome浏览器 //请求网页 import requests import re //正则表达式 就是去不规则的网页里面提取有规律的信息 headers = { 'User-Agent':'存放浏览器里面的' } response = requests.get

2020_五一数学建模_C题_整理后的数据.zip

该数据是我的程序读取的数据,仅供参考,问题的解决方案:https://blog.csdn.net/qq_41228463/article/details/105993051

R语言入门基础

本课程旨在帮助学习者快速入门R语言: 课程系统详细地介绍了使用R语言进行数据处理的基本思路和方法。 课程能够帮助初学者快速入门数据处理。 课程通过大量的案例详细地介绍了如何使用R语言进行数据分析和处理 课程操作实际案例教学,通过编写代码演示R语言的基本使用方法和技巧

人才招聘系统PHP+MySQL源码

PHP 5.0及以上 + MySQL 5.0及以上 开发的人才招聘系统完全可运行源码,按照操作说明简单配置即可运行。学习PHPWEB应用的完整系统程序源码。

Java基础知识面试题(2020最新版)

文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的对比基础语法数据类型Java有哪些数据类型switc...

python可视化分析(matplotlib、seaborn、ggplot2)

python可视化分析总结(matplotlib、seaborn、ggplot)一、matplotlib库1、基本绘图命令3、图形参数设置4、特殊统计图的绘制4.1 数学函数图4.2 气泡图4.1 三维曲面图二、seaborn库1、常用统计图1.1 箱线图1.2 小提琴图1.3 点图1.4 条图与计数图1.5 分组图1.6 概率分布图2、联合图3、配对图三、ggplot库1、图层画法+常用图形2、快速绘图 一、matplotlib库 1、基本绘图命令 import matplotlib.pyplot as

Vue.js 2.0之全家桶系列视频课程

基于新的Vue.js 2.3版本, 目前新全的Vue.js教学视频,让你少走弯路,直达技术前沿! 1. 包含Vue.js全家桶(vue.js、vue-router、axios、vuex、vue-cli、webpack、ElementUI等) 2. 采用笔记+代码案例的形式讲解,通俗易懂

初级玩转Linux+Ubuntu(嵌入式开发基础课程)

课程主要面向嵌入式Linux初学者、工程师、学生 主要从一下几方面进行讲解: 1.linux学习路线、基本命令、高级命令 2.shell、vi及vim入门讲解 3.软件安装下载、NFS、Samba、FTP等服务器配置及使用

人工智能-计算机视觉实战之路(必备算法+深度学习+项目实战)

系列课程主要分为3大阶段:(1)首先掌握计算机视觉必备算法原理,结合Opencv进行学习与练手,通过实际视项目进行案例应用展示。(2)进军当下最火的深度学习进行视觉任务实战,掌握深度学习中必备算法原理与网络模型架构。(3)结合经典深度学习框架与实战项目进行实战,基于真实数据集展开业务分析与建模实战。整体风格通俗易懂,项目驱动学习与就业面试。 建议同学们按照下列顺序来进行学习:1.Python入门视频课程 2.Opencv计算机视觉实战(Python版) 3.深度学习框架-PyTorch实战/人工智能框架实战精讲:Keras项目 4.Python-深度学习-物体检测实战 5.后续实战课程按照自己喜好选择就可以

【大总结2】大学两年,写了这篇几十万字的干货总结

本文十天后设置为粉丝可见,喜欢的提前关注 不要白嫖请点赞 不要白嫖请点赞 不要白嫖请点赞 文中提到的书我都有电子版,可以评论邮箱发给你。 文中提到的书我都有电子版,可以评论邮箱发给你。 文中提到的书我都有电子版,可以评论邮箱发给你。 本篇文章应该算是Java后端开发技术栈的,但是大部分是基础知识,所以我觉得对任何方向都是有用的。 1、数据结构 数据结构是计算机存储、...

lena全身原图(非256*256版本,而是全身原图)

lena全身原图(非256*256版本,而是全身原图) lena原图很有意思,我们通常所用的256*256图片是在lena原图上截取了头部部分的256*256正方形得到的. 原图是花花公子杂志上的一个

【项目实战】 图书信息管理系统(Maven,mybatis)(第一个自己独立完成的项目)

《程序设计综合训练实践报告》 此项目为图书信息管理系统,是一个采用了mysql+mybatis框架+java编写的maven项目

图书管理系统(Java + Mysql)我的第一个完全自己做的实训项目

图书管理系统 Java + MySQL 完整实训代码,MVC三层架构组织,包含所有用到的图片资源以及数据库文件,大三上学期实训,注释很详细,按照阿里巴巴Java编程规范编写

Python入门视频精讲

Python入门视频培训课程以通俗易懂的方式讲解Python核心技术,Python基础,Python入门。适合初学者的教程,让你少走弯路! 课程内容包括:1.Python简介和安装 、2.第一个Python程序、PyCharm的使用 、3.Python基础、4.函数、5.高级特性、6.面向对象、7.模块、8.异常处理和IO操作、9.访问数据库MySQL。教学全程采用笔记+代码案例的形式讲解,通俗易懂!!!

20行代码教你用python给证件照换底色

20行代码教你用python给证件照换底色

2018年全国大学生计算机技能应用大赛决赛 大题

2018年全国大学生计算机技能应用大赛决赛大题,程序填空和程序设计(侵删)

MySQL数据库从入门到实战应用

限时福利1:购课进答疑群专享柳峰(刘运强)老师答疑服务 限时福利2:购课后添加学习助手(微信号:csdn590),按消息提示即可领取编程大礼包! 为什么说每一个程序员都应该学习MySQL? 根据《2019-2020年中国开发者调查报告》显示,超83%的开发者都在使用MySQL数据库。 使用量大同时,掌握MySQL早已是运维、DBA的必备技能,甚至部分IT开发岗位也要求对数据库使用和原理有深入的了解和掌握。 学习编程,你可能会犹豫选择 C++ 还是 Java;入门数据科学,你可能会纠结于选择 Python 还是 R;但无论如何, MySQL 都是 IT 从业人员不可或缺的技能! 【课程设计】 在本课程中,刘运强老师会结合自己十多年来对MySQL的心得体会,通过课程给你分享一条高效的MySQL入门捷径,让学员少走弯路,彻底搞懂MySQL。 本课程包含3大模块:&nbsp; 一、基础篇: 主要以最新的MySQL8.0安装为例帮助学员解决安装与配置MySQL的问题,并对MySQL8.0的新特性做一定介绍,为后续的课程展开做好环境部署。 二、SQL语言篇: 本篇主要讲解SQL语言的四大部分数据查询语言DQL,数据操纵语言DML,数据定义语言DDL,数据控制语言DCL,学会熟练对库表进行增删改查等必备技能。 三、MySQL进阶篇: 本篇可以帮助学员更加高效的管理线上的MySQL数据库;具备MySQL的日常运维能力,语句调优、备份恢复等思路。 &nbsp;

C/C++学习指南全套教程

C/C++学习的全套教程,从基本语法,基本原理,到界面开发、网络开发、Linux开发、安全算法,应用尽用。由毕业于清华大学的业内人士执课,为C/C++编程爱好者的教程。

C/C++跨平台研发从基础到高阶实战系列套餐

一 专题从基础的C语言核心到c++ 和stl完成基础强化; 二 再到数据结构,设计模式完成专业计算机技能强化; 三 通过跨平台网络编程,linux编程,qt界面编程,mfc编程,windows编程,c++与lua联合编程来完成应用强化 四 最后通过基于ffmpeg的音视频播放器,直播推流,屏幕录像,

我以为我对Mysql事务很熟,直到我遇到了阿里面试官

太惨了,面试又被吊打

专为程序员设计的数学课

<p> 限时福利限时福利,<span>15000+程序员的选择!</span> </p> <p> 购课后添加学习助手(微信号:csdn590),按提示消息领取编程大礼包!并获取讲师答疑服务! </p> <p> <br> </p> <p> 套餐中一共包含5门程序员必学的数学课程(共47讲) </p> <p> 课程1:《零基础入门微积分》 </p> <p> 课程2:《数理统计与概率论》 </p> <p> 课程3:《代码学习线性代数》 </p> <p> 课程4:《数据处理的最优化》 </p> <p> 课程5:《马尔可夫随机过程》 </p> <p> <br> </p> <p> 哪些人适合学习这门课程? </p> <p> 1)大学生,平时只学习了数学理论,并未接触如何应用数学解决编程问题; </p> <p> 2)对算法、数据结构掌握程度薄弱的人,数学可以让你更好的理解算法、数据结构原理及应用; </p> <p> 3)看不懂大牛代码设计思想的人,因为所有的程序设计底层逻辑都是数学; </p> <p> 4)想学习新技术,如:人工智能、机器学习、深度学习等,这门课程是你的必修课程; </p> <p> 5)想修炼更好的编程内功,在遇到问题时可以灵活的应用数学思维解决问题。 </p> <p> <br> </p> <p> 在这门「专为程序员设计的数学课」系列课中,我们保证你能收获到这些:<br> <br> <span> </span> </p> <p class="ql-long-24357476"> <span class="ql-author-24357476">①价值300元编程课程大礼包</span> </p> <p class="ql-long-24357476"> <span class="ql-author-24357476">②应用数学优化代码的实操方法</span> </p> <p class="ql-long-24357476"> <span class="ql-author-24357476">③数学理论在编程实战中的应用</span> </p> <p class="ql-long-24357476"> <span class="ql-author-24357476">④程序员必学的5大数学知识</span> </p> <p class="ql-long-24357476"> <span class="ql-author-24357476">⑤人工智能领域必修数学课</span> </p> <p> <br> 备注:此课程只讲程序员所需要的数学,即使你数学基础薄弱,也能听懂,只需要初中的数学知识就足矣。<br> <br> 如何听课? </p> <p> 1、登录CSDN学院 APP 在我的课程中进行学习; </p> <p> 2、登录CSDN学院官网。 </p> <p> <br> </p> <p> 购课后如何领取免费赠送的编程大礼包和加入答疑群? </p> <p> 购课后,添加助教微信:<span> csdn590</span>,按提示领取编程大礼包,或观看付费视频的第一节内容扫码进群答疑交流! </p> <p> <img src="https://img-bss.csdn.net/201912251155398753.jpg" alt=""> </p>

Eclipse archetype-catalog.xml

Eclipse Maven 创建Web 项目报错 Could not resolve archetype org.apache.maven.archetypes:maven-archetype-web

使用TensorFlow+keras快速构建图像分类模型

课程分为两条主线: 1&nbsp;从Tensorflow的基础知识开始,全面介绍Tensorflow和Keras相关内容。通过大量实战,掌握Tensorflow和Keras经常用到的各种建模方式,参数优化方法,自定义参数和模型的手段,以及对训练结果评估与分析的技巧。 2&nbsp;从机器学习基础算法开始,然后进入到图像分类领域,使用MNIST手写数据集和CIFAR10图像数据集,从简单神经网络到深度神经网络,再到卷积神经网络,最终完成复杂模型:残差网络的搭建。完成这条主线,学员将可以自如地使用机器学习的手段来达到图像分类的目的。

Python代码实现飞机大战

文章目录经典飞机大战一.游戏设定二.我方飞机三.敌方飞机四.发射子弹五.发放补给包六.主模块 经典飞机大战 源代码以及素材资料(图片,音频)可从下面的github中下载: 飞机大战源代码以及素材资料github项目地址链接 ————————————————————————————————————————————————————————— 不知道大家有没有打过飞机,喜不喜欢打飞机。当我第一次接触这个东西的时候,我的内心是被震撼到的。第一次接触打飞机的时候作者本人是身心愉悦的,因为周边的朋友都在打飞机, 每

最近面试Java后端开发的感受:如果就以平时项目经验来面试,通过估计很难,不信你来看看

在上周,我密集面试了若干位Java后端的候选人,工作经验在3到5年间。我的标准其实不复杂:第一能干活,第二Java基础要好,第三最好熟悉些分布式框架,我相信其它公司招初级开发时,应该也照着这个标准来面的。 我也知道,不少候选人能力其实不差,但面试时没准备或不会说,这样的人可能在进团队干活后确实能达到期望,但可能就无法通过面试,但面试官总是只根据面试情况来判断。 但现实情况是,大多数人可能面试前没准备,或准备方法不得当。要知道,我们平时干活更偏重于业务,不可能大量接触到算法,数据结构,底层代码这类面试必问

三个项目玩转深度学习(附1G源码)

从事大数据与人工智能开发与实践约十年,钱老师亲自见证了大数据行业的发展与人工智能的从冷到热。事实证明,计算机技术的发展,算力突破,海量数据,机器人技术等,开启了第四次工业革命的序章。深度学习图像分类一直是人工智能的经典任务,是智慧零售、安防、无人驾驶等机器视觉应用领域的核心技术之一,掌握图像分类技术是机器视觉学习的重中之重。针对现有线上学习的特点与实际需求,我们开发了人工智能案例实战系列课程。打造:以项目案例实践为驱动的课程学习方式,覆盖了智能零售,智慧交通等常见领域,通过基础学习、项目案例实践、社群答疑,三维立体的方式,打造最好的学习效果。

微信小程序开发实战之番茄时钟开发

微信小程序番茄时钟视频教程,本课程将带着各位学员开发一个小程序初级实战类项目,针对只看过官方文档而又无从下手的开发者来说,可以作为一个较好的练手项目,对于有小程序开发经验的开发者而言,可以更好加深对小程序各类组件和API 的理解,为更深层次高难度的项目做铺垫。

相关热词 c#设计思想 c#正则表达式 转换 c#form复制 c#写web c# 柱形图 c# wcf 服务库 c#应用程序管理器 c#数组如何赋值给数组 c#序列化应用目的博客园 c# 设置当前标注样式
立即提问