go整合kafka
func main() {
//开始连接kafka
fmt.Println("开始连接kafka")
// 新建一个arama配置实例
config := sarama.NewConfig()
config.Version=sarama.V0_10_2_0
config.ClientID="iocserver_go_fcq"
//消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("consumer connect error:", err)
return
}
// WaitForAll waits for all in-sync replicas to commit before responding.
config.Producer.RequiredAcks = sarama.WaitForAll
// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
// 新建一个同步生产者
client, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
fmt.Println("kafka connnect success...")
defer consumer.Close()
//Partitions(topic):该方法返回了该topic的所有分区id
partitions, err := consumer.Partitions("test")
if err != nil {
fmt.Println("geet partitions failed, err:", err)
return
}
for _, p := range partitions {
//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
//如果该分区消费者已经消费了该信息将会返回error
//sarama.OffsetNewest:表明了为最新消息
partitionConsumer, err := consumer.ConsumePartition("test", p, sarama.OffsetNewest)
if err != nil {
fmt.Println("partitionConsumer err:", err)
continue
}
wgkafka.Add(1)
go func(){
for m := range partitionConsumer.Messages() {
fmt.Printf("topic: %s, key: %s, text: %s, offset: %d\n",string(m.Topic), string(m.Key), string(m.Value), m.Offset)
//查询clickhouse
var topics = queryTopicByClickhouse();
}
wgkafka.Done()
}()
}
wgkafka.Wait()
fmt.Println("kafka监听结束")
}
现在没有报错,就是不知道如何动态监听kafka的topic
我的解答思路和尝试过的方法
有没有和Java中的监听kafka的通配符的方式解决呢?