在进行数据上传到mangodb的时候出现的报错求解决
源代码
def main(args: Array[String]): Unit = {
//定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender",
"es.httpHosts" -> "localhost:9200",
"es.transportHosts" -> "localhost:9300",
"es.index" -> "recommender",
"es.cluster.name" -> "elasticsearch"
)
//创建一个sparkconf
val sparkConf =new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
//创建一个sparkSession
val spark=SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
//加载数据
val userRDD=spark.sparkContext.textFile(USER_DATA_PATH)
val userDF=userRDD.map(
item=>{
val attr=item.split("\\n")
User(attr(0),attr(1).trim)
}
).toDF()
val hobbyRDD=spark.sparkContext.textFile(HOBBY_DATA_PATH)
val hobbyDF=hobbyRDD.map(
item=>{
val attr=item.split("\\n")
Hobby(attr(0).trim)
}
).toDF()
val cityRDD=spark.sparkContext.textFile(CITYNAME_DATA_PATH)
val cityDF=cityRDD.map(
item=>{
val attr=item.split("\\n")
Cityname(attr(0).trim)
}
).toDF()
implicit val mongoConfig=MongoConfig(config("mongo.uri"),config("mongo.db"))
//将数据保存到MongoDB
storeDataInMongDB(userDF,hobbyDF,cityDF)
//数据预处理
//保存数据到ES
storeDataInES()
spark.stop()
}
def storeDataInMongDB(userDF:DataFrame,hobbyDF:DataFrame,cityDF:DataFrame)(implicit mongoConfig: MongoConfig):Unit={
//新建一个mongdb的连接
val mongoClient=MongoClient(MongoClientURI(mongoConfig.uri))
//如果mongodb中已经有相应的数据库,先删除
mongoClient(mongoConfig.db)(MONGODB_USER_COLLECTION).dropCollection()
mongoClient(mongoConfig.db)(MONGODB_HOBBY_COLLECTION).dropCollection()
mongoClient(mongoConfig.db)(MONGODB_CITYNAME_COLLECTION).dropCollection()
//将DF数据写入对应的mongo表中
userDF.write
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_USER_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
hobbyDF.write
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_HOBBY_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
cityDF.write
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_CITYNAME_COLLECTION)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//对数据表建索引
mongoClient(mongoConfig.db)(MONGODB_USER_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_HOBBY_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_HOBBY_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_CITYNAME_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
mongoClient(mongoConfig.db)(MONGODB_CITYNAME_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
//关闭 MongoDB 的连接
mongoClient.close()
}
def storeDataInES():Unit={
}
}
遇到的报错
INFO ---[ main] org.apache.spark.scheduler.DAGScheduler (line: 54) : Job 0 failed: foreachPartition at MongoSpark.scala:130, took 0.194757 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 1
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:74)
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:72)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:330)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111)
at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1336)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:130)
at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:178)
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at com.atguigu.recommender.DataLoader$.storeDataInMongDB(DataLoader.scala:121)
at com.atguigu.recommender.DataLoader$.main(DataLoader.scala:96)
at com.atguigu.recommender.DataLoader.main(DataLoader.scala)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:74)
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:72)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:330)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111)
at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1336)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)