因为有需求要记录一张表删除的数据,计划使用flink cdc来做,因为需要过滤,所以使用了debezium.skipped.operations参数,但是这个参数并未生效,还是会把insert update 记录同步到sink,想问下是哪里出问题了呢?下面是代码。
package org.example.sqlcdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.Properties;
public class SqlCdc {
public static void main(String[] args) throws Exception {
// Properties debeziumProperties = new Properties();
// debeziumProperties.put("skipped.operations", "c,u,t");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("create table demo1(" +
" id INT," +
" name STRING," +
" primary key (id) not enforced" +
")WITH(" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'localhost'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = 'root@root'," +
" 'database-name' = 'local'," +
" 'table-name' = 'from_table'," +
" 'debezium.skipped.operations' = 'c,u,t'" +
")");
tableEnv.executeSql("create table demo2(" +
" id INT," +
" name STRING," +
" primary key (id) not enforced" +
")WITH(" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:3306/local?serverTimezone=UTC'," +
" 'username' = 'root'," +
" 'password' = 'root@root'," +
" 'table-name' = 'to_table'," +
" 'driver' = 'com.mysql.cj.jdbc.Driver'," +
" 'scan.fetch-size' = '200'" +
")");
tableEnv.executeSql("insert into demo2 select id,name from demo1");
tableEnv.executeSql("select * from demo1").print();
env.execute("FLinkSql Cdc");
}
}