【kafka】kafka消息生产者抛异常

服务器正常运行,kafka本来没有问题,从某一时刻开始,就一直抛异常了,重启服务器后恢复正常。
请问是什么原因?
绝大部分消息抛异常了,还有很小一部分没有抛异常,但也没收到,这些丢失但没有异常的又是什么原因?

已知情况:kafka消息生产者 没有指定partition

kafka消息生产者异常

2个回答

1, 先确认你的网络有没有问题
2,在向服务器发起连接后,在kafka的服务器配置中有zookeeper.connect=xx.xx.xx.xx:2181的配置 这时候kafka会查找zookeeper
那么如果我们的hosts 中没有做hosts的配置 kafka经多次尝试连接不上就会报上面的错误,所有要做host映射,配置hosts文件 做zookeeper服务器的映射配置。
3,出现此种错误 还有一种情况

Hostname the broker will advertise to producers and consumers. If not set, it uses the

value for "host.name" if configured. Otherwise, it will use the value returned from

java.net.InetAddress.getCanonicalHostName().

advertised.host.name=192.168.1.118

远程连接的话 是根据这个配置来找broker的,默认是localhost ,所以如果不是本机运行的话 应该设置此值 来确保通信畅通。

fuweihua123
fuweihua123 回复qq_38834316: 建议你启动kafka用守护模式启动 -daemon **.properties
大约 2 年之前 回复
qq_38834316
qq_38834316 感觉你说的好像很靠谱啊, 我的情况是 服务器的kafka生产者正常运行了几个月了, 突然有问题了,一直抛异常。应该是网络原因导致断了,然后重连没连上。不过领导让我处理下,以后不要出现这个问题,大佬有什么建议吗
大约 2 年之前 回复

可能是服务器缓存问题,重启服务器就好了或者clean下服务器

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
kafka里面关于生产者SDK如何推送消息给kafka集群

kafka里面关于生产者SDK如何推送消息给kafka集群,求各位给解答一下,谢谢

在docker容器里kafka生产者(java客户端)消息发送不出去!!

在我们项目实施过程,部署在docker容器里应用需要用kafka客户端发送不出消息,但是同样应用部署在物理机(该容器所在物理机)上,kafka客户端可以发送消息;然后我们在docker容器里的kafka客户端生产者增加调整两个参数linger.ms、batch.size,这样在容器里kafa客户端就可以发送。现在不知道在容器里和在物理机上对于linger.ms、batch.size这两个默认参数有什么区别?

kafka1.0.0的client,生产者生产数据失败

配置: ``` Properties props = new Properties(); //broker地址 props.put("bootstrap.servers", "39.108.61.252:9092,39.108.61.252:9093,39.108.61.252:9094"); //请求时候需要验证 props.put("acks", "0"); //请求失败时候需要重试 props.put("retries", 1); //生产者就会尝试将记录组合成一个batch的请求。 这有助于客户端和服务器的性能。不能大于此默认值,否则浪费内存,反而降低吞吐量 //props.put("batch.size", 16384); //汇聚一定时间内的记录一起发出 // props.put("linger.ms", 50); //内存缓存区大小 props.put("buffer.memory", 33554432); //指定消息key序列化方式 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定消息本身的序列化方式 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); ``` 生产数据 没有Thread.sleep()就不能成功发送数据!,有了就可已在消费者端接受到数据。若把Thread.sleep()删除,在生产末尾加上close()方法也能成功生产 ``` for (int i = 0; i < 10; i++) { try { Thread.sleep(50); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } kafkaProducer.send(new ProducerRecord<>("topic_user_general_info_update", "simpleKey", "value-"+i)); } ``` 困扰很久,不知道配置还是哪里问题。

docker 安装kafka一直无法测试生产者生产消息成功

