Big monster 2022-10-26 01:34 采纳率: 0%
浏览 46
已结题

导入数据到mongo中关于spark的报错求

在进行数据上传到mangodb的时候出现的报错求解决

源代码

 def main(args: Array[String]): Unit = {
//定义用到的配置参数
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender",
      "es.httpHosts" -> "localhost:9200",
      "es.transportHosts" -> "localhost:9300",
      "es.index" -> "recommender",
      "es.cluster.name" -> "elasticsearch"
    )


    //创建一个sparkconf
    val sparkConf =new SparkConf().setMaster(config("spark.cores")).setAppName("DataLoader")
    //创建一个sparkSession
    val spark=SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    //加载数据
    val userRDD=spark.sparkContext.textFile(USER_DATA_PATH)

    val userDF=userRDD.map(
      item=>{
        val attr=item.split("\\n")
        User(attr(0),attr(1).trim)
      }
    ).toDF()

    val hobbyRDD=spark.sparkContext.textFile(HOBBY_DATA_PATH)
    val hobbyDF=hobbyRDD.map(
      item=>{
        val attr=item.split("\\n")
        Hobby(attr(0).trim)
      }
    ).toDF()
    val cityRDD=spark.sparkContext.textFile(CITYNAME_DATA_PATH)
    val cityDF=cityRDD.map(
      item=>{
        val attr=item.split("\\n")
        Cityname(attr(0).trim)
      }
    ).toDF()

    implicit val mongoConfig=MongoConfig(config("mongo.uri"),config("mongo.db"))

    //将数据保存到MongoDB
    storeDataInMongDB(userDF,hobbyDF,cityDF)
    //数据预处理


    //保存数据到ES
    storeDataInES()

    spark.stop()
  }

  def storeDataInMongDB(userDF:DataFrame,hobbyDF:DataFrame,cityDF:DataFrame)(implicit mongoConfig: MongoConfig):Unit={
//新建一个mongdb的连接
    val mongoClient=MongoClient(MongoClientURI(mongoConfig.uri))

    //如果mongodb中已经有相应的数据库,先删除
    mongoClient(mongoConfig.db)(MONGODB_USER_COLLECTION).dropCollection()
    mongoClient(mongoConfig.db)(MONGODB_HOBBY_COLLECTION).dropCollection()
    mongoClient(mongoConfig.db)(MONGODB_CITYNAME_COLLECTION).dropCollection()

    //将DF数据写入对应的mongo表中
    userDF.write
      .option("uri",mongoConfig.uri)
      .option("collection",MONGODB_USER_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    hobbyDF.write
      .option("uri",mongoConfig.uri)
      .option("collection",MONGODB_HOBBY_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    cityDF.write
      .option("uri",mongoConfig.uri)
      .option("collection",MONGODB_CITYNAME_COLLECTION)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    //对数据表建索引
    mongoClient(mongoConfig.db)(MONGODB_USER_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
      mongoClient(mongoConfig.db)(MONGODB_HOBBY_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
      mongoClient(mongoConfig.db)(MONGODB_HOBBY_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
      mongoClient(mongoConfig.db)(MONGODB_CITYNAME_COLLECTION).createIndex(MongoDBObject("uid" -> 1))
      mongoClient(mongoConfig.db)(MONGODB_CITYNAME_COLLECTION).createIndex(MongoDBObject("mid" -> 1))
 //关闭 MongoDB 的连接
      mongoClient.close()
  }

  def storeDataInES():Unit={

  }
}

遇到的报错

INFO ---[ main] org.apache.spark.scheduler.DAGScheduler (line: 54) : Job 0 failed: foreachPartition at MongoSpark.scala:130, took 0.194757 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 1
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:74)
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:72)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:330)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111)
at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1336)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:130)
at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:178)
at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at com.atguigu.recommender.DataLoader$.storeDataInMongDB(DataLoader.scala:121)
at com.atguigu.recommender.DataLoader$.main(DataLoader.scala:96)
at com.atguigu.recommender.DataLoader.main(DataLoader.scala)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:74)
at com.atguigu.recommender.DataLoader$$anonfun$1.apply(DataLoader.scala:72)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:330)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111)
at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1336)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2022-10-27 15:50
    关注
    不知道你这个问题是否已经解决, 如果还没有解决的话:

    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 系统已结题 11月3日
  • 赞助了问题酬金1元 10月26日
  • 创建了问题 10月26日

悬赏问题

  • ¥15 oracle集群安装出bug
  • ¥15 关于#python#的问题:自动化测试
  • ¥20 问题请教!vue项目关于Nginx配置nonce安全策略的问题
  • ¥15 教务系统账号被盗号如何追溯设备
  • ¥20 delta降尺度方法,未来数据怎么降尺度
  • ¥15 c# 使用NPOI快速将datatable数据导入excel中指定sheet,要求快速高效
  • ¥15 再不同版本的系统上,TCP传输速度不一致
  • ¥15 高德地图点聚合中Marker的位置无法实时更新
  • ¥15 DIFY API Endpoint 问题。
  • ¥20 sub地址DHCP问题