(*^_^*)静一静 2025-09-11 17:26 采纳率: 18.8%
浏览 14

flinkCDC同步数据遇到的问题

一flinkcdc 遇到的问题1,有点看不懂是什么问题
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:216)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 49 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at org.apache.flink.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:86)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1545)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)

Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1757544595000, eventType=EXT_WRITE_ROWS, serverId=153, headerLength=19, dataLength=7807, nextPosition=17737908, flags=0}
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1489)

Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1757544595000, eventType=EXT_WRITE_ROWS, serverId=153, headerLength=19, dataLength=7807, nextPosition=17737908, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051)

Caused by: java.io.EOFException: Failed to read next byte from position 198195464
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:213)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:52)
at io.debezium.connector.mysql.RowDeserializers.deserializeVarString(RowDeserializers.java:263)
at io.debezium.connector.mysql.RowDeserializers$WriteRowsDeserializer.deserializeVarString(RowDeserializers.java:192)
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataDeserializer.java:189)
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:143)
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64)
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56)
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:335)

二flinkcdc 遇到的问题2,有点看不懂是什么问题

java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:216)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 44 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50)
at org.apache.flink.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:86)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1545)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)

Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1757544659000, eventType=EXT_WRITE_ROWS, serverId=153, headerLength=19, dataLength=8168, nextPosition=315969389, flags=0}
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1489)

Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1757544659000, eventType=EXT_WRITE_ROWS, serverId=153, headerLength=19, dataLength=8168, nextPosition=315969389, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:341)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:244)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:259)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1051)

Caused by: java.io.EOFException: Failed to read remaining 8 of 11 bytes from position 29506360. Block length: 5843. Initial block length: 8164.
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.fill(ByteArrayInputStream.java:115)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:105)
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeNewDecimal(AbstractRowsEventDataDeserializer.java:258)
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeCell(AbstractRowsEventDataDeserializer.java:169)
at com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer.deserializeRow(AbstractRowsEventDataDeserializer.java:143)
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserializeRows(WriteRowsEventDataDeserializer.java:64)
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:56)
at com.github.shyiko.mysql.binlog.event.deserialization.WriteRowsEventDataDeserializer.deserialize(WriteRowsEventDataDeserializer.java:32)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:335)

  • 写回答

