RanFps 2024-06-20 19:14 采纳率: 0%
浏览 14

Flinkcdcmysql实时同步pgsql报错,如何排查后面id

Flink mysql实时同步pgsql。为什么我吧数据同步到pgsql中id一直在DO UPDATE SET 后面这个应该怎么才能排除掉呢

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv  = StreamTableEnvironment.create(env);
        // 创建MySQL CDC源表
        String sourceDDL = "CREATE TABLE mysql_source  (" +
                " id INT," +
                " name STRING," +
                " age STRING," +
                " sex STRING," +
                " phone STRING," +
                " email STRING," +
                " birthday STRING," +
                " PRIMARY KEY (id) NOT ENFORCED" +
                ") WITH (" +
                " 'server-time-zone' = 'UTC'," +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = '*'," +
                " 'port' = '3306'," +
                " 'username' = '*'," +
                " 'password' = '*'," +
                " 'scan.startup.mode' = 'latest-offset'," +
                " 'database-name' = '*'," +
                " 'table-name' = '*'," +
                " 'scan.incremental.snapshot.chunk.key-column' = 'id'" +
                ")";
        // 注册源表
        tableEnv.executeSql(sourceDDL);
     
        // 配置 PostgreSQL Sink
        String postgresSinkDDL =
                "CREATE TABLE postgres_sink (" +
                        " id INT PRIMARY KEY," +
                        " name STRING," +
                        " age STRING," +
                        " sex STRING," +
                        " phone STRING," +
                        " email STRING," +
                        " birthday STRING" +
                        ") WITH (" +
                        "  'connector' = 'jdbc'," +
                        "  'url' = 'jdbc:postgresql://*:5432/*'," +
                        "  'table-name' = ‘*'," +
                        "  'username' = '*'," +
                        "  'password' = '*'"+
                        ")";

        tableEnv.executeSql(postgresSinkDDL);
        // 插入语句,将 MySQL 数据插入到 PostgreSQL
        tableEnv.executeSql("INSERT INTO postgres_sink SELECT * FROM mysql_source").print();

        // 触发 Flink 作业执行
        env.execute("MySQL to PostgreSQL CDC Example");
Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:181)
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:187)
    at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
    at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:154)
    at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:162)
    at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:144)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:129)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:111)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:83)
    at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:55)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:181)
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:212)
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO ods.ods_cdc_user1(id, name, age, sex, phone, email, birthday) VALUES (455, 'df', '24', '1', '139', '123@321.com', '1696953132000') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, age=EXCLUDED.age, sex=EXCLUDED.sex, phone=EXCLUDED.phone, email=EXCLUDED.email, birthday=EXCLUDED.birthday was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported  Call getNextException to see other errors in the batch.
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
    ... 8 more
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO ods.ods_cdc_user1(id, name, age, sex, phone, email, birthday) VALUES (455, 'df', '24', '1', '139', '123@321.com', '1696953132000') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, age=EXCLUDED.age, sex=EXCLUDED.sex, phone=EXCLUDED.phone, email=EXCLUDED.email, birthday=EXCLUDED.birthday was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported  Call getNextException to see other errors in the batch.
    at org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:199)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:532)
    at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:870)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:893)
    at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
    at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
    at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
    at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
    at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
    ... 8 more
Caused by: org.postgresql.util.PSQLException: ERROR: modification of distribution columns in OnConflictUpdate is not supported
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521)
    ... 16 more

  • 写回答

2条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2024-06-20 19:15
    关注

    引自免费微信小程序:皆我百晓生

    根据你提供的错误信息,问题似乎出现在尝试将数据插入到PostgreSQL表时发生的。异常堆栈指出插入操作时发生了一个BatchUpdateException,并且具体的错误信息是:“修改分布列在OnConflictUpdate中不受支持”。这可能是因为你在执行INSERT INTO语句时使用了PostgreSQL的ON CONFLICT DO UPDATE语句,试图在发生冲突时更新某些列,但是可能涉及到了一些不应该被修改的列。

    首先,确保你的PostgreSQL表的主键(在你的例子中为id)已经被正确地定义为唯一键或主键。然后,你需要检查你的INSERT语句中的ON CONFLICT DO UPDATE部分,确保你没有试图更新任何被认为是分布列的字段。在PostgreSQL中,分布列通常用于物理存储和查询优化,不应该被直接修改。

    另外,你需要检查你的JDBC连接配置是否正确,特别是URL、用户名和密码等参数。确保你正在连接到正确的数据库和表。此外,还需要检查PostgreSQL的版本是否支持你所使用的特性。

    对于你的Flink代码,确保你已经正确创建了源表和目标表,并且DDL语句是正确的。还需要检查是否有足够的权限来访问和操作数据库中的表。

    如果你仍然遇到问题,你可以尝试以下步骤来进一步调试:

    1. 确保你的PostgreSQL数据库已经正确配置并可以正常工作。你可以尝试使用其他工具(如PgAdmin或命令行工具)来手动插入数据并查看是否遇到相同的问题。
    2. 检查你的Flink配置是否正确,包括JDBC连接参数和表结构定义等。确保所有的参数都是正确的并且与数据库配置匹配。
    3. 查看Flink的日志以获取更多关于错误的详细信息。查看是否有其他警告或错误信息可以帮助你找到问题的根源。

    如果你能提供更多关于你的数据库结构、表定义和具体的INSERT语句的信息,我可能能够提供更具体的帮助。

    评论 编辑记录

报告相同问题?

问题事件

  • 修改了问题 6月20日
  • 创建了问题 6月20日

悬赏问题

  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见
  • ¥15 一共有五道问题关于整数幂的运算还有房间号码 还有网络密码的解答?(语言-python)
  • ¥20 sentry如何捕获上传Android ndk 崩溃
  • ¥15 在做logistic回归模型限制性立方条图时候,不能出完整图的困难
  • ¥15 G0系列单片机HAL库中景园gc9307液晶驱动芯片无法使用硬件SPI+DMA驱动,如何解决?