就是为了Ctrl+C+V 2022-02-15 09:53 采纳率: 0%
浏览 22
已结题

golang在整合kafka的过程中,如何实现动态监听kafka的topic,Java中是能够用通配符的,go有没有类似的解决方案呢?

go整合kafka
func main()  {
    //开始连接kafka
    fmt.Println("开始连接kafka")
    // 新建一个arama配置实例
    config := sarama.NewConfig()
    config.Version=sarama.V0_10_2_0
    config.ClientID="iocserver_go_fcq"
    //消费者
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        fmt.Println("consumer connect error:", err)
        return
    }

    // WaitForAll waits for all in-sync replicas to commit before responding.
    config.Producer.RequiredAcks = sarama.WaitForAll

    // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    config.Producer.Return.Successes = true
    // 新建一个同步生产者
    client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        fmt.Println("producer close, err:", err)
        return
    }
    fmt.Println("kafka connnect success...")
    defer consumer.Close()
    //Partitions(topic):该方法返回了该topic的所有分区id
    partitions, err := consumer.Partitions("test")
    if err != nil {
        fmt.Println("geet partitions failed, err:", err)
        return
    }

    for _, p := range partitions {
        //ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
        //如果该分区消费者已经消费了该信息将会返回error
        //sarama.OffsetNewest:表明了为最新消息
        partitionConsumer, err := consumer.ConsumePartition("test", p, sarama.OffsetNewest)
        if err != nil {
            fmt.Println("partitionConsumer err:", err)
            continue
        }
        wgkafka.Add(1)
        go func(){
            for m := range partitionConsumer.Messages() {
                fmt.Printf("topic: %s, key: %s, text: %s, offset: %d\n",string(m.Topic), string(m.Key), string(m.Value), m.Offset)
                //查询clickhouse
                var topics = queryTopicByClickhouse();
            }
            wgkafka.Done()
        }()
    }
    wgkafka.Wait()
    fmt.Println("kafka监听结束")

}
现在没有报错,就是不知道如何动态监听kafka的topic
我的解答思路和尝试过的方法
有没有和Java中的监听kafka的通配符的方式解决呢?
  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 2月23日
    • 创建了问题 2月15日

    悬赏问题

    • ¥15 搭建Multisim24秒篮球倒计时
    • ¥15 关于#python#的问题,请各位专家解答!
    • ¥20 笔记本电脑 处理器是AMD的r7
    • ¥15 clash节点timeout
    • ¥15 需要在vitis下实现彩调视频图像累加,并输出
    • ¥15 解决不了的LNK2019错误
    • ¥20 MATLAB仿真三相桥式全控整流电路
    • ¥15 EDA技术关于时序电路设计
    • ¥15 百度文心一言流式返回sse失败
    • ¥15 由于远程方已关闭传输流,身份验证失败