怎么解决的 kafka消费内存溢出问题 ?

图片说明图片说明

并发消费导致内存溢出问题

1个回答

内存溢出,如果不是程序代码内存泄漏问题,就只能
1.程序启动分配内存调大
2.修改消费者 批量消费一次最大拉取的数据量 max-poll-records

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
Spring集成kafka,消费者运行时内存占用会一直增长?

本人用Spring集成kafka消费者,发布运行时内存占用会一直升高,最后程序挂掉。请各位大神看看,提供解决方法 以下是我的配置文件 ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978342_260014.png) 程序运行两天后占用内存达到了1.4G,我用jmap导出程序占用文件,使用eclipsemat分析 ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978543_231966.png) ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978554_565464.png) 发现是这个org.springframework.kafka.listener.KafkaMessageListenerContainer这个类里面 ![图片说明](https://img-ask.csdn.net/upload/201810/31/1540978671_113331.png) 这个里面的LinkedBlockingQueue这个队列像是没释放一样。不知道是不是还需要配置什么东西,,一直找不到什么方法来解决。

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

在logstash中,使用kafka作为输入源,内存溢出

log4j, [2017-09-11T09:39:02.399] ERROR: kafka.network.BoundedByteBufferReceive: OOME with size 20971590 java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) 如何解决这个问题

kafka集群内存使用不均问题

kafka集群三台服务器(内存都是33G),按照集群模式配置启动,有两台内存只用到3G,另一台却用到了32G,并没有查看到消息积压,这是啥因呢?

kafka消费者速度与什么有关

@KafkaListener(topics = {"CRBKC0002.000"}) public void sendSmsInfoByBizType(String record) { } 假设单机版的kafka,就一个节点。 1、 @KafkaListener注解接受消费者,是不是等这个方法执行完。 这个消费者进程才算消费结束。是不是一个镜像这个方法同时只能执行一次?就是不能连续起多个线程执行这个方法。 2、如果接受到参数就算消费这进程结束,也就是获取这个record消费者进程就结束了,那假设生产者一秒生产100w数据进入kafka。那这边获取参数就算消费者进程消费结束,那是不是相当于瞬间连续起100w这个方法线程执行。可是tomcat就200线程。

kafka 消费端 处理数据比较慢,会不会出现数据积压?

如题,kafka消费端接收到数据后 要进行部分业务逻辑操作,可能会有3秒左右,处理很慢 的话,对程序有什么影响呢?新手提问, 望各位大神不吝赐教!

kafka 消费者消费不到数据

[root@hzctc-kafka-5d61 ~]# kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group sbs-haodian-message1 --topic Message --zookeeper 10.1.5.61:2181 [2018-04-18 16:43:43,467] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/sbs-haodian-message1/offsets/Message/8. 用kafka的时候 用命令查看消费组消费情况 报这个错误 其他的消费组是正常的 哪位大神知道这是什么原因导致的 我在消费操作的时候加了缓存锁 每次poll操作之后的间隔时间不确定 可能是10S或者20S或者30S 不过我的sessiontimeiut设置了90s。这个会有什么影响吗

kafka消费者处理慢的情况下如何提高消息处理速度?不允许增加分区

如题.最近的一个面试题,说是考虑kafka理论特性.具体要求我可能有理解错误.如果各位有研究一眼看出是什么问题,谢谢给个提示. 我搜索了下,提高消费性能的有: 增加分区个数(增加消费者并行数)[不允许]; 消费者使用多线程;如果消息处理是CPU密集的加多线程也没用啊; 或许我理解有问题? 换个问题? 生产者1秒生成1W消息.然而此时全部消费者1s只能消费5000,消息处理是纯CPU计算,问:在不添加分区的情况下如何消息处理速度?

kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了 package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic){ super(); this.topic=topic; } @Override public void run(){ //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka ConsumerConnector consumer = createConsumer(); //Map作用指定获取的topic以及partition Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(topic, 3); //consumer连接器获取消息 Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); //获取对应的topic中的某一个partition中的数据 KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()){ byte[] message = iterator.next().message(); System.out.println("message is:"+new String(message)); } } private ConsumerConnector createConsumer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "XXX:2181"); properties.put("auto.offset.reset", "smallest");//读取旧数据 properties.put("group.id", "333fcdcd"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("testtest").start(); } }

kafka消费reset offset问题

kafka消费端包如下错误: 2016-07-12 14:02:45,076 [ConsumerFetcherThread-order-consumer-group_iZ25mmzglleZ-1468299172455-8b8f5ebd-0-3] ERROR [ConsumerFetcherThread:97] - [ConsumerFetcherThread-order-consumer-group_iZ25mmzglleZ-1468299172455-8b8f5ebd-0-3], Current offset 458416 for partition [order-consumer-group,0] out of range; reset offset to 580644 如何让消费者从 offset 458416 消费。配置auto.offset.reset=smallest 无效。有什么比较好的办法处理?

spring boot 1.5集成 kafka 消费者怎么自己确认消费

spring boot 1.5集成 kafka 消费者怎么自己确认消费 怎么使用@KafkaListener注解实现Acknowledgment,即消费者怎么自己提交游标

kafka消费数据老是丢失

WARN TaskSetManager: Lost task 9.0 in stage 26569.0 (TID 812602, 2, 2, 104-250-138-250.static.gorillaservers.com): k): k): ): ): kafka.common.NotLeaderForPForPForPartitionException 有两个groupID消费一个topic,出现上面的警告后,有一个groupID就消费不到数据了

kafka消费者迭代器卡死

配置kafka+zk在虚拟机上,自己在本机上的生产者可以正常发送消息,但是消费者在 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 之后 iterator 的任何操作都直接卡死了,请问这个是怎么回事?在这个操作之前debug是可以看到变量内部的数值的,这个操作之后就不能看了,value全部清空了。 贴源码 ``` package com.weixinjia.recreation.queue.client; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerGroupExample extends Thread{ private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // // executor = Executors.newFixedThreadPool(a_numThreads); System.out.println(streams.size()); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); System.out.println(iterator); while(iterator.hasNext()){ MessageAndMetadata<byte[], byte[]> next = iterator.next(); byte[] message = next.message(); String string = new String(message); System.out.println(string); } } System.out.println("消息输出完成"); } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "1500"); props.put("zookeeper.sync.time.ms", "4000"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "10240000"); props.put("auto.commit.enable", "true"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "masterServce:2181"; String groupId = "group-1"; String topic = "test2"; int threads = 3; ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } } ``` 消息在kafka那边直接用kafka-console-consumer.sh是可以查询到的

