driect自定义偏移量消费数据,但是获取zookeeper上的偏移量的时候出了问题。。

报错如下:
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property auto.offset.reset is overridden to largest
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property group.id is overridden to testGroup
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property zookeeper.connect is overridden to pure:2181,pure_CDH2:2181,pure_CDH3:2181
2018-10-16 15:20:22,192 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:24,258 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:26,266 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:28,304 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:30,314 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:32,336 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:34,345 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:36,351 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: Trying to get the consumption offset of the partition failed....
at news.KafkaManager.getConsumeOffset(KafkaManager.scala:72)
at news.KafkaManager.createDirectStream(KafkaManager.scala:168)
at news.Runs$.createStreamingContext(Runs.scala:45)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:829)
at news.Runs$.main(Runs.scala:30)
at news.Runs.main(Runs.scala)

代码如下:
val topicAndPartitions: Set[TopicAndPartition] = getMetadataForTopicAndPartition()

//尝试获取每个分区最大一条消息的偏移量
val errOrConsumeOffsets = kafkaCluster.getConsumerOffsets(kafkaParams(ParameterContants.GROUP_ID), topicAndPartitions)

if (errOrConsumeOffsets.isLeft) {
  throw new SparkException("Trying to get the consumption offset of the partition failed....")
}
val consumeOffsetsMap: Map[TopicAndPartition, Long] = errOrConsumeOffsets.right.get
consumeOffsetsMap
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问
相关内容推荐