dongtedu 2021-03-06 20:04
浏览 73

SparkStreaming + kafka 中将offset存入zookeeper报错的问题

object StreamingKafka {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[3]")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val ssc = new StreamingContext(conf,Seconds(2))
    val zkManager = new KafkaOffsetZKManager("localhost:2181");
//    ssc.checkpoint("hdfs://192.168.71.142:9000/checkpoint")

    val streaminputs =StreamingKafkaTool.getInputStream(ssc,args,zkManager)

    streaminputs.foreachRDD(rdd=>{
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//      zkManager.storeOffsets(offsetRanges,"log_01")                         ----------------------  1---------
//      streaminputs.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })


    val rddStream =streaminputs.map(recordcomsumer=>{

      StreamingKafkaTool.etlData(recordcomsumer)
    }).filter(record=>record.courseId!=0)

//    rddStream.foreachRDD(rdd=>{rdd.foreachPartition(partitions=>partitions.foreach(r=>println(r.courseId)))})
    rddStream.map(line=>{
      (line.time.substring(0,8)+"_"+line.courseId,1)
    }).reduceByKey(_+_)
      .foreachRDD(rdd=>{
        rdd.foreachPartition(partitions=>{
          val list = new ListBuffer[CourseClickCount]
          partitions.foreach(item=>{
            list.append(CourseClickCount(item._1,item._2))
          })
          CourseClickCountDao.save(list)
          zkManager.storeOffsets(offsetRanges,"log_01")      --------------2--------------------
        })

      })
    ssc.start()
    ssc.awaitTermination()

  }

}

 

storeOffsets(offsetRanges,"log_01") 是我自定义的将offset保存到zookeeper的方法。当我把zkManager.storeOffsets(offsetRanges,"log_01") 写在 ---------------------- 1---------处时没问题,代码能正常执行,然而当我放在--------------2--------------------处时却报 Exception in thread "main" org.apache.spark.SparkException: Task not serializable      Caused by: java.io.NotSerializableException: cn.zdl.sparkstudy.utils.KafkaOffsetZKManager
Serialization stack: 的异常 ,小白不懂,求各位大牛指导,不胜感激。

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥50 永磁型步进电机PID算法
    • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
    • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
    • ¥15 如何处理复杂数据表格的除法运算
    • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
    • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
    • ¥200 uniapp长期运行卡死问题解决
    • ¥15 latex怎么处理论文引理引用参考文献
    • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?
    • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?