我从集群环境的hdfs中读取数据,然后处理数据时出现问题,在循环里面添加的对象在循环外就没有了,初学spark和scala,请大佬指点.
object Test {
case class Passenger(name: String, txn_date: String, txn_time: String, txn_station: String, ticket_type: String, trans_code: String, sub: String, txn_station_id: String)
def main(args: Array[String]): Unit = {
val inputFile = "hdfs://Master:9000/user/hadoop/input/tmp.txt"
val conf = new SparkConf().setAppName("WordCount")
val sc = new SparkContext(conf)
val text = sc.textFile(inputFile) //25 lines like "000025643 " "20141201" "060912" "0328" "88" "22" "" from hdfs
val Passengers = new ArrayBuffer[Passenger]()
for (line <- text) {
val points = for (i <- 0 until (line.length) if (line.charAt(i) == '"')) yield {
i
}
val items = for (i <- 0 until (points.length) if (i % 2 == 0)) yield {
if (!line.slice(points(i).toString.toInt + 1, points(i + 1).toString.toInt).equals("")) {
line.slice(points(i).toString.toInt + 1, points(i + 1).toString.toInt).trim
}
else
"null"
}
val tmp:Passenger=new Passenger(items(0).trim, items(1), items(2), items(3), items(4), items(5), "null", items(6))
println(tmp) //it is Passenger(000026853,20141201,060921,0325,88,21,null,null) [no problem]
Passengers.append(tmp)
println(Passengers.length) //1,2,3.....25 [no problem]
}
println("----------------------------" + Passengers.length) //it is 0!!!! why?
val passengersArray = Passengers.toArray
val customersRDD = sc.parallelize(passengersArray)
val customersDF = customersRDD.toDF()
}
}