dongyun51582 2018-03-19 10:10
浏览 380

Kafka消费者组丢失未提交的消息

I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited.

kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited

Now running

kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe

returns

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              13              0

so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                           0          13              14              1

Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG test 0 15 15

and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.

fetch/commit code:

go func() {
    for {
        ctx := context.Background()

        m, err := mr.brokerReader.FetchMessage(ctx)
        if err != nil {
            break
        }

        if err := msgFunc(m); err != nil {
            log.Errorf("# messaging # cannot commit a message: %v", err)
            continue
        }

        // commit message if no error
        if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
            // should we do something else to just logging not committed message?
            log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
        }
    }
}()

reader configuration:

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers:         brokers,
GroupID:         groupID,
Topic:           topic,
CommitInterval:  0,
MinBytes:        10e3,
MaxBytes:        10e6,
})

library used: https://github.com/segmentio/kafka-go

  • 写回答

3条回答 默认 最新

  • doufen1933 2018-03-19 10:45
    关注

    In kafka you just submit offsets not single messages. If I understand your code right (not a go-developer). You just continue after you hit an invalid message. If after in invalid message a valid one appears you will submit the offset again - I guess that was not your intention.

    Just to make clear what does submitting or committing an offset means: Your consumer group will store the offset to a dedicated internal kafka topic (or on older kafka versions on zookeeper). An offset can identify a single position within a topic (or to be more precise on a partition of a given topic). This means you can only consume a topic in a linear fashion.

    Here you can see what happens on kafka-consumer side:

    New Kafka Consumer

    You are consuming from a (most likely multiple) stack(s) of messages. You submit the position (a.k.a offset) at this topic/partition. So you can not say I want to reconsume a specific message again. What you can do is to stop consuming once you hit an invalid message. In this case your problem will be: How do I get rid of this message. Deleting a single message from a kafka topic is tricky. A common pattern is to write this messages to some kind of dead-letter topic and deal with it with a different consumer.

    Hope that made things a little bit clearer to you.

    评论

报告相同问题?

悬赏问题

  • ¥15 程序不包含适用于入口点的静态Main方法
  • ¥15 素材场景中光线烘焙后灯光失效
  • ¥15 请教一下各位,为什么我这个没有实现模拟点击
  • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
  • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
  • ¥20 有关区间dp的问题求解
  • ¥15 多电路系统共用电源的串扰问题
  • ¥15 slam rangenet++配置
  • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
  • ¥15 ubuntu子系统密码忘记