普通网友 2019-07-26 20:56
浏览 108

在kafka-go中阅读具有特定ID的消息

I'm building a request-response setup in Kafka using the Kafka-go library using the message Key as a correlation ID. My setup works fine without concurrency, but when the messages start being sent in separate goroutines, the reader part skips the correct keys (since other routine probably read it already).

How can I read only a specific key from a topic, considering the connection is being shared by different goroutines?

Client example below (Error evaluation were removed for brevity):

package main

import (
    "bytes"
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/google/uuid"
    kafka "github.com/segmentio/kafka-go"
)

var wg sync.WaitGroup

func requestMessage(connR *kafka.Conn, connW *kafka.Conn, body []byte, index int) {
    currentUUID := uuid.New()
    byteUUID := []byte(fmt.Sprintf("%s", currentUUID))
    connW.WriteMessages(kafka.Message{
        Key:   byteUUID,
        Value: body,
    })
    fmt.Println("Posted id " + string(byteUUID))
    for {
        m, _ := connR.ReadMessage(10e6)
        if bytes.Equal(m.Key, byteUUID) {
            break
        }
    }

    wg.Done()
    fmt.Println("Done " + string(byteUUID))

}

func main() {
    iterations := 100
    interval := 500 * time.Millisecond
    kafkaURL := "kafka:9092"
    topic := "benchmarktopic"
    partition := 0
    connW, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
    defer connW.Close()
    connR, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic+"response", partition)
    defer connR.Close()
    for i := 0; i < iterations; i++ {
        <-time.After(interval)
        go requestMessage(connR, connW, []byte("body"), i)
        wg.Add(1)
    }
    wg.Wait()
}
  • 写回答

1条回答 默认 最新

  • dongrong5189 2019-07-27 06:28
    关注

    You cannot really read only a specific key from a Kafka topic partition. The thing is that your records will be dispatched to specific partitions based on the hash of the key ( default behavior). So you might have different keys in a same partition. So as long as you have more keys than number of partitions, you'll find a partition containing different keys.

    The only one way I have in mind would be to set N partitions for your topic where N is the number of different keys you could have ( quite a huge number if you use uuid as a key) and assign partition with a static mapping ( key - > partition) to your producers/consumers.

    BTW, you're already assigning the part 0 to your producer, was wondering why?

    Yannick

    评论

报告相同问题?

悬赏问题

  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)
  • ¥20 matlab yalmip kkt 双层优化问题
  • ¥15 如何在3D高斯飞溅的渲染的场景中获得一个可控的旋转物体
  • ¥88 实在没有想法,需要个思路
  • ¥15 MATLAB报错输入参数太多