4条回答 默认 最新

  • 宝码香车 优质创作者: JavaScript技术领域 2025-09-11 17:28
    关注

    让【宝码香车】来帮您解答,本回答参考DeepSeek编写,并整理提供。

    如果答案让您满意,请采纳、点赞、关注,非常感谢!
    .
    每日一句正能量,让它带给您好心情:与其羡慕别人,不如专注自己,你也在闪闪发光。
    .

    您好,根据您提供的两个异常堆栈信息,问题非常明确。这两个问题本质上是同一个核心原因,都是 Flink CDC 在读取 MySQL 的 binlog 时,遇到了无法正确解析(反序列化)的事件数据

    这两个错误都指向了 Debezium(Flink CDC 底层使用的框架)在解析 MySQL 的 binlog 事件时失败。

    问题核心分析

    根本原因是:MySQL 服务器上的二进制日志(binlog)中包含了无法被 Debezium 连接器正常解析的数据。

    具体到堆栈信息:

    1. 问题一 的最终错误是 java.io.EOFException: Failed to read next byte from position 198195464。这通常意味着 binlog 文件在预期的位置之后已经没有数据可读了,可能的原因是 binlog 文件本身出现了损坏或不完整
    2. 问题二 的最终错误是 java.io.EOFException: Failed to read remaining 8 of 11 bytes from position 29506360...。这个错误更具体,它发生在尝试解析一个 NEWDECIMAL 类型的数据时。这强烈暗示 MySQL 表中某条记录的 DECIMALNUMERIC 类型字段包含的数据,与表结构定义(如精度、标度)不匹配,或者该字段的数据在 binlog 中的存储格式异常,导致解析器无法正确读取。

    解决方案

    这是一个比较棘手的问题,通常需要从 MySQL 服务器端入手解决。以下是几个排查和解决的步骤,请按顺序尝试:

    1. 检查并修复 MySQL 表结构(尤其是问题二)

    这是最可能的原因。请检查 Flink CDC 任务正在同步的表中,所有 DECIMALNUMERIC 类型的字段。

    • 确认精度和标度:确保应用程序写入的数据没有超过字段定义的精度(总位数)和标度(小数位数)。例如,一个定义为 DECIMAL(10, 2) 的字段,无法正确存储 12345678901.23(11位整数)或 123.456(3位小数)。
    • 修复数据:如果存在不符合定义的数据,需要先清理或修复这些异常数据。

    2. 跳过损坏的 binlog 事件(临时解决方案)

    如果问题是由于某个特定的、无法修复的 binlog 事件引起的,你可以配置 Flink CDC 跳过这个事件,从下一个位点开始读取。这只是一个临时恢复任务的方法,你会丢失这个事件对应的数据变更

    在你的 Flink CDC 源代码中,添加 debezium 相关的配置选项:

    MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(3306)
        .databaseList("yourDatabase")
        .tableList("yourDatabase.yourTable")
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema())
        // 添加以下配置来跳过无法解析的事件
        .debeziumProperties(
            PropertiesUtil.createDebeziumProperties()
                .setProperty("event.deserialization.failure.handling.mode", "skip")
                // 或者更安全的模式:记录警告后跳过
                // .setProperty("event.deserialization.failure.handling.mode", "warn")
                .toProperties()
        )
        .build();
    

    注意:使用 skip 模式会静默忽略错误。建议先使用 warn 模式运行,观察日志,确认跳过的正是那个有问题的事件(通过 nextPosition=315969389 等信息定位),然后再决定是否使用 skip

    3. 重新定位 Starting Offset

    如果跳过事件不起作用,或者损坏的范围较大,可以考虑让任务从一个新的 binlog 位点开始读取,放弃损坏位点之前的所有历史数据。这通常用于测试或可以接受数据丢失的场景。

    • 查看当前错误堆栈中的 nextPosition(例如 nextPosition=315969389),记住这个值。
    • 在你的 Flink CDC 任务配置中,设置一个大于这个错误位点的起始位点(scan.startup.mode = specific-offset)或 GTID。
    .startupOptions(StartupOptions.specificOffset("binlog-filename.xxx", 315970000L)) // 使用一个比错误位点稍大的值
    // 或者使用时间戳
    // .startupOptions(StartupOptions.timestamp(1757544659000L + 10000L))
    

    4. 终极方案:重新创建 MySQL 复制环境

    如果以上方法都无法解决,极有可能是 MySQL 服务器的 binlog 文件本身出现了物理损坏。这是最严重的情况。

    1. 停止 Flink CDC 任务。
    2. 联系 DBA 或自行对 MySQL 源库进行修复。这可能涉及:
    • 重启 MySQL 服务。
    • 执行 mysqlcheck 等工具来修复表。
    • 最彻底的方法是:搭建一个新的从库,通过物理备份恢复一个健康的副本,然后将 Flink CDC 的同步源指向这个新的、健康的数据库实例。

    总结

    问题特征可能原因推荐解决方案
    EOFException 读取字节失败Binlog 文件损坏1. 尝试跳过事件 (方案2)
    2. 重新定位位点 (方案3)
    3. 修复或重建 MySQL (方案4)
    解析 NEWDECIMAL 等特定类型时失败表数据与结构定义不符1. 检查并修复表数据 (方案1)
    2. 跳过事件 (方案2)

    建议您首先从方案1开始排查,检查相关表的 DECIMAL 字段和数据。如果问题依旧,再尝试方案2作为临时恢复手段。如果问题持续出现,则可能需要考虑方案4。

    评论
    1人已打赏

报告相同问题?

问题事件

  • 创建了问题 9月11日