Cd_Empty 2019-03-03 14:37 采纳率: 0%
浏览 445

如何用spark将带有union类型的avro消息存入hive

我现在是用spark streaming 读取kafka中的avro消息,反序列化后希望使用spark sql存入hive或者hdfs
但是不管我是将avro转成json还是case class,即使在DStream.print能够正确打印出来,使用spark.foreachRdd进行后续操作时都会报错。

  1. 我找了第三方包,将avdl文件转成caseclass,由于avro中存在union(record,record,record)的类型,所以case class是带有shapeless的类型的,在转成spark后会报 is not a term的错误
  2. 这一次是使用json4s将对象转换成了json,同样,在DStream.print能够正确打印,但是进入spark.foreachRdd后就会报错,no value for 'BAD', 不知道是不是因为不同的json中有一些属性是null,有一些不是null
  3. 这一次我直接使用avro的tostring,和不同的是,在DStream中能打印,进入foreachRdd也并没有报错,但是部分json显示是corrupt.

第二和第三的json我把打印出来的字符串放入json文件里,使用spark.read.json读都是完全没问题的,我就有点搞不明白了,各位大神,有办法能够处理吗?
大家如果有这种需求一般是如何做的呢?
Code Snippet:

def main {
val kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams))

println("Schema:" + schema)
val stream = deserialize(config, kafkaStream)

process(approach, isPrint, stream)
ssc.start()
ssc.awaitTermination()
}

def process(approach: Int, isPrint: Boolean, stream: DStream[DeserializedFromKafkaRecord]): Unit = {
approach match {
/**
* Approach 1 avrohugger + avro4s
* Convert GenericData into CaseClass
*/
case 1 => {
val mappedStream = stream.map(record => {
val format = RecordFormat[A]
val ennio = format.from(record.value)
ennio
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = rdd.toDF()
df.show()
}

})
}
}
/**
* Approach 2 avro4s + json4s
* Convert CaseClass into Json
*/
case 2 => {
val mappedStream = stream.map(record => {
val format = RecordFormat[A]
val ennio = format.from(record.value)
implicit val formats = DefaultFormats.preservingEmptyValues
write(ennio)
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
df.show()
}

})
}
}
/**
* Approach 3
* Convert GenericData into Json
*/
case 3 => {
val mappedStream = stream.mapPartitions(partition => {
partition.map(_.value.toString)
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
df.show()
}

})
}
}
  • 写回答

0条回答

    报告相同问题?

    悬赏问题

    • ¥30 这是哪个作者做的宝宝起名网站
    • ¥60 版本过低apk如何修改可以兼容新的安卓系统
    • ¥25 由IPR导致的DRIVER_POWER_STATE_FAILURE蓝屏
    • ¥50 有数据,怎么建立模型求影响全要素生产率的因素
    • ¥50 有数据,怎么用matlab求全要素生产率
    • ¥15 TI的insta-spin例程
    • ¥15 完成下列问题完成下列问题
    • ¥15 C#算法问题, 不知道怎么处理这个数据的转换
    • ¥15 YoloV5 第三方库的版本对照问题
    • ¥15 请完成下列相关问题!