dsolwotv00116 2019-07-02 12:18
浏览 175
已采纳

重用Kafka消息的可能原因

Yesterday I found from log the kafka was reconsuming some messages after the Kafka group coordinator initiated a group rebalance. These messages had been consumed two days ago (confirmed from log).

There were two other rebalancing reported in the log, but they didn't reconsume messages anymore. So why the first time reblancing would cause reconsuming messages? What were the problems?

I am using the golang kafka client. here are the code

config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest 

and we are handling messges before claiming messages, so seems we are using the Send At Least Once strategy for kafka. We have three brokers in one machine, and only one consumer thread (go routine) in the other machine.

Any explanations for this phoenomenon? I think the messages must have been committed, coz they were consumed two days ago, or why would kafka keep offsets for more than two days without committing?

Consuming Code sample:

func (consumer *Consumer) ConsumeClaim(session 
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {
    realHanlder(message)   // consumed data here
    session.MarkMessage(message, "") // mark offset
}

return nil
}

Added:

  1. Rebalancing happened after app restarted. There were two other restarts which didn't cuase reconnsume

  2. configs of kafka

    log.retention.check.interval.ms=300000
    log.retention.hours=168
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable = true
    auto.create.topics.enable=false

  • 写回答

1条回答 默认 最新

  • duanjiu4498 2019-08-10 00:56
    关注

    By reading the source code of both golang saram client and kafka server, finally I found the reason as below

    1. Consumer group offset retention time is 24hours, which is a default setting by kafka, while log retention is 7days explicitly set by us.

    2. My server app is running in test environment where few people can visit, which means there may be few messages produced by kafka producer, and then the consumer group has few messages to consumes, thus the consumer may not commit any offset for long time.

    3. When the consume offset is not updated for more than 24hours, due to the offset config, the kafka broker/coordinator will remove the consume offset from partitions. Next time the saram queries from kafka broker where the offset is, of course client gets nothing. Notice we are using sarama.OffsetOldest as initial value, then sarama client will consume messages from the start of messages kept by kafka broker, which results in messages reconsuming, and this is likely to happen because log retention is 7days

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥30 酬劳2w元求合作写文章
  • ¥15 在现有系统基础上增加功能
  • ¥15 远程桌面文档内容复制粘贴,格式会变化
  • ¥15 关于#java#的问题:找一份能快速看完mooc视频的代码
  • ¥15 这种微信登录授权 谁可以做啊
  • ¥15 请问我该如何添加自己的数据去运行蚁群算法代码
  • ¥20 用HslCommunication 连接欧姆龙 plc有时会连接失败。报异常为“未知错误”
  • ¥15 网络设备配置与管理这个该怎么弄
  • ¥20 机器学习能否像多层线性模型一样处理嵌套数据
  • ¥20 西门子S7-Graph,S7-300,梯形图