I'm building a request-response setup in Kafka using the Kafka-go library using the message Key as a correlation ID. My setup works fine without concurrency, but when the messages start being sent in separate goroutines, the reader part skips the correct keys (since other routine probably read it already).
How can I read only a specific key from a topic, considering the connection is being shared by different goroutines?
Client example below (Error evaluation were removed for brevity):
package main
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
)
var wg sync.WaitGroup
func requestMessage(connR *kafka.Conn, connW *kafka.Conn, body []byte, index int) {
currentUUID := uuid.New()
byteUUID := []byte(fmt.Sprintf("%s", currentUUID))
connW.WriteMessages(kafka.Message{
Key: byteUUID,
Value: body,
})
fmt.Println("Posted id " + string(byteUUID))
for {
m, _ := connR.ReadMessage(10e6)
if bytes.Equal(m.Key, byteUUID) {
break
}
}
wg.Done()
fmt.Println("Done " + string(byteUUID))
}
func main() {
iterations := 100
interval := 500 * time.Millisecond
kafkaURL := "kafka:9092"
topic := "benchmarktopic"
partition := 0
connW, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
defer connW.Close()
connR, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic+"response", partition)
defer connR.Close()
for i := 0; i < iterations; i++ {
<-time.After(interval)
go requestMessage(connR, connW, []byte("body"), i)
wg.Add(1)
}
wg.Wait()
}