dtiu94034 2019-07-16 13:33 采纳率: 0%
浏览 1459

如何使用Sarama Go Kafka Consumer从最新的抵消量消耗?

我有三个问题:

  1. “oldest offset”是什么意思? Oldest offset并不意味着偏移量为0?

// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2

  1. 假设:

    A. 三个broker运行在一台机器上
    B. 使用者群体只有一个使用者线程
    C. 使用者信任OffsetOldest标志
    D. 已经产生了100条消息,目前使用者线程已经消耗了90条消息

    因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始?是91还是0?

  2. 在下面的代码中,似乎每次启动使用者时都会重新消耗消息,但实际上这种情况不应该发生。为什么重新消费会紧接着重新启动后发生(不是全部) ?

     func (this *consumerGroupHandler) ConsumeClaim(session 
     sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
              for message := range claim.Messages() {
              this.handler(message)
             session.MarkMessage(message, "")
        }
    
        return nil
    }
    
    ctx := context.Background()
    conf := sarama.NewConfig()
    
    conf.Version = sarama.V2_0_0_0
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Return.Errors = true
    
    consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
    if err != nil {
        logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
        return
    }
    
  • 写回答

1条回答

  • dragoninasia2014 2019-07-16 14:02
    关注
    1. No. When the retention policy is applied, older messages are deleted from the topics. Therefore, the oldest offset might not be the first-ever offset (i.e. 0).

    2. It depends on your configuration. Essentially, you have 3 options:

      • Start consuming from the earliest offset
      • Start consuming from the latest offset
      • Start consuming from a specific offset
    3. You have to use sarama.OffsetOldest. From the documentation,

     const (
            // OffsetNewest stands for the log head offset, i.e. the offset that will be
            // assigned to the next message that will be produced to the partition. You
            // can send this to a client's GetOffset method to get this offset, or when
            // calling ConsumePartition to start consuming new messages.
            OffsetNewest int64 = -1
            // OffsetOldest stands for the oldest offset available on the broker for a
            // partition. You can send this to a client's GetOffset method to get this
            // offset, or when calling ConsumePartition to start consuming from the
            // oldest offset that is still available on the broker.
            OffsetOldest int64 = -2
        )
    
    评论

报告相同问题?

悬赏问题

  • ¥30 这是哪个作者做的宝宝起名网站
  • ¥60 版本过低apk如何修改可以兼容新的安卓系统
  • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
  • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
  • ¥50 有数据,怎么用matlab求全要素生产率
  • ¥15 TI的insta-spin例程
  • ¥15 完成下列问题完成下列问题
  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!