如何用spark将带有union类型的avro消息存入hive

我现在是用spark streaming 读取kafka中的avro消息,反序列化后希望使用spark sql存入hive或者hdfs
但是不管我是将avro转成json还是case class,即使在DStream.print能够正确打印出来,使用spark.foreachRdd进行后续操作时都会报错。

  1. 我找了第三方包,将avdl文件转成caseclass,由于avro中存在union(record,record,record)的类型,所以case class是带有shapeless的类型的,在转成spark后会报 is not a term的错误
  2. 这一次是使用json4s将对象转换成了json,同样,在DStream.print能够正确打印,但是进入spark.foreachRdd后就会报错,no value for 'BAD', 不知道是不是因为不同的json中有一些属性是null,有一些不是null
  3. 这一次我直接使用avro的tostring,和不同的是,在DStream中能打印,进入foreachRdd也并没有报错,但是部分json显示是corrupt.

第二和第三的json我把打印出来的字符串放入json文件里,使用spark.read.json读都是完全没问题的,我就有点搞不明白了,各位大神,有办法能够处理吗?
大家如果有这种需求一般是如何做的呢?
Code Snippet:

def main {
val kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams))

println("Schema:" + schema)
val stream = deserialize(config, kafkaStream)

process(approach, isPrint, stream)
ssc.start()
ssc.awaitTermination()
}

def process(approach: Int, isPrint: Boolean, stream: DStream[DeserializedFromKafkaRecord]): Unit = {
approach match {
/**
* Approach 1 avrohugger + avro4s
* Convert GenericData into CaseClass
*/
case 1 => {
val mappedStream = stream.map(record => {
val format = RecordFormat[A]
val ennio = format.from(record.value)
ennio
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = rdd.toDF()
df.show()
}

})
}
}
/**
* Approach 2 avro4s + json4s
* Convert CaseClass into Json
*/
case 2 => {
val mappedStream = stream.map(record => {
val format = RecordFormat[A]
val ennio = format.from(record.value)
implicit val formats = DefaultFormats.preservingEmptyValues
write(ennio)
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
df.show()
}

})
}
}
/**
* Approach 3
* Convert GenericData into Json
*/
case 3 => {
val mappedStream = stream.mapPartitions(partition => {
partition.map(_.value.toString)
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
df.show()
}

})
}
}
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!