kafka消费数据老是丢失

WARN TaskSetManager: Lost task 9.0 in stage 26569.0 (TID 812602, 2, 2, 104-250-138-250.static.gorillaservers.com): k): k): ): ): kafka.common.NotLeaderForPForPForPartitionException
有两个groupID消费一个topic,出现上面的警告后,有一个groupID就消费不到数据了

2个回答

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
Kafka消费者组丢失未提交的消息

<div class="post-text" itemprop="text"> <p>I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited. </p> <pre><code>kafka-console-producer --broker-list localhost:9092 --topic test &gt;this will be commited </code></pre> <p>Now running </p> <pre><code>kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe </code></pre> <p>returns</p> <pre><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 13 13 0 </code></pre> <p>so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:</p> <pre><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 13 14 1 </code></pre> <p>Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:</p> <p><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test 0 15 15</code> </p> <p>and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.</p> <p>fetch/commit code:</p> <pre><code>go func() { for { ctx := context.Background() m, err := mr.brokerReader.FetchMessage(ctx) if err != nil { break } if err := msgFunc(m); err != nil { log.Errorf("# messaging # cannot commit a message: %v", err) continue } // commit message if no error if err := mr.brokerReader.CommitMessages(ctx, m); err != nil { // should we do something else to just logging not committed message? log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err) } } }() </code></pre> <p>reader configuration:</p> <pre><code>kafkaReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID, Topic: topic, CommitInterval: 0, MinBytes: 10e3, MaxBytes: 10e6, }) </code></pre> <p>library used: <a href="https://github.com/segmentio/kafka-go" rel="nofollow noreferrer">https://github.com/segmentio/kafka-go</a></p> </div>

kafka 消费者消费不到数据

[root@hzctc-kafka-5d61 ~]# kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group sbs-haodian-message1 --topic Message --zookeeper 10.1.5.61:2181 [2018-04-18 16:43:43,467] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/sbs-haodian-message1/offsets/Message/8. 用kafka的时候 用命令查看消费组消费情况 报这个错误 其他的消费组是正常的 哪位大神知道这是什么原因导致的 我在消费操作的时候加了缓存锁 每次poll操作之后的间隔时间不确定 可能是10S或者20S或者30S 不过我的sessiontimeiut设置了90s。这个会有什么影响吗

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

kafka 消费端 处理数据比较慢,会不会出现数据积压?

如题,kafka消费端接收到数据后 要进行部分业务逻辑操作,可能会有3秒左右,处理很慢 的话,对程序有什么影响呢?新手提问, 望各位大神不吝赐教!

kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了 package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic){ super(); this.topic=topic; } @Override public void run(){ //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka ConsumerConnector consumer = createConsumer(); //Map作用指定获取的topic以及partition Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(topic, 3); //consumer连接器获取消息 Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); //获取对应的topic中的某一个partition中的数据 KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()){ byte[] message = iterator.next().message(); System.out.println("message is:"+new String(message)); } } private ConsumerConnector createConsumer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "XXX:2181"); properties.put("auto.offset.reset", "smallest");//读取旧数据 properties.put("group.id", "333fcdcd"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("testtest").start(); } }

kafka数据可以写入数据,消费不可数据

kafka的offset值一直不变,可以往里面写数据 会是什么原因呢 手动改变offset的值是可以消费数据的

kafka 在子线程消费数据,有卡顿的情况?

2个程序消费数据,一个程序在主线程消费就没这个问题,另个一个子线程消费数据,有卡顿的情况?

kafka消费者速度与什么有关

@KafkaListener(topics = {"CRBKC0002.000"}) public void sendSmsInfoByBizType(String record) { } 假设单机版的kafka,就一个节点。 1、 @KafkaListener注解接受消费者,是不是等这个方法执行完。 这个消费者进程才算消费结束。是不是一个镜像这个方法同时只能执行一次?就是不能连续起多个线程执行这个方法。 2、如果接受到参数就算消费这进程结束,也就是获取这个record消费者进程就结束了,那假设生产者一秒生产100w数据进入kafka。那这边获取参数就算消费者进程消费结束,那是不是相当于瞬间连续起100w这个方法线程执行。可是tomcat就200线程。

kafka消费者迭代器卡死

配置kafka+zk在虚拟机上,自己在本机上的生产者可以正常发送消息,但是消费者在 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 之后 iterator 的任何操作都直接卡死了,请问这个是怎么回事?在这个操作之前debug是可以看到变量内部的数值的,这个操作之后就不能看了,value全部清空了。 贴源码 ``` package com.weixinjia.recreation.queue.client; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerGroupExample extends Thread{ private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // // executor = Executors.newFixedThreadPool(a_numThreads); System.out.println(streams.size()); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); System.out.println(iterator); while(iterator.hasNext()){ MessageAndMetadata<byte[], byte[]> next = iterator.next(); byte[] message = next.message(); String string = new String(message); System.out.println(string); } } System.out.println("消息输出完成"); } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "1500"); props.put("zookeeper.sync.time.ms", "4000"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "10240000"); props.put("auto.commit.enable", "true"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "masterServce:2181"; String groupId = "group-1"; String topic = "test2"; int threads = 3; ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } } ``` 消息在kafka那边直接用kafka-console-consumer.sh是可以查询到的

python消费kafka数据,为什么前面取几次都取不到?

python 消费kafka数据时,刚开始连接时为什么取不到数据? 代码如下: ``` # -*- coding:utf8 -*- from kafka import KafkaConsumer from kafka import TopicPartition import kafka import time # 测试kafka poll方法能拉取多少的记录 consumer = KafkaConsumer( bootstrap_servers=['192.168.13.202:9092'], group_id='group-1', auto_offset_reset='earliest', enable_auto_commit=False) consumer.subscribe('test') print ("t1",time.time()) while True: print("t2", time.time()) msg = consumer.poll(timeout_ms=100, max_records=5) # 从kafka获取消息 # print (len(msg)) for i in msg.values(): for k in i: print(k.offset, k.value) time.sleep(1) ``` 打印的结果却是 ``` t1 1567669170.438951 t2 1567669170.438951 t2 1567669171.8450315 t2 1567669172.945094 t2 1567669174.0471573 t2 1567669175.1472201 0 b'{"ast":"\xe7\x82\xb"}' 1 b'{"ast":"","dm":2}' 2 b'{"ast":"12"}' 3 b'{"ast":"sd"}' 4 b'{"ast":"12ds"}' t2 1567669176.1822793 ``` 为什么连接上kafka之后,会取5次才会取到数据?

kafka多个消费者消费同一数据

kafka不同groupid下的消费者,消费同一topic下的某一条数据,为什么offset值不变? 只被消费了一次吗?

【求助】structed streaming 在消费kafka数据时,怎么保证数据完整&有且仅有1次被消费

看了下官方文档,原本streaming在使用direct模式时,可以自己维护offset,感觉上还比较靠谱。 现在structed streaming 使用kafka时,enable.auto.commit 是不可设置的,按照文档说的是,structed streaming 不提交任何offset, 那spark在新版本的消费kafka中,如何保证有且仅有一次,或者是至少被消费一次。

kafka通过consumer java api实现消费者,KafkaStream打印不出来数据

kafka2.2.0 通过consumer java api实现消费者,KafkaStream打印不出来数据 ``` package kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumerTest extends Thread { //在linux环境运行正常 @Override public void run() { // TODO Auto-generated method stub String topic="powerTopic"; Properties pro=new Properties(); pro.put("zookeeper.connect", "10.2.2.61:2181,10.2.2.62:2181,10.2.2.63:2181"); pro.put("group.id", "test"); // pro.put("zookeeper.session.timeout.ms", "4000"); // pro.put("consumer.timeout.ms", "-1"); ConsumerConfig paramConsumerConfig=new ConsumerConfig(pro); ConsumerConnector cosumerConnector=Consumer.createJavaConsumerConnector(paramConsumerConfig); Map<String, Integer> paramMap=new HashMap<String, Integer>(); paramMap.put(topic,1); Map<String, List<KafkaStream<byte[], byte[]>>> messageStream=cosumerConnector.createMessageStreams(paramMap); KafkaStream<byte[], byte[]> kafkastream=messageStream.get(topic).get(0); // System.out.println(kafkastream.size()); System.out.println("hello"); ConsumerIterator<byte[], byte[]> iterator=kafkastream.iterator(); while(iterator.hasNext()){ // MessageAndMetadata<byte[], byte[]> message=iterator.next(); // String topic1=message.topic(); String msg=new String(iterator.next().message()); System.out.println(msg); } } public static void main(String[] args) { // TODO Auto-generated method stub new KafkaConsumerTest().start(); new MyProducer01().start(); } } ``` kafka环境在centos操作系统,在windows系统的eclipse运行程序,打印不出来数据,也不结束报错: ![图片说明](https://img-ask.csdn.net/upload/201912/20/1576807897_599340.jpg) 打包后在集群环境运行结果: ![图片说明](https://img-ask.csdn.net/upload/201912/20/1576808400_74063.jpg)

spark读取kafka数据, 缓存当天数据

spark stream从kafka读取数据,10秒间隔;需要缓存当天数据用于业务分析。 思路1:定义static rdd用于union每次接收到的rdd;用window窗口(窗口长1小时,滑动步长20分钟);union之后checkpoint。 但是发现在利用static rdd做业务分析的时候,应该是因为磁盘io,所以执行时间太长。 思路2:一样定义static rdd, context调用remember(24小时)保留数据24小时(数据缓存在哪里了,暂时不清楚,汗);但是在业务分析时,发现static rdd的count结果为0 求教怎么缓存一段时间的rdd 数据放executor内存或分布放个worker都可以,一天的数据量大概在100g,过滤后再5g,机器内存256g

kafkaconsumer数据不能批量消费

我用kafkaconsumer批量消费数据, 无法获取获取批量, 从日志看offset提交也异常, 规律性的每3次提交成功一次, 数据每次获取一条, 无法批量获取。 困扰了两天了, 莫名啊。 ``` public class KafkaManualConsumer { public static void main(String[] args) { Properties properties = new Properties(); System.setProperty("java.security.auth.login.config", "c:/kafka_client_jaas.conf"); //配置文件路径 properties.put("security.protocol", "SASL_PLAINTEXT"); properties.put("sasl.mechanism", "PLAIN"); properties.put("bootstrap.servers", "VM_0_16_centos:9092"); //kafka:9092 properties.put("enable.auto.commit", "false"); //properties.put("session.timeout.ms", 60000); properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000); properties.put("fetch.max.wait.ms", 5000); properties.put("max.poll.records", 5000); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "yuu67u36"); // properties.put("receive.buffer.bytes", 3276800); // properties.put("heartbeat.interval.ms", 59000); // properties.put("client.id", "t4t5t234f34f3f"); // properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32*1024*1024); // properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 64*1024*1024); // properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 128*1024*1024); //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 2000*1024); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("topic-video-dev-attendphotos")); //kafkaConsumer.subscribe(Arrays.asList("topic-video-dev-stat")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000L); System.out.println("-----------------"); System.out.println(records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println("offset = " + record.offset()); VideoPhotoOuter dto = JSON.parseObject(record.value(), VideoPhotoOuter.class); System.out.println(dto.getPhotos().get(0).getPhotoFmt()); //System.out.printf("offset = %d, value = %s", record.offset(), record.value()); } try { kafkaConsumer.commitSync(); Thread.currentThread().sleep(1000L); } catch(Exception ex) { //手动抛出SQLException使用事务回滚 } } //kafkaConsumer.close(); } } ``` 下面是控制台日志 ``` 17:32:06.758 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [VM_0_16_centos:9092] check.crcs = true client.dns.lookup = default client.id = t4t5t234f34f3f client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 5000 fetch.min.bytes = 1 group.id = yuu67u36 group.instance.id = null heartbeat.interval.ms = 59000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 5000 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 3276800 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 60000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = PLAIN security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 60000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ....................... ........................ ----------------- 0 17:32:08.068 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90261 for partition topic-video-dev-attendphotos-0 ----------------- 0 17:32:10.095 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90261 for partition topic-video-dev-attendphotos-0 17:32:12.091 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Node 90 sent a full fetch response that created a new incremental fetch session 725149318 with 1 response partition(s) 17:32:12.092 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Fetch READ_UNCOMMITTED at offset 90261 for partition topic-video-dev-attendphotos-0 returned fetch data (error=NONE, highWaterMark=93666, lastStableOffset = 93666, logStartOffset = 5372, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576) 17:32:12.120 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-video-dev-attendphotos.bytes-fetched 17:32:12.120 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.topic-video-dev-attendphotos.records-fetched 17:32:12.121 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic-video-dev-attendphotos-0.records-lag 17:32:12.121 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic-video-dev-attendphotos-0.records-lead 17:32:12.122 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Added READ_UNCOMMITTED fetch request for partition topic-video-dev-attendphotos-0 at position FetchPosition{offset=90262, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=VM_0_16_centos:9092 (id: 90 rack: null), epoch=0}} to node VM_0_16_centos:9092 (id: 90 rack: null) 17:32:12.122 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Built incremental fetch (sessionId=725149318, epoch=1) for node 90. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) 17:32:12.122 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-video-dev-attendphotos-0), toForget=(), implied=()) to broker VM_0_16_centos:9092 (id: 90 rack: null) ----------------- 1 offset = 90261 JPG 17:32:12.239 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0 ----------------- 0 17:32:14.256 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0 ----------------- 0 17:32:16.279 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Committed offset 90262 for partition topic-video-dev-attendphotos-0 17:32:16.603 [kafka-coordinator-heartbeat-thread | yuu67u36] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Node 90 sent an incremental fetch response for session 725149318 with 1 response partition(s) 17:32:16.603 [kafka-coordinator-heartbeat-thread | yuu67u36] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Fetch READ_UNCOMMITTED at offset 90262 for partition topic-video-dev-attendphotos-0 returned fetch data (error=NONE, highWaterMark=93668, lastStableOffset = 93668, logStartOffset = 5372, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=1048576) 17:32:17.280 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Added READ_UNCOMMITTED fetch request for partition topic-video-dev-attendphotos-0 at position FetchPosition{offset=90263, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=VM_0_16_centos:9092 (id: 90 rack: null), epoch=0}} to node VM_0_16_centos:9092 (id: 90 rack: null) 17:32:17.281 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Built incremental fetch (sessionId=725149318, epoch=2) for node 90. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s) 17:32:17.281 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=t4t5t234f34f3f, groupId=yuu67u36] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(topic-video-dev-attendphotos-0), toForget=(), implied=()) to broker VM_0_16_centos:9092 (id: 90 rack: null) ----------------- 1 ```

spring boot 1.5集成 kafka 消费者怎么自己确认消费

spring boot 1.5集成 kafka 消费者怎么自己确认消费 怎么使用@KafkaListener注解实现Acknowledgment,即消费者怎么自己提交游标

kafka的数据存放,想做配置的修改

kafka的默认存放时间为7天,因为在做测试,所以想让他保留的时间长一点。修改了 log.retention.hours=1440 log.retention.bytes=1073741824 这个两个配置。但是今天在做测试的时候,发现kafka 的数据没有啦。只有四条数据啦。我就想知道kafka数据怎么就没有啦,是因为什么没有的。 望大神多多指教!

监控kafka数据源是否堆积

监控kafka数据源是否堆积,java怎么实现 Kafka版本是0.8.2.1

spring-kafka做了分区,部分分区数据可以正常消费,部分分区始终无法消费?

spring-kafka(版本:1.0.6.RELEASE,kafka-client:0.9.0.1),创建了8个分区,有一个分区的数据始终无法消费,其他分区数据正常消费。有大神知道是为什么?

大学四年自学走来,这些私藏的实用工具/学习网站我贡献出来了

大学四年,看课本是不可能一直看课本的了,对于学习,特别是自学,善于搜索网上的一些资源来辅助,还是非常有必要的,下面我就把这几年私藏的各种资源,网站贡献出来给你们。主要有:电子书搜索、实用工具、在线视频学习网站、非视频学习网站、软件下载、面试/求职必备网站。 注意:文中提到的所有资源,文末我都给你整理好了,你们只管拿去,如果觉得不错,转发、分享就是最大的支持了。 一、电子书搜索 对于大部分程序员...

在中国程序员是青春饭吗?

今年,我也32了 ,为了不给大家误导,咨询了猎头、圈内好友,以及年过35岁的几位老程序员……舍了老脸去揭人家伤疤……希望能给大家以帮助,记得帮我点赞哦。 目录: 你以为的人生 一次又一次的伤害 猎头界的真相 如何应对互联网行业的「中年危机」 一、你以为的人生 刚入行时,拿着傲人的工资,想着好好干,以为我们的人生是这样的: 等真到了那一天,你会发现,你的人生很可能是这样的: ...

程序员请照顾好自己,周末病魔差点一套带走我。

程序员在一个周末的时间,得了重病,差点当场去世,还好及时挽救回来了。

技术大佬:我去,你写的 switch 语句也太老土了吧

昨天早上通过远程的方式 review 了两名新来同事的代码,大部分代码都写得很漂亮,严谨的同时注释也很到位,这令我非常满意。但当我看到他们当中有一个人写的 switch 语句时,还是忍不住破口大骂:“我擦,小王,你丫写的 switch 语句也太老土了吧!” 来看看小王写的代码吧,看完不要骂我装逼啊。 private static String createPlayer(PlayerTypes p...

和黑客斗争的 6 天!

互联网公司工作,很难避免不和黑客们打交道,我呆过的两家互联网公司,几乎每月每天每分钟都有黑客在公司网站上扫描。有的是寻找 Sql 注入的缺口,有的是寻找线上服务器可能存在的漏洞,大部分都...

点沙成金:英特尔芯片制造全过程揭密

“亚马逊丛林里的蝴蝶扇动几下翅膀就可能引起两周后美国德州的一次飓风……” 这句人人皆知的话最初用来描述非线性系统中微小参数的变化所引起的系统极大变化。 而在更长的时间尺度内,我们所生活的这个世界就是这样一个异常复杂的非线性系统…… 水泥、穹顶、透视——关于时间与技艺的蝴蝶效应 公元前3000年,古埃及人将尼罗河中挖出的泥浆与纳特龙盐湖中的矿物盐混合,再掺入煅烧石灰石制成的石灰,由此得来了人...

讲一个程序员如何副业月赚三万的真实故事

loonggg读完需要3分钟速读仅需 1 分钟大家好,我是你们的校长。我之前讲过,这年头,只要肯动脑,肯行动,程序员凭借自己的技术,赚钱的方式还是有很多种的。仅仅靠在公司出卖自己的劳动时...

上班一个月,后悔当初着急入职的选择了

最近有个老铁,告诉我说,上班一个月,后悔当初着急入职现在公司了。他之前在美图做手机研发,今年美图那边今年也有一波组织优化调整,他是其中一个,在协商离职后,当时捉急找工作上班,因为有房贷供着,不能没有收入来源。所以匆忙选了一家公司,实际上是一个大型外包公司,主要派遣给其他手机厂商做外包项目。**当时承诺待遇还不错,所以就立马入职去上班了。但是后面入职后,发现薪酬待遇这块并不是HR所说那样,那个HR自...

女程序员,为什么比男程序员少???

昨天看到一档综艺节目,讨论了两个话题:(1)中国学生的数学成绩,平均下来看,会比国外好?为什么?(2)男生的数学成绩,平均下来看,会比女生好?为什么?同时,我又联想到了一个技术圈经常讨...

副业收入是我做程序媛的3倍,工作外的B面人生是怎样的?

提到“程序员”,多数人脑海里首先想到的大约是:为人木讷、薪水超高、工作枯燥…… 然而,当离开工作岗位,撕去层层标签,脱下“程序员”这身外套,有的人生动又有趣,马上展现出了完全不同的A/B面人生! 不论是简单的爱好,还是正经的副业,他们都干得同样出色。偶尔,还能和程序员的特质结合,产生奇妙的“化学反应”。 @Charlotte:平日素颜示人,周末美妆博主 大家都以为程序媛也个个不修边幅,但我们也许...

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

如果你是老板,你会不会踢了这样的员工?

有个好朋友ZS,是技术总监,昨天问我:“有一个老下属,跟了我很多年,做事勤勤恳恳,主动性也很好。但随着公司的发展,他的进步速度,跟不上团队的步伐了,有点...

我入职阿里后,才知道原来简历这么写

私下里,有不少读者问我:“二哥,如何才能写出一份专业的技术简历呢?我总感觉自己写的简历太烂了,所以投了无数份,都石沉大海了。”说实话,我自己好多年没有写过简历了,但我认识的一个同行,他在阿里,给我说了一些他当年写简历的方法论,我感觉太牛逼了,实在是忍不住,就分享了出来,希望能够帮助到你。 01、简历的本质 作为简历的撰写者,你必须要搞清楚一点,简历的本质是什么,它就是为了来销售你的价值主张的。往深...

我说我不会算法,阿里把我挂了。

不说了,字节跳动也反手把我挂了。

优雅的替换if-else语句

场景 日常开发,if-else语句写的不少吧??当逻辑分支非常多的时候,if-else套了一层又一层,虽然业务功能倒是实现了,但是看起来是真的很不优雅,尤其是对于我这种有强迫症的程序"猿",看到这么多if-else,脑袋瓜子就嗡嗡的,总想着解锁新姿势:干掉过多的if-else!!!本文将介绍三板斧手段: 优先判断条件,条件不满足的,逻辑及时中断返回; 采用策略模式+工厂模式; 结合注解,锦...

离职半年了,老东家又发 offer,回不回?

有小伙伴问松哥这个问题,他在上海某公司,在离职了几个月后,前公司的领导联系到他,希望他能够返聘回去,他很纠结要不要回去? 俗话说好马不吃回头草,但是这个小伙伴既然感到纠结了,我觉得至少说明了两个问题:1.曾经的公司还不错;2.现在的日子也不是很如意。否则应该就不会纠结了。 老实说,松哥之前也有过类似的经历,今天就来和小伙伴们聊聊回头草到底吃不吃。 首先一个基本观点,就是离职了也没必要和老东家弄的苦...

为什么你不想学习?只想玩?人是如何一步一步废掉的

不知道是不是只有我这样子,还是你们也有过类似的经历。 上学的时候总有很多光辉历史,学年名列前茅,或者单科目大佬,但是虽然慢慢地长大了,你开始懈怠了,开始废掉了。。。 什么?你说不知道具体的情况是怎么样的? 我来告诉你: 你常常潜意识里或者心理觉得,自己真正的生活或者奋斗还没有开始。总是幻想着自己还拥有大把时间,还有无限的可能,自己还能逆风翻盘,只不是自己还没开始罢了,自己以后肯定会变得特别厉害...

男生更看重女生的身材脸蛋,还是思想?

往往,我们看不进去大段大段的逻辑。深刻的哲理,往往短而精悍,一阵见血。问:产品经理挺漂亮的,有点心动,但不知道合不合得来。男生更看重女生的身材脸蛋,还是...

为什么程序员做外包会被瞧不起?

二哥,有个事想询问下您的意见,您觉得应届生值得去外包吗?公司虽然挺大的,中xx,但待遇感觉挺低,马上要报到,挺纠结的。

当HR压你价,说你只值7K,你该怎么回答?

当HR压你价,说你只值7K时,你可以流畅地回答,记住,是流畅,不能犹豫。 礼貌地说:“7K是吗?了解了。嗯~其实我对贵司的面试官印象很好。只不过,现在我的手头上已经有一份11K的offer。来面试,主要也是自己对贵司挺有兴趣的,所以过来看看……”(未完) 这段话主要是陪HR互诈的同时,从公司兴趣,公司职员印象上,都给予对方正面的肯定,既能提升HR的好感度,又能让谈判气氛融洽,为后面的发挥留足空间。...

面试:第十六章:Java中级开发(16k)

HashMap底层实现原理,红黑树,B+树,B树的结构原理 Spring的AOP和IOC是什么?它们常见的使用场景有哪些?Spring事务,事务的属性,传播行为,数据库隔离级别 Spring和SpringMVC,MyBatis以及SpringBoot的注解分别有哪些?SpringMVC的工作原理,SpringBoot框架的优点,MyBatis框架的优点 SpringCould组件有哪些,他们...

面试阿里p7,被按在地上摩擦,鬼知道我经历了什么?

面试阿里p7被问到的问题(当时我只知道第一个):@Conditional是做什么的?@Conditional多个条件是什么逻辑关系?条件判断在什么时候执...

你打算用Java 8一辈子都不打算升级到Java 14,真香

我们程序员应该抱着尝鲜、猎奇的心态,否则就容易固步自封,技术停滞不前。

无代码时代来临,程序员如何保住饭碗?

编程语言层出不穷,从最初的机器语言到如今2500种以上的高级语言,程序员们大呼“学到头秃”。程序员一边面临编程语言不断推陈出新,一边面临由于许多代码已存在,程序员编写新应用程序时存在重复“搬砖”的现象。 无代码/低代码编程应运而生。无代码/低代码是一种创建应用的方法,它可以让开发者使用最少的编码知识来快速开发应用程序。开发者通过图形界面中,可视化建模来组装和配置应用程序。这样一来,开发者直...

面试了一个 31 岁程序员,让我有所触动,30岁以上的程序员该何去何从?

最近面试了一个31岁8年经验的程序猿,让我有点感慨,大龄程序猿该何去何从。

大三实习生,字节跳动面经分享,已拿Offer

说实话,自己的算法,我一个不会,太难了吧

程序员垃圾简历长什么样?

已经连续五年参加大厂校招、社招的技术面试工作,简历看的不下于万份 这篇文章会用实例告诉你,什么是差的程序员简历! 疫情快要结束了,各个公司也都开始春招了,作为即将红遍大江南北的新晋UP主,那当然要为小伙伴们做点事(手动狗头)。 就在公众号里公开征简历,义务帮大家看,并一一点评。《启舰:春招在即,义务帮大家看看简历吧》 一石激起千层浪,三天收到两百多封简历。 花光了两个星期的所有空闲时...

《经典算法案例》01-08:如何使用质数设计扫雷(Minesweeper)游戏

我们都玩过Windows操作系统中的经典游戏扫雷(Minesweeper),如果把质数当作一颗雷,那么,表格中红色的数字哪些是雷(质数)?您能找出多少个呢?文中用列表的方式罗列了10000以内的自然数、质数(素数),6的倍数等,方便大家观察质数的分布规律及特性,以便对算法求解有指导意义。另外,判断质数是初学算法,理解算法重要性的一个非常好的案例。

《Oracle Java SE编程自学与面试指南》最佳学习路线图(2020最新版)

正确选择比瞎努力更重要!

一文带你入门Java Stream流,太强了

两个星期以前,就有读者强烈要求我写一篇 Java Stream 流的文章,我说市面上不是已经有很多了吗,结果你猜他怎么说:“就想看你写的啊!”你看你看,多么苍白的喜欢啊。那就“勉为其难”写一篇吧,嘻嘻。 单从“Stream”这个单词上来看,它似乎和 java.io 包下的 InputStream 和 OutputStream 有些关系。实际上呢,没毛关系。Java 8 新增的 Stream 是为...

立即提问
相关内容推荐