zjr12138 2023-11-28 14:13 采纳率: 66.7%
浏览 55

使用 scala 语言的 spark.sql 把 mysql 数据库中的数据增量抽取到 hudi 中

我需要编写 scala 代码,使用 spark 将 mysql 的 shtd_store 库中的 user_info 表的数据增量抽取到 hudi 的 ods 库中(路径为 /user/hive/warehouse/ods.db)的 user_info 中。根据 ods.user_info 表中的 operate_time 或 create_time 作为增量字段字段(即 MySQL 中每条数据取这两个时间中较大的那个时间作为增量字段去和 ods 里的这两个字段中较大的时间进行比较)。只将新增的数据抽入,字段名称、类型不变,同时添加分区,若 operate_time 为空,则用 create_time 填充,分区字段为 etl_date,类型为 String,且值为当天日期(分区字段格式为 y-M-d)。id 作为 primaryKey,operate_time 作为 preCombineField。

我应该如何编写这段代码,请各位指教,最好能出完整代码,感谢!

  • 写回答

3条回答 默认 最新

  • 往事随风ing 博客专家认证 2023-11-28 15:01
    关注

    建议:使用 Flink CDC 直接增量实时写入 Hudi 表。
    注意:离线初始化/重置表时,才使用 Spark 拉取。

    代码参考:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

    评论 编辑记录

报告相同问题?

问题事件

  • 创建了问题 11月28日

悬赏问题

  • ¥15 单纯型python实现编译报错
  • ¥15 c++2013读写oracle
  • ¥15 c++ gmssl sm2验签demo
  • ¥15 关于模的完全剩余系(关键词-数学方法)
  • ¥15 有没有人懂这个博图程序怎么写,还要跟SFB连接,真的不会,求帮助
  • ¥15 PVE8.2.7无法成功使用a5000的vGPU,什么原因
  • ¥15 is not in the mmseg::model registry。报错,模型注册表找不到自定义模块。
  • ¥15 安装quartus II18.1时弹出此error,怎么解决?
  • ¥15 keil官网下载psn序列号在哪
  • ¥15 想用adb命令做一个通话软件,播放录音