douzhigan1687 2019-07-04 17:49 采纳率: 0%
浏览 1620
已采纳

如何使用Sarama在多个goroutine中从Kafka主题消费?

I use https://github.com/Shopify/sarama for interaction with Kafka. I have a topic with, for example, 100 partitions. I have application, which is deployed on 1 host. So, I want to consume from this topic in multiple goroutines.

I see this example - https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go , in which we can see, how to create consumer in specific consumer group.

So, my question is, should I create multiply such consumers, or there is some setting in Sarama, where I can set up needed number of consumer goroutines.

P.S. I see this question - https://github.com/Shopify/sarama/issues/140 - but there is no answer, how to create MultiConsumer.

  • 写回答

1条回答 默认 最新

  • doutuanxiao4619 2019-07-04 19:02
    关注

    This example shows a fully working console application which can consume for all partitions in a topic creating one goroutine per partition:

    https://github.com/Shopify/sarama/blob/master/tools/kafka-console-consumer/kafka-console-consumer.go

    It is linked at the end of the thread you posted in your question.

    It basically creates one consumer:

    c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
    

    Then gets all the partitions for the desired topic:

    func getPartitions(c sarama.Consumer) ([]int32, error) {
        if *partitions == "all" {
            return c.Partitions(*topic)
        }
    ...
    

    Then for each partition it creates a PartitionConsumer and consumes from each partition in a different goroutine:

    for _, partition := range partitionList {
        pc, err := c.ConsumePartition(*topic, partition, initialOffset)
        ....
    
        wg.Add(1)
        go func(pc sarama.PartitionConsumer) {
            defer wg.Done()
            for message := range pc.Messages() {
                messages <- message
            }
        }(pc)
    
    }
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥50 永磁型步进电机PID算法
  • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
  • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
  • ¥15 如何处理复杂数据表格的除法运算
  • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥200 uniapp长期运行卡死问题解决
  • ¥15 latex怎么处理论文引理引用参考文献
  • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?
  • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?