# 1.docker 安装kafka一直无法测试生产者生产消息成功 docker安装完kafka之后,创建topic没有报错 bash-4.4# kafka-topics.sh --create --zookeeper 118.190.26.133:2181 --replication-factor 1 --partitions 1 --topic mytest_topic WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both. Created topic mytest_topic. 但是运行生产者时候出现了一下错误 bash-4.4# kafka-console-producer.sh --broker-list 118.190.26.133:9092 --topic mytest_topic >[2020-04-20 02:50:33,086] WARN [Producer clientId=console-producer] Connection to node -1 (/118.190.26.133:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,087] WARN [Producer clientId=console-producer] Bootstrap broker 118.190.26.133:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,175] WARN [Producer clientId=console-producer] Connection to node -1 (/118.190.26.133:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,175] WARN [Producer clientId=console-producer] Bootstrap broker 118.190.26.133:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,277] WARN [Producer clientId=console-producer] Connection to node -1 (/118.190.26.133:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,277] WARN [Producer clientId=console-producer] Bootstrap broker 118.190.26.133:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,480] WARN [Producer clientId=console-producer] Connection to node -1 (/118.190.26.133:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,480] WARN [Producer clientId=console-producer] Bootstrap broker 118.190.26.133:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,934] WARN [Producer clientId=console-producer] Connection to node -1 (/118.190.26.133:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:33,934] WARN [Producer clientId=console-producer] Bootstrap broker 118.190.26.133:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2020-04-20 02:50:34,741] WARN [Producer clientId=console-producer] Connection to node -1 (/118.190.26.133:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) 以下是server.propertise配置文件信息 connect-distributed.properties connect-log4j.properties consumer.properties server.properties zookeeper.properties bash-4.4# vi server.properties # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 advertised.host.name=master ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://0.0.0.0:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://118.190.26.133:9092

springboot结合kafka时0毫秒关闭消息生产者

