I want to a kafka consumer which polls Kafka message into a slice of events and perform some bulkoperation over that slice and it should block the thread till the bulkoperation is completed just to ensure I am not missing any record from Kafka
I am using confluent-kafka-go library which gives us the flexibility to poll the consumer for some interval of time which returns an Event everytime. I want to get slices of event polled in the interval of time mentioned so that I can run batch operations over that slice.
var records []struct
maxBatchsize := 500
ev := c.Poll(1000)
switch e := ev.(type) {
case *kafka.Message:
fmt.Println(string(e.Value))
<<<json unmarshalled to struct>>>
records = append(records, struct)
if len(records) >= maxBatchsize {
err := BulkInsert(records)
if err != nil {
_, _ = c.Commit()
records = nil
}
}
Above code helps me just for hack but dont solve my problem. I want to avoid using maxBatchSize. I want a functionality like
val x = consumer.poll(5000).asScala \\which returns me and iterable consumer<k,v> on which I can perform bulk operations.