刘狗 2018-10-16 07:25 采纳率: 0%
浏览 916

driect自定义偏移量消费数据,但是获取zookeeper上的偏移量的时候出了问题。。

报错如下:
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property auto.offset.reset is overridden to largest
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property group.id is overridden to testGroup
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property zookeeper.connect is overridden to pure:2181,pure_CDH2:2181,pure_CDH3:2181
2018-10-16 15:20:22,192 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:24,258 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:26,266 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:28,304 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:30,314 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:32,336 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:34,345 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:36,351 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: Trying to get the consumption offset of the partition failed....
at news.KafkaManager.getConsumeOffset(KafkaManager.scala:72)
at news.KafkaManager.createDirectStream(KafkaManager.scala:168)
at news.Runs$.createStreamingContext(Runs.scala:45)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:829)
at news.Runs$.main(Runs.scala:30)
at news.Runs.main(Runs.scala)

代码如下:
val topicAndPartitions: Set[TopicAndPartition] = getMetadataForTopicAndPartition()

//尝试获取每个分区最大一条消息的偏移量
val errOrConsumeOffsets = kafkaCluster.getConsumerOffsets(kafkaParams(ParameterContants.GROUP_ID), topicAndPartitions)

if (errOrConsumeOffsets.isLeft) {
  throw new SparkException("Trying to get the consumption offset of the partition failed....")
}
val consumeOffsetsMap: Map[TopicAndPartition, Long] = errOrConsumeOffsets.right.get
consumeOffsetsMap
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥50 易语言把MYSQL数据库中的数据添加至组合框
    • ¥20 求数据集和代码#有偿答复
    • ¥15 关于下拉菜单选项关联的问题
    • ¥20 java-OJ-健康体检
    • ¥15 rs485的上拉下拉,不会对a-b<-200mv有影响吗,就是接受时,对判断逻辑0有影响吗
    • ¥15 使用phpstudy在云服务器上搭建个人网站
    • ¥15 应该如何判断含间隙的曲柄摇杆机构,轴与轴承是否发生了碰撞?
    • ¥15 vue3+express部署到nginx
    • ¥20 搭建pt1000三线制高精度测温电路
    • ¥15 使用Jdk8自带的算法,和Jdk11自带的加密结果会一样吗,不一样的话有什么解决方案,Jdk不能升级的情况