自己在做springboot结合kafka的时候,运行消息生产者,结果console显示了生产者的相关配置以及提示生产者在0毫秒内关闭了消息生产者,然后生产者发送消息失败。想请问下各位大佬这到底是是配置文件的问题还是消息生产者发送消息的时候出现了问题啊。 消息发送者代码: @Component @EnableKafka public class MessageSender { @Autowired private KafkaTemplate<String,String> kafkaTemplate; //private static final MessageSender sender=new MessageSender(); /* * * kafka客户端发送消息 * @param topic 主题 * @param message 消息内容 * @return*/ public boolean sendMessage(String topic,String message) { try { System.out.println("topic"+topic+"message"+message); kafkaTemplate.send(topic, message); } catch (Exception e) { return false; } return true; } } 控制台具体信息如下: 2019-07-09 22:43:34.008 INFO 7916 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 65536 bootstrap.servers = [192.168.2.2:9092] buffer.memory = 524288 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.StringDeserializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringDeserializer 2019-07-09 22:43:34.019 INFO 7916 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms. 2019-07-09 22:43:34.040 DEBUG 7916 --- [nio-8080-exec-1] o.s.b.w.s.f.OrderedRequestContextFilter : Cleared thread-bound request context: org.apache.catalina.connector.RequestFacade@2e251d9f 2019-07-09 22:43:34.077 DEBUG 7916 --- [nio-8080-exec-2] o.s.b.w.s.f.OrderedRequestContextFilter : Bound request context to thread: org.apache.catalina.connector.RequestFacade@2e251d9f 2019-07-09 22:43:34.112 DEBUG 7916 --- [nio-8080-exec-2] o.s.b.w.s.f.OrderedRequestContextFilter : Cleared thread-bound request context: org.apache.catalina.connector.RequestFacade@2e251d9f ``` ```

kafka消息传递中文乱码

![图片说明](https://img-ask.csdn.net/upload/201912/13/1576215935_606006.png)cmd操作kafka 生产者与消费者 传递之后是乱码 后通过chcp改为utf-8编码格式 直接就空了 代码控制台输出是没有问题的,请问该怎么解决???

kafka生产者发消息时无法更新元数据

十万火急,求助 kafka的broker能够正常启动,zookeeper没有问题,使用生产者发送消息失败,提示获取元数据失败 [root@newkafka2 ~]# /root/kafka2/bin/kafka-console-producer.sh -broker-list server2:9092 -topic newkafka2 >111111111111 [2017-10-20 15:42:14,048] ERROR Error when sending message to topic newkafka2 with key: null, value: 12 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. >[2017-10-20 15:43:21,289] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-10-20 15:45:28,516] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-10-20 15:47:35,876] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) ![图片说明](https://img-ask.csdn.net/upload/201710/20/1508485750_814271.png)

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

kafka消费者处理慢的情况下如何提高消息处理速度?不允许增加分区

如题.最近的一个面试题,说是考虑kafka理论特性.具体要求我可能有理解错误.如果各位有研究一眼看出是什么问题,谢谢给个提示. 我搜索了下,提高消费性能的有: 增加分区个数(增加消费者并行数)[不允许]; 消费者使用多线程;如果消息处理是CPU密集的加多线程也没用啊; 或许我理解有问题? 换个问题? 生产者1秒生成1W消息.然而此时全部消费者1s只能消费5000,消息处理是纯CPU计算,问:在不添加分区的情况下如何消息处理速度?

windows系统 在java中发送消息到kafka,在控制台显示中文,在kafka中文乱码

![图片说明](https://img-ask.csdn.net/upload/201806/17/1529218043_512138.png)![图片说明](https://img-ask.csdn.net/upload/201806/17/1529218053_109421.png)

如何使用Jaeger追踪Kafka生产者和消费者

<div class="post-text" itemprop="text"> <p>I want to tracing kafka producer and consumer by using jaeger and Go, but I can not get the chain of the span for the producer's tracer and consumer's tracer. Here is the producer's code:</p> <pre class="lang-golang prettyprint-override"><code>func (p *Producer) WriteMessages(ctx context.Context, messages ...kafka.Message) error { topic := p.Stats().Topic for _, message := range messages { // span := opentracing.GlobalTracer().StartSpan("TO_"+topic, ext.SpanKindProducer) span, _ := opentracing.StartSpanFromContext(ctx, "TO_"+topic, ext.SpanKindProducer) ext.Component.Set(span, "golang-kafka") ext.PeerService.Set(span, "kafka") ext.MessageBusDestination.Set(span, topic) headers := make(map[string]string) opentracing.GlobalTracer().Inject( span.Context(), opentracing.TextMap, opentracing.TextMapCarrier(headers), ) for key, value := range headers { header := kafka.Header{ Key: key, Value: []byte(value), } message.Headers = append(message.Headers, header) } span.Finish() } return p.Writer.WriteMessages(ctx, messages...) } </code></pre> <p>and the consumer's code:</p> <pre class="lang-golang prettyprint-override"><code>func (c *Consumer) ReadMessage(ctx context.Context) (kafka.Message, error) { message, err := c.Reader.ReadMessage(ctx) if err != nil { return kafka.Message{}, err } topic, partition, offset := message.Topic, message.Partition, message.Offset headers := make(map[string]string) for _, header := range message.Headers { headers[header.Key] = string(header.Value) } spanContext, _ := opentracing.GlobalTracer().Extract( opentracing.TextMap, opentracing.TextMapCarrier(headers), ) span, _ := opentracing.StartSpanFromContext(ctx, "FROM_"+topic, opentracing.FollowsFrom(spanContext), ext.SpanKindConsumer) // span := opentracing.StartSpan("FROM_"+topic, opentracing.FollowsFrom(spanContext), ext.SpanKindConsumer) ext.Component.Set(span, "golang-kafka") ext.PeerService.Set(span, "kafka") span.SetTag("topic", topic) span.SetTag("partition", partition) span.SetTag("offset", offset) span.Finish() return message, err } </code></pre> <p>I can get the producer chain and the consumer chain, but I can not chain them together.</p> </div>

如何使用php-rdkafka在kafka中确认消费消息?

<div class="post-text" itemprop="text"> <p>我使用<a href="https://github.com/arnaud-lb/php-rdkafka" rel="nofollow noreferrer">php-rdkafka</a>作为php Kafka客户端,并使用测试组成功地生成了测试消息。我使用下面的代码来使用该消息:</p> <pre><code>$kafkaConsumer = new RdKafka\Consumer(); $kafkaConsumer-&gt;addBrokers("127.0.0.1:9292"); $topic = $kafkaConsumer-&gt;newTopic("test"); $topic-&gt;consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { $msg = $topic-&gt;consume(0, 1000); if($msg){ if ($msg-&gt;err) { echo $msg-&gt;errstr(), " "; break; } else { echo $msg-&gt;payload, " "; } } } </code></pre> <p>但是,当我再次尝试在测试组中设置消息并尝试使用测试组的消息时,我得到了旧消息和新消息。所以我只想知道如何才能确认旧的信息,这样我才能得到新的信息,而不是旧的信息?有人能给个建议吗??</p> <p>我的kafka版本是0.11.0.1。</p> </div>

kafka的生产消费模型报错

## 消费者启动报错,实在不知道哪里错了,求大神。 生产者: ./kafka-console-producer.sh --broker-list S1PA11:9092,S1PA22:9092,S1PA33:9092 --topic AF_3 12 23 44 5 5 576 [2017-11-12 15:32:57,728] ERROR Error when sending message to topic AF_3 with key: null, value: 2 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) [2017-11-12 15:33:57,731] ERROR Error when sending message to topic AF_3 with key: null, value: 2 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) ^Cgeconline@S1PA33:~/kafka/kafka_2.10-0.9.0.0/bin$ [2017-11-12 15:37:14,884] INFO [Group Metadata Manager on Broker 3]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager) 消费者: geconline@S1PA22:~/kafka/kafka_2.10-0.9.0.0/bin$ ./kafka-console-consumer.sh --zookeeper s1pa11:9092,s1pa22:9092,s1pa33:9092 --from-beginning --topic AF_3 No brokers found in ZK.

kafka多个消费者消费同一数据

kafka不同groupid下的消费者,消费同一topic下的某一条数据,为什么offset值不变? 只被消费了一次吗?

kafka消费者速度与什么有关

@KafkaListener(topics = {"CRBKC0002.000"}) public void sendSmsInfoByBizType(String record) { } 假设单机版的kafka,就一个节点。 1、 @KafkaListener注解接受消费者,是不是等这个方法执行完。 这个消费者进程才算消费结束。是不是一个镜像这个方法同时只能执行一次?就是不能连续起多个线程执行这个方法。 2、如果接受到参数就算消费这进程结束,也就是获取这个record消费者进程就结束了,那假设生产者一秒生产100w数据进入kafka。那这边获取参数就算消费者进程消费结束,那是不是相当于瞬间连续起100w这个方法线程执行。可是tomcat就200线程。

用sarama编写Kafka生产者时的时间戳无效

<div class="post-text" itemprop="text"> <p>I have a Kafka instance running (locally, in a Docker) and I created a producer in Go, using the <a href="https://godoc.org/github.com/Shopify/sarama" rel="nofollow noreferrer">sarama package</a>.</p> <p>As I want to use Kafka Streams on my topic, the producer has to embed a timestamp in the messages, or I get this ugly error message: </p> <blockquote> <p>org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = crawler_events, partition = 0, offset = 0, CreateTime = -1, serialized key size = -1, serialized value size = 187, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {XXX}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.</p> </blockquote> <p>Here is the portion of code sending the message in my Go program: </p> <pre class="lang-go prettyprint-override"><code>// Init a connection to the Kafka host, // create the producer, // and count successes and errors in delivery func (c *kafkaClient) init() { config := sarama.NewConfig() config.Producer.Return.Successes = true c.config = *config var err error c.producer, err = sarama.NewAsyncProducer(c.hosts, &amp;c.config) if err != nil { panic(err) } go func() { for range c.producer.Successes() { c.successes++ } }() go func() { for range c.producer.Errors() { c.errors++ } }() } // Send a message to the Kafka topic, WITH TIMESTAMP func (c *kafkaClient) send(event string) { message := &amp;sarama.ProducerMessage{ Topic: c.topic, Value: sarama.StringEncoder(event), Timestamp: time.Now(), } c.producer.Input() &lt;- message c.enqueued++ } </code></pre> <p>As you can see, the timestamp I try to send is <code>time.Now()</code>. </p> <p>When I run the console consumer to see the received timestamps:</p> <pre><code>docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic crawler_events \ --from-beginning --property print.timestamp=true </code></pre> <p>I see they are all "-1": </p> <pre><code>CreateTime:-1 {"XXX"} </code></pre> <p>When adding a message to the topic with the console producer, I have the expected timestamps like: </p> <pre><code>CreateTime:1539010180284 hello </code></pre> <p>What am I doing wrong? Thanks for your help. </p> </div>

kafka消费者迭代器卡死

配置kafka+zk在虚拟机上,自己在本机上的生产者可以正常发送消息,但是消费者在 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 之后 iterator 的任何操作都直接卡死了,请问这个是怎么回事?在这个操作之前debug是可以看到变量内部的数值的,这个操作之后就不能看了,value全部清空了。 贴源码 ``` package com.weixinjia.recreation.queue.client; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerGroupExample extends Thread{ private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run(int a_numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads // // executor = Executors.newFixedThreadPool(a_numThreads); System.out.println(streams.size()); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream<byte[], byte[]> stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); System.out.println(iterator); while(iterator.hasNext()){ MessageAndMetadata<byte[], byte[]> next = iterator.next(); byte[] message = next.message(); String string = new String(message); System.out.println(string); } } System.out.println("消息输出完成"); } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "1500"); props.put("zookeeper.sync.time.ms", "4000"); props.put("auto.commit.interval.ms", "1000"); props.put("fetch.message.max.bytes", "10240000"); props.put("auto.commit.enable", "true"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = "masterServce:2181"; String groupId = "group-1"; String topic = "test2"; int threads = 3; ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } } ``` 消息在kafka那边直接用kafka-console-consumer.sh是可以查询到的

有人使用过 nmred/kafka吗 有关nmred/kafka的问题

consumer.php $logger = new Logger('my_logger'); // Now add some handlers // $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setGroupId('test'); $config->setBrokerVersion('0.10.2.1'); $config->setTopics(array('test')); $config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); }); producer.php $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setBrokerVersion('0.10.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'dgjll90', 'key' => '', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true); 生产者是没有问题的,能正常发送 但是消费者(consumer)始终得不到数据,查看日志,发现,到最后一直循环 my_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: HeartbeatRequest ApiVersion: 0 [] [] 这句话 但有的时候又能正常返回数据,但一般几率很小,差不多10几次一次能正常返回,其他时候就是一直循环上面那句话 求解答

springboot kafka 多个接收系统订阅多个发送系统,多个消费组多个消费者该如何写代码?

1、kafka结合springboot开发。 2、生产消息的系统有多个,可能会动态的改变消费系统的个数。举例:A、B、C这3个系统生产消息后分别想让不同的系统使用广播模式消费消息。A想要系统1、2、3消费同一个消息,B想要系统2、3、4、5消费同一个消息,C想要系统1、2、5、6消费同一个消息。这样消费组的个数相当于在动态改变着,可能后面新增或减少,实际环境中生产消息的系统有几十个,消费消息的系统也有几十个,这种网上完全找不到代码案例,都是简单的案例或写死的。求指导代码该怎么写!太复杂了完全没思路!

大学四年自学走来,这些私藏的实用工具/学习网站我贡献出来了

大学四年,看课本是不可能一直看课本的了,对于学习,特别是自学,善于搜索网上的一些资源来辅助,还是非常有必要的,下面我就把这几年私藏的各种资源,网站贡献出来给你们。主要有:电子书搜索、实用工具、在线视频学习网站、非视频学习网站、软件下载、面试/求职必备网站。 注意:文中提到的所有资源,文末我都给你整理好了,你们只管拿去,如果觉得不错,转发、分享就是最大的支持了。 一、电子书搜索 对于大部分程序员...

在中国程序员是青春饭吗?

今年,我也32了 ,为了不给大家误导,咨询了猎头、圈内好友,以及年过35岁的几位老程序员……舍了老脸去揭人家伤疤……希望能给大家以帮助,记得帮我点赞哦。 目录: 你以为的人生 一次又一次的伤害 猎头界的真相 如何应对互联网行业的「中年危机」 一、你以为的人生 刚入行时,拿着傲人的工资,想着好好干,以为我们的人生是这样的: 等真到了那一天,你会发现,你的人生很可能是这样的: ...

Java基础知识面试题(2020最新版)

文章目录Java概述何为编程什么是Javajdk1.5之后的三大版本JVM、JRE和JDK的关系什么是跨平台性?原理是什么Java语言有哪些特点什么是字节码?采用字节码的最大好处是什么什么是Java程序的主类?应用程序和小程序的主类有何不同?Java应用程序与小程序之间有那些差别?Java和C++的区别Oracle JDK 和 OpenJDK 的对比基础语法数据类型Java有哪些数据类型switc...

我以为我学懂了数据结构,直到看了这个导图才发现,我错了

数据结构与算法思维导图

技术大佬:我去,你写的 switch 语句也太老土了吧

昨天早上通过远程的方式 review 了两名新来同事的代码,大部分代码都写得很漂亮,严谨的同时注释也很到位,这令我非常满意。但当我看到他们当中有一个人写的 switch 语句时,还是忍不住破口大骂:“我擦,小王,你丫写的 switch 语句也太老土了吧!” 来看看小王写的代码吧,看完不要骂我装逼啊。 private static String createPlayer(PlayerTypes p...

和黑客斗争的 6 天!

互联网公司工作,很难避免不和黑客们打交道,我呆过的两家互联网公司,几乎每月每天每分钟都有黑客在公司网站上扫描。有的是寻找 Sql 注入的缺口,有的是寻找线上服务器可能存在的漏洞,大部分都...

Linux 会成为主流桌面操作系统吗?

整理 |屠敏出品 | CSDN(ID:CSDNnews)2020 年 1 月 14 日,微软正式停止了 Windows 7 系统的扩展支持,这意味着服役十年的 Windows 7,属于...

讲一个程序员如何副业月赚三万的真实故事

loonggg读完需要3分钟速读仅需 1 分钟大家好,我是你们的校长。我之前讲过,这年头,只要肯动脑,肯行动,程序员凭借自己的技术,赚钱的方式还是有很多种的。仅仅靠在公司出卖自己的劳动时...

学习总结之HTML5剑指前端(建议收藏,图文并茂)

前言学习《HTML5与CSS3权威指南》这本书很不错,学完之后我颇有感触,觉得web的世界开明了许多。这本书是需要有一定基础的web前端开发工程师。这本书主要学习HTML5和css3,看...

女程序员,为什么比男程序员少???

昨天看到一档综艺节目,讨论了两个话题:(1)中国学生的数学成绩,平均下来看,会比国外好?为什么?(2)男生的数学成绩,平均下来看,会比女生好?为什么?同时,我又联想到了一个技术圈经常讨...

搜狗输入法也在挑战国人的智商!

故事总是一个接着一个到来...上周写完《鲁大师已经彻底沦为一款垃圾流氓软件!》这篇文章之后,鲁大师的市场工作人员就找到了我,希望把这篇文章删除掉。经过一番沟通我先把这篇文章从公号中删除了...

副业收入是我做程序媛的3倍,工作外的B面人生是怎样的?

提到“程序员”,多数人脑海里首先想到的大约是:为人木讷、薪水超高、工作枯燥…… 然而,当离开工作岗位,撕去层层标签,脱下“程序员”这身外套,有的人生动又有趣,马上展现出了完全不同的A/B面人生! 不论是简单的爱好,还是正经的副业,他们都干得同样出色。偶尔,还能和程序员的特质结合,产生奇妙的“化学反应”。 @Charlotte:平日素颜示人,周末美妆博主 大家都以为程序媛也个个不修边幅,但我们也许...

MySQL数据库面试题(2020最新版)

文章目录数据库基础知识为什么要使用数据库什么是SQL?什么是MySQL?数据库三大范式是什么mysql有关权限的表都有哪几个MySQL的binlog有有几种录入格式?分别有什么区别?数据类型mysql有哪些数据类型引擎MySQL存储引擎MyISAM与InnoDB区别MyISAM索引与InnoDB索引的区别?InnoDB引擎的4大特性存储引擎选择索引什么是索引?索引有哪些优缺点?索引使用场景(重点)...

新一代神器STM32CubeMonitor介绍、下载、安装和使用教程

关注、星标公众号,不错过精彩内容作者:黄工公众号:strongerHuang最近ST官网悄悄新上线了一款比较强大的工具:STM32CubeMonitor V1.0.0。经过我研究和使用之...

记一次腾讯面试,我挂在了最熟悉不过的队列上……

腾讯后台面试,面试官问:如何自己实现队列?

如果你是老板,你会不会踢了这样的员工?

有个好朋友ZS,是技术总监,昨天问我:“有一个老下属,跟了我很多年,做事勤勤恳恳,主动性也很好。但随着公司的发展,他的进步速度,跟不上团队的步伐了,有点...

我入职阿里后,才知道原来简历这么写

私下里,有不少读者问我:“二哥,如何才能写出一份专业的技术简历呢?我总感觉自己写的简历太烂了,所以投了无数份,都石沉大海了。”说实话,我自己好多年没有写过简历了,但我认识的一个同行,他在阿里,给我说了一些他当年写简历的方法论,我感觉太牛逼了,实在是忍不住,就分享了出来,希望能够帮助到你。 01、简历的本质 作为简历的撰写者,你必须要搞清楚一点,简历的本质是什么,它就是为了来销售你的价值主张的。往深...

冒泡排序动画(基于python pygame实现)

本项目效果初始截图如下 动画见本人b站投稿:https://www.bilibili.com/video/av95491382 本项目对应github地址:https://github.com/BigShuang python版本:3.6,pygame版本:1.9.3。(python版本一致应该就没什么问题) 样例gif如下 ======================= 大爽歌作,mad

Redis核心原理与应用实践

Redis核心原理与应用实践 在很多场景下都会使用Redis,但是到了深层次的时候就了解的不是那么深刻,以至于在面试的时候经常会遇到卡壳的现象,学习知识要做到系统和深入,不要把Redis想象的过于复杂,和Mysql一样,是个读取数据的软件。 有一个理解是Redis是key value缓存服务器,更多的优点在于对value的操作更加丰富。 安装 yum install redis #yum安装 b...

现代的 “Hello, World”,可不仅仅是几行代码而已

作者 |Charles R. Martin译者 | 弯月,责编 | 夕颜头图 |付费下载自视觉中国出品 | CSDN(ID:CSDNnews)新手...

带了6个月的徒弟当了面试官,而身为高级工程师的我天天修Bug......

即将毕业的应届毕业生一枚,现在只拿到了两家offer,但最近听到一些消息,其中一个offer,我这个组据说客户很少,很有可能整组被裁掉。 想问大家: 如果我刚入职这个组就被裁了怎么办呢? 大家都是什么时候知道自己要被裁了的? 面试软技能指导: BQ/Project/Resume 试听内容: 除了刷题,还有哪些技能是拿到offer不可或缺的要素 如何提升面试软实力:简历, 行为面试,沟通能...

!大部分程序员只会写3年代码

如果世界上都是这种不思进取的软件公司,那别说大部分程序员只会写 3 年代码,恐怕就没有程序员这种职业。

离职半年了,老东家又发 offer,回不回?

有小伙伴问松哥这个问题,他在上海某公司,在离职了几个月后,前公司的领导联系到他,希望他能够返聘回去,他很纠结要不要回去? 俗话说好马不吃回头草,但是这个小伙伴既然感到纠结了,我觉得至少说明了两个问题:1.曾经的公司还不错;2.现在的日子也不是很如意。否则应该就不会纠结了。 老实说,松哥之前也有过类似的经历,今天就来和小伙伴们聊聊回头草到底吃不吃。 首先一个基本观点,就是离职了也没必要和老东家弄的苦...

2020阿里全球数学大赛:3万名高手、4道题、2天2夜未交卷

阿里巴巴全球数学竞赛( Alibaba Global Mathematics Competition)由马云发起,由中国科学技术协会、阿里巴巴基金会、阿里巴巴达摩院共同举办。大赛不设报名门槛,全世界爱好数学的人都可参与,不论是否出身数学专业、是否投身数学研究。 2020年阿里巴巴达摩院邀请北京大学、剑桥大学、浙江大学等高校的顶尖数学教师组建了出题组。中科院院士、美国艺术与科学院院士、北京国际数学...

为什么你不想学习?只想玩?人是如何一步一步废掉的

不知道是不是只有我这样子,还是你们也有过类似的经历。 上学的时候总有很多光辉历史,学年名列前茅,或者单科目大佬,但是虽然慢慢地长大了,你开始懈怠了,开始废掉了。。。 什么?你说不知道具体的情况是怎么样的? 我来告诉你: 你常常潜意识里或者心理觉得,自己真正的生活或者奋斗还没有开始。总是幻想着自己还拥有大把时间,还有无限的可能,自己还能逆风翻盘,只不是自己还没开始罢了,自己以后肯定会变得特别厉害...

HTTP与HTTPS的区别

面试官问HTTP与HTTPS的区别,我这样回答让他竖起大拇指!

程序员毕业去大公司好还是小公司好?

虽然大公司并不是人人都能进,但我仍建议还未毕业的同学,尽力地通过校招向大公司挤,但凡挤进去,你这一生会容易很多。 大公司哪里好?没能进大公司怎么办?答案都在这里了,记得帮我点赞哦。 目录: 技术氛围 内部晋升与跳槽 啥也没学会,公司倒闭了? 不同的人脉圈,注定会有不同的结果 没能去大厂怎么办? 一、技术氛围 纵观整个程序员技术领域,哪个在行业有所名气的大牛,不是在大厂? 而且众所...

男生更看重女生的身材脸蛋,还是思想?

往往,我们看不进去大段大段的逻辑。深刻的哲理,往往短而精悍,一阵见血。问:产品经理挺漂亮的,有点心动,但不知道合不合得来。男生更看重女生的身材脸蛋,还是...

程序员为什么千万不要瞎努力?

本文作者用对比非常鲜明的两个开发团队的故事,讲解了敏捷开发之道 —— 如果你的团队缺乏统一标准的环境,那么即使勤劳努力,不仅会极其耗时而且成果甚微,使用...

为什么程序员做外包会被瞧不起?

二哥,有个事想询问下您的意见,您觉得应届生值得去外包吗?公司虽然挺大的,中xx,但待遇感觉挺低,马上要报到,挺纠结的。

立即提问
相关内容推荐