(*^_^*)静一静 2025-09-05 10:12 采纳率: 18.8%
浏览 6

flink checkpoint失败

flink监听mysql的binlog的日志做实时计算,其中有一个任务,表数据只有2万,但是每次checkpoint总是失败,其他的任务没有这种情况:

2025-09-05 09:49:18,724 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 114 for job 07a364cb8fea5fb1953fd9fb148b8925. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2346) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_361]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_361]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_361]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_361]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_361]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_361]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_361]
2025-09-05 09:49:18,725 INFO org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] - checkpoint request time in queue: 660001
2025-09-05 09:49:18,727 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. The latest checkpoint failed due to Checkpoint expired before completing., view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:212) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2281) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2260) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$1200(CheckpointCoordinator.java:102) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2346) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:70) ~[flink-dist-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_361]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_361]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_361]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_361]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_361]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_361]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_361]
2025-09-05 09:49:18,728 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 4 tasks will be restarted to recover from a global failure.
2025-09-05 09:49:18,728 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job SapOrderInfo-rt_dwh_ods.ODS_S001_BUT000 (07a364cb8fea5fb1953fd9fb148b8925) switched from state RUNNING to RESTARTING.
2025-09-05 09:49:18,728 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 115 for job 07a364cb8fea5fb1953fd9fb148b8925. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.stopCheckpointScheduler(CheckpointCoordinator.java:2068) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator.jobStatusChanges(CheckpointCoordinatorDeActivator.java:49) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifyJobStatusChange(DefaultExecutionGraph.java:1609) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1167) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.transitionState(DefaultExecutionGraph.java:1139) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.scheduler.SchedulerBase.transitionExecutionGraphState(SchedulerBase.java:601) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.addVerticesToRestartPending(DefaultScheduler.java:386) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasksWithDelay(DefaultScheduler.java:362) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeRestartTasks(DefaultScheduler.java:330) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:325) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:57) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.failGlobal(DefaultExecutionGraph.java:1092) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph$1.lambda$failJob$0(DefaultExecutionGraph.java:477) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:460) ~[flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:460) ~[flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:225) ~[flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88) ~[flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174) ~[flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245) [flink-rpc-akka365eff67-1c9c-4c09-a6d0-3bcb63efea84.jar:1.20.1]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_361]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) [?:1.8.0_361]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) [?:1.8.0_361]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) [?:1.8.0_361]

  • 写回答

4条回答 默认 最新

  • 紫雾凌寒 新星创作者: 人工智能技术领域 2025-09-05 10:12
    关注

    让 “紫雾凌寒” 来帮你回答,本回答结合 DeepSeek

    如果还有问题,请评论或点击头像私信,如果有帮助到您, 请关注、采纳、点赞

    1. 关键点分析:
      - 该任务专门监听MySQL的binlog日志进行实时计算。
      - 表数据量仅2万,但checkpoint总是失败,而其他任务正常。
      - 提供了一个时间戳“2025-09-05 09:49:18,724”,但信息有限,推测可能是checkpoint失败时的时间记录。
    2. 解决方案:
      - 检查网络连接
      • 确保Flink集群与MySQL之间的网络稳定。可以通过ping命令测试网络连通性。例如,在Flink集群的节点上执行“ping ”。
      • 检查防火墙设置,确保相关端口开放。Flink连接MySQL可能使用特定的端口(如MySQL的3306端口),要保证该端口在防火墙中允许通过。
      • 查看Checkpoint详细日志
      • Flink的Checkpoint失败通常会在日志中记录更详细的原因。在Flink的日志目录中查找相关任务的日志文件,例如“/taskexecutor-*.log”。
      • 日志中可能会显示诸如“无法连接到MySQL”“读取binlog失败”等具体错误信息。根据这些信息针对性解决问题。
      • 检查MySQL配置
      • 确认MySQL的binlog配置正确。检查“log_bin”参数是否开启,以及binlog的格式是否与Flink兼容。
      • 查看MySQL的资源使用情况,如CPU、内存等。如果资源紧张,可能会影响binlog的读取。可以通过MySQL的监控工具查看相关指标。
      • 调整Checkpoint配置
      • 适当增加Checkpoint的间隔时间。例如,在Flink的配置文件中修改“checkpointing.interval”参数。
      • 减小Checkpoint的超时时间。修改“checkpointing.timeout”参数,避免因长时间等待导致失败。

    以下是一个简单的代码示例,展示如何在Flink中配置Checkpoint间隔时间:
    java StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 设置Checkpoint间隔为5秒
    3. 多种解决方案优缺点:
    - 检查网络连接
    - 优点:简单直接,能够快速定位网络相关的问题,如果是网络问题可以迅速解决。
    - 缺点:只能解决网络层面的问题,如果问题出在其他方面则无法解决。
    - 查看Checkpoint详细日志
    - 优点:能获取到具体的失败原因,针对性强,便于准确解决问题。
    - 缺点:需要仔细查看日志,可能会花费较多时间在分析日志上。
    - 检查MySQL配置
    - 优点:从源头解决问题,如果是MySQL配置导致的,能彻底消除隐患。
    - 缺点:需要对MySQL有一定了解,配置修改可能会影响MySQL的其他功能,操作需谨慎。
    - 调整Checkpoint配置
    - 优点:相对简单,通过调整配置可能快速解决Checkpoint失败问题。
    - 缺点:可能只是暂时缓解问题,不能从根本上解决导致失败的原因。
    4. 总结:
    - 针对Flink监听MySQL binlog的任务Checkpoint失败问题,首先要从网络连接、日志查看、MySQL配置以及Checkpoint配置等方面进行排查。通过逐步检查和调整,找到并解决导致Checkpoint失败的具体原因,确保任务能够稳定运行。

    希望以上解答对您有所帮助。如果您有任何疑问,欢迎在评论区提出。

    评论

报告相同问题?

问题事件

  • 创建了问题 9月5日