laijunlin_data 2023-08-31 15:57 采纳率: 58.3%
浏览 23

flinksql的方式通过flinkcdc将mysql的binlog写入kafka,一定要用upsert-kafka吗

#遇到的问题
flinksql的通过flinkcdc从mysql读取一张表,写入kafka,报错信息:

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafka_t4' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, cdc_t4]], fields=[id, name])

#运行的代码

tableEnv.executeSql(
                "CREATE TABLE cdc_t4 (\n" +
                        "    id bigint,\n" +
                        "    name varchar,\n" +
                        "    PRIMARY KEY (id) NOT ENFORCED\n" +
                        "  ) WITH (\n" +
                        "    'connector' = 'mysql-cdc',\n" +
                        "    'hostname' = 'cnap421',\n" +
                        "    'port' = '3306',\n" +
                        "    'server-time-zone' = 'Asia/Shanghai',\n" +
                        "    'debezium.snapshot.mode' = 'initial',\n" + //initial,earliest,latest
                        "    'username' = 'root',\n" +
                        "    'password' = 'sgs123654.',\n" +
                        "    'database-name' = 'test',\n" +
                        "    'table-name' = 't4'\n" +
                        "  )"
        );

        tableEnv.executeSql(
                "create table kafka_t4(id bigint, name varchar"
                        + ") with(\n"
                        + "'connector' = 'kafka',\n"
                        + "'topic' = 'test_output_sink',\n"
                        + "'properties.bootstrap.servers' = '10.168.136.31:9092,10.168.136.28:9092',\n"
                        + "'format' = 'json'\n"
                        + ")"

        );

        tableEnv.executeSql(
                "insert into kafka_t4 select * from cdc_t4"
        );


#解决思路
将connect的kafka换成upsert-kafka就可以了,我以前一直使用flink的datastream的api方式处理的,没注意这个。
#现在的疑问
flinksql的方式通过flinkcdc将mysql的binlog写入kafka,一定要用upsert-kafka吗

  • 写回答

2条回答 默认 最新

  • CSDN-Ada助手 CSDN-AI 官方账号 2023-08-31 18:54
    关注

    【以下回答由 GPT 生成】

    问题中的报错信息是由于Table sink 'default_catalog.default_database.kafka_t4'不支持消费更新和删除的更改,而节点TableSourceScan(table=[['default_catalog', 'default_database', 'cdc_t4']], fields=[id, name])生成了此类更改。这意味着您所使用的Kafka表不支持CDC的更改日志格式。为了解决这个问题,您可以尝试使用upsert-kafka来正确处理更改数据。

    upsert-kafka是Flink的一个表连接器,可以正确地处理来自CDC的更改数据并写入Kafka。下面是如何在您的代码中使用upsert-kafka的例子:

    首先,确保您的项目的pom.xml文件包含了Flink对应版本的upsert-kafka依赖。例如:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>对应版本</version>
    </dependency>
    

    其次,替换您的代码中Kafka表的创建部分,使用upsert-kafka连接器,并为其指定主键字段。如下所示:

    tableEnv.executeSql(
        "CREATE TABLE kafka_t4 (\n" +
        "    id bigint,\n" +
        "    name varchar,\n" +
        "    PRIMARY KEY (id) NOT ENFORCED\n" +
        ") WITH (\n" +
        "    'connector' = 'upsert-kafka',\n" +
        "    'topic' = 't4',\n" +
        "    'properties.bootstrap.servers' = 'localhost:9092',\n" +
        "    'properties.group.id' = 'testGroup',\n" +
        "    'format' = 'json',\n" +
        "    'key.fields' = 'id'  // 指定主键字段\n" +
        ")"
    );
    

    在这里,'connector' = 'upsert-kafka'表示使用upsert-kafka连接器,'key.fields' = 'id'指定了您的表的主键字段是'id'。使用upsert-kafka连接器,Flink将正确地处理来自CDC的更改数据,并将其写入Kafka。

    请注意,您需要根据您使用的Flink和upsert-kafka版本来选择相应的依赖和配置。

    希望这个解决方案对您有所帮助,如果还有其他问题请随时提问。



    【相关推荐】



    如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^
    评论

报告相同问题?

问题事件

  • 创建了问题 8月31日

悬赏问题

  • ¥15 下载ctorch报错,求解
  • ¥15 如何入门学习c语言,单片机
  • ¥15 idea 编辑语言的选择
  • ¥15 Windows下部署Asmjit
  • ¥15 请问双层规划模型的上下层目标函数不一致,是如何保证迭代收敛性的
  • ¥15 微信小程序 前端页面内容搜索
  • ¥15 cpu是如何判断当前指令已经执行完毕,然后去执行下条指令的
  • ¥15 安装visual studio2022时visualstudiosetup启动不了,闪退。问题代号0x0和0x1389
  • ¥30 java spring boot2.5.3版本websocket连不上
  • ¥15 angular js调外部链接查看pdf