Yesterday I found from log the kafka was reconsuming some messages after the Kafka group coordinator initiated a group rebalance. These messages had been consumed two days ago (confirmed from log).
There were two other rebalancing reported in the log, but they didn't reconsume messages anymore. So why the first time reblancing would cause reconsuming messages? What were the problems?
I am using the golang kafka client. here are the code
config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest
and we are handling messges before claiming messages, so seems we are using the Send At Least Once strategy for kafka. We have three brokers in one machine, and only one consumer thread (go routine) in the other machine.
Any explanations for this phoenomenon? I think the messages must have been committed, coz they were consumed two days ago, or why would kafka keep offsets for more than two days without committing?
Consuming Code sample:
func (consumer *Consumer) ConsumeClaim(session
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
realHanlder(message) // consumed data here
session.MarkMessage(message, "") // mark offset
}
return nil
}
Added:
Rebalancing happened after app restarted. There were two other restarts which didn't cuase reconnsume
-
configs of kafka
log.retention.check.interval.ms=300000
log.retention.hours=168
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable = true
auto.create.topics.enable=false