object StreamingKafka { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[3]") .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange] val ssc = new StreamingContext(conf,Seconds(2)) val zkManager = new KafkaOffsetZKManager("localhost:2181"); // ssc.checkpoint("hdfs://192.168.71.142:9000/checkpoint") val streaminputs =StreamingKafkaTool.getInputStream(ssc,args,zkManager) streaminputs.foreachRDD(rdd=>{ offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // zkManager.storeOffsets(offsetRanges,"log_01") ---------------------- 1--------- // streaminputs.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) val rddStream =streaminputs.map(recordcomsumer=>{ StreamingKafkaTool.etlData(recordcomsumer) }).filter(record=>record.courseId!=0) // rddStream.foreachRDD(rdd=>{rdd.foreachPartition(partitions=>partitions.foreach(r=>println(r.courseId)))}) rddStream.map(line=>{ (line.time.substring(0,8)+"_"+line.courseId,1) }).reduceByKey(_+_) .foreachRDD(rdd=>{ rdd.foreachPartition(partitions=>{ val list = new ListBuffer[CourseClickCount] partitions.foreach(item=>{ list.append(CourseClickCount(item._1,item._2)) }) CourseClickCountDao.save(list) zkManager.storeOffsets(offsetRanges,"log_01") --------------2-------------------- }) }) ssc.start() ssc.awaitTermination() } }
storeOffsets(offsetRanges,"log_01") 是我自定义的将offset保存到zookeeper的方法。当我把zkManager.storeOffsets(offsetRanges,"log_01") 写在 ---------------------- 1---------处时没问题,代码能正常执行,然而当我放在--------------2--------------------处时却报 Exception in thread "main" org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: cn.zdl.sparkstudy.utils.KafkaOffsetZKManager
Serialization stack: 的异常 ,小白不懂,求各位大牛指导,不胜感激。