dongnong7524 2016-12-05 14:09
浏览 338

Golang Kafka不消耗所有偏移量的最新消息

First Batch:- I am trying to pull data from 100 flat file and loading up into an array and inserting them to kafka producer one by one as byte array.

Second Batch:- I am consuming from kafka consumer and then inserting them to NoSQL database.

I use Offsetnewset in the config file of shopify sarama golang package for Kafka.

I can receive and insert messages to kafka but while consuming I am getting only the first message. Since I gave Offset newest in the sarama config. how can I get all the data here.

  • 写回答

1条回答 默认 最新

  • dongzhunqiu4841 2016-12-06 17:35
    关注

    It is difficult to be able to tell something without any code or more in depth explanation about how kafka is configured (i.e.: topics, partitions, ...), so few quick checks come to my mind:

    1. Assuming you start consuming with the OffsetNewest set before you start producing, one thing that maybe happening is that you are not consuming from all partitions on that topic, regarding to sarama docs, you have to consume each partition explicitly by creating PartitionConsumers. From the example in https://godoc.org/github.com/Shopify/sarama#Consumer:

      partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
      if err != nil {
          panic(err)
      }
      
      ...
      
      consumed := 0
      ConsumerLoop:
      for {
          select {
          case msg := <-partitionConsumer.Messages():
              log.Printf("Consumed message offset %d
      ", msg.Offset)
              consumed++
          case <-signals:
              break ConsumerLoop
          }
      }
      
    2. You, in fact, are starting the consuming after producing all the events, and so, the pointer to read them all is not OffsetNewest but OffsetOldest instead.

    I'm sorry to not be able to give you a more useful answer, but maybe if you paste some code or give more details we can help a but more.

    评论

报告相同问题?

悬赏问题

  • ¥15 js调用html页面需要隐藏某个按钮
  • ¥15 ads仿真结果在圆图上是怎么读数的
  • ¥20 Cotex M3的调试和程序执行方式是什么样的?
  • ¥15 一道python难题3
  • ¥15 牛顿斯科特系数表表示
  • ¥15 arduino 步进电机
  • ¥20 程序进入HardFault_Handler
  • ¥15 oracle集群安装出bug
  • ¥15 关于#python#的问题:自动化测试
  • ¥20 问题请教!vue项目关于Nginx配置nonce安全策略的问题