dongtedu
dongtedu
2021-03-06 20:04

SparkStreaming + kafka 中将offset存入zookeeper报错的问题

object StreamingKafka {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[3]")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val ssc = new StreamingContext(conf,Seconds(2))
    val zkManager = new KafkaOffsetZKManager("localhost:2181");
//    ssc.checkpoint("hdfs://192.168.71.142:9000/checkpoint")

    val streaminputs =StreamingKafkaTool.getInputStream(ssc,args,zkManager)

    streaminputs.foreachRDD(rdd=>{
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//      zkManager.storeOffsets(offsetRanges,"log_01")                         ----------------------  1---------
//      streaminputs.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })


    val rddStream =streaminputs.map(recordcomsumer=>{

      StreamingKafkaTool.etlData(recordcomsumer)
    }).filter(record=>record.courseId!=0)

//    rddStream.foreachRDD(rdd=>{rdd.foreachPartition(partitions=>partitions.foreach(r=>println(r.courseId)))})
    rddStream.map(line=>{
      (line.time.substring(0,8)+"_"+line.courseId,1)
    }).reduceByKey(_+_)
      .foreachRDD(rdd=>{
        rdd.foreachPartition(partitions=>{
          val list = new ListBuffer[CourseClickCount]
          partitions.foreach(item=>{
            list.append(CourseClickCount(item._1,item._2))
          })
          CourseClickCountDao.save(list)
          zkManager.storeOffsets(offsetRanges,"log_01")      --------------2--------------------
        })

      })
    ssc.start()
    ssc.awaitTermination()

  }

}

 

storeOffsets(offsetRanges,"log_01") 是我自定义的将offset保存到zookeeper的方法。当我把zkManager.storeOffsets(offsetRanges,"log_01") 写在 ---------------------- 1---------处时没问题,代码能正常执行,然而当我放在--------------2--------------------处时却报 Exception in thread "main" org.apache.spark.SparkException: Task not serializable      Caused by: java.io.NotSerializableException: cn.zdl.sparkstudy.utils.KafkaOffsetZKManager
Serialization stack: 的异常 ,小白不懂,求各位大牛指导,不胜感激。

  • 点赞
  • 写回答
  • 关注问题
  • 收藏
  • 复制链接分享
  • 邀请回答

为你推荐