有关kafka消费者的问题,描述如下

现在有个问题请教下:有谁了解kafka,现在的问题就是我要读某个group中的固定个数分区的东西,然后consume这些数据的时候我要启用多线程读取,怎么保证我的数据不会重复(个人认为是不是偏移量就可以看出是否重复),每个线程读到一堆数据后,然后解析成单个对象,再启动不同的线程入库,这种话多线程套多线程效率是不是会很低,多节点部署这个应用会平分这个执行效率,请问怎么解决这类问题

kafka 消费者 获取消息

activeqmq 都是broker push 到 消费者,消费者 建立 messageListener 监听器 就可以 获取消息,但kafka 是 需要去broker pull消息, 怎么才能知道 broker中 已经 有了对应 topic 呢 ?定时 获取?

卡夫卡kafka消费问题,消费一段时间就会超时 求助原因

``` 80 DEBUG [YyjkStorageKafkaConsumerThread] YyjkStorageKafkaConsumerThread.value:{"tableName":"sw_segment","operateType":"INSERT","operateId":"4921.43.15759673490360004","indexType":"type","storageType":"elasticsearch","date":1575967330707,"tableData":{"trace_id":"4921.43.15759673490360005","endpoint_name":"/v4/default/registry/microservices/b0b6cb4d62e32b56c3bf8cb4bd2b7aed46cdffbe/instances/72abd3701b1811eaa6ea005056b6530b/heartbeat","latency":3,"end_time":1575967349039,"endpoint_id":189076,"service_instance_id":4921,"version":2,"start_time":1575967349036,"data_binary":"Cg0KC7kmK8SNreHuqv8bEsQBEP///////////wEYrLLl9+4tIK+y5ffuLTCUxQtABFABWANgO3oSCgtodHRwLm1ldGhvZBIDUFVUeogBCgN1cmwSgAEvdjQvZGVmYXVsdC9yZWdpc3RyeS9taWNyb3NlcnZpY2VzL2IwYjZjYjRkNjJlMzJiNTZjM2JmOGNiNGJkMmI3YWVkNDZjZGZmYmUvaW5zdGFuY2VzLzcyYWJkMzcwMWIxODExZWFhNmVhMDA1MDU2YjY1MzBiL2hlYXJ0YmVhdBgNILkm","service_id":13,"time_bucket":20191210164229,"is_error":0,"segment_id":"4921.43.15759673490360004"}} 16:40:05,480 DEBUG [YyjkStorageKafkaConsumerThread] YyjkStorageKafkaConsumerThread.FormatData,tableName:sw_segment|operateId:4921.43.15759673490360004|tableMap:{trace_id=4921.43.15759673490360005, endpoint_name=/v4/default/registry/microservices/b0b6cb4d62e32b56c3bf8cb4bd2b7aed46cdffbe/instances/72abd3701b1811eaa6ea005056b6530b/heartbeat, latency=3, end_time=1575967349039, endpoint_id=189076, service_instance_id=4921, version=2, start_time=1575967349036, data_binary=Cg0KC7kmK8SNreHuqv8bEsQBEP///////////wEYrLLl9+4tIK+y5ffuLTCUxQtABFABWANgO3oSCgtodHRwLm1ldGhvZBIDUFVUeogBCgN1cmwSgAEvdjQvZGVmYXVsdC9yZWdpc3RyeS9taWNyb3NlcnZpY2VzL2IwYjZjYjRkNjJlMzJiNTZjM2JmOGNiNGJkMmI3YWVkNDZjZGZmYmUvaW5zdGFuY2VzLzcyYWJkMzcwMWIxODExZWFhNmVhMDA1MDU2YjY1MzBiL2hlYXJ0YmVhdBgNILkm, service_id=13, time_bucket=20191210164229, is_error=0, segment_id=4921.43.15759673490360004} 16:40:05,480 DEBUG [ElasticSearchClient] Executing bulk [32] with 8 requests 16:40:05,481 DEBUG [MainClientExec] [exchange: 44] start execution 16:40:05,481 DEBUG [RequestAddCookies] CookieSpec selected: default 16:40:05,481 DEBUG [RequestAuthCache] Re-using cached 'basic' auth scheme for http://10.23.11.224:9200 16:40:05,481 DEBUG [RequestAuthCache] No credentials for preemptive authentication 16:40:05,481 DEBUG [InternalHttpAsyncClient] [exchange: 44] Request connection for {}->http://10.23.11.224:9200 16:40:05,481 DEBUG [PoolingNHttpClientConnectionManager] Connection request: [route: {}->http://10.23.11.224:9200][total kept alive: 1; route allocated: 1 of 10; total allocated: 1 of 30] 16:40:05,482 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][r:r]: Set timeout 0 16:40:05,482 DEBUG [PoolingNHttpClientConnectionManager] Connection leased: [id: http-outgoing-0][route: {}->http://10.23.11.224:9200][total kept alive: 0; route allocated: 1 of 10; total allocated: 1 of 30] 16:40:05,482 DEBUG [InternalHttpAsyncClient] [exchange: 44] Connection allocated: CPoolProxy{http-outgoing-0 [ACTIVE]} 16:40:05,482 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][r:r]: Set attribute http.nio.exchange-handler 16:40:05,482 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][rw:r]: Event set [w] 16:40:05,482 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE] Request ready 16:40:05,482 DEBUG [InternalHttpAsyncClient] Connection route already established 16:40:05,482 DEBUG [MainClientExec] [exchange: 44] Attempt 1 to execute request 16:40:05,482 DEBUG [MainClientExec] Target auth state: UNCHALLENGED 16:40:05,482 DEBUG [MainClientExec] Proxy auth state: UNCHALLENGED 16:40:05,482 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][rw:w]: Set timeout 30000 16:40:05,482 DEBUG [headers] http-outgoing-0 >> POST /_bulk?timeout=1m HTTP/1.1 16:40:05,482 DEBUG [headers] http-outgoing-0 >> Content-Length: 6657 16:40:05,482 DEBUG [headers] http-outgoing-0 >> Content-Type: application/json 16:40:05,482 DEBUG [headers] http-outgoing-0 >> Host: 10.23.11.224:9200 16:40:05,482 DEBUG [headers] http-outgoing-0 >> Connection: Keep-Alive 16:40:05,482 DEBUG [headers] http-outgoing-0 >> User-Agent: Apache-HttpAsyncClient/4.1.2 (Java/1.8.0_221) 16:40:05,483 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][rw:w]: Event set [w] 16:40:05,483 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE] Output ready 16:40:05,483 DEBUG [MainClientExec] [exchange: 44] produce content 16:40:05,483 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE] [content length: 6657; pos: 4096; completed: false] 16:40:05,483 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][rw:w]: 4293 bytes written 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "POST /_bulk?timeout=1m HTTP/1.1[\r][\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "Content-Length: 6657[\r][\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "Content-Type: application/json[\r][\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "Host: 10.23.11.224:9200[\r][\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "User-Agent: Apache-HttpAsyncClient/4.1.2 (Java/1.8.0_221)[\r][\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "[\r][\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"11.88.15759673496781142"}}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"11.88.15759673496781143","endpoint_name":"/authentication","latency":71,"end_time":1575967349749,"endpoint_id":150,"service_instance_id":11,"version":2,"start_time":1575967349678,"data_binary":"CgwKCgtY1oK15O6q/xsS3gEIARivt+X37i0gurfl9+4tMIkBQAVQAVgBYCF6DgoHZGIudHlwZRIDc3FsehsKC2RiLmluc3RhbmNlEgx0eWd6cHRfZHpzd2p6kwEKDGRiLnN0YXRlbWVudBKCAXNlbGVjdCB0LmNoZWNrX3RpbWUsdC5leHRlbmRfaW5mbyx0LnVzZXJfbmFtZSx0LmxvZ2luX2NoYW5uZWwgZnJvbSBzc29fdXNlcl9zZXNzaW9uIHQgd2hlcmUgdC50aWNrZXQgPSA/IGFuZCB0LmxvZ291dF90aW1lIGlzIG51bGwSnAEIAhjGt+X37i0g2Lfl9+4tMJUBQAVQAVgBYCF6DgoHZGIudHlwZRIDc3FsehsKC2RiLmluc3RhbmNlEgx0eWd6cHRfZHpzd2p6UgoMZGIuc3RhdGVtZW50EkJ1cGRhdGUgc3NvX3VzZXJfc2Vzc2lvbiB0IHNldCB0LmV4dGVuZF9pbmZvID0gPyB3aGVyZSB0LnRpY2tldCA9ID8SWAgDGNm35ffuLSDrt+X37i0wlAFABVABWAFgIXoOCgdkYi50eXBlEgNzcWx6GwoLZGIuaW5zdGFuY2USDHR5Z3pwdF9kenN3anoOCgxkYi5zdGF0ZW1lbnQSZhD///////////8BGK635ffuLSD1t+X37i0wlgFYA2ABejAKA3VybBIpaHR0cDovL25zc28uZHpzd2pqYy50YXguY24vYXV0aGVudGljYXRpb256EgoLaHR0cC5tZXRob2QSA0dFVBgMIAs=","service_id":12,"time_bucket":20191210164229,"is_error":0,"segment_id":"11.88.15759673496781142"}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"4921.36.15759673457660020"}}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"4921.36.15759673457660021","endpoint_name":"Mysql/JDBI/PreparedStatement/executeQuery","latency":377,"end_time":1575967346143,"endpoint_id":162,"service_instance_id":4921,"version":2,"start_time":1575967345766,"data_binary":"Cg0KC7kmJPSg4dHuqv8bErYBEP///////////wEY5pjl9+4tIN+b5ffuLTCiAUADUAFYAWAheg4KB2RiLnR5cGUSA3NxbHohCgtkYi5pbnN0YW5jZRISdHlnenB0X2R6c3dqX3d3X2tmel0KDGRiLnN0YXRlbWVudBJNc2VsZWN0ICogZnJvbSBxel9kbWIgd2hlcmUgeHlieiA9ICdZJyBBTkQgeXhieiA9ICdZJyBhbmQgbG93ZXIoY29kZSk9bG93ZXIoPykYDSC5Jg==","service_id":13,"time_bucket":20191210164225,"is_error":0,"segment_id":"4921.36.15759673457660020"}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"4921.36.15759673461450022"}}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"4921.36.15759673461450023","endpoint_name":"Mysql/JDBI/PreparedStatement/executeQuery","latency":69,"end_time":1575967346214,"endpoint_id":162,"service_instance_id":4921,"version":2,"start_time":1575967346145,"data_binary":"Cg0KC7kmJKbKyNPuqv8bErYBEP///////////wEY4Zvl9+4tIKac5ffuLTCiAUADUAFYAWAheg4KB2RiLnR5cGUSA3NxbHohCgtkYi5pbnN0YW5jZRISdHlnenB0X2R6c3dqX3d3X2tmel0KDGRiLnN0YXRlbWVudBJNc2VsZWN0ICogZnJvbSBxel9kbWIgd2hlcmUgeHlieiA9ICdZJyBBTkQgeXhieiA9ICdZJyBhbmQgbG93ZXIoY29kZSk9bG93ZXIoPykYDSC5Jg==","service_id":13,"time_bucket":20191210164226,"is_error":0,"segment_id":"4921.36.15759673461450022"}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"11.37.15759673529984298"}}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"11.37.15759673529984299","endpoint_name":"Mysql/JDBI/PreparedStatement/executeQuery","latency":14,"end_time":1575967353012,"endpoint_id":137,"service_instance_id":11,"version":2,"start_time":1575967352998,"data_binary":"CgwKCgslqsqf9O6q/xsSsAEQ////////////ARim0eX37i0gtNHl9+4tMIkBQAVQAVgBYCF6DgoHZGIudHlwZRIDc3FsehsKC2RiLmluc3RhbmNlEgx0eWd6cHRfZHpzd2p6XQoMZGIuc3RhdGVtZW50Ek1zZWxlY3QgKiBmcm9tIHF6X2RtYiB3aGVyZSB4eWJ6ID0gJ1knIEFORCB5eGJ6ID0gJ1knIGFuZCBsb3dlcihjb2RlKT1sb3dlcig/KRgMIAs=","service_id":12,"time_bucket":20191210164232,"is_error":0,"segment_id":"11.37.15759673529984298"}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"11.37.15759673530124300"}}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"11.37.15759673530124301","endpoint_name":"Mysql/JDBI/PreparedStatement/executeQuery","latency":12,"end_time":1575967353024,"endpoint_id":137,"service_instance_id":11,"version":2,"start_time":1575967353012,"data_binary":"CgwKCgsljJCo9O6q/xsSsAEQ////////////ARi00eX37i0gwNHl9+4tMIkBQAVQAVgBYCF6DgoHZGIudHlwZRIDc3FsehsKC2RiLmluc3RhbmNlEgx0eWd6cHRfZHpzd2p6XQoMZGIuc3RhdGVtZW50Ek1zZWxlY3QgKiBmcm9tIHF6X2RtYiB3aGVyZSB4eWJ6ID0gJ1knIEFORCB5eGJ6ID0gJ1knIGFuZCBsb3dlcihjb2RlKT1sb3dlcig/KRgMIAs=","service_id":12,"time_bucket":20191210164233,"is_error":0,"segment_id":"11.37.15759673530124300"}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"11.47.15759673539631576"}}[\n]" 16:40:05,483 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"11.47.15759673539631577","endpoint_name":"/v4/default/registry/mi" 16:40:05,483 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE] Output ready 16:40:05,483 DEBUG [MainClientExec] [exchange: 44] produce content 16:40:05,483 DEBUG [MainClientExec] [exchange: 44] Request completed 16:40:05,484 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE] [content length: 6657; pos: 6657; completed: true] 16:40:05,484 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][rw:w]: 2561 bytes written 16:40:05,484 DEBUG [wire] http-outgoing-0 >> "croservices/62af8840312c4750370c3ea64fd68203bf02d518/instances/1563e3d11a5f11eabd58005056b67cc4/heartbeat","latency":2,"end_time":1575967353965,"endpoint_id":146,"service_instance_id":11,"version":2,"start_time":1575967353963,"data_binary":"CgwKCgsv2LPs+O6q/xsSwwEQ////////////ARjr2OX37i0g7djl9+4tMJIBQAdQAVgDYDt6EgoLaHR0cC5tZXRob2QSA1BVVHqIAQoDdXJsEoABL3Y0L2RlZmF1bHQvcmVnaXN0cnkvbWljcm9zZXJ2aWNlcy82MmFmODg0MDMxMmM0NzUwMzcwYzNlYTY0ZmQ2ODIwM2JmMDJkNTE4L2luc3RhbmNlcy8xNTYzZTNkMTFhNWYxMWVhYmQ1ODAwNTA1NmI2N2NjNC9oZWFydGJlYXQYDCAL","service_id":12,"time_bucket":20191210164233,"is_error":0,"segment_id":"11.47.15759673539631576"}[\n]" 16:40:05,484 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"11.47.15759673539651578"}}[\n]" 16:40:05,484 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"11.47.15759673539631577","endpoint_name":"#/v4/default/registry/microservices/62af8840312c4750370c3ea64fd68203bf02d518/instances/1563e3d11a5f11eabd58005056b67cc4/heartbeat","latency":0,"end_time":1575967353965,"endpoint_id":147,"service_instance_id":11,"version":2,"start_time":1575967353965,"data_binary":"CgwKCgsv+s/t+O6q/xsSwAMQ////////////ARjt2OX37i0g7djl9+4tKpoCCAESDAoKCy/Ys+z47qr/GyALOAtCgAEvdjQvZGVmYXVsdC9yZWdpc3RyeS9taWNyb3NlcnZpY2VzLzYyYWY4ODQwMzEyYzQ3NTAzNzBjM2VhNjRmZDY4MjAzYmYwMmQ1MTgvaW5zdGFuY2VzLzE1NjNlM2QxMWE1ZjExZWFiZDU4MDA1MDU2YjY3Y2M0L2hlYXJ0YmVhdFKAAS92NC9kZWZhdWx0L3JlZ2lzdHJ5L21pY3Jvc2VydmljZXMvNjJhZjg4NDAzMTJjNDc1MDM3MGMzZWE2NGZkNjgyMDNiZjAyZDUxOC9pbnN0YW5jZXMvMTU2M2UzZDExYTVmMTFlYWJkNTgwMDUwNTZiNjdjYzQvaGVhcnRiZWF0OoEBIy92NC9kZWZhdWx0L3JlZ2lzdHJ5L21pY3Jvc2VydmljZXMvNjJhZjg4NDAzMTJjNDc1MDM3MGMzZWE2NGZkNjgyMDNiZjAyZDUxOC9pbnN0YW5jZXMvMTU2M2UzZDExYTVmMTFlYWJkNTgwMDUwNTZiNjdjYzQvaGVhcnRiZWF0UAJYA2A7GAwgCw==","service_id":12,"time_bucket":20191210164233,"is_error":0,"segment_id":"11.47.15759673539651578"}[\n]" 16:40:05,484 DEBUG [wire] http-outgoing-0 >> "{"index":{"_index":"sw_segment","_type":"type","_id":"4921.43.15759673490360004"}}[\n]" 16:40:05,484 DEBUG [wire] http-outgoing-0 >> "{"trace_id":"4921.43.15759673490360005","endpoint_name":"/v4/default/registry/microservices/b0b6cb4d62e32b56c3bf8cb4bd2b7aed46cdffbe/instances/72abd3701b1811eaa6ea005056b6530b/heartbeat","latency":3,"end_time":1575967349039,"endpoint_id":189076,"service_instance_id":4921,"version":2,"start_time":1575967349036,"data_binary":"Cg0KC7kmK8SNreHuqv8bEsQBEP///////////wEYrLLl9+4tIK+y5ffuLTCUxQtABFABWANgO3oSCgtodHRwLm1ldGhvZBIDUFVUeogBCgN1cmwSgAEvdjQvZGVmYXVsdC9yZWdpc3RyeS9taWNyb3NlcnZpY2VzL2IwYjZjYjRkNjJlMzJiNTZjM2JmOGNiNGJkMmI3YWVkNDZjZGZmYmUvaW5zdGFuY2VzLzcyYWJkMzcwMWIxODExZWFhNmVhMDA1MDU2YjY1MzBiL2hlYXJ0YmVhdBgNILkm","service_id":13,"time_bucket":20191210164229,"is_error":0,"segment_id":"4921.43.15759673490360004"}[\n]" 16:40:05,484 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE] Request ready 16:40:05,484 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][r:w]: Event cleared [w] 16:40:06,073 DEBUG [FetchSessionHandler] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Node 0 sent an incremental fetch response for session 520315326 with 0 response partition(s), 1 implied partition(s) 16:40:07,365 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][r:r]: 1460 bytes read 16:40:07,365 DEBUG [wire] http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]" 16:40:07,365 DEBUG [wire] http-outgoing-0 << "content-type: application/json; charset=UTF-8[\r][\n]" 16:40:07,365 DEBUG [wire] http-outgoing-0 << "content-length: 3697[\r][\n]" 16:40:07,365 DEBUG [wire] http-outgoing-0 << "[\r][\n]" 16:40:07,365 DEBUG [wire] http-outgoing-0 << "{"took":1872,"errors":true,"items":[{"index":{"_index":"sw_segment","_type":"type","_id":"11.88.15759673496781142","status":429,"error":{"type":"es_rejected_execution_exception","reason":"rejected execution of processing of [32196501][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[sw_segment][1]] containing [5] requests, target allocation id: U61PmKwGRPe_wjkdosiWUg, primary term: 1 on EsThreadPoolExecutor[name = JKPT-ES-NODE-001/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@37fc16a1[Running, pool size = 8, active threads = 8, queued tasks = 200, completed tasks = 4765825]]"}}},{"index":{"_index":"sw_segment","_type":"type","_id":"4921.36.15759673457660020","status":429,"error":{"type":"es_rejected_execution_exception","reason":"rejected execution of processing of [32196501][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[sw_segment][1]] containing [5] requests, target allocation id: U61PmKwGRPe_wjkdosiWUg, primary term: 1 on EsThreadPoolExecutor[name = JKPT-ES-NODE-001/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@37fc16a1[Running, pool size = 8, active threads = 8, queued tasks = 200, completed tasks = 4765825]]"}}},{"index":{"_index":"sw_segment","_type":"type","_id":"4921.36.15759673461450022","_version":1,"result":"created","_shards"" 16:40:07,365 DEBUG [headers] http-outgoing-0 << HTTP/1.1 200 OK 16:40:07,365 DEBUG [headers] http-outgoing-0 << content-type: application/json; charset=UTF-8 16:40:07,365 DEBUG [headers] http-outgoing-0 << content-length: 3697 16:40:07,365 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE(1372)] Response received 16:40:07,365 DEBUG [MainClientExec] [exchange: 44] Response received HTTP/1.1 200 OK 16:40:07,365 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE(1372)] Input ready 16:40:07,365 DEBUG [MainClientExec] [exchange: 44] Consume content 16:40:07,365 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][r:r]: 2325 bytes read 16:40:07,365 DEBUG [wire] http-outgoing-0 << ":{"total":1,"successful":1,"failed":0},"_seq_no":24332,"_primary_term":1,"status":201}},{"index":{"_index":"sw_segment","_type":"type","_id":"11.37.15759673529984298","status":429,"error":{"type":"es_rejected_execution_exception","reason":"rejected execution of processing of [32196501][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[sw_segment][1]] containing [5] requests, target allocation id: U61PmKwGRPe_wjkdosiWUg, primary term: 1 on EsThreadPoolExecutor[name = JKPT-ES-NODE-001/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@37fc16a1[Running, pool size = 8, active threads = 8, queued tasks = 200, completed tasks = 4765825]]"}}},{"index":{"_index":"sw_segment","_type":"type","_id":"11.37.15759673530124300","_version":1,"result":"created","_shards":{"total":1,"successful":1,"failed":0},"_seq_no":24333,"_primary_term":1,"status":201}},{"index":{"_index":"sw_segment","_type":"type","_id":"11.47.15759673539631576","status":429,"error":{"type":"es_rejected_execution_exception","reason":"rejected execution of processing of [32196501][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[sw_segment][1]] containing [5] requests, target allocation id: U61PmKwGRPe_wjkdosiWUg, primary term: 1 on EsThreadPoolExecutor[name = JKPT-ES-NODE-001/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@37fc16a1[Running, pool size = 8, active threads = 8, queued tasks = 200, completed tasks = 4765825]]"}}},{"index":{"_index":"sw_segment","_type":"type","_id":"11.47.15759673539651578","status":429,"error":{"type":"es_rejected_execution_exception","reason":"rejected execution of processing of [32196501][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[sw_segment][1]] containing [5] requests, target allocation id: U61PmKwGRPe_wjkdosiWUg, primary term: 1 on EsThreadPoolExecutor[name = JKPT-ES-NODE-001/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@37fc16a1[Running, pool size = 8, active threads = 8, queued tasks = 200, completed tasks = 4765825]]"}}},{"index":{"_index":"sw_segment","_type":"type","_id":"4921.43.15759673490360004","_version":1,"result":"created","_shards":{"total":1,"successful":1,"failed":0},"_seq_no":24334,"_primary_term":1,"status":201}}]}" 16:40:07,365 DEBUG [InternalHttpAsyncClient] [exchange: 44] Connection can be kept alive indefinitely 16:40:07,365 DEBUG [MainClientExec] [exchange: 44] Response processed 16:40:07,365 DEBUG [InternalHttpAsyncClient] [exchange: 44] releasing connection 16:40:07,365 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][r:r]: Remove attribute http.nio.exchange-handler 16:40:07,365 DEBUG [PoolingNHttpClientConnectionManager] Releasing connection: [id: http-outgoing-0][route: {}->http://10.23.11.224:9200][total kept alive: 0; route allocated: 1 of 10; total allocated: 1 of 30] 16:40:07,365 DEBUG [PoolingNHttpClientConnectionManager] Connection [id: http-outgoing-0][route: {}->http://10.23.11.224:9200] can be kept alive indefinitely 16:40:07,365 DEBUG [ManagedNHttpClientConnectionImpl] http-outgoing-0 10.23.6.33:6663<->10.23.11.224:9200[ACTIVE][r:r]: Set timeout 0 16:40:07,365 DEBUG [PoolingNHttpClientConnectionManager] Connection released: [id: http-outgoing-0][route: {}->http://10.23.11.224:9200][total kept alive: 1; route allocated: 1 of 10; total allocated: 1 of 30] 16:40:07,367 DEBUG [RestClient] request [POST http://10.23.11.224:9200/_bulk?timeout=1m] returned [HTTP/1.1 200 OK] 16:40:07,367 DEBUG [InternalIODispatch] http-outgoing-0 [ACTIVE] [content length: 3697; pos: 3697; completed: true] 16:40:08,187 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 16:40:08,389 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response 16:40:11,203 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 16:40:11,404 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response 16:40:14,221 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 16:41:20,774 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response 16:41:23,589 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 16:41:23,790 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response actCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response 16:41:38,665 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 16:41:38,867 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response 17:13:13,402 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 17:13:13,605 DEBUG [NetworkClient] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Node -1 disconnected. 17:13:13,606 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response 17:13:13,707 DEBUG [NetworkClient] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending metadata request (type=MetadataRequest, topics=compute_traceStorage, allowAutoCreate=true) to node 10.23.11.235:9092 (id: 0 rack: null) 17:13:13,907 DEBUG [Metadata] Updating last seen epoch from 0 to 0 for partition compute_traceStorage-0 17:13:13,907 DEBUG [Metadata] Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = yQ_sRlMlSui8hlVtaPl4wg, nodes = [10.23.11.235:9092 (id: 0 rack: null)], partitions = [Partition(topic = compute_traceStorage, partition = 0, leader = 0, replicas = [0], isr = [0], offlineReplicas = [])], controller = 10.23.11.235:9092 (id: 0 rack: null))} 17:13:16,420 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Sending Heartbeat request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 17:13:16, 17:15:02,170 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Received successful Heartbeat response 17:15:04,683 WARN [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 17:15:04,683 INFO [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Member consumer-1-f8b4d0da-f83c-4849-8cfd-74e748aad3c7 sending LeaveGroup request to coordinator 10.23.11.235:9092 (id: 2147483647 rack: null) 17:15:04,683 DEBUG [AbstractCoordinator] [Consumer clientId=consumer-1, groupId=jkpt-transfer-group] Disabling heartbeat thread ```

