I am using consumer group with just one consumer, just one broker ( docker wurstmeister image ). It's decided in a code to commit offset or not - if code returns error then message is not commited. I need to ensure that system does not lose any message - even if that means retrying same msg forever ( for now ;) ). For testing this I have created simple handler which does not commit offset in case of 'error' string send as message to kafka. All other strings are commited.
kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited
Now running
kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe
returns
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 13 13 0
so thats ok, there is no lag. Now we pass 'error' string to fake that something bad happened and message is not commited:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 13 14 1
Current offset stays at right position + there is 1 lagged message. Now if we pass correct message again offset will move on to 15:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test 0 15 15
and message number 14 will not be picked up ever again. Is it default behaviour? Do I need to trace last offset and load message by it+1 manually? I have set commit interval to 0 to hopefully not use any auto.commit mechanism.
fetch/commit code:
go func() {
for {
ctx := context.Background()
m, err := mr.brokerReader.FetchMessage(ctx)
if err != nil {
break
}
if err := msgFunc(m); err != nil {
log.Errorf("# messaging # cannot commit a message: %v", err)
continue
}
// commit message if no error
if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
// should we do something else to just logging not committed message?
log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
}
}
}()
reader configuration:
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
CommitInterval: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})
library used: https://github.com/segmentio/kafka-go