dsolwotv00116 2019-07-02 12:18
浏览 175
已采纳

重用Kafka消息的可能原因

Yesterday I found from log the kafka was reconsuming some messages after the Kafka group coordinator initiated a group rebalance. These messages had been consumed two days ago (confirmed from log).

There were two other rebalancing reported in the log, but they didn't reconsume messages anymore. So why the first time reblancing would cause reconsuming messages? What were the problems?

I am using the golang kafka client. here are the code

config := sarama.NewConfig()
config.Version = version
config.Consumer.Offsets.Initial = sarama.OffsetOldest 

and we are handling messges before claiming messages, so seems we are using the Send At Least Once strategy for kafka. We have three brokers in one machine, and only one consumer thread (go routine) in the other machine.

Any explanations for this phoenomenon? I think the messages must have been committed, coz they were consumed two days ago, or why would kafka keep offsets for more than two days without committing?

Consuming Code sample:

func (consumer *Consumer) ConsumeClaim(session 
sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

for message := range claim.Messages() {
    realHanlder(message)   // consumed data here
    session.MarkMessage(message, "") // mark offset
}

return nil
}

Added:

  1. Rebalancing happened after app restarted. There were two other restarts which didn't cuase reconnsume

  2. configs of kafka

    log.retention.check.interval.ms=300000
    log.retention.hours=168
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    delete.topic.enable = true
    auto.create.topics.enable=false

  • 写回答

1条回答 默认 最新

  • duanjiu4498 2019-08-10 00:56
    关注

    By reading the source code of both golang saram client and kafka server, finally I found the reason as below

    1. Consumer group offset retention time is 24hours, which is a default setting by kafka, while log retention is 7days explicitly set by us.

    2. My server app is running in test environment where few people can visit, which means there may be few messages produced by kafka producer, and then the consumer group has few messages to consumes, thus the consumer may not commit any offset for long time.

    3. When the consume offset is not updated for more than 24hours, due to the offset config, the kafka broker/coordinator will remove the consume offset from partitions. Next time the saram queries from kafka broker where the offset is, of course client gets nothing. Notice we are using sarama.OffsetOldest as initial value, then sarama client will consume messages from the start of messages kept by kafka broker, which results in messages reconsuming, and this is likely to happen because log retention is 7days

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名
  • ¥65 汇编语言除法溢出问题