douzhang1115 2017-02-01 18:07
浏览 1924
已采纳

如何在Golang中创建kafka消费者组?

An available library is sarama (or its expansion sarama-cluster) however no consumer group example are provided, not in sarama nor in sarama-cluster.

I do not understand the API. May I have an example of creating a consumer group for a topic?

  • 写回答

2条回答 默认 最新

  • doushoubu5360 2017-02-01 22:12
    关注

    The consumer group is specified by the second argument of the cluster consumer "constructor". Here's a very basic sketch:

    import (
        "github.com/Shopify/sarama"
        "github.com/bsm/sarama-cluster"
    )
    
    conf := cluster.NewConfig()
    // add config values
    
    brokers := []string{"kafka-1:9092", "kafka-2:9092"}
    group := "Your-Consumer-Group"
    topics := []{"topicName"}
    consumer := cluster.NewConsumer(broker, group, topics, conf)
    

    And so you'll have a consumer belonging to the specified consumer group.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
  • dphe5602 2019-06-20 05:49
    关注

    There is no need to use sarama-cluster library. It is DEPRECATED for apache kafka integration. Sarama original library itself provide a way to connect to kafka cluster using consumer group.

    We need to create client and then we initialize consumer group where we create claims and wait for message channel to receive message.

    Initializing client :-

    kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
    if err != nil {
        log.Println(err)
    }
    
    config := sarama.NewConfig()
    config.Version = kfversion
    config.Consumer.Return.Errors = true
    
    // Start with a client
    client, err := sarama.NewClient([]string{brokerAddr}, config)
    if err != nil {
        log.Println(err)
    }
    defer func() { _ = client.Close() }()
    

    Connection to consumer group :-

    // Start a new consumer group
    group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
    if err != nil {
        log.Println(err)
    }
    defer func() { _ = group.Close() }()
    

    Start consuming messages from topic partition :-

    // Iterate over consumer sessions.
    ctx := context.Background()
    for {
        topics := []string{topicName}
        handler := &Message{}
        err := group.Consume(ctx, topics, handler)
        if err != nil {
            log.Println(err)
        }
    }
    

    The last part is to wait for message channel to consume messages. We need to implement all of the functions (three) to implement ConsumerGroupHandler interface.

    func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
    func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
    func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
        for msg := range claim.Messages() {
            fmt.Printf("Message topic:%q partition:%d offset:%d
    ", msg.Topic, msg.Partition, msg.Offset)
            sess.MarkMessage(msg, "")
        }
        return nil
    }
    

    For more information on kafka using golang check sarama library.

    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 Tpad api账户 api口令
  • ¥30 ppt进度条制作,vba语言
  • ¥15 stc12c5a60s2单片机测光敏ADC
  • ¥15 生信simpleaffy包下载
  • ¥15 请教一下simulink中S函数相关问题
  • ¥15 在二层网络中,掩码存在包含关系即可通信
  • ¥15 端口转发器解析失败不知道电脑设置了啥
  • ¥15 Latex算法流程图行号自定义
  • ¥15 关于#python#的问题:我在自己的电脑上运行起来总是报错,希望能给我一个详细的教程,(开发工具-github)
  • ¥40 基于51单片机实现球赛计分器功能