douqiong8412 2018-12-07 14:05
浏览 619

连接到Kafka后Golang消费者延迟接收Kafka消息

I'm new to Golang and Kafa so this might seem like a silly question.

After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?

It prints a message right before consumer.Messages() and print another message for each message received. The ~20 sec delay is between the first fmt.Println and second fmt.Println.

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
    cluster "github.com/bsm/sarama-cluster"
)

func main() {

    // Create the consumer and listen for new messages
    consumer := createConsumer()

    // Create a signal channel to know when we are done
    done := make(chan bool)

    // Start processing messages
    go func() { 
        fmt.Println("Start consuming Kafka messages")
        for msg := range consumer.Messages() {
            s := string(msg.Value[:])
            fmt.Println("Msg: ", s)
        }
    }()

    <-done

}

func createConsumer() *cluster.Consumer {
    // Define our configuration to the cluster
    config := cluster.NewConfig()
    config.Consumer.Return.Errors = false
    config.Group.Return.Notifications = false
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    // Create the consumer
    brokers := []string{"127.0.0.1:9092"}
    topics := []string{"orders"}
    consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
    if err != nil {
        log.Fatal("Unable to connect consumer to Kafka")
    }
    go handleErrors(consumer)
    go handleNotifications(consumer)
    return consumer
}

docker-compose.yml

version: '2'
services:
zookeeper:
    image: "confluentinc/cp-zookeeper:5.0.1"
    hostname: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

broker-1:
    image: "confluentinc/cp-enterprise-kafka:5.0.1"
    hostname: broker-1
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_BROKER_RACK: rack-a
    KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
    KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://127.0.0.1:9092'
    KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    KAFKA_DELETE_TOPIC_ENABLE: "true"
    KAFKA_JMX_PORT: 9999
    KAFKA_JMX_HOSTNAME: 'broker-1'
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker-1:9092
    CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
    CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    CONFLUENT_METRICS_ENABLE: 'true'
    CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    KAFKA_CREATE_TOPICS: "orders:1:1"
  • 写回答

1条回答 默认 最新

  • duanjie5570 2019-06-20 07:40
    关注

    After my Kafka consumer first connects to the Kafka server, why is there a delay (~ 20 secs) between establishing connection to the Kafka server, and receiving the first message?

    There can not be that much delay because consumer used message channel which receive messages from kafka. As soon as the message is available in kafka queue it will be sent to message channel which consumer can receive.

    Coming to you code implementation :-

    for msg := range consumer.Messages() {
        s := string(msg.Value[:])
        fmt.Println("Msg: ", s)
    }
    

    consumer.Messages() returns a channel and for loops over the channel which returns a message whenever it is available inside channel.

    Refer to this question How to create a kafka consumer group in Golang? to connect using sarama. you don't need sarama-cluster for connection.

    评论

报告相同问题?

悬赏问题

  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 LiBeAs的带隙等于0.997eV,计算阴离子的N和P
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘
  • ¥15 来真人,不要ai!matlab有关常微分方程的问题求解决,
  • ¥15 perl MISA分析p3_in脚本出错
  • ¥15 k8s部署jupyterlab,jupyterlab保存不了文件
  • ¥15 ubuntu虚拟机打包apk错误
  • ¥199 rust编程架构设计的方案 有偿
  • ¥15 回答4f系统的像差计算