douluan5738 2017-11-02 09:07
浏览 177
已采纳

使用Golang Sarama软件包无法使用来自本地运行的Kafka服务器的消息

I am making a simple Telegram bot that would read messages from a local Kafka server and print it out to a chat. Both zookeeper and kafka server config files are at their defaults. Console consumer works. The problem rises when I try to consume messages from code using Golang Sarama package. Before I added these lines:

case err := <-pc.Errors(): log.Panic(err)

the program only printed the messages once, after which it would stall. Now it panics prinitng this to the log: kafka: error while consuming test1/0: kafka: broker not connected

Here's the code:

    type kafkaResponse struct {
        telega  *tgbotapi.Message
        message []byte
    }

    type kafkaRequest struct {
        telega *tgbotapi.Message
        topic  string
    }    
    var kafkaBrokers = []string{"localhost:9092"}
    func main() {
                //channels for request response
                var reqChan = make(chan kafkaRequest)
                var respChan = make(chan kafkaResponse)

                //starting kafka client routine to listen to topic channnel
                go consumer(reqChan, respChan, kafkaBrokers)

                //bot thingy here
                bot, err := tgbotapi.NewBotAPI(token)
                if err != nil {
                    log.Panic(err)
                }
                bot.Debug = true
                log.Printf("Authorized on account %s", bot.Self.UserName)
                u := tgbotapi.NewUpdate(0)
                u.Timeout = 60
                updates, err := bot.GetUpdatesChan(u)
                for {
                    select {
                    case update := <-updates:
                        if update.Message == nil {
                            continue
                        }
                        switch update.Message.Text {

                        case "Topic: test1":
                            topic := "test1"
                            reqChan <- kafkaRequest{update.Message, topic}
                        }
                    case response := <-respChan:
                        bot.Send(tgbotapi.NewMessage(response.telega.Chat.ID, string(response.message)))
                    }

                }

here's the consumer.go:

 func consumer(reqChan chan kafkaRequest, respChan chan kafkaResponse, brokers []string) {
            config := sarama.NewConfig()
            config.Consumer.Return.Errors = true

            // Create new consumer
            consumer, err := sarama.NewConsumer(brokers, config)
            if err != nil {
                panic(err)
            }
            defer func() {
                if err := consumer.Close(); err != nil {
                    panic(err)
                }
            }()

            select {
            case request := <-reqChan:
                //get all partitions on the given topic
                partitionList, err := consumer.Partitions(request.topic)
                if err != nil {
                    fmt.Println("Error retrieving partitionList ", err)
                }

                initialOffset := sarama.OffsetOldest
                for _, partition := range partitionList {
                    pc, _ := consumer.ConsumePartition(request.topic, partition, initialOffset)

                    go func(pc sarama.PartitionConsumer) {
                        for {
                            select {
                            case message := <-pc.Messages():
                                respChan <- kafkaResponse{request.telega, message.Value}
                            case err := <-pc.Errors():
                                log.Panic(err)
                            }
                        }
                    }(pc)
                }
            }
        }
  • 写回答

1条回答 默认 最新

  • douzhong5902 2017-11-02 11:27
    关注

    You are closing your consumer after setting up all the PartitionConsumers in the code

    defer func() {
                if err := consumer.Close(); err != nil {
                    panic(err)
                }
            }()
    

    However, the documentation specifies that you should only close the consumer after all the PartitionConsumers have been closed.

    // Close shuts down the consumer. It must be called after all child
    // PartitionConsumers have already been closed.
    Close() error
    

    I would recommend you add a sync.WaitGroup to the function go func(pc sarama.PartitionConsumer) {

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 如何在node.js中或者java中给wav格式的音频编码成sil格式呢
  • ¥15 不小心不正规的开发公司导致不给我们y码,
  • ¥15 我的代码无法在vc++中运行呀,错误很多
  • ¥50 求一个win系统下运行的可自动抓取arm64架构deb安装包和其依赖包的软件。
  • ¥60 fail to initialize keyboard hotkeys through kernel.0000000000
  • ¥30 ppOCRLabel导出识别结果失败
  • ¥15 Centos7 / PETGEM
  • ¥15 csmar数据进行spss描述性统计分析
  • ¥15 各位请问平行检验趋势图这样要怎么调整?说标准差差异太大了
  • ¥15 delphi webbrowser组件网页下拉菜单自动选择问题