Flink mysql实时同步pgsql。为什么我吧数据同步到pgsql中id一直在DO UPDATE SET 后面这个应该怎么才能排除掉呢
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建MySQL CDC源表
String sourceDDL = "CREATE TABLE mysql_source (" +
" id INT," +
" name STRING," +
" age STRING," +
" sex STRING," +
" phone STRING," +
" email STRING," +
" birthday STRING," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'server-time-zone' = 'UTC'," +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = '*'," +
" 'port' = '3306'," +
" 'username' = '*'," +
" 'password' = '*'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'database-name' = '*'," +
" 'table-name' = '*'," +
" 'scan.incremental.snapshot.chunk.key-column' = 'id'" +
")";
// 注册源表
tableEnv.executeSql(sourceDDL);
// 配置 PostgreSQL Sink
String postgresSinkDDL =
"CREATE TABLE postgres_sink (" +
" id INT PRIMARY KEY," +
" name STRING," +
" age STRING," +
" sex STRING," +
" phone STRING," +
" email STRING," +
" birthday STRING" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:postgresql://*:5432/*'," +
" 'table-name' = ‘*'," +
" 'username' = '*'," +
" 'password' = '*'"+
")";
tableEnv.executeSql(postgresSinkDDL);
// 插入语句,将 MySQL 数据插入到 PostgreSQL
tableEnv.executeSql("INSERT INTO postgres_sink SELECT * FROM mysql_source").print();
// 触发 Flink 作业执行
env.execute("MySQL to PostgreSQL CDC Example");
Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:181)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:187)
at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:154)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:162)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:144)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:129)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:111)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:83)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:55)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:181)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:212)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO ods.ods_cdc_user1(id, name, age, sex, phone, email, birthday) VALUES (455, 'df', '24', '1', '139', '123@321.com', '1696953132000') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, age=EXCLUDED.age, sex=EXCLUDED.sex, phone=EXCLUDED.phone, email=EXCLUDED.email, birthday=EXCLUDED.birthday was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
... 8 more
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO ods.ods_cdc_user1(id, name, age, sex, phone, email, birthday) VALUES (455, 'df', '24', '1', '139', '123@321.com', '1696953132000') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, age=EXCLUDED.age, sex=EXCLUDED.sex, phone=EXCLUDED.phone, email=EXCLUDED.email, birthday=EXCLUDED.birthday was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch.
at org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:199)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:532)
at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:870)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:893)
at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
... 8 more
Caused by: org.postgresql.util.PSQLException: ERROR: modification of distribution columns in OnConflictUpdate is not supported
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521)
... 16 more