如何使用Sarama Go Kafka Consumer从最新的抵消量消耗?

我有三个问题:</ p>


  1. 什么是 最早的偏移量是什么意思? 最旧的偏移量不等于偏移量0?</ li>
    </ ol>


    // OffsetOldest代表经纪人上对

    可用的最旧偏移量 //分区。

    OffsetOldest int64 = -2 </ p>
    </ blockquote>


    1. 假设</ p> \ ñ

      一个。 在同一台计算机上运行的三个代理

      B。 使用者组只有一个使用者线程

      C。 使用者配置OffsetOldest标志。

      D。 产生了100 msg,当前使用方线程已消耗90 msg。 </ p>

      因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始消耗? 是91还是0? </ p> </ li>

    2. 在下面的代码中,似乎每次启动使用者时都会重新使用消息。 但实际上并不会一直发生。 为什么重用只是在重新启动后才发生几次(不是全部)?</ p>

        func(此* consumerGroupHandler)ConsumeClaim(会话
      sarama.ConsumerGroupSession,声称sarama.ConsumerGroupClaim)错误 {
      for message:= range Claim.Messages(){
      this.handler(message)
      session.MarkMessage(message,“”)
      }

      返回nil
      }

      ctx: = context.Background()
      conf:= sarama.NewConfig()

      conf.Version = sarama.V2_0_0_0
      conf.Consumer.Offsets.Initial = sarama.OffsetOldest
      conf.Consumer.Return.Errors = true
      \ nconsumer,err:= sarama.NewConsumerGroup(strings.Split(app.Config()。KafkaBrokers,“,”),groupId,conf)
      if err!= nil {
      logger.Error(“ NewConsumerGroupFromClient(%s)error :%v“,groupId,err)
      返回
      }
      </ code> </ pre> </ li>
      </ ol>
      </ div>

展开原文

原文

我有三个问题:

  1. “oldest offset”是什么意思? Oldest offset并不意味着偏移量为0?

// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2

  1. 假设:

    A. 三个broker运行在一台机器上
    B. 使用者群体只有一个使用者线程
    C. 使用者信任OffsetOldest标志
    D. 已经产生了100条消息,目前使用者线程已经消耗了90条消息

    因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始?是91还是0?

  2. 在下面的代码中,似乎每次启动使用者时都会重新消耗消息,但实际上这种情况不应该发生。为什么重新消费会紧接着重新启动后发生(不是全部) ?

     func (this *consumerGroupHandler) ConsumeClaim(session 
     sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
              for message := range claim.Messages() {
              this.handler(message)
             session.MarkMessage(message, "")
        }
    
        return nil
    }
    
    ctx := context.Background()
    conf := sarama.NewConfig()
    
    conf.Version = sarama.V2_0_0_0
    conf.Consumer.Offsets.Initial = sarama.OffsetOldest
    conf.Consumer.Return.Errors = true
    
    consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf)
    if err != nil {
        logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err)
        return
    }
    

1个回答




  1. 否。 应用保留策略后,将从主题中删除较旧的消息。 因此,最早的偏移量可能不是第一个偏移量(即 0 </ code>)。 </ p> </ li>

  2. 这取决于您的配置。 本质上,您有3个选项:</ p>


    • 最早的</ code>偏移量开始消耗</ li>
    • 开始消耗 code>最新</ code>偏移量</ li>
    • 从特定偏移量开始消耗</ li>
      </ ul> </ li>
    • 您必须使用< 代码> sarama.OffsetOldest </代码>。 从文档,</ p> </ li>
      </ ol>


        const(
      // OffsetNewest代表日志头的偏移量,即将被分配给下一个将要生成的消息的偏移量
      //

      //可以将其发送到客户端的GetOffset方法以获取此偏移量,或者当
      //调用ConsumePartition开始使用新消息时。
      OffsetNewest int64 = -1
      // OffsetOldest代表最旧的 代理在
      //分区上可用的偏移量。您可以将此偏移量发送到客户端的GetOffset方法以获取此
      //偏移量,或者在调用ConsumePartition以从仍可用的最旧偏移量开始使用时//

      OffsetOldest int64 = -2

      </ code> </ pre>
      </ blockquote>
      </ div>

展开原文

原文

  1. No. When the retention policy is applied, older messages are deleted from the topics. Therefore, the oldest offset might not be the first-ever offset (i.e. 0).

  2. It depends on your configuration. Essentially, you have 3 options:

    • Start consuming from the earliest offset
    • Start consuming from the latest offset
    • Start consuming from a specific offset
  3. You have to use sarama.OffsetOldest. From the documentation,

 const (
        // OffsetNewest stands for the log head offset, i.e. the offset that will be
        // assigned to the next message that will be produced to the partition. You
        // can send this to a client's GetOffset method to get this offset, or when
        // calling ConsumePartition to start consuming new messages.
        OffsetNewest int64 = -1
        // OffsetOldest stands for the oldest offset available on the broker for a
        // partition. You can send this to a client's GetOffset method to get this
        // offset, or when calling ConsumePartition to start consuming from the
        // oldest offset that is still available on the broker.
        OffsetOldest int64 = -2
    )

douqingji3026
douqingji3026 我找到了答案。 对于问题2,除非消耗偏移量过期,否则您不会从最早/最新偏移量开始消耗!!!
大约一年之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问