object StreamingKafka {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[3]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
val ssc = new StreamingContext(conf,Seconds(2))
val zkManager = new KafkaOffsetZKManager("localhost:2181");
// ssc.checkpoint("hdfs://192.168.71.142:9000/checkpoint")
val streaminputs =StreamingKafkaTool.getInputStream(ssc,args,zkManager)
streaminputs.foreachRDD(rdd=>{
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// zkManager.storeOffsets(offsetRanges,"log_01") ---------------------- 1---------
// streaminputs.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
val rddStream =streaminputs.map(recordcomsumer=>{
StreamingKafkaTool.etlData(recordcomsumer)
}).filter(record=>record.courseId!=0)
// rddStream.foreachRDD(rdd=>{rdd.foreachPartition(partitions=>partitions.foreach(r=>println(r.courseId)))})
rddStream.map(line=>{
(line.time.substring(0,8)+"_"+line.courseId,1)
}).reduceByKey(_+_)
.foreachRDD(rdd=>{
rdd.foreachPartition(partitions=>{
val list = new ListBuffer[CourseClickCount]
partitions.foreach(item=>{
list.append(CourseClickCount(item._1,item._2))
})
CourseClickCountDao.save(list)
zkManager.storeOffsets(offsetRanges,"log_01") --------------2--------------------
})
})
ssc.start()
ssc.awaitTermination()
}
}
storeOffsets(offsetRanges,"log_01") 是我自定义的将offset保存到zookeeper的方法。当我把zkManager.storeOffsets(offsetRanges,"log_01") 写在 ---------------------- 1---------处时没问题,代码能正常执行,然而当我放在--------------2--------------------处时却报 Exception in thread "main" org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException: cn.zdl.sparkstudy.utils.KafkaOffsetZKManager
Serialization stack: 的异常 ,小白不懂,求各位大牛指导,不胜感激。