如何使用php-rdkafka在kafka中确认消费消息?

我使用php-rdkafka作为php Kafka客户端,并使用测试组成功地生成了测试消息。我使用下面的代码来使用该消息:

$kafkaConsumer = new RdKafka\Consumer();
$kafkaConsumer->addBrokers("127.0.0.1:9292");
$topic = $kafkaConsumer->newTopic("test");
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
    $msg = $topic->consume(0, 1000);
    if($msg){
    if ($msg->err) {
        echo $msg->errstr(), "
";
        break;
    } else {
        echo $msg->payload, "
";
    }
  }
}

但是,当我再次尝试在测试组中设置消息并尝试使用测试组的消息时,我得到了旧消息和新消息。所以我只想知道如何才能确认旧的信息,这样我才能得到新的信息,而不是旧的信息?有人能给个建议吗??

我的kafka版本是0.11.0.1。

查看全部
doukuo9116
doukuo9116
2017/10/13 13:20
  • php
  • 点赞
  • 收藏
  • 回答
    私信
满意答案
查看全部

1个回复