求问:Kafka节点负载不均衡问题

问题描述

在平台的Kafka集群环境(kafka01,kafka02,kafka03)中 ,出现了kafka01 CPU负载高,另外两台节点Kafka02, Kafka03 CPU负荷低的问题:

Grafana监控如下

CPU:kafka01明显高很多
图片说明

流量:kafka01明显高很多,in/out大概是kafka02,kafka03的5倍

图片说明

问题定位

1.查询topic:我们的topic都是三分区,三副本的。

2.查询topic分区:可以看到这个3分区,3副本的分区,leader的分配是不均衡的。有两个分区的leader都是1。
图片说明

3.查询topic消费:可以看到topic的分区数据量都是差不多的。
图片说明

请教大神:问题出在哪里,该怎么解决,谢谢!

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
其他相关推荐
Python+OpenCV计算机视觉

Python+OpenCV计算机视觉

kafka向zookeeper注册问题

Connected to X.X.3.30:2181 2017/02/16 15:37:39 structs.go:21: Authenticated: id=97466877104750596, timeout=10000 2017/02/16 15:37:39 structs.go:21: Re-submitting `0` credentials after reconnect 2017/02/16 15:37:39 structs.go:21: Recv loop terminated: err=EOF 2017/02/16 15:37:39 Producer.go:197: [pctest7:9092] 2017/02/16 15:37:39 Producer.go:198: 开始发送消息 2017/02/16 15:37:39 structs.go:21: Send loop terminated: err=<nil> 2017/02/16 15:37:57 Producer.go:105: kafka: client has run out of available brokers to talk to (Is your cluster reachable?) exit status 1 请问下这是要配置什么文件呢 求指导!

kafka消费者无法消费信息

在生产环境部署kafka集群和消费者服务器后,通过logstash向kafka集群发送实时日志,消费者也能正常消费信息。但是两分钟之后消费者就停止消费信息了,想问下各位老师如何排查问题点在哪里。 1:查看了kafka服务器的日志,logstash还在向kafka推实时日志,kafka集群日志留存时间是两个小时。 2:kafka消费者一共有两台,两台都在同时运行。 3:kafka集群有三台服务器,查问题的时候发现,kafka消费者只连接到了一台broker上,不知道这是不是原因所在。

卡夫卡制片人恐慌

<div class="post-text" itemprop="text"> <p>I'm working on building kafka producer based on golang right now. I cannot sent any message from my producer. The panic shown as follow:</p> <pre><code>producer close, err: kafka: client has run out of available brokers to talk to(Is your cluster reachable?) </code></pre> <p>I test the kafka-console-consumer and kafka-console-producer, they are all work well on terminal. So, is there anything I missed? Here's the code:</p> <pre><code>package main import ( "github.com/Shopify/sarama" "log" "os" "strings" ) var ( logger = log.New(os.Stderr, "[srama]", log.LstdFlags) ) func main() { sarama.Logger = logger config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner msg := &amp;sarama.ProducerMessage{} msg.Topic = "hello" msg.Partition = int32(-1) msg.Key = sarama.StringEncoder("key") msg.Value = sarama.ByteEncoder("hello, world!") producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)//default port if err != nil { logger.Println("Failed to produce message: %s", err) os.Exit(500) } defer producer.Close() partition, offset, err := producer.SendMessage(msg) if err != nil { logger.Println("Failed to produce message: ", err) } logger.Printf("partition=%d, offset=%d ", partition, offset) } </code></pre> </div>

如何保护zookeeper中kafka的元数据节点

我现在分别有kafka和zookeeper的集群, zookeeper我单独使用的时候自己建立的节点可以设置权限,但是kafka在zookeeper中创建的元数据节点怎么办?这些节点都是anyone权限,肯定不可能手动去一个一个加,因为zookeeper子节点不继承父节点的权限,所以除非产生的每个节点都有权限否者没有意义。 我查资料按照步骤给zookeeper启用了jaas文件,然后zookeeper集群也可以正常启动与使用,但是kafka启动的时候报错: java.lang.SecurityException: zookeeper.set.acl is true, but the verification of the JAAS login file failed. 主要就是下面这句话,到底是什么情况下才会报这个错? zookeeper.set.acl is true, but the verification of the JAAS login file failed. 我现在zookeeper的jaas配置方式是: 1.给zoo.cfg文件配置了 authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 2.zookeeper的jaas文件如下: Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpwd"; }; 这个文件conf文件夹中 3. 并且org.apache.kafka.common.security.plain.PlainLoginModule 需要用到的jar也都添加到了 classpath中, 4. zkEnv.sh 也设置了SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf" kafka的broker的jaas文件内容如下: KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpwd" user_admin="adminpwd"; }; 配置文件server.properties中设置了zookeeper.set.acl=true 请问我是不是哪里弄错了,为什么就是不行呢 zookeeper.set.acl 这个怎么用,根据我查资料理解,这样设置了以后kafka就可以创建的节点是带ACL的

