ToFind1996 2023-03-02 01:04 采纳率: 0%
浏览 20

flink cdc 使用debezium 参数不生效

因为有需求要记录一张表删除的数据,计划使用flink cdc来做,因为需要过滤,所以使用了debezium.skipped.operations参数,但是这个参数并未生效,还是会把insert update 记录同步到sink,想问下是哪里出问题了呢?下面是代码。

package org.example.sqlcdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Properties;

public class SqlCdc {
    public static void main(String[] args) throws Exception {
//        Properties debeziumProperties = new Properties();
//        debeziumProperties.put("skipped.operations", "c,u,t");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("create table demo1(" +
                " id INT," +
                " name STRING," +
                " primary key (id) not enforced" +
                ")WITH(" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = 'localhost'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = 'root@root'," +
                " 'database-name' = 'local'," +
                " 'table-name' = 'from_table'," +
                " 'debezium.skipped.operations' = 'c,u,t'" +
                ")");

        tableEnv.executeSql("create table demo2(" +
                " id INT," +
                " name STRING," +
                " primary key (id) not enforced" +
                ")WITH(" +
                " 'connector' = 'jdbc'," +
                " 'url' = 'jdbc:mysql://localhost:3306/local?serverTimezone=UTC'," +
                " 'username' = 'root'," +
                " 'password' = 'root@root'," +
                " 'table-name' = 'to_table'," +
                " 'driver' = 'com.mysql.cj.jdbc.Driver'," +
                " 'scan.fetch-size' = '200'" +
                ")");
        tableEnv.executeSql("insert into demo2 select id,name from demo1");
        tableEnv.executeSql("select * from demo1").print();
        env.execute("FLinkSql Cdc");

    }
}




  • 写回答

1条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-03-02 02:43
    关注
    评论

报告相同问题?

问题事件

  • 创建了问题 3月2日

悬赏问题

  • ¥15 is not in the mmseg::model registry。报错,模型注册表找不到自定义模块。
  • ¥15 安装quartus II18.1时弹出此error,怎么解决?
  • ¥15 keil官网下载psn序列号在哪
  • ¥15 想用adb命令做一个通话软件,播放录音
  • ¥30 Pytorch深度学习服务器跑不通问题解决?
  • ¥15 部分客户订单定位有误的问题
  • ¥15 如何在maya程序中利用python编写领子和褶裥的模型的方法
  • ¥15 Bug traq 数据包 大概什么价
  • ¥15 在anaconda上pytorch和paddle paddle下载报错
  • ¥25 自动填写QQ腾讯文档收集表