dtiu94034 2019-07-16 13:33 采纳率: 0%
浏览 1459

如何使用Sarama Go Kafka Consumer从最新的抵消量消耗?

我有三个问题:

  1. “oldest offset”是什么意思? Oldest offset并不意味着偏移量为0?

// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2

  1. 假设:

    A. 三个broker运行在一台机器上
    B. 使用者群体只有一个使用者线程
    C. 使用者信任OffsetOldest标志
    D. 已经产生了100条消息,目前使用者线程已经消耗了90条消息

    因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始?是91还是0?

  2. 在下面的代码中,似乎每次启动使用者时都会重新消耗消息,但实际上这种情况不应该发生。为什么重新消费会紧接着重新启动后发生(不是全部) ?

     func (this *consumerGroupHandler) ConsumeClaim(session 
     sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
              for message := range claim.Messages() {
              this.handler(message)
             session.MarkMessage(message, "")
        }
    
        return nil
    }
    
    ctx := context.Background()
    conf := sarama.NewConfig()
    
    conf.Version = sarama.V2_0_0_0
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Return.Errors = true
    
    consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
    if err != nil {
        logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
        return
    }
    
  • 写回答

1条回答 默认 最新

  • dragoninasia2014 2019-07-16 14:02
    关注
    1. No. When the retention policy is applied, older messages are deleted from the topics. Therefore, the oldest offset might not be the first-ever offset (i.e. 0).

    2. It depends on your configuration. Essentially, you have 3 options:

      • Start consuming from the earliest offset
      • Start consuming from the latest offset
      • Start consuming from a specific offset
    3. You have to use sarama.OffsetOldest. From the documentation,

     const (
            // OffsetNewest stands for the log head offset, i.e. the offset that will be
            // assigned to the next message that will be produced to the partition. You
            // can send this to a client's GetOffset method to get this offset, or when
            // calling ConsumePartition to start consuming new messages.
            OffsetNewest int64 = -1
            // OffsetOldest stands for the oldest offset available on the broker for a
            // partition. You can send this to a client's GetOffset method to get this
            // offset, or when calling ConsumePartition to start consuming from the
            // oldest offset that is still available on the broker.
            OffsetOldest int64 = -2
        )
    
    评论

报告相同问题?

悬赏问题

  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记
  • ¥15 信号傅里叶变换在matlab上遇到的小问题请求帮助
  • ¥15 保护模式-系统加载-段寄存器
  • ¥15 电脑桌面设定一个区域禁止鼠标操作