dongtedu 2021-03-06 20:04 采纳率: 0%
浏览 73

SparkStreaming + kafka 中将offset存入zookeeper报错的问题

object StreamingKafka {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[3]")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
    val ssc = new StreamingContext(conf,Seconds(2))
    val zkManager = new KafkaOffsetZKManager("localhost:2181");
//    ssc.checkpoint("hdfs://192.168.71.142:9000/checkpoint")

    val streaminputs =StreamingKafkaTool.getInputStream(ssc,args,zkManager)

    streaminputs.foreachRDD(rdd=>{
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//      zkManager.storeOffsets(offsetRanges,"log_01")                         ----------------------  1---------
//      streaminputs.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })


    val rddStream =streaminputs.map(recordcomsumer=>{

      StreamingKafkaTool.etlData(recordcomsumer)
    }).filter(record=>record.courseId!=0)

//    rddStream.foreachRDD(rdd=>{rdd.foreachPartition(partitions=>partitions.foreach(r=>println(r.courseId)))})
    rddStream.map(line=>{
      (line.time.substring(0,8)+"_"+line.courseId,1)
    }).reduceByKey(_+_)
      .foreachRDD(rdd=>{
        rdd.foreachPartition(partitions=>{
          val list = new ListBuffer[CourseClickCount]
          partitions.foreach(item=>{
            list.append(CourseClickCount(item._1,item._2))
          })
          CourseClickCountDao.save(list)
          zkManager.storeOffsets(offsetRanges,"log_01")      --------------2--------------------
        })

      })
    ssc.start()
    ssc.awaitTermination()

  }

}

 

storeOffsets(offsetRanges,"log_01") 是我自定义的将offset保存到zookeeper的方法。当我把zkManager.storeOffsets(offsetRanges,"log_01") 写在 ---------------------- 1---------处时没问题,代码能正常执行,然而当我放在--------------2--------------------处时却报 Exception in thread "main" org.apache.spark.SparkException: Task not serializable      Caused by: java.io.NotSerializableException: cn.zdl.sparkstudy.utils.KafkaOffsetZKManager
Serialization stack: 的异常 ,小白不懂,求各位大牛指导,不胜感激。

  • 写回答

1条回答 默认 最新

  • 你知我知皆知 2024-07-24 09:36
    关注

    以下回答参考 皆我百晓生券券喵儿 等免费微信小程序相关内容作答,并由本人整理回复。

    KafkaOffsetZKManager 类是你的代码中的一部分,它使用了 org.apache.kafka.clients.producer.RecordMetadata 类型的实例作为参数传递给 storeOffsets 方法。这个类并没有实现 serializable 属性,这意味着它无法被序列化成 JSON 格式,并因此抛出 NotSerializableException 异常。

    解决此问题的一种方法是在调用 storeOffsets 方法之前,先检查 KafkaOffsetZKManager 类是否实现了 serializable 属性。如果实现了,那么就可以将对象序列化并存储到 ZooKeeper;如果没有,则可以尝试将其转换为字节流或者直接返回一个字符串表示的 JSON 对象,然后将其存储到 ZooKeeper。

    例如:

    val zkm = new KafkaOffsetZKManager("localhost:2181");
    if (zkm.serializable) {
      // 如果实现了 Serializable 接口,可以直接序列化和存储
    } else {
      // 如果没有实现 Serializable 接口,可以尝试将对象转换为字节流或字符串表示的 JSON 对象
      val json = JSON.stringify(zkm)
      zkManager.storeOffsets(json, "log_01")  // 使用 JSON 字符串存储
    }
    

    以上两种方法都可以用于解决 KafkaOffsetZKManagerstoreOffsets 方法中抛出 NotSerializableException 异常的问题。

    评论

报告相同问题?