2016-11-11 13:20 阅读 119

在go中使用Kafka Avro消息

I'm trying to consume Kafka messages in avro format but I'm not able to decode the messages from avro to json in Go.

I'm using the Confluent platform (3.0.1). For example I produce avro messages like:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

Now I consume messages with the go Kafka libary: sarama. Plain text message are working fine. Avro message have to be decoded. I found different libs: github.com/linkedin/goavro, github.com/elodina/go-avro

But after decoding I get a json without values (both libs):



avroSchema := `
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))


schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)

msg = sarama.ConsumerMessage

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享

2条回答 默认 最新

  • 已采纳
    dotymq4217 dotymq4217 2016-11-14 09:27

    Just found out (by comparing binary avro messages) that I had to remove the first 5 elements of the message byte array - now everything works :)

    message = msg.Value[5:]

    Maybe someone can explain why

    点赞 评论 复制链接分享
  • duanliang4009 duanliang4009 2017-01-16 21:22

    The first byte is a magic byte (0). The following 4 bytes are the avro schema ID

    Which is only really useful if you use the Confluent schema registry.

    点赞 评论 复制链接分享