我有三个问题:
- “oldest offset”是什么意思? Oldest offset并不意味着偏移量为0?
// OffsetOldest stands for the oldest offset available on the broker for a
// partition.
OffsetOldest int64 = -2
-
假设:
A. 三个broker运行在一台机器上
B. 使用者群体只有一个使用者线程
C. 使用者信任OffsetOldest标志
D. 已经产生了100条消息,目前使用者线程已经消耗了90条消息因此,如果使用者线程重新启动,那么该使用者将从哪个偏移量开始?是91还是0?
-
在下面的代码中,似乎每次启动使用者时都会重新消耗消息,但实际上这种情况不应该发生。为什么重新消费会紧接着重新启动后发生(不是全部) ?
func (this *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { this.handler(message) session.MarkMessage(message, "") } return nil } ctx := context.Background() conf := sarama.NewConfig() conf.Version = sarama.V2_0_0_0 conf.Consumer.Offsets.Initial = sarama.OffsetOldest conf.Consumer.Return.Errors = true consumer, err := sarama.NewConsumerGroup(strings.Split(app.Config().KafkaBrokers, ","), groupId, conf) if err != nil { logger.Error("NewConsumerGroupFromClient(%s) error: %v", groupId, err) return }