Cd_Empty 2019-03-03 14:37 采纳率: 0%
浏览 445

如何用spark将带有union类型的avro消息存入hive

我现在是用spark streaming 读取kafka中的avro消息,反序列化后希望使用spark sql存入hive或者hdfs
但是不管我是将avro转成json还是case class,即使在DStream.print能够正确打印出来,使用spark.foreachRdd进行后续操作时都会报错。

  1. 我找了第三方包,将avdl文件转成caseclass,由于avro中存在union(record,record,record)的类型,所以case class是带有shapeless的类型的,在转成spark后会报 is not a term的错误
  2. 这一次是使用json4s将对象转换成了json,同样,在DStream.print能够正确打印,但是进入spark.foreachRdd后就会报错,no value for 'BAD', 不知道是不是因为不同的json中有一些属性是null,有一些不是null
  3. 这一次我直接使用avro的tostring,和不同的是,在DStream中能打印,进入foreachRdd也并没有报错,但是部分json显示是corrupt.

第二和第三的json我把打印出来的字符串放入json文件里,使用spark.read.json读都是完全没问题的,我就有点搞不明白了,各位大神,有办法能够处理吗?
大家如果有这种需求一般是如何做的呢?
Code Snippet:

def main {
val kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte]](ssc, PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams))

println("Schema:" + schema)
val stream = deserialize(config, kafkaStream)

process(approach, isPrint, stream)
ssc.start()
ssc.awaitTermination()
}

def process(approach: Int, isPrint: Boolean, stream: DStream[DeserializedFromKafkaRecord]): Unit = {
approach match {
/**
* Approach 1 avrohugger + avro4s
* Convert GenericData into CaseClass
*/
case 1 => {
val mappedStream = stream.map(record => {
val format = RecordFormat[A]
val ennio = format.from(record.value)
ennio
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = rdd.toDF()
df.show()
}

})
}
}
/**
* Approach 2 avro4s + json4s
* Convert CaseClass into Json
*/
case 2 => {
val mappedStream = stream.map(record => {
val format = RecordFormat[A]
val ennio = format.from(record.value)
implicit val formats = DefaultFormats.preservingEmptyValues
write(ennio)
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
df.show()
}

})
}
}
/**
* Approach 3
* Convert GenericData into Json
*/
case 3 => {
val mappedStream = stream.mapPartitions(partition => {
partition.map(_.value.toString)
})
if (isPrint) {
mappedStream.print()
}else {
mappedStream.foreachRDD(foreachFunc = rdd => {
if (!rdd.partitions.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
df.show()
}

})
}
}
  • 写回答

1条回答 默认 最新

  • 你知我知皆知 2024-08-08 03:52
    关注

    以下回答参考 皆我百晓生券券喵儿 等免费微信小程序相关内容作答,并由本人整理回复。

    Apache Spark和Hive之间的数据导入通常涉及到两种方法:一种是通过Spark SQL的内置函数或UDF(用户定义的函数)来实现,另一种是利用外部工具(例如Parquet、Avro等)来创建一个本地的数据存储格式,然后将其加载到Hive表中。以下是通过Spark SQL的内置函数和UDF来导入AVRO数据的方法:

    1. 首先,你需要安装必要的依赖项,包括avro库。
    pip install pyarrow pandas
    
    1. 然后,你可以使用以下代码将AVRO数据导入Spark DataFrame:
    from pyspark.sql import SparkSession
    from pyarrow import io
    from pyarrow import parquet
    from pyarrow import dataset
    from pyarrow.parquet import write as parquet_write
    
    # 创建SparkSession实例
    spark = SparkSession.builder.appName('LoadAVRO').getOrCreate()
    
    # 使用pyarrow读取AVRO文件
    avro_file_path = 'path/to/your/file.avro'
    dataframe = io.Arrow.open(avro_file_path)
    
    # 将DataFrame写入本地Parquet文件
    parquet_write(dataframe, 'path/to/save/output.parquet')
    

    注意:这个例子使用了pyarrow库,如果你没有安装它,可以使用以下命令安装:

    pip install pyarrow pandas
    

    以上代码首先创建了一个SparkSession实例,并指定了应用名称。然后,它使用pyarrow.open()函数从指定的AVRO文件路径读取数据。最后,它使用parquet_write()函数将读取的数据写入本地的Parquet文件。

    如果你想将AVRO数据导出为Hive表,你可以使用pyspark.sql.HiveContext类,如下所示:

    from pyspark.sql import HiveContext
    
    # 创建HiveContext实例
    hc = HiveContext(sc)
    
    # 将DataFrame写入Hive表
    hc.sql("CREATE TABLE my_table AS SELECT * FROM your_spark_df")
    hc.sql("INSERT INTO my_table SELECT * FROM your_spark_df")
    

    这将会创建一个新的Hive表并插入你的Spark DataFrame。

    评论

报告相同问题?