连接到Kafka后Golang消费者延迟接收Kafka消息

I'm new to Golang and Kafa so this might seem like a silly question.

After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?

It prints a message right before consumer.Messages() and print another message for each message received. The ~20 sec delay is between the first fmt.Println and second fmt.Println.

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

func main() {

    // Create the consumer and listen for new messages
    consumer := createConsumer()

    // Create a signal channel to know when we are done
    done := make(chan bool)

    // Start processing messages
    go func() { 
        fmt.Println("Start consuming Kafka messages")
        for msg := range consumer.Messages() {
            s := string(msg.Value[:])
            fmt.Println("Msg: ", s)
        }
    }()

    <-done

}

func createConsumer() *cluster.Consumer {
    // Define our configuration to the cluster
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = false
    config.Group.Return.Notifications = false
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    // Create the consumer
    brokers := []string{"127.0.0.1:9092"}
    topics := []string{"orders"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        log.Fatal("Unable to connect consumer to Kafka")
    }
    go handleErrors(consumer)
    go handleNotifications(consumer)
    return consumer
}

docker-compose.yml

version: '2'
services:
zookeeper:
    image: "confluentinc/cp-zookeeper:5.0.1"
    hostname: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

broker-1:
    image: "confluentinc/cp-enterprise-kafka:5.0.1"
    hostname: broker-1
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_BROKER_RACK: rack-a
    KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
    KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
    KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    KAFKA_DELETE_TOPIC_ENABLE: "true"
    KAFKA_JMX_PORT: 9999
    KAFKA_JMX_HOSTNAME: 'broker-1'
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
    CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
    CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    CONFLUENT_METRICS_ENABLE: 'true'
    CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    KAFKA_CREATE_TOPICS: "orders:1:1"
dpiz9879
dpiz9879 我在您的应用程序中使用了与您类似的配置。从我所看到的,每当我启动应用程序并尝试连接到Kafka时,Kafka的GroupCoordinator都需要一些时间来重新平衡/重新稳定组消费者。就我而言,这就是延迟的原因,也是Kafka工作的方式...我认为代码没有错。希望这个帮助:)
一年多之前 回复
doulan6245
doulan6245 执行docker-composedown和docker-composeup清除所有Kafka消息,延迟仍然不到20秒。
一年多之前 回复
duanjiwang2927
duanjiwang2927 该主题下的100条以下消息。共有2个主题,第二个主题包含10条以下消息。Kafka和Zookeeper在具有16GB内存的2018i7MacbookPro上的Docker容器中运行
一年多之前 回复
drazvzi741287
drazvzi741287 您的主题中有几则讯息?如果要处理大量数据,Kafka可能需要一段时间才能找到起点。
一年多之前 回复

1个回答



我的Kafka使用者首次连接到Kafka服务器后,为什么在建立与Kafka服务器的连接之间会有延迟(〜20秒) ,并收到第一条消息?</ p>


不会有那么多延迟,因为消费者使用了从kafka接收消息的消息通道。 消息一旦在kafka队列中可用
,它将被发送到消费者可以接收的消息通道。</ p>
</ blockquote>

代码实现:- </ p>

 用于msg:=范围内的消费者。Messages(){
s:=字符串(msg.Value [:])
fmt.Println(“ Msg:”,s )
}
</ code> </ pre>

consumer.Messages()</ code>返回一个通道,并且 for </ code>遍历该通道 </ p>

请参阅此问题如何创建kafka 是Golang的消费者群体?使用sarama进行连接。 您不需要sarama-cluster进行连接。</ p>
</ div>

展开原文

原文

After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?

There can not be that much delay because consumer used message channel which receive messages from kafka. As soon as the message is available in kafka queue it will be sent to message channel which consumer can receive.

Coming to you code implementation :-

for msg := range consumer.Messages() {
    s := string(msg.Value[:])
    fmt.Println("Msg: ", s)
}

consumer.Messages() returns a channel and for loops over the channel which returns a message whenever it is available inside channel.

Refer to this question How to create a kafka consumer group in Golang? to connect using sarama. you don't need sarama-cluster for connection.

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