Kafka消费者组丢失未提交的消息

<div class="post-text" itemprop="text"> <p>I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited. </p> <pre><code>kafka-console-producer --broker-list localhost:9092 --topic test &gt;this will be commited </code></pre> <p>Now running </p> <pre><code>kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe </code></pre> <p>returns</p> <pre><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 13 13 0 </code></pre> <p>so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:</p> <pre><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 13 14 1 </code></pre> <p>Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:</p> <p><code>TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test 0 15 15</code> </p> <p>and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.</p> <p>fetch/commit code:</p> <pre><code>go func() { for { ctx := context.Background() m, err := mr.brokerReader.FetchMessage(ctx) if err != nil { break } if err := msgFunc(m); err != nil { log.Errorf("# messaging # cannot commit a message: %v", err) continue } // commit message if no error if err := mr.brokerReader.CommitMessages(ctx, m); err != nil { // should we do something else to just logging not committed message? log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err) } } }() </code></pre> <p>reader configuration:</p> <pre><code>kafkaReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID, Topic: topic, CommitInterval: 0, MinBytes: 10e3, MaxBytes: 10e6, }) </code></pre> <p>library used: <a href="https://github.com/segmentio/kafka-go" rel="nofollow noreferrer">https://github.com/segmentio/kafka-go</a></p> </div>

启动kafka 消费者有时获取不到消息

kafka消费者启动的时候有时候不能获取到消息,但是重启后就可以了,有时候还要重启好多次。。。不知道是为什么,希望大神能指导一下。 [ INFO ] [2016-09-29 14:34:53] org.hibernate.validator.internal.util.Version [30] - HV000001: Hibernate Validator 5.2.4.Final [ INFO ] [2016-09-29 14:34:53] com.coocaa.salad.stat.ApplicationMain [48] - Starting ApplicationMain on zhuxiang with PID 1740 (D:\IdeaProjects\green-salad\adx-stat\target\classes started by zhuxiang in D:\IdeaProjects\green-salad) [ INFO ] [2016-09-29 14:34:53] com.coocaa.salad.stat.ApplicationMain [663] - The following profiles are active: dev [ INFO ] [2016-09-29 14:34:54] org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext [581] - Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5754de72: startup date [Thu Sep 29 14:34:54 CST 2016]; root of context hierarchy 2016-09-29 14:34:55 JRebel: Monitoring Spring bean definitions in 'D:\IdeaProjects\green-salad\adx-stat\target\classes\spring-integration-consumer.xml'. [ INFO ] [2016-09-29 14:34:55] org.springframework.beans.factory.xml.XmlBeanDefinitionReader [317] - Loading XML bean definitions from URL [file:/D:/IdeaProjects/green-salad/adx-stat/target/classes/spring-integration-consumer.xml] [ INFO ] [2016-09-29 14:34:56] org.springframework.beans.factory.config.PropertiesFactoryBean [172] - Loading properties file from URL [jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties] 2016-09-29 14:34:56 JRebel: Monitoring properties in 'jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties'. [ INFO ] [2016-09-29 14:34:56] org.springframework.integration.config.IntegrationRegistrar [330] - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. [ INFO ] [2016-09-29 14:34:56] org.springframework.beans.factory.support.DefaultListableBeanFactory [843] - Overriding bean definition for bean 'kafkaConsumerService' with a different definition: replacing [Generic bean: class [com.coocaa.salad.stat.service.KafkaConsumerService]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in file [D:\IdeaProjects\green-salad\adx-stat\target\classes\com\coocaa\salad\stat\service\KafkaConsumerService.class]] with [Generic bean: class [com.coocaa.salad.stat.service.KafkaConsumerService]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in URL [file:/D:/IdeaProjects/green-salad/adx-stat/target/classes/spring-integration-consumer.xml]] [ INFO ] [2016-09-29 14:34:57] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor [130] - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. [ INFO ] [2016-09-29 14:34:57] org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor [158] - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. [ INFO ] [2016-09-29 14:34:57] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration' of type [class org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration$$EnhancerBySpringCGLIB$$3dea2e76] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) [ INFO ] [2016-09-29 14:34:58] org.springframework.beans.factory.config.PropertiesFactoryBean [172] - Loading properties file from URL [jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties] 2016-09-29 14:34:58 JRebel: Monitoring properties in 'jar:file:/D:/maven-repo2/org/springframework/integration/spring-integration-core/4.3.1.RELEASE/spring-integration-core-4.3.1.RELEASE.jar!/META-INF/spring.integration.default.properties'. [ INFO ] [2016-09-29 14:34:58] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) [ INFO ] [2016-09-29 14:34:58] org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker [328] - Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) [ INFO ] [2016-09-29 14:34:59] org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer [88] - Tomcat initialized with port(s): 8081 (http) spring-integration-consumer.xml内容如下: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka-1.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- topic test conf --> <int:channel id="inputFromKafka"> <int:dispatcher task-executor="kafkaMessageExecutor"/> </int:channel> <!-- zookeeper配置 可以配置多个 --> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="172.20.135.95:2181,172.20.135.95:2182" zk-connection-timeout="10000" zk-session-timeout="10000" zk-sync-time="2000"/> <!-- channel配置 auto-startup="true" 否则接收不发数据 --> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="inputFromKafka"> <int:poller fixed-delay="1" time-unit="MILLISECONDS"/> </int-kafka:inbound-channel-adapter> <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500"/> <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop> <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M --> <prop key="fetch.message.max.bytes">5242880</prop> <prop key="auto.commit.interval.ms">1000</prop> <prop key="auto.commit.enables">true</prop> </props> </property> </bean> <!-- 消息接收的BEEN --> <bean id="kafkaConsumerService" class="com.coocaa.salad.stat.service.KafkaConsumerService"/> <!-- 指定接收的方法 --> <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage"/> <int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="group-4" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000"> <!-- 两个TOPIC配置 --> <int-kafka:topic id="clientsRequests2" streams="4"/> <!--<int-kafka:topic id="sunneytopic" streams="4" />--> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> </beans> kafka版本0.10的

logstash+kafka的日志服务问题。求解,急急急

之前搭建过一个ElasticSearch+logstash+kafka的日志处理系统, 刚开始运行是没有问题的、后面准备在生产环境使用时,发现logstash处理消息有问题 要么就是在kafka收集到消息很久之后才消费日志消息,要么就是启动时消费消息,之后就不再读取kafka的日志消息了,不知道是怎么回事。最开始我认为是logstash的正则 解析需要大量时间匹配过滤,造成消息堵塞,最后我直接把日志存为json直接不用正则匹配,还是有同样的问题。以下是我的logstast配置 input { kafka { zk_connect => "192.168.0.153:12200" topic_id => "cias-uop" group_id => "logstash" rebalance_backoff_ms => 5000 rebalance_max_retries => 10 queue_size => 80000 consumer_threads => 4 fetch_message_max_bytes => 104857600 } } filter { multiline{ pattern => "^\s" what => "previous" } grok { match => { } } } output { elasticsearch { hosts => ["192.168.0.153:9200"] index => "cias-uop-kafka" document_type => "cias-uop-%{+YYYY.MM.dd}" } }

在中国程序员是青春饭吗?

今年,我也32了 ,为了不给大家误导,咨询了猎头、圈内好友,以及年过35岁的几位老程序员……舍了老脸去揭人家伤疤……希望能给大家以帮助,记得帮我点赞哦。 目录: 你以为的人生 一次又一次的伤害 猎头界的真相 如何应对互联网行业的「中年危机」 一、你以为的人生 刚入行时,拿着傲人的工资,想着好好干,以为我们的人生是这样的: 等真到了那一天,你会发现,你的人生很可能是这样的: ...

Java校招入职华为,半年后我跑路了

何来 我,一个双非本科弟弟,有幸在 19 届的秋招中得到前东家华为(以下简称 hw)的赏识,当时秋招签订就业协议,说是入了某 java bg,之后一系列组织架构调整原因等等让人无法理解的神操作,最终毕业前夕,被通知调往其他 bg 做嵌入式开发(纯 C 语言)。 由于已至于校招末尾,之前拿到的其他 offer 又无法再收回,一时感到无力回天,只得默默接受。 毕业后,直接入职开始了嵌入式苦旅,由于从未...

Java基础知识面试题(2020最新版)

文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的对比基础语法数据类型Java有哪些数据类型switc...

@程序员:GitHub这个项目快薅羊毛

今天下午在朋友圈看到很多人都在发github的羊毛,一时没明白是怎么回事。 后来上百度搜索了一下,原来真有这回事,毕竟资源主义的羊毛不少啊,1000刀刷爆了朋友圈!不知道你们的朋友圈有没有看到类似的消息。 这到底是啥情况? 微软开发者平台GitHub 的一个区块链项目 Handshake ,搞了一个招募新会员的活动,面向GitHub 上前 25万名开发者派送 4,246.99 HNS币,大约价...

再不跳槽,应届毕业生拿的都比我多了!

跳槽几乎是每个人职业生涯的一部分,很多HR说“三年两跳”已经是一个跳槽频繁与否的阈值了,可为什么市面上有很多程序员不到一年就跳槽呢?他们不担心影响履历吗? PayScale之前发布的**《员工最短任期公司排行榜》中,两家码农大厂Amazon和Google**,以1年和1.1年的员工任期中位数分列第二、第四名。 PayScale:员工最短任期公司排行榜 意外的是,任期中位数极小的这两家公司,薪资...

我以为我学懂了数据结构,直到看了这个导图才发现,我错了

数据结构与算法思维导图

技术大佬:我去,你写的 switch 语句也太老土了吧

昨天早上通过远程的方式 review 了两名新来同事的代码,大部分代码都写得很漂亮,严谨的同时注释也很到位,这令我非常满意。但当我看到他们当中有一个人写的 switch 语句时,还是忍不住破口大骂:“我擦,小王,你丫写的 switch 语句也太老土了吧!” 来看看小王写的代码吧,看完不要骂我装逼啊。 private static String createPlayer(PlayerTypes p...

华为初面+综合面试(Java技术面)附上面试题

华为面试整体流程大致分为笔试,性格测试,面试,综合面试,回学校等结果。笔试来说,华为的难度较中等,选择题难度和网易腾讯差不多。最后的代码题,相比下来就简单很多,一共3道题目,前2题很容易就AC,题目已经记不太清楚,不过难度确实不大。最后一题最后提交的代码过了75%的样例,一直没有发现剩下的25%可能存在什么坑。 笔试部分太久远,我就不怎么回忆了。直接将面试。 面试 如果说腾讯的面试是挥金如土...

和黑客斗争的 6 天!

互联网公司工作,很难避免不和黑客们打交道,我呆过的两家互联网公司,几乎每月每天每分钟都有黑客在公司网站上扫描。有的是寻找 Sql 注入的缺口,有的是寻找线上服务器可能存在的漏洞,大部分都...

讲一个程序员如何副业月赚三万的真实故事

loonggg读完需要3分钟速读仅需 1 分钟大家好,我是你们的校长。我之前讲过,这年头,只要肯动脑,肯行动,程序员凭借自己的技术,赚钱的方式还是有很多种的。仅仅靠在公司出卖自己的劳动时...

win10暴力查看wifi密码

刚才邻居打了个电话说:喂小灰,你家wifi的密码是多少,我怎么连不上了。 我。。。 我也忘了哎,就找到了一个好办法,分享给大家: 第一种情况:已经连接上的wifi,怎么知道密码? 打开:控制面板\网络和 Internet\网络连接 然后右击wifi连接的无线网卡,选择状态 然后像下图一样: 第二种情况:前提是我不知道啊,但是我以前知道密码。 此时可以利用dos命令了 1、利用netsh wlan...

上班一个月,后悔当初着急入职的选择了

最近有个老铁,告诉我说,上班一个月,后悔当初着急入职现在公司了。他之前在美图做手机研发,今年美图那边今年也有一波组织优化调整,他是其中一个,在协商离职后,当时捉急找工作上班,因为有房贷供着,不能没有收入来源。所以匆忙选了一家公司,实际上是一个大型外包公司,主要派遣给其他手机厂商做外包项目。**当时承诺待遇还不错,所以就立马入职去上班了。但是后面入职后,发现薪酬待遇这块并不是HR所说那样,那个HR自...

女程序员,为什么比男程序员少???

昨天看到一档综艺节目,讨论了两个话题:(1)中国学生的数学成绩,平均下来看,会比国外好?为什么?(2)男生的数学成绩,平均下来看,会比女生好?为什么?同时,我又联想到了一个技术圈经常讨...

总结了 150 余个神奇网站,你不来瞅瞅吗?

原博客再更新,可能就没了,之后将持续更新本篇博客。

副业收入是我做程序媛的3倍,工作外的B面人生是怎样的?

提到“程序员”,多数人脑海里首先想到的大约是:为人木讷、薪水超高、工作枯燥…… 然而,当离开工作岗位,撕去层层标签,脱下“程序员”这身外套,有的人生动又有趣,马上展现出了完全不同的A/B面人生! 不论是简单的爱好,还是正经的副业,他们都干得同样出色。偶尔,还能和程序员的特质结合,产生奇妙的“化学反应”。 @Charlotte:平日素颜示人,周末美妆博主 大家都以为程序媛也个个不修边幅,但我们也许...

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

如果你是老板,你会不会踢了这样的员工?

有个好朋友ZS,是技术总监,昨天问我:“有一个老下属,跟了我很多年,做事勤勤恳恳,主动性也很好。但随着公司的发展,他的进步速度,跟不上团队的步伐了,有点...

我入职阿里后,才知道原来简历这么写

私下里,有不少读者问我:“二哥,如何才能写出一份专业的技术简历呢?我总感觉自己写的简历太烂了,所以投了无数份,都石沉大海了。”说实话,我自己好多年没有写过简历了,但我认识的一个同行,他在阿里,给我说了一些他当年写简历的方法论,我感觉太牛逼了,实在是忍不住,就分享了出来,希望能够帮助到你。 01、简历的本质 作为简历的撰写者,你必须要搞清楚一点,简历的本质是什么,它就是为了来销售你的价值主张的。往深...

程序员写出这样的代码,能不挨骂吗?

当你换槽填坑时,面对一个新的环境。能够快速熟练,上手实现业务需求是关键。但是,哪些因素会影响你快速上手呢?是原有代码写的不够好?还是注释写的不够好?昨夜...

!大部分程序员只会写3年代码

如果世界上都是这种不思进取的软件公司,那别说大部分程序员只会写 3 年代码,恐怕就没有程序员这种职业。

离职半年了,老东家又发 offer,回不回?

有小伙伴问松哥这个问题,他在上海某公司,在离职了几个月后,前公司的领导联系到他,希望他能够返聘回去,他很纠结要不要回去? 俗话说好马不吃回头草,但是这个小伙伴既然感到纠结了,我觉得至少说明了两个问题:1.曾经的公司还不错;2.现在的日子也不是很如意。否则应该就不会纠结了。 老实说,松哥之前也有过类似的经历,今天就来和小伙伴们聊聊回头草到底吃不吃。 首先一个基本观点,就是离职了也没必要和老东家弄的苦...

HTTP与HTTPS的区别

面试官问HTTP与HTTPS的区别,我这样回答让他竖起大拇指!

男生更看重女生的身材脸蛋,还是思想?

往往,我们看不进去大段大段的逻辑。深刻的哲理,往往短而精悍,一阵见血。问:产品经理挺漂亮的,有点心动,但不知道合不合得来。男生更看重女生的身材脸蛋,还是...

程序员为什么千万不要瞎努力?

本文作者用对比非常鲜明的两个开发团队的故事,讲解了敏捷开发之道 —— 如果你的团队缺乏统一标准的环境,那么即使勤劳努力,不仅会极其耗时而且成果甚微,使用...

为什么程序员做外包会被瞧不起?

二哥,有个事想询问下您的意见,您觉得应届生值得去外包吗?公司虽然挺大的,中xx,但待遇感觉挺低,马上要报到,挺纠结的。

当HR压你价,说你只值7K,你该怎么回答?

当HR压你价,说你只值7K时,你可以流畅地回答,记住,是流畅,不能犹豫。 礼貌地说:“7K是吗?了解了。嗯~其实我对贵司的面试官印象很好。只不过,现在我的手头上已经有一份11K的offer。来面试,主要也是自己对贵司挺有兴趣的,所以过来看看……”(未完) 这段话主要是陪HR互诈的同时,从公司兴趣,公司职员印象上,都给予对方正面的肯定,既能提升HR的好感度,又能让谈判气氛融洽,为后面的发挥留足空间。...

面试:第十六章:Java中级开发(16k)

HashMap底层实现原理,红黑树,B+树,B树的结构原理 Spring的AOP和IOC是什么?它们常见的使用场景有哪些?Spring事务,事务的属性,传播行为,数据库隔离级别 Spring和SpringMVC,MyBatis以及SpringBoot的注解分别有哪些?SpringMVC的工作原理,SpringBoot框架的优点,MyBatis框架的优点 SpringCould组件有哪些,他们...

面试阿里p7,被按在地上摩擦,鬼知道我经历了什么?

面试阿里p7被问到的问题(当时我只知道第一个):@Conditional是做什么的?@Conditional多个条件是什么逻辑关系?条件判断在什么时候执...

终于懂了TCP和UDP协议区别

终于懂了TCP和UDP协议区别

Python爬虫,高清美图我全都要(彼岸桌面壁纸)

爬取彼岸桌面网站较为简单,用到了requests、lxml、Beautiful Soup4

立即提问
相关内容推荐