dongmao7195 2018-10-08 14:51
浏览 623
已采纳

用sarama编写Kafka生产者时的时间戳无效

I have a Kafka instance running (locally, in a Docker) and I created a producer in Go, using the sarama package.

As I want to use Kafka Streams on my topic, the producer has to embed a timestamp in the messages, or I get this ugly error message:

org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = crawler_events, partition = 0, offset = 0, CreateTime = -1, serialized key size = -1, serialized value size = 187, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {XXX}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

Here is the portion of code sending the message in my Go program:

// Init a connection to the Kafka host,
// create the producer,
// and count successes and errors in delivery
func (c *kafkaClient) init() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    c.config = *config
    var err error
    c.producer, err = sarama.NewAsyncProducer(c.hosts, &c.config)
    if err != nil {
        panic(err)
    }
    go func() {
        for range c.producer.Successes() {
            c.successes++
        }
    }()
    go func() {
        for range c.producer.Errors() {
            c.errors++
        }
    }()
}

// Send a message to the Kafka topic, WITH TIMESTAMP
func (c *kafkaClient) send(event string) {
    message := &sarama.ProducerMessage{
        Topic: c.topic,
        Value: sarama.StringEncoder(event),
        Timestamp: time.Now(),
    }
    c.producer.Input() <- message
    c.enqueued++
}

As you can see, the timestamp I try to send is time.Now().

When I run the console consumer to see the received timestamps:

docker-compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 --topic crawler_events \
  --from-beginning --property print.timestamp=true

I see they are all "-1":

CreateTime:-1   {"XXX"}

When adding a message to the topic with the console producer, I have the expected timestamps like:

CreateTime:1539010180284        hello

What am I doing wrong? Thanks for your help.

  • 写回答

1条回答 默认 最新

  • douyou8266 2018-10-08 17:26
    关注

    Sarama defaults to Kafka Version 0.8.2. It means it will use the old 0.8.2 format requests when talking to brokers.

    As timestamp support was only added in 0.10, if you don't specify a Version >= 0.10 explicitly, your timestamp won't be forwarded to the brokers.

    You need to add config.Version = sarama.V0_10_0_0 to your code and timestamps will work.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
  • ¥15 YoloV5 第三方库的版本对照问题
  • ¥15 请完成下列相关问题!
  • ¥15 drone 推送镜像时候 purge: true 推送完毕后没有删除对应的镜像,手动拷贝到服务器执行结果正确在样才能让指令自动执行成功删除对应镜像,如何解决?
  • ¥15 求daily translation(DT)偏差订正方法的代码
  • ¥15 js调用html页面需要隐藏某个按钮
  • ¥15 ads仿真结果在圆图上是怎么读数的
  • ¥20 Cotex M3的调试和程序执行方式是什么样的?
  • ¥20 java项目连接sqlserver时报ssl相关错误
  • ¥15 一道python难题3