如何修复`kafka:客户端用尽了可用的代理进行对话(您的群集是否可以访问?)错误

<div class="post-text" itemprop="text"> <p>I am developing an application which reads a message off of an sqs queue, does some stuff with that data, and takes the result and publishes to a kafka topic. In order to test locally, I'd like to set up a kafka image in my docker build. I am currently able to spin up aws-cli, localstack, and my app's containers locally using docker-compose. Separately, I am able to spin up kafka and zookeper without a problem as well. I am unable to get my application to communicate with kafka.</p> <p>I've tried using two separate compose files, and also fiddled with the networks. Finally, I've referenced: <a href="https://rmoff.net/2018/08/02/kafka-listeners-explained/" rel="nofollow noreferrer">https://rmoff.net/2018/08/02/kafka-listeners-explained/</a>. </p> <p>Here is my docker-compose file: </p> <pre><code>version: '3.7' services: localstack: image: localstack/localstack:latest container_name: localstack env_file: .env ports: # Localstack endpoints for various API. Format is localhost:container - '4563-4584:4563-4584' - '8080:8080' environment: - SERVICES=sns:4575,sqs:4576 - DATA_DIR=/tmp/localstack/data volumes: # store data locally in 'localstack' folder - './localstack:/tmp/localstack' networks: - my_network aws: image: mesosphere/aws-cli container_name: aws-cli # copy local JSON_DATA folder contents into aws-cli container's app folder #volumes: # - ./JSON_DATA:/app env_file: .env # bash entrypoint needed for multiple commands entrypoint: /bin/sh -c command: &gt; " sleep 10; aws --endpoint-url=http://localstack:4576 sqs create-queue --queue-name input_queue; aws --endpoint-url=http://localstack:4575 sns create-topic --name input_topic; aws --endpoint-url=http://localstack:4575 sns subscribe --topic-arn arn:aws:sns:us-east-2:123456789012:example_topic --protocol sqs --notification-endpoint http://localhost:4576/queue/input_queue; " networks: - my_network depends_on: - localstack my_app: build: . image: my_app container_name: my_app env_file: .env ports: - '9000:9000' networks: - my_network depends_on: - localstack - aws zookeeper: image: confluentinc/cp-zookeeper:5.0.0 container_name: zookeeper ports: - 2181:2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 networks: - my_network kafka: image: confluentinc/cp-kafka:5.0.0 ports: - 9092:9092 depends_on: - zookeeper environment: # For more details see See https://rmoff.net/2018/08/02/kafka-listeners-explained/ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092 KAFKA_LISTENERS: INSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_CREATE_TOPICS: "output_topic:2:2" networks: - my_network networks: my_network: </code></pre> <p>I would hope to see no errors as a result of publishing to this topic. Instead, I'm getting: </p> <pre><code>kafka: client has run out of available brokers to talk to (Is your cluster reachable?) </code></pre> <p>Any ideas what I may be doing wrong? Thank you for your help.</p> </div>

Kafka:客户的可用经纪人用完了

<div class="post-text" itemprop="text"> <p>UPDATE: It turned out I had an issue with my ports in Docker. Not sure why that fixed this phenomenon.</p> <p>I believe I have come across a strange error. I am using the <a href="https://github.com/Shopify/sarama" rel="nofollow noreferrer">Sarama</a> library and am able to create a consumer successfully. </p> <pre><code>func main() { config = sarama.NewConfig() config.ClientID = "go-kafka-consumer" config.Consumer.Return.Errors = true // Create new consumer master, err := sarama.NewConsumer("localhost:9092", config) if err != nil { panic(err) } defer func() { if err := master.Close(); err != nil { panic(err) } }() partitionConsumer, err := master.ConsumePartition("myTopic",0, sarama.OffsetOldest) if err != nil { panic(err) } } </code></pre> <p>As soon as I break this code up and move outside the main routine, I run into the error: </p> <blockquote> <p>kafka: client has run out of available brokers to talk to (Is your cluster reachable?)</p> </blockquote> <p>I have split my code up as follows: the previous main() method I have now converted into a consumer package with a method called NewConsumer() and my new main() calls NewConsumer() like so:</p> <pre><code>c := consumer.NewConsumer() </code></pre> <p>The panic statement is getting triggered in the line with <code>sarama.NewConsumer</code> and prints out <code>kafka: client has run out of available brokers to talk to (Is your cluster reachable?)</code></p> <p>Why would breaking up my code this way trigger Sarama to fail to make the consumer? Does Sarama need to be run directly from main?</p> </div>

kafka 发生再均衡后,如何控制数据重复消费问题

consumer退出或者新增,partition新增的时候会触发再均衡。 那么发生再均衡的时候如果某个consumer正在消费的任务没有消费完【手动提交】,此一般处如何理处理

Windows-Java-kafka问题:KafkaStream.iterator()方法不会执行

![图片说明](https://img-ask.csdn.net/upload/201707/13/1499917440_84717.png) debug走到划线那步的时候有值,如图 ![图片说明](https://img-ask.csdn.net/upload/201707/13/1499917641_461323.png) 再debug往下走的时候就退出了,如图 ![图片说明](https://img-ask.csdn.net/upload/201707/13/1499917708_707243.png)

kafka ArrayIndexOutOfBoundsException: 18

在kafka上出了下面这个问题,上网查了下都说是新版的kafka clien向旧版的kafka发送请求,旧版的kafka(<0.10)不支持ApiVersion(key:18) Request,造成的,但是我所有的produce,consumer,kafka服务器上装的kafka clien都是0.9.0.1,应该不会出现这个问题才对,为什么?求各位大神指点 ``` [2018-10-25 10:03:17,919] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2018-10-25 10:03:18,080] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [topic-test,0] (kafka.server.ReplicaFetcherManager) [2018-10-25 10:03:18,099] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [topic-test,0] (kafka.server.ReplicaFetcherManager) [2018-10-25 10:03:48,864] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68) at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426) at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421) at scala.collection.Iterator$class.foreach(Iterator.scala:742) at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.network.Processor.run(SocketServer.scala:421) at java.lang.Thread.run(Thread.java:748) ```

关于kafka topic分区的问题

各位为好: 就是kafka将topic分为多个区,然后将区分布在多个sever上;如果将10个topic,每个topic分1个区。那么这个topic的10个区会分布在不同的server上吗?还是会集中在1,2台server上?如果分为多个区的话,我知道是会均匀分布的; 另外每个区是在那个server上怎么知道?owner 只是告诉了是消费者在消费当前分区,不知道当前分区在那个server上。 ![图片说明](https://img-ask.csdn.net/upload/201504/12/1428828615_949831.png)

PHP+ZK+kafka 节点出现故障是否自动切换?

如题,开发语言使用PHP,如果一个kafka节点挂了,是否自动切换到健康节点,还是需要手动reload到健康节点?

Sarama无法与Kafka服务器对话

<div class="post-text" itemprop="text"> <p>So I am trying to configure a Sarama (a native go client for kafka) producer client. I have configured my TLS accordingly making sure the client certs were generated using proper ciphers. My Go code to init the client looks like this:</p> <pre><code>import ( "crypto/tls" "crypto/x509" "encoding/pem" "io/ioutil" "net" "path/filepath" "github.com/Shopify/sarama" log "github.com/sirupsen/logrus" ) const ( certFile = "client_ingestion_client.pem" keyFile = "client_ingestion_client.key" ) func InitKafkaClient(host string, port string, certPath string) (sarama.AsyncProducer, error) { cf := filepath.Join(certPath, certFile) kf := filepath.Join(certPath, keyFile) // Log cert and key path log.Debugln(cf) log.Debugln(kf) // Read the cert in certIn, err := ioutil.ReadFile(cf) if err != nil { log.Error("cannot read cert", err) return nil, err } // Read &amp; decode the encrypted key file with the pass to make tls work keyIn, err := ioutil.ReadFile(kf) if err != nil { log.Error("cannot read key", err) return nil, err } // Decode and decrypt our PEM block as DER decodedPEM, _ := pem.Decode([]byte(keyIn)) decrypedPemBlock, err := x509.DecryptPEMBlock(decodedPEM, []byte("m4d3ups3curity4k4fka?")) if err != nil { log.Error("cannot decrypt pem block", err) return nil, err } // Parse the DER encoded block as PEM rsaKey, err := x509.ParsePKCS1ParrivateKey(decrypedPemBlock) if err != nil { log.Error("failed to parse rsa as pem", err) return nil, err } // Marshal the pem encoded RSA key to bytes in memory pemdata := pem.EncodeToMemory( &amp;pem.Block{ Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(rsaKey), }, ) if err != nil { log.Error("cannot marshal rsa as pem in memory", err) return nil, err } // Load our decrypted key pair crt, err := tls.X509KeyPair(certIn, pemdata) if err != nil { log.Error("cannot load key pair", err) return nil, err } config := sarama.NewConfig() config.Net.TLS.Enable = true config.Net.TLS.Config = &amp;tls.Config{ Certificates: []tls.Certificate{crt}, CipherSuites: []uint16{ tls.TLS_RSA_WITH_AES_128_GCM_SHA256, }, } // Setting this allows us not to read from successes channel config.Producer.Return.Successes = false // Setting this allows us not to read from errors channel config.Producer.Return.Errors = false client, err := sarama.NewClient([]string{net.JoinHostPort(host, port)}, config) if err != nil { return nil, err } return sarama.NewAsyncProducerFromClient(client) } </code></pre> <p>When I initialize the code I get an error saying:</p> <p><code>time="2018-01-19T15:31:38Z" level=error msg="Error trying to setup kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"</code></p> <p>I have verified that the Kafka host is reachable and can be connected to. See below.</p> <p>I verified that the key gets decrypted correctly by checking the output from the go code to the output generated from the <code>openssl rsa -in client_ingestion_client.key -out decrypted.key</code> command. I also made sure that the key was generated properly using keytool with the correct flags including -keylag RSA flag as suggested in <a href="https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions#why-cant-sarama-connect-to-my-kafka-cluster-using-ssl" rel="nofollow noreferrer">here</a>.</p> <p>I also ran <code>openssl s_client -connect $KAFKA_HOST:$KAFKA_PORT</code> and got the following response</p> <pre><code>verify error:num=19:self signed certificate in certificate chain 139901934057376:error:1408E0F4:SSL routines:ssl3_get_message:unexpected message:s3_both.c:408: </code></pre> <p>The verify error is fine since I am using a self signed cert but I don't know what the error that follows is about. Maybe thats the cause of my problem?</p> <p>Further I get the following information:</p> <pre><code>Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1 Shared Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1 Peer signing digest: SHA512 Server Temp Key: ECDH, P-256, 256 bits --- SSL handshake has read 4668 bytes and written 169 bytes --- New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES128-GCM-SHA256 Server public key is 2048 bit Secure Renegotiation IS supported Compression: NONE Expansion: NONE No ALPN negotiated SSL-Session: Protocol : TLSv1.2 Cipher : ECDHE-RSA-AES128-GCM-SHA256 Session-ID: 5A6216C765EF33BC85FACE82B01BC506358F4D62C77817A1F7EEFB50941DAEC9 Session-ID-ctx: Master-Key: F8641FBF63A0AC7AB2D6D941C421DCA44550448524DADF4F0A7943F7928E65D5773E60A45212A7F320B250595AA6737B Key-Arg : None Krb5 Principal: None PSK identity: None PSK identity hint: None Start Time: 1516377799 Timeout : 300 (sec) Verify return code: 19 (self signed certificate in certificate chain) --- </code></pre> <p>Since this cipher is referenced in the openssl conenction:</p> <p><code>ECDHE-RSA-AES128-GCM-SHA256</code></p> <p>I tried adding this <code>tls.TLS_RSA_WITH_AES_128_GCM_SHA256</code> to my go code which seemed like a close match but I get the same error message in go with it saying that it has run out of available brokers to talk to.</p> </div>

有人使用过 nmred/kafka吗 有关nmred/kafka的问题

consumer.php $logger = new Logger('my_logger'); // Now add some handlers // $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setGroupId('test'); $config->setBrokerVersion('0.10.2.1'); $config->setTopics(array('test')); $config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); }); producer.php $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('127.0.0.1:9092'); $config->setBrokerVersion('0.10.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'dgjll90', 'key' => '', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { var_dump($errorCode); }); $producer->send(true); 生产者是没有问题的,能正常发送 但是消费者(consumer)始终得不到数据,查看日志,发现,到最后一直循环 my_logger.DEBUG: Start Request ClientId: kafka-php ApiKey: HeartbeatRequest ApiVersion: 0 [] [] 这句话 但有的时候又能正常返回数据,但一般几率很小,差不多10几次一次能正常返回,其他时候就是一直循环上面那句话 求解答

求救!kafka maven依赖冲突问题

小弟目前在写毕业设计项目,其中涉及到kafka javaapi的一段代码: ``` import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public void sendMessage(AppLogEntity e) { //创建配置对象 Properties props = new Properties(); props.put("metadata.broker.list", "192.168.72.182:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); //创建生产者 Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props)); sendSingleLog(producer,Constants.TOPIC_APP_STARTUP,e.getAppStartupLogs()); sendSingleLog(producer,Constants.TOPIC_APP_ERRROR,e.getAppErrorLogs()); sendSingleLog(producer,Constants.TOPIC_APP_EVENT,e.getAppEventLogs()); sendSingleLog(producer,Constants.TOPIC_APP_PAGE,e.getAppPageLogs()); sendSingleLog(producer,Constants.TOPIC_APP_USAGE,e.getAppUsageLogs()); //发送消息 producer.close(); } ``` 框架是SSM,启动后报错 org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'collectLogController': Failed to introspect bean class [com.automan.applogs.collect.web.controller.CollectLogController] for lookup method metadata: could not find class that it depends on; nested exception is java.lang.NoClassDefFoundError: kafka/javaapi/producer/Producer 再往下看,即是 Caused by: java.lang.ClassNotFoundException: kafka.javaapi.producer.Producer 我第一反应是去看pom文件,发现依赖都没有问题,但点开详细dependency后,发现kafka的依赖中scala版本冲突了, ![图片说明](https://img-ask.csdn.net/upload/201911/26/1574744977_566670.png) 查询了各种办法都解决不掉,也不知道是不是这个问题导致上述报错,特来求救,感激不尽~~

【kafka】kafka消息生产者抛异常

服务器正常运行,kafka本来没有问题,从某一时刻开始,就一直抛异常了,重启服务器后恢复正常。 请问是什么原因? 绝大部分消息抛异常了,还有很小一部分没有抛异常,但也没收到,这些丢失但没有异常的又是什么原因? 已知情况:kafka消息生产者 没有指定partition ![kafka消息生产者异常](https://img-ask.csdn.net/upload/201804/28/1524849596_87543.png)

intellij启动kafka失败

Exception in thread "main" java.lang.NoClassDefFoundError: Kafka/Kafka (wrong name: kafka/Kafka)

无法使用Sarama Golang软件包创建Kafka生产者客户端-“客户端/元数据在获取元数据时从代理处出错:EOF”

<div class="post-text" itemprop="text"> <p>Versions: GoLang 1.10.2 Kafka 4.4.1 Docker 18.03.1</p> <p>I'm trying to use Shopify's Sarama package to test out my Kafka instance. I used Docker compose to stand up Kafka/Zookeeper and it is all successfully running. </p> <p>When I try to create a Producer client with Sarama, an error is thrown. </p> <p>When I run the following </p> <pre><code> package main import ( "fmt" "log" "os" "os/signal" "time" "strconv" "github.com/Shopify/sarama" </code></pre> <p>)</p> <pre><code>func main() { // Setup configuration config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll brokers := []string{"localhost:29092"} producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { // Should not reach here panic(err) } defer func() { if err := producer.Close(); err != nil { // Should not reach here panic(err) } }() </code></pre> <p>I get this </p> <p>[sarama] 2018/06/12 17:22:05 Initializing new client</p> <p>[sarama] 2018/06/12 17:22:05 client/metadata fetching metadata for all topics from broker localhost:29092</p> <p>[sarama] 2018/06/12 17:22:05 Connected to broker at localhost:29092 (unregistered)</p> <p>[sarama] 2018/06/12 17:22:05 client/metadata got error from broker while fetching metadata: EOF</p> <p>[sarama] 2018/06/12 17:22:05 Closed connection to broker localhost:29092</p> <p>{sarama] 2018/06/12 17:22:05 client/metadata no available broker to send metadata request to</p> <p>[sarama] 2018/06/12 17:22:06 Closing Client panic: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)</p> <p>goroutine 1 [running]: main.main() /Users/benwornom/go/src/github.com/acstech/doppler-events/testprod/main.go:29 +0x3ec exit status 2</p> <p>Sarama did try several times in a row to create a producer client, but failed each time. </p> <p>My understanding of Sarama's "NewAsyncProducer" method is that it calls "NewClient", which is invoked regardless of whether you are creating a Producer or Consumer. NewClient attempts to gather metadata from the Kafka broker, which is failing in my situation. I know it is connecting to the Kafka broker, but once it connects it seems to break. Any advice would be helpful. My network connection is strong, I can't think of anything interfering with the server. As far as I know, I only have one broker and one partition for the existing topic. I don't think I have to manually assign a topic to a broker. If my client is connecting with the broker, why can't I establish a lasting connection for my producer?</p> <p>This is from the kafka log file right before it dies. </p> <p>__consumer_offsets-5 -&gt; Vector(1), connect-offsets-23 -&gt; Vector(1), __consumer_offsets-43 -&gt; Vector(1), __consumer_offsets-32 -&gt; Vector(1), __consumer_offsets-21 -&gt; Vector(1), __consumer_offsets-10 -&gt; Vector(1), connect-offsets-20 -&gt; Vector(1), __consumer_offsets-37 -&gt; Vector(1), connect-offsets-9 -&gt; Vector(1), connect-status-4 -&gt; Vector(1), __consumer_offsets-48 -&gt; Vector(1), __consumer_offsets-40 -&gt; Vector(1), __consumer_offsets-29 -&gt; Vector(1), __consumer_offsets-18 -&gt; Vector(1), connect-offsets-14 -&gt; Vector(1), __consumer_offsets-7 -&gt; Vector(1), __consumer_offsets-34 -&gt; Vector(1), __consumer_offsets-45 -&gt; Vector(1), __consumer_offsets-23 -&gt; Vector(1), connect-offsets-6 -&gt; Vector(1), connect-status-1 -&gt; Vector(1), connect-offsets-17 -&gt; Vector(1), connect-offsets-0 -&gt; Vector(1), connect-offsets-22 -&gt; Vector(1), __consumer_offsets-26 -&gt; Vector(1), connect-offsets-11 -&gt; Vector(1), __consumer_offsets-15 -&gt; Vector(1), __consumer_offsets-4 -&gt; Vector(1), __consumer_offsets-42 -&gt; Vector(1), __consumer_offsets-9 -&gt; Vector(1), __consumer_offsets-31 -&gt; Vector(1), __consumer_offsets-20 -&gt; Vector(1), connect-offsets-3 -&gt; Vector(1), __consumer_offsets-1 -&gt; Vector(1), __consumer_offsets-12 -&gt; Vector(1), connect-offsets-8 -&gt; Vector(1), connect-offsets-19 -&gt; Vector(1), connect-status-3 -&gt; Vector(1), __confluent.support.metrics-0 -&gt; Vector(1), __consumer_offsets-17 -&gt; Vector(1), __consumer_offsets-28 -&gt; Vector(1), __consumer_offsets-6 -&gt; Vector(1), __consumer_offsets-39 -&gt; Vector(1), __consumer_offsets-44 -&gt; Vector(1), connect-offsets-16 -&gt; Vector(1), connect-status-0 -&gt; Vector(1), connect-offsets-5 -&gt; Vector(1), connect-offsets-21 -&gt; Vector(1), __consumer_offsets-47 -&gt; Vector(1), __consumer_offsets-36 -&gt; Vector(1), __consumer_offsets-14 -&gt; Vector(1), __consumer_offsets-25 -&gt; Vector(1), __consumer_offsets-3 -&gt; Vector(1), __consumer_offsets-30 -&gt; Vector(1), __consumer_offsets-41 -&gt; Vector(1), connect-offsets-13 -&gt; Vector(1), connect-offsets-24 -&gt; Vector(1), connect-offsets-2 -&gt; Vector(1), connect-configs-0 -&gt; Vector(1), __consumer_offsets-11 -&gt; Vector(1), __consumer_offsets-22 -&gt; Vector(1), __consumer_offsets-33 -&gt; Vector(1), __consumer_offsets-0 -&gt; Vector(1), connect-offsets-7 -&gt; Vector(1), connect-offsets-18 -&gt; Vector(1))) (kafka.controller.KafkaController) [36mkafka_1 |[0m [2018-06-12 20:24:47,461] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController) [36mkafka_1 |[0m [2018-06-12 20:24:47,462] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)</p> </div>

kafka消费不到数据问题

kafka集群搭建正常,通过console都能正常生产和消费消息,但是通过JAVA程序就是读取不到消息,更换group都尝试过了 package test; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class KafkaConsumer extends Thread { private String topic; public KafkaConsumer(String topic){ super(); this.topic=topic; } @Override public void run(){ //通过properties设置了Consumer的参数,并且创建了连接器,连接到Kafaka ConsumerConnector consumer = createConsumer(); //Map作用指定获取的topic以及partition Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); topicCountMap.put(topic, 3); //consumer连接器获取消息 Map<String,List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); //获取对应的topic中的某一个partition中的数据 KafkaStream<byte[],byte[]> kafkaStream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()){ byte[] message = iterator.next().message(); System.out.println("message is:"+new String(message)); } } private ConsumerConnector createConsumer(){ Properties properties = new Properties(); properties.put("zookeeper.connect", "XXX:2181"); properties.put("auto.offset.reset", "smallest");//读取旧数据 properties.put("group.id", "333fcdcd"); return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); } public static void main(String[] args) { new KafkaConsumer("testtest").start(); } }

关于kafka写producer的问题

kafka在写producer时,采用轮询partition的方式指定KeyedMessage的key值,这样对性能是否有很大影响?

2019 Python开发者日-培训

2019 Python开发者日-培训

150讲轻松搞定Python网络爬虫

150讲轻松搞定Python网络爬虫

设计模式(JAVA语言实现)--20种设计模式附带源码

设计模式(JAVA语言实现)--20种设计模式附带源码

YOLOv3目标检测实战:训练自己的数据集

YOLOv3目标检测实战:训练自己的数据集

java后台+微信小程序 实现完整的点餐系统

java后台+微信小程序 实现完整的点餐系统

三个项目玩转深度学习(附1G源码)

三个项目玩转深度学习(附1G源码)

初级玩转Linux+Ubuntu(嵌入式开发基础课程)

初级玩转Linux+Ubuntu(嵌入式开发基础课程)

2019 AI开发者大会

2019 AI开发者大会

玩转Linux:常用命令实例指南

玩转Linux:常用命令实例指南

一学即懂的计算机视觉(第一季)

一学即懂的计算机视觉(第一季)

4小时玩转微信小程序——基础入门与微信支付实战

4小时玩转微信小程序——基础入门与微信支付实战

Git 实用技巧

Git 实用技巧

Python数据清洗实战入门

Python数据清洗实战入门

使用TensorFlow+keras快速构建图像分类模型

使用TensorFlow+keras快速构建图像分类模型

实用主义学Python(小白也容易上手的Python实用案例)

实用主义学Python(小白也容易上手的Python实用案例)

程序员的算法通关课:知己知彼(第一季)

程序员的算法通关课:知己知彼(第一季)

MySQL数据库从入门到实战应用

MySQL数据库从入门到实战应用

机器学习初学者必会的案例精讲

机器学习初学者必会的案例精讲

手把手实现Java图书管理系统(附源码)

手把手实现Java图书管理系统(附源码)

极简JAVA学习营第四期(报名以后加助教微信:eduxy-1)

极简JAVA学习营第四期(报名以后加助教微信:eduxy-1)

.net core快速开发框架

.net core快速开发框架

玩转Python-Python3基础入门

玩转Python-Python3基础入门

Python数据挖掘简易入门

Python数据挖掘简易入门

微信公众平台开发入门

微信公众平台开发入门

程序员的兼职技能课

程序员的兼职技能课

Windows版YOLOv4目标检测实战:训练自己的数据集

Windows版YOLOv4目标检测实战:训练自己的数据集

HoloLens2开发入门教程

HoloLens2开发入门教程

微信小程序开发实战

微信小程序开发实战

Java8零基础入门视频教程

Java8零基础入门视频教程

相关热词 c#跨线程停止timer c#批量写入sql数据库 c# 自动安装浏览器 c#语言基础考试题 c# 偏移量打印是什么 c# 绘制曲线图 c#框体中的退出函数 c# 按钮透明背景 c# idl 混编出错 c#在位置0处没有任何行
立即提问