如何确保我的消费者仅按顺序处理kafka主题中的消息?

I've never used kafka before. I have two test Go programs accessing a local kafka instance: a reader and a writer. I'm trying to tweak my producer, consumer, and kafka server settings to get a particular behavior.

My writer:

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    topics := []string{
        "policymanager-100",
        "policymanager-200",
        "policymanager-300",
    }
    progress := make(map[string]int)
    for _, t := range topics {
        progress[t] = 0
    }

    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "0",
    })
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    fmt.Println("producing messages...")
    for i := 0; i < 30; i++ {
        index := rand.Intn(len(topics))
        topic := topics[index]
        num := progress[topic]
        num++
        fmt.Printf("%s => %d
", topic, num)
        msg := &kafka.Message{
            Value: []byte(strconv.Itoa(num)),
            TopicPartition: kafka.TopicPartition{
                Topic: &topic,
            },
        }
        err = producer.Produce(msg, nil)
        if err != nil {
            panic(err)
        }
        progress[topic] = num
        time.Sleep(time.Millisecond * 100)
    }
    fmt.Println("DONE")
}

There are three topics that exist on my local kafka: policymanager-100, policymanager-200, policymanager-300. They each only have 1 partition to ensure all messages are sorted by the time kafka receives them. My writer will randomly pick one of those topics and issue a message consisting of a number that increments solely for that topic. When it's done running, I expect the queues to look something like this (topic names shortened for legibility):

100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12

So far so good. I'm trying to configure things so that any number of consumers can be spun up and consume these messages in order. By "in-order" I mean that no consumer should get message 2 for topic 100 until message 1 is COMPLETED (not just started). If message 1 for topic 100 is being worked on, consumers are free to consume from other topics that currently don't have a message being processed. If a message of a topic has been sent to a consumer, that entire topic should become "locked" until either a timeout assumes that the consumer failed or the consumer commits the message, then the topic is "unlocked" to have it's next message made available to be consumed.

My reader:

package main

import (
    "fmt"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    count := 2
    for i := 0; i < count; i++ {
        go consumer(i + 1)
    }
    fmt.Println("cosuming...")
    // hold this thread open indefinitely
    select {}
}

func consumer(id int) {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "group.id":           "0", // strconv.Itoa(id),
        "enable.auto.commit": "false",
    })
    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
    for {
        msg, err := c.ReadMessage(-1)
        if err != nil {
            panic(err)
        }

        fmt.Printf("%d) Message on %s: %s
", id, msg.TopicPartition, string(msg.Value))
        time.Sleep(time.Second)
        _, err = c.CommitMessage(msg)
        if err != nil {
            fmt.Printf("ERROR commiting: %+v
", err)
        }
    }
}

From my current understanding, the way I'm likely to achieve this is by setting up my consumer properly. I've tried many different variations of this program. I've tried having all my goroutines share the same consumer. I've tried using a different group.id for each goroutine. None of these was the right configuration to get the behavior I'm after.

What the posted code does is empty out one topic at a time. Despite having multiple goroutines, the process will read all of 100 then move to 200 then 300 and only one goroutine will actually do all the reading. When I let each goroutine have a different group.id then messages get read by multiple goroutines which I would like to prevent.

My example consumer is simply breaking things up with goroutines but when I begin working this project into my use case at work, I'll need this to work across multiple kubernetes instances that won't be talking to each other so using anything that interacts between goroutines won't work as soon as there are 2 instances on 2 kubes. That's why I'm hoping to make kafka do the gatekeeping I want.

dt3999
dt3999 让我们继续聊天中的讨论。
一年多之前 回复
duang5049
duang5049 分区越多,吞吐量越高,并行度越高,集群资源的使用越好。
一年多之前 回复
douzhan1935
douzhan1935 每个主题只有一个分区会不会有大问题?根据我所读的内容,这应该可以强制执行命令。我不希望任何话题变得很大。
一年多之前 回复
doujiaozhan4397
doujiaozhan4397 一般来说,您不能。即使您有一个使用方消耗了该主题的所有分区的单个使用者,也将以不确定的顺序使用分区,并且不能保证您在所有分区上的总排序。
一年多之前 回复
dongzanghua8422
dongzanghua8422 有点。虽然我确实只想处理一次消息,但是现在我最关心的是防止乱序处理的能力。如果一个使用者使用offset=1但尚未提交,我不希望其他使用者接收offset=2。我希望第二个消费者找到一个不同的主题,该主题在队列的开头没有“已发送但未提交”的消息。如果所有主题都有一条消息正在由其他消费者处理,那么我宁愿等待,也不愿冒险乱做。
一年多之前 回复
duanchuo7741
duanchuo7741 可能重复的stackoverflow.com/questions/42165726/…
一年多之前 回复
duanbing8817
duanbing8817 看来消费者交易是我所追求的。只要消费者转移到其他主题,而不是在事务锁定消息之后才获得下一条消息。
一年多之前 回复
dongliang1873
dongliang1873 librdkafka(ConfluentGolang客户端所基于的)尚不完全具有一次语义。您可以关注github.com/confluentinc/confluent-kafka-go/issues/104(以及链接的问题)以获取更多信息。
一年多之前 回复

1个回答



通常来说,您不能。 即使您有一个使用方消耗了该主题的所有分区的单个使用者,也将以不确定的顺序使用分区,并且不能保证您在所有分区上的总排序。 </ p>

尝试“键入消息”,认为您可能会发现此案例非常有用。</ p>
</ div>

展开原文

原文

Generally speaking, you cannot. Even if you had a single consumer that consumed all the partitions for the topic, the partitions would be consumed in a non-deterministic order and your total ordering across all partitions would not be guaranteed.

Try Keyed Messages, think you may find this of good use for your use case.

Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