我需要编写 scala 代码,使用 spark 将 mysql 的 shtd_store 库中的 user_info 表的数据增量抽取到 hudi 的 ods 库中(路径为 /user/hive/warehouse/ods.db)的 user_info 中。根据 ods.user_info 表中的 operate_time 或 create_time 作为增量字段字段(即 MySQL 中每条数据取这两个时间中较大的那个时间作为增量字段去和 ods 里的这两个字段中较大的时间进行比较)。只将新增的数据抽入,字段名称、类型不变,同时添加分区,若 operate_time 为空,则用 create_time 填充,分区字段为 etl_date,类型为 String,且值为当天日期(分区字段格式为 y-M-d)。id 作为 primaryKey,operate_time 作为 preCombineField。
我应该如何编写这段代码,请各位指教,最好能出完整代码,感谢!