我的Spark Streaming代码如下所示:
val lines=FlumeUtils.createStream(ssc,"hdp2.domain",22222,StorageLevel.MEMORY_AND_DISK_SER_2)
val words = lines.filter(examtep(_))
words.foreachRDD(exam(_))
//some other code
def exam(rdd:RDD[SparkFlumeEvent]):Unit={
if(rdd.count()>0) {
println("****Something*****")
val newrdd=rdd.map(sfe=>{
val tmp=new String(sfe.event.getBody.array())
tmp
})
newrdd.saveAsTextFile("/user/spark/appoutput/Temperaturetest")
}
}
当words.foreachRDD(exam(_))中每次执行exam()方法的时候,都会执行newrdd.saveAsTextFile("/user/''''''"),但是HDFS上Temperaturetest文件夹里的内容每次都会被覆盖掉,只保存着最后一次saveAsTextFIle的内容,怎样才能让所有数据都存储到Temperaturetest中呢??