报错如下:
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property auto.offset.reset is overridden to largest
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property group.id is overridden to testGroup
2018-10-16 15:20:21,156 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property zookeeper.connect is overridden to pure:2181,pure_CDH2:2181,pure_CDH3:2181
2018-10-16 15:20:22,192 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:24,258 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:26,266 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:28,304 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:30,314 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:32,336 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:34,345 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2018-10-16 15:20:36,351 (main) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: Trying to get the consumption offset of the partition failed....
at news.KafkaManager.getConsumeOffset(KafkaManager.scala:72)
at news.KafkaManager.createDirectStream(KafkaManager.scala:168)
at news.Runs$.createStreamingContext(Runs.scala:45)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at news.Runs$$anonfun$1.apply(Runs.scala:30)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:829)
at news.Runs$.main(Runs.scala:30)
at news.Runs.main(Runs.scala)
代码如下:
val topicAndPartitions: Set[TopicAndPartition] = getMetadataForTopicAndPartition()
//尝试获取每个分区最大一条消息的偏移量
val errOrConsumeOffsets = kafkaCluster.getConsumerOffsets(kafkaParams(ParameterContants.GROUP_ID), topicAndPartitions)
if (errOrConsumeOffsets.isLeft) {
throw new SparkException("Trying to get the consumption offset of the partition failed....")
}
val consumeOffsetsMap: Map[TopicAndPartition, Long] = errOrConsumeOffsets.right.get
consumeOffsetsMap