天行见地物 2018-11-19 01:51 采纳率: 0%
浏览 2419

map算子里面使用sparkContext 报 java.io.NotSerializableException: org.apache.spark.SparkContext错?

val receiverStream: ReceiverInputDStream[ String ] = RabbitMQUtils.createStream String
receiverStream.print()

receiverStream.map(value => {
 //@transient val sc = spark.sparkContext
  val jsonS = JSON.parseFull(value)
  val mapjson: Map[ String, String ] = regJson(jsonS)
  val alarmContent = mapjson.get("alarmContent").toString.replace("Some(", "").replace(")", "")
  val alarmEventId = mapjson.get("alarmEventId").toString.replace("Some(", "").replace(")", "")
  val alarmLevel = mapjson.get("alarmLevel").toString.replace("Some(", "").replace(")", "")
  val alarmType = mapjson.get("alarmType").toString.replace("Some(", "").replace(")", "")
  val buildingId = mapjson.get("buildingId").toString.replace("Some(", "").replace(")", "")
  val chargesCode = mapjson.get("chargesCode").toString.replace("Some(", "").replace(")", "")
  val createDate = mapjson.get("createDate").toString.replace("Some(", "").replace(")", "").toDouble
  val delFlag = mapjson.get("delFlag").toString.replace("Some(", "").replace(")", "")
  val deviceId = mapjson.get("deviceId").toString.replace("Some(", "").replace(")", "")
  val happenTime = mapjson.get("happenTime").toString.replace("Some(", "").replace(")", "").toDouble
  val isNewRecord = mapjson.get("isNewRecord").toString.replace("Some(", "").replace(")", "").toBoolean
  val page = mapjson.get("page").toString.replace("Some(", "").replace(")", "")
  val producerCode = mapjson.get("producerCode").toString.replace("Some(", "").replace(")", "")
  val sqlMap = mapjson.get("sqlMap").toString.replace("Some(", "").replace(")", "")
  println(alarmEventId)
  val strings: Apple = Apple(alarmContent, alarmEventId, alarmLevel,
    alarmType, buildingId, chargesCode, createDate, delFlag,
    deviceId, happenTime, isNewRecord, page, producerCode, sqlMap)
  val apples: Seq[ Apple ] = Seq(strings)
  //println("走到这里了!")
 println("logs:" + apples)
 // val appRdd: RDD[ Apple ] = sc.makeRDD(apples)
 /* value1.foreachPartition(iter =>{
    import spark.implicits._
    val frameDF: DataFrame = value1.toDF()
    frameDF.createTempView("t_1")
    frameDF.show()
  })*/
 val value1: RDD[ Apple ] = sc.parallelize(apples)
  import spark.implicits._
  val frameDF: DataFrame = value1.toDF()
  frameDF.createTempView("t_1")
  frameDF.show()
}).print()
  • 写回答

1条回答 默认 最新

  • 天行见地物 2018-11-19 01:55
    关注

    报错信息:Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2039)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:679)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:545)
    at example.RabbitMQ2Spark$.main(RabbitMQ2Spark.scala:54)
    at example.RabbitMQ2Spark.main(RabbitMQ2Spark.scala)
    Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
    Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@185f7840)
    - field (class: example.RabbitMQ2Spark$$anonfun$main$1, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class example.RabbitMQ2Spark$$anonfun$main$1, )
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 12 more

    评论

报告相同问题?

悬赏问题

  • ¥15 安卓adb backup备份应用数据失败
  • ¥15 eclipse运行项目时遇到的问题
  • ¥15 关于#c##的问题:最近需要用CAT工具Trados进行一些开发
  • ¥15 南大pa1 小游戏没有界面,并且报了如下错误,尝试过换显卡驱动,但是好像不行
  • ¥15 没有证书,nginx怎么反向代理到只能接受https的公网网站
  • ¥50 成都蓉城足球俱乐部小程序抢票
  • ¥15 yolov7训练自己的数据集
  • ¥15 esp8266与51单片机连接问题(标签-单片机|关键词-串口)(相关搜索:51单片机|单片机|测试代码)
  • ¥15 电力市场出清matlab yalmip kkt 双层优化问题
  • ¥30 ros小车路径规划实现不了,如何解决?(操作系统-ubuntu)