I am using segmentio/kafka-go to connect to Kafka.
// to produce messages
topic := "my-topic"
partition := 0
conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn.SetWriteDeadline(time.Now().Add(10*time.Second))
conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
conn.Close()
I am able to produce into my Kafka server using this code.
// to consume messages
topic := "my-topic"
partition := 0
conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
_, err := batch.Read(b)
if err != nil {
// err -> "invalid codec"
break
}
fmt.Println(string(b))
}
batch.Close()
conn.Close()
But I am unable to consume using the above code. I am getting the error invalid codec
. What can be the reason?
In case relevant, I tweaked the minimum batch size to 1 so that it tries to consume something.