doucong4535 2019-02-22 21:09
浏览 53
已采纳

如何从一开始就订阅

I am trying to write a Kafka Consumer with GroupId foo, which Subscribes to a certain topic and reads from the very beginning (even if there is a previous offset). I tried to use Subscribe with the rebalance callback, but it never seems to be called (have set the go.application setting).

Is there any example how one would make this work?


EDIT: Added more details

  • 写回答

2条回答 默认 最新

  • dsi37923 2019-02-25 14:09
    关注

    We now went with setting enable.auto.commit to false. This way, there will be no offset stored and we consume from the beginning on every run just fine.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
  • duankui6150 2019-02-23 08:53
    关注

    This example is from Confluent Kafka go Github, you may only need to set the value of auto.offset.reset to kafka.OffsetBeginning.String():

    package main
    
    /**
     * Copyright 2016 Confluent Inc.
     */
    
    // consumer_example implements a consumer using the non-channel Poll() API
    // to retrieve messages and events.
    
    import (
        "fmt"
        "github.com/confluentinc/confluent-kafka-go/kafka"
        "os"
        "os/signal"
        "syscall"
    )
    
    func main() {
    
        broker := "YOUR_BROKER"
        group := "YOUR_GROUP"
        topics := "YOUR_TOPICS"
        sigchan := make(chan os.Signal, 1)
        signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    
        c, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":  broker,
            "group.id":           group,
            "session.timeout.ms": 6000,
            "auto.offset.reset":  kafka.OffsetBeginning.String()})
    
        if err != nil {
            fmt.Fprintf(os.Stderr, "Failed to create consumer: %s
    ", err)
            os.Exit(1)
        }
    
        fmt.Printf("Created Consumer %v
    ", c)
    
        err = c.SubscribeTopics(topics, nil)
    
        run := true
    
        for run == true {
            select {
            case sig := <-sigchan:
                fmt.Printf("Caught signal %v: terminating
    ", sig)
                run = false
            default:
                ev := c.Poll(100)
                if ev == nil {
                    continue
                }
    
                switch e := ev.(type) {
                case *kafka.Message:
                    fmt.Printf("%% Message on %s:
    %s
    ",
                        e.TopicPartition, string(e.Value))
                    if e.Headers != nil {
                        fmt.Printf("%% Headers: %v
    ", e.Headers)
                    }
                case kafka.Error:
                    // Errors should generally be considered as informational, the client will try to automatically recover
                    fmt.Fprintf(os.Stderr, "%% Error: %v
    ", e)
                default:
                    fmt.Printf("Ignored %v
    ", e)
                }
            }
        }
    
        fmt.Printf("Closing consumer
    ")
        c.Close()
    }
    
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥15 Arduino实现音频混响
  • ¥15 cuda.jit加速报错
  • ¥15 Octave 安装工具箱出错 Only Win32 target is supported!
  • ¥15 docker save的不能在另一台设备运行
  • ¥15 Unity Animation Rigging使用问题
  • ¥15 mbedtls握手返回-7200
  • ¥30 c++ http服务器
  • ¥15 express连接mssql,每条额外附加了语句
  • ¥20 IQOO12如何有效ADB方法
  • ¥15 ios如何获取用户的订阅情况