2301_79564221 2024-03-29 09:14 采纳率: 0%
浏览 133
已结题

flink cdc无法实时同步mysql数据

我用这个代码 同步mysql的数据到另一个mysql 只会同步一次 第一次同步完成后后续源表变更目标表没有变化 flink版本是1.13.6 cdc是1.4.0。执行完insert后,web ui日志没有任何报错。用了其他的库尝试是没有问题的,怀疑可能是mysql的binlog日志有问题,但不知道如何解决。。mysql的binlog日志是已经打开了的。
这是代码

img

这是我需要同步的那个库,数据库配置情况

img

img

本来的需求是mysql实时同步mysql,来来回回更换flink和cdc的版本都没有解决,昨天尝试了一下其他的库随便搞了点数据发现是可以正常同步的,不管是mysql或者kafka,刚刚好就是我需要同步的那个库无法正常同步,出现上面的情况。

  • 写回答

19条回答 默认 最新

  • 专家-赤兔[在线] 优质创作者: 编程框架技术领域 2024-03-29 09:14
    关注

    引自免费微信小程序:皆我百晓生

    根据您提供的信息,您正在尝试使用Flink CDC从MySQL数据库实时同步数据到另一个MySQL数据库,但首次同步之后,后续的源表变更并未反映到目标表中。首先,请允许我纠正并梳理一下您提供的配置信息,并提供可能的问题排查步骤及解决方案。

    1. 源表配置(MySQL CDC): 您的配置中似乎缺少了关键的CDC相关配置项,例如debezium.snapshot.modedatabase.history等,这可能导致Flink无法正确监听MySQL binlog事件。以下是修复后的源表配置示例:

      CREATE TABLE flink_cdc (
          name string,
          sex string,
          age string,
          PRIMARY KEY (name) NOT ENFORCED
      ) WITH (
          'connector' = 'debezium-mysql',
          'hostname' = 'mysgl-cds',
          'port' = '3306',
          'username' = 'root',
          'password' = '<your_password>',
          'database-name' = 'museum',
          'table-name' = 'flink_cdc',
          'debezium.snapshot.mode' = 'initial',  -- 或者 'schema_only'/'latest'
          'database.history' = 'io.debezium.relational.history.FileDatabaseHistory',
          'database.history.file.filename' = '/path/to/debezium-history.log'  -- 配置Debezium历史记录文件路径
      );
      
    2. 目标表配置(MySQL Sink): 确保目标表配置无误,特别是URL和driver等信息要完整。

      CREATE TABLE flink_test (
          name string,
          sex string,
          age string,
          PRIMARY KEY (name) NOT ENFORCED
      ) WITH (
          'connector' = 'jdbc',
          'url' = 'jdbc:mysql://<target_mysql_host>:<target_mysql_port>/<target_database_name>',
          'driver' = 'com.mysql.jdbc.Driver',
          'username' = 'root',
          'password' = '<your_password>',
          'table-name' = 'flink_test'
      );
      
    3. 检查MySQL CDC源

      • 确认MySQL的binlog格式设置为ROW模式。
      • 检查MySQL用户是否具有足够的权限来读取binlog。
      • 查看Flink CDC任务的日志,确认是否存在与连接MySQL CDC相关的错误或警告。
    4. 验证实时同步逻辑: 修正配置后,确保使用INSERT/UPDATE/DELETE语句在源MySQL上进行变更测试,并观察目标MySQL表是否实时更新。

    5. 运行Flink作业: 使用INSERT INTO ... SELECT语法将CDC数据流式插入目标表。

      INSERT INTO flink_test SELECT * FROM flink_cdc;
      
    6. 监控与调试: 使用Flink Web UI查看作业状态,以及checkpoint和watermark的进度,确保作业健康且无异常。

    如果按照以上步骤操作后仍然存在问题,请检查目标MySQL服务器的连接和写入权限,并考虑进一步深入到Flink CDC插件的详细日志中查找线索。同时,确保所使用的MySQL驱动版本与MySQL服务器兼容。如果问题依然存在,可以尝试升级或降级Flink CDC插件版本以排除版本不兼容性问题。

    评论 编辑记录

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 4月2日
  • 修改了问题 3月29日
  • 创建了问题 3月29日