刘狗 2018-10-16 07:25 采纳率: 0%
浏览 916

driect自定义偏移量消费数据,但是获取zookeeper上的偏移量的时候出了问题。。

报错如下:
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property auto.offset.reset is overridden to largest
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property group.id is overridden to testGroup
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property zookeeper.connect is overridden to pure:2181,pure_CDH2:2181,pure_CDH3:2181
2018-10-16 15:20:22,192 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:24,258 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:26,266 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:28,304 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:30,314 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:32,336 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:34,345 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:36,351 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: Trying to get the consumption offset of the partition failed....
at news.KafkaManager.getConsumeOffset(KafkaManager.scala:72)
at news.KafkaManager.createDirectStream(KafkaManager.scala:168)
at news.Runs$.createStreamingContext(Runs.scala:45)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:829)
at news.Runs$.main(Runs.scala:30)
at news.Runs.main(Runs.scala)

代码如下:
val topicAndPartitions: Set[TopicAndPartition] = getMetadataForTopicAndPartition()

//尝试获取每个分区最大一条消息的偏移量
val errOrConsumeOffsets = kafkaCluster.getConsumerOffsets(kafkaParams(ParameterContants.GROUP_ID), topicAndPartitions)

if (errOrConsumeOffsets.isLeft) {
  throw new SparkException("Trying to get the consumption offset of the partition failed....")
}
val consumeOffsetsMap: Map[TopicAndPartition, Long] = errOrConsumeOffsets.right.get
consumeOffsetsMap
  • 写回答

0条回答

    报告相同问题?

    悬赏问题

    • ¥15 Power query添加列问题
    • ¥50 Kubernetes&Fission&Eleasticsearch
    • ¥15 有没有帮写代码做实验仿真的
    • ¥15 報錯:Person is not mapped,如何解決?
    • ¥30 vmware exsi重置后登不上
    • ¥15 c++头文件不能识别CDialog
    • ¥15 Excel发现不可读取的内容
    • ¥15 关于#stm32#的问题:CANOpen的PDO同步传输问题
    • ¥20 yolov5自定义Prune报错,如何解决?
    • ¥15 电磁场的matlab仿真