du59131 2019-07-16 12:51
浏览 47

将Kafka消息轮询成单片golang

I want to a kafka consumer which polls Kafka message into a slice of events and perform some bulkoperation over that slice and it should block the thread till the bulkoperation is completed just to ensure I am not missing any record from Kafka

I am using confluent-kafka-go library which gives us the flexibility to poll the consumer for some interval of time which returns an Event everytime. I want to get slices of event polled in the interval of time mentioned so that I can run batch operations over that slice.


    var records []struct
    maxBatchsize := 500
    ev := c.Poll(1000)
    switch e := ev.(type) {
    case *kafka.Message:
        fmt.Println(string(e.Value))
        <<<json unmarshalled to struct>>>
        records = append(records, struct)
        if len(records) >= maxBatchsize {
        err := BulkInsert(records)
            if err != nil {
         _, _ = c.Commit()
        records = nil
        }
    }

Above code helps me just for hack but dont solve my problem. I want to avoid using maxBatchSize. I want a functionality like

val x = consumer.poll(5000).asScala \\which returns me and iterable consumer<k,v> on which I can perform bulk operations.
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 昨天挂载了一下u盘,然后拔了
    • ¥30 win from 窗口最大最小化,控件放大缩小,闪烁问题
    • ¥20 易康econgnition精度验证
    • ¥15 msix packaging tool打包问题
    • ¥28 微信小程序开发页面布局没问题,真机调试的时候页面布局就乱了
    • ¥15 python的qt5界面
    • ¥15 无线电能传输系统MATLAB仿真问题
    • ¥50 如何用脚本实现输入法的热键设置
    • ¥20 我想使用一些网络协议或者部分协议也行,主要想实现类似于traceroute的一定步长内的路由拓扑功能
    • ¥30 深度学习,前后端连接