douzhang1115
2017-02-01 18:07
浏览 1.3k
已采纳

如何在Golang中创建kafka消费者组?

An available library is sarama (or its expansion sarama-cluster) however no consumer group example are provided, not in sarama nor in sarama-cluster.

I do not understand the API. May I have an example of creating a consumer group for a topic?

图片转代码服务由CSDN问答提供 功能建议

可用库为 sarama (或其扩展名 sarama-cluster ),但是没有 提供了消费者组示例,但未在 sarama 和< a href =“ https://github.com/bsm/sarama-cluster/issues/105” rel =“ nofollow noreferrer”> sarama-cluster 。

我愿意 不了解API。 我可以举一个为主题创建消费者组的示例吗?

  • 写回答
  • 关注问题
  • 收藏
  • 邀请回答

2条回答 默认 最新

  • doushoubu5360 2017-02-01 22:12
    已采纳

    The consumer group is specified by the second argument of the cluster consumer "constructor". Here's a very basic sketch:

    import (
        "github.com/Shopify/sarama"
        "github.com/bsm/sarama-cluster"
    )
    
    conf := cluster.NewConfig()
    // add config values
    
    brokers := []string{"kafka-1:9092", "kafka-2:9092"}
    group := "Your-Consumer-Group"
    topics := []{"topicName"}
    consumer := cluster.NewConsumer(broker, group, topics, conf)
    

    And so you'll have a consumer belonging to the specified consumer group.

    已采纳该答案
    打赏 评论
  • dphe5602 2019-06-20 05:49

    There is no need to use sarama-cluster library. It is DEPRECATED for apache kafka integration. Sarama original library itself provide a way to connect to kafka cluster using consumer group.

    We need to create client and then we initialize consumer group where we create claims and wait for message channel to receive message.

    Initializing client :-

    kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
    if err != nil {
        log.Println(err)
    }
    
    config := sarama.NewConfig()
    config.Version = kfversion
    config.Consumer.Return.Errors = true
    
    // Start with a client
    client, err := sarama.NewClient([]string{brokerAddr}, config)
    if err != nil {
        log.Println(err)
    }
    defer func() { _ = client.Close() }()
    

    Connection to consumer group :-

    // Start a new consumer group
    group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
    if err != nil {
        log.Println(err)
    }
    defer func() { _ = group.Close() }()
    

    Start consuming messages from topic partition :-

    // Iterate over consumer sessions.
    ctx := context.Background()
    for {
        topics := []string{topicName}
        handler := &Message{}
        err := group.Consume(ctx, topics, handler)
        if err != nil {
            log.Println(err)
        }
    }
    

    The last part is to wait for message channel to consume messages. We need to implement all of the functions (three) to implement ConsumerGroupHandler interface.

    func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
    func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
    func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
        for msg := range claim.Messages() {
            fmt.Printf("Message topic:%q partition:%d offset:%d
    ", msg.Topic, msg.Partition, msg.Offset)
            sess.MarkMessage(msg, "")
        }
        return nil
    }
    

    For more information on kafka using golang check sarama library.

    打赏 评论

相关推荐 更多相似问题