陈尘辰 2017-07-06 08:41 采纳率: 0%
浏览 2048

scala编写从hdfs文件取数据通过phoenix将数据批量插入到hbase,报错对象没有序列化

求大牛指导指导问题所在
批量插入的方法如下
class PhoenixClient_v2(host:String,port:String) extends Serializable {
try{
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
}
catch{
case e :ClassNotFoundException => println(e)
}
def InsertOfBatch(phoenixSQL:String,Key:Array[String],Column:Array[String],filepath:String=null,dataArr:Array[Map[String,Any]]=null) = {
val sparkConf = new SparkConf().setMaster("local").setAppName("hdfs2hbase")
val sc = new SparkContext(sparkConf)
//从json格式的hdfs文件中读取
if (filepath != null) {
val url = "jdbc:phoenix:" + host + ":" + port

  val conn = DriverManager.getConnection(url)
  val ps = conn.prepareStatement(phoenixSQL)

  try {
    //利用spark从文件系统或者hdfs上读取文件
    val Rdd_file = sc.textFile(filepath)

    var HashKey:String = (hashCode()%100).toString
    //从rdd中取出相应数据到ps中
    Rdd_file.map{y=>
      Key.foreach(x => if (x == "Optime") HashKey += "|"+new JSONObject(y).getString(x).split(" ")(0) else HashKey += "|"+new JSONObject(y).getString(x))
      ps.setString(1,HashKey)
      Column.foreach(x => ps.setString(Column.indexOf(x) + 2, new JSONObject(y).getString(x)))
      ps.addBatch()
    }
    ps.executeBatch()
    conn.commit()
  } catch{
    case e: SQLException => println("Insert into phoenix from file failed:" + e.toString)
  }finally {
    conn.close()
    ps.close()
  }
}
//读取Map[String,Any]映射的集合
if (dataArr != null){
  val url = "jdbc:phoenix:" + host + ":" + port

  val conn = DriverManager.getConnection(url)
  val ps = conn.prepareStatement(phoenixSQL)
  try {
    val Rdd_Arr = sc.parallelize(dataArr)
    var HashKey:String = (hashCode()%100).toString
    Rdd_Arr.map{y=>
      Key.foreach(x => if (x == "Optime") HashKey += "|"+y.get(x).toString.split(" ")(0) else HashKey += "|"+y.get(x) )
      ps.setString(1,HashKey)
      Column.foreach(x => ps.setString(Column.indexOf(x) + 2, y.get(x).toString))
      ps.addBatch()
    }
    ps.executeBatch()
    conn.commit()
  } catch{
    case e: SQLException => println("Insert into phoenix from file failed:" + e.toString)
  }finally {
    conn.close()
    ps.close()
  }
}

}
}
调用的方式如下
object PhoenixTest{

def main(args: Array[String]): Unit = {

val pc = new PhoenixClient_v2("zk1","2181")
println("select result :"+pc.Select_Sql("select * from \"OP_record\"").toString)
pc.CreateTable("create table if not exists \"OP_record\"(\"pk\" varchar(100) not null primary key, \"cf1\".\"curdonate\" varchar(20),\"cf1\".\"curcharge\" varchar(20),\"cf1\".\"uniqueid\" varchar(20))","OP_record")

val KeyArr = Array("serverid","userid","Optime")
val ColumnArr = Array("curdonate","curcharge","uniqueid")
pc.InsertOfBatch("upsert into \"OP_record\" values(?,?,?,?)",KeyArr,ColumnArr,"hdfs://zk1:9000/chen/hive_OP_record_diamonds_20170702.log.1499028228.106")
    }
}
报错信息如下:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:318)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:317)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
    at org.apache.spark.rdd.RDD.map(RDD.scala:317)
    at scala.Hbase.PhoenixClient_v2.InsertOfBatch(PhoenixClient_v2.scala:123)
    at scala.Hbase.PhoenixTest$.main(PhoenixClient_v2.scala:230)
    at scala.Hbase.PhoenixTest.main(PhoenixClient_v2.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.io.NotSerializableException: org.apache.phoenix.jdbc.PhoenixPreparedStatement
Serialization stack:
- object not serializable (class: org.apache.phoenix.jdbc.PhoenixPreparedStatement, value: upsert into "OP_record" values(?,?,?,?))
- field (class: scala.Hbase.PhoenixClient_v2$$anonfun$InsertOfBatch$1, name: ps$1, type: interface java.sql.PreparedStatement)
- object (class scala.Hbase.PhoenixClient_v2$$anonfun$InsertOfBatch$1, )
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 21 more
17/07/06 16:35:26 INFO SparkContext: Invoking stop() from shutdown hook
17/07/06 16:35:26 INFO SparkUI: Stopped Spark web UI at http://192.168.12.243:4040
17/07/06 16:35:26 INFO DAGScheduler: Stopping DAGScheduler
17/07/06 16:35:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/06 16:35:27 INFO MemoryStore: MemoryStore cleared
17/07/06 16:35:27 INFO BlockManager: BlockManager stopped
17/07/06 16:35:27 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/06 16:35:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/07/06 16:35:27 INFO SparkContext: Successfully stopped SparkContext
17/07/06 16:35:27 INFO ShutdownHookManager: Shutdown hook called
17/07/06 16:35:27 INFO ShutdownHookManager: Deleting directory /tmp/spark-18cc3337-cdda-498f-a7f3-124f239a6bfe

  • 写回答

1条回答 默认 最新

  • zqbnqsdsmd 2018-08-15 16:29
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 #MATLAB仿真#车辆换道路径规划
  • ¥15 java 操作 elasticsearch 8.1 实现 索引的重建
  • ¥15 数据可视化Python
  • ¥15 要给毕业设计添加扫码登录的功能!!有偿
  • ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
  • ¥15 微信公众号自制会员卡没有收款渠道啊
  • ¥100 Jenkins自动化部署—悬赏100元
  • ¥15 关于#python#的问题:求帮写python代码
  • ¥20 MATLAB画图图形出现上下震荡的线条
  • ¥15 关于#windows#的问题:怎么用WIN 11系统的电脑 克隆WIN NT3.51-4.0系统的硬盘