//代码
import org.apache.spark.sql.{SaveMode, SparkSession}
object hudi_1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("yarn")
.appName("HudiExample")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.enableHiveSupport()
.getOrCreate()
val mysqlUrl = "jdbc:mysql://192.168.23.45:3306/ds_db01"
val mysqlUser = "root"
val mysqlPassword = "123456"
val mysqlDf = spark.read.format("jdbc")
.option("url", mysqlUrl)
.option("user", mysqlUser)
.option("password", mysqlPassword)
.option("driver", "com.mysql.jdbc.Driver")
.option("dbTable", "ds_db01.customer_inf")
.load()
mysqlDf.write
.format("org.apache.hudi")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.table.name", "customer_inf")
.option("hoodie.datasource.write.recordkey.field", "customer_inf_id")
.option("hoodie.datasource.write.partitionpath.field", "etl_date")
.option("hoodie.datasource.write.precombine.field", "modified_time")
.option("hoodie.datasource.write.operation", "insert")
.mode(SaveMode.Append)
.save("hdfs://192.168.23.45:9000/user/hudi_2/table")
}
}
//错误
(size: 11.9 KiB, free: 365.7 MiB)
2023-11-22 16:50:52,325 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 13.0 (TID 11) (bigdata1 executor 2): java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SQLConf$.AVRO_REBASE_MODE_IN_WRITE()Lorg/apache/spark/internal/config/ConfigEntry;
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:65)
at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroSerializer.<init>(HoodieSpark3_1AvroSerializer.scala:26)
at org.apache.spark.sql.adapter.Spark3_1Adapter.createAvroSerializer(Spark3_1Adapter.scala:45)
at org.apache.hudi.AvroConversionUtils$.createInternalRowToAvroConverter(AvroConversionUtils.scala:81)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:178)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2023-11-22 16:50:52,327 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 13.0 (TID 12) (bigdata1, executor 2, partition 0, PROCESS_LOCAL, 4299 bytes) taskResourceAssignments Map()
2023-11-22 16:50:52,424 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 13.0 (TID 12) on bigdata1, executor 2: java.lang.NoSuchMethodError (org.apache.spark.sql.internal.SQLConf$.AVRO_REBASE_MODE_IN_WRITE()Lorg/apache/spark/internal/config/ConfigEntry;) [duplicate 1]
2023-11-22 16:50:52,425 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 13.0 (TID 13) (bigdata1, executor 2, partition 0, PROCESS_LOCAL, 4299 bytes) taskResourceAssignments Map()
2023-11-22 16:50:52,498 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 13.0 (TID 13) on bigdata1, executor 2: java.lang.NoSuchMethodError (org.apache.spark.sql.internal.SQLConf$.AVRO_REBASE_MODE_IN_WRITE()Lorg/apache/spark/internal/config/ConfigEntry;) [duplicate 2]
2023-11-22 16:50:52,500 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 13.0 (TID 14) (bigdata2, executor 1, partition 0, PROCESS_LOCAL, 4299 bytes) taskResourceAssignments Map()
2023-11-22 16:50:52,532 INFO storage.BlockManagerInfo: Added broadcast_11_piece0 in memory on bigdata2:42183 (size: 11.9 KiB, free: 366.3 MiB)
2023-11-22 16:51:00,416 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 13.0 (TID 14) on bigdata2, executor 1: java.lang.NoSuchMethodError (org.apache.spark.sql.internal.SQLConf$.AVRO_REBASE_MODE_IN_WRITE()Lorg/apache/spark/internal/config/ConfigEntry;) [duplicate 3]
2023-11-22 16:51:00,418 ERROR scheduler.TaskSetManager: Task 0 in stage 13.0 failed 4 times; aborting job
2023-11-22 16:51:00,423 INFO cluster.YarnScheduler: Removed TaskSet 13.0, whose tasks have all completed, from pool
2023-11-22 16:51:00,427 INFO cluster.YarnScheduler: Cancelling stage 13
2023-11-22 16:51:00,427 INFO cluster.YarnScheduler: Killing all running tasks in stage 13: Stage cancelled
2023-11-22 16:51:00,428 INFO scheduler.DAGScheduler: ShuffleMapStage 13 (countByKey at HoodieJavaPairRDD.java:105) failed in 9.605 s due to Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 14) (bigdata2 executor 1): java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SQLConf$.AVRO_REBASE_MODE_IN_WRITE()Lorg/apache/spark/internal/config/ConfigEntry;
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:65)
at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroSerializer.<init>(HoodieSpark3_1AvroSerializer.scala:26)
at org.apache.spark.sql.adapter.Spark3_1Adapter.createAvroSerializer(Spark3_1Adapter.scala:45)
at org.apache.hudi.AvroConversionUtils$.createInternalRowToAvroConverter(AvroConversionUtils.scala:81)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:178)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
2023-11-22 16:51:00,431 INFO scheduler.DAGScheduler: Job 9 failed: countByKey at HoodieJavaPairRDD.java:105, took 9.614751 s
Exception in thread "main" org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20231122165012889
at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
at org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor.execute(SparkInsertCommitActionExecutor.java:45)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insert(HoodieSparkCopyOnWriteTable.java:118)
at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.insert(HoodieSparkCopyOnWriteTable.java:97)
at org.apache.hudi.client.SparkRDDWriteClient.insert(SparkRDDWriteClient.java:180)
at org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:204)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:329)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at hudi_1$.main(hudi_1.scala:36)
at hudi_1.main(hudi_1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage 13.0 (TID 14) (bigdata2 executor 1): java.lang.NoSuchMethodError: org.apache.spark.sql.internal.SQLConf$.AVRO_REBASE_MODE_IN_WRITE()Lorg/apache/spark/internal/config/ConfigEntry;
at org.apache.hudi.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:65)
at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_1AvroSerializer.<init>(HoodieSpark3_1AvroSerializer.scala:26)
at org.apache.spark.sql.adapter.Spark3_1Adapter.createAvroSerializer(Spark3_1Adapter.scala:45)
at org.apache.hudi.AvroConversionUtils$.createInternalRowToAvroConverter(AvroConversionUtils.scala:81)
at org.apache.hudi.HoodieSparkUtils$.$anonfun$createRdd$2(HoodieSparkUtils.scala:178)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
hudi有人会吗,这是遇到的问题
- 写回答
- 好问题 0 提建议
- 追加酬金
- 关注问题
- 邀请回答
-
2条回答 默认 最新
关注 引自强大而免费的微信小程序:皆我百晓生。
问题分析:
根据错误日志,发现问题出在使用Hudi的过程中,具体是在写入数据时出现了NoSuchMethodError错误。根据错误信息可知,是由于缺少Avro Rebase Mode的配置引起的。
解决方案:
- 首先,确认使用的Hudi版本与Spark版本是否兼容。不同版本的Hudi可能需要不同版本的Spark支持。可以查阅相关文档或Hudi官方网站来获取版本兼容性信息。
- 如果版本兼容,尝试通过添加相关的依赖项来解决问题。可以在项目的build.sbt文件中添加以下依赖项:
libraryDependencies += "org.apache.avro" % "avro" % "1.8.2"
3.如果问题仍然存在,可以尝试更新Spark版本。根据错误信息,是由于缺少AVRO_REBASE_MODE_IN_WRITE配置导致的错误,可能是由于Spark的版本较低,不支持该配置。尝试更新Spark版本到较高的版本,并重新构建项目。
4.如果以上方法都无效,可以尝试使用较低版本的Hudi或Spark,或者使用其他的数据湖解决方案。
代码示例:
import org.apache.spark.sql.{SaveMode, SparkSession} object HudiExample { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("yarn") .appName("HudiExample") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.hive.convertMetastoreParquet", "false") .enableHiveSupport() .getOrCreate() val mysqlUrl = "jdbc:mysql://192.168.23.45:3306/ds_db01" val mysqlUser = "root" val mysqlPassword = "123456" val mysqlDf = spark.read.format("jdbc") .option("url", mysqlUrl) .option("user", mysqlUser) .option("password", mysqlPassword) .option("driver", "com.mysql.jdbc.Driver") .option("dbTable", "ds_db01.customer_inf") .load() mysqlDf.write .format("org.apache.hudi") .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE") .option("hoodie.table.name", "customer_inf") .option("hoodie.datasource.write.recordkey.field", "customer_inf_id") .option("hoodie.datasource.write.partitionpath.field", "etl_date") .option("hoodie.datasource.write.precombine.field", "modified_time") .option("hoodie.datasource.write.operation", "insert") .mode(SaveMode.Append) .save("hdfs://192.168.23.45:9000/user/hudi_2/table") } }
以上是一个简单的使用Hudi写入数据的示例代码,将MySQL中的数据加载到DataFrame中,然后使用Hudi将DataFrame写入到HDFS的表中。注意替换相应的MySQL连接信息和Hudi配置信息。
希望以上解决方案能对你有帮助!如果还有其他问题,请随时提问。
解决 无用评论 打赏 举报 编辑记录
悬赏问题
- ¥15 券商软件上市公司信息获取问题
- ¥100 ensp启动设备蓝屏,代码clock_watchdog_timeout
- ¥15 Android studio AVD启动不了
- ¥15 陆空双模式无人机怎么做
- ¥15 想咨询点问题,与算法转换,负荷预测,数字孪生有关
- ¥15 C#中的编译平台的区别影响
- ¥15 软件供应链安全是跟可靠性有关还是跟安全性有关?
- ¥15 电脑蓝屏logfilessrtsrttrail问题
- ¥20 关于wordpress建站遇到的问题!(语言-php)(相关搜索:云服务器)
- ¥15 【求职】怎么找到一个周围人素质都很高不会欺负他人,并且未来月薪能够达到一万以上(技术岗)的工作?希望可以收到写有具体,可靠,已经实践过了的路径的回答?