WJC_SX 2023-03-06 11:26 采纳率: 95%
浏览 58

Java代码提交Flink Job报错

Flink Job 通过Java代码提交报错,java.io.FileNotFoundException: /tmp/executionGraphStore-94c8652f-8743-4ba5-b138-21c437af0250/997044085d23299e9580d424b5b72559 (No such file or directory)

```java

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import java.io.File;
import java.util.concurrent.CompletableFuture;

public class FlinkJobSumbit3 {

    public static void main(String[] args) {

        String jarFilePath = "D:\\BaiduNetdiskDownload\\CDC-1.0-SNAPSHOT-jar-with-dependencies.jar";
        RestClusterClient<StandaloneClusterId> client = null;
        try {
            // 集群信息
            Configuration configuration = new Configuration();
            configuration.setString(JobManagerOptions.ADDRESS, "172.16.2.131");
            configuration.setInteger(JobManagerOptions.PORT, 6123);
            configuration.setInteger(RestOptions.PORT, 8081);
            client = new RestClusterClient<StandaloneClusterId>(configuration, StandaloneClusterId.getInstance());
            int parallelism = 1;
            File jarFile=new File(jarFilePath);
            SavepointRestoreSettings savepointRestoreSettings=SavepointRestoreSettings.none();
            PackagedProgram program = PackagedProgram.newBuilder()
                    .setConfiguration(configuration)
                    .setEntryPointClassName("FlinkCDCToMySQL")
                    .setJarFile(jarFile)
                    .setArguments("[{\"TableId\":1292101841059840,\"TableName\":\"sys_job_collect_copy_copy1\",\"StepType\":\"read\",\"Model\":\"Full\",\"DataSourceId\":1305401857409024},{\"TableId\":1305401903546382,\"TableName\":\"sys_job_collect_copy_copy1_copy1\",\"StepType\":\"writer\",\"DataSourceId\":1305401857409024}]").build();
                    //.setSavepointRestoreSettings(savepointRestoreSettings).build();
            JobGraph jobGraph= PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
            CompletableFuture<JobID> result = client.submitJob(jobGraph);
            JobID jobId=  result.get();
            System.out.println("提交完成");
            System.out.println("jobId:"+ jobId.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

报错信息:
2023-03-06 10:45:15,324 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 997044085d23299e9580d424b5b72559 (FlinkCDCWithKaPro).
2023-03-06 10:45:15,324 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 997044085d23299e9580d424b5b72559 (FlinkCDCWithKaPro).
2023-03-06 10:45:15,326 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_36 .
2023-03-06 10:45:15,326 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job FlinkCDCWithKaPro (997044085d23299e9580d424b5b72559).
2023-03-06 10:45:15,327 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for FlinkCDCWithKaPro (997044085d23299e9580d424b5b72559).
2023-03-06 10:45:15,328 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 997044085d23299e9580d424b5b72559 reached globally terminal state FAILED.
2023-03-06 10:45:15,328 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Could not store completed job FlinkCDCWithKaPro(997044085d23299e9580d424b5b72559).
**java.io.FileNotFoundException: /tmp/executionGraphStore-94c8652f-8743-4ba5-b138-21c437af0250/997044085d23299e9580d424b5b72559 (No such file or directory)**
    at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_333]
    at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_333]
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213) ~[?:1.8.0_333]
    at java.io.FileOutputStream.<init>(FileOutputStream.java:162) ~[?:1.8.0_333]
    at org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.storeArchivedExecutionGraph(FileArchivedExecutionGraphStore.java:276) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore.put(FileArchivedExecutionGraphStore.java:198) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.dispatcher.Dispatcher.archiveExecutionGraph(Dispatcher.java:855) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.dispatcher.Dispatcher.jobReachedGloballyTerminalState(Dispatcher.java:848) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:446) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) ~[?:1.8.0_333]
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) ~[?:1.8.0_333]
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_333]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.12.2.jar:1.12.2]
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.12.2.jar:1.12.2]

```

  • 写回答

2条回答 默认 最新

  • 程序yang 全栈领域优质创作者 2023-03-06 11:33
    关注

    参考:
    检查 Flink 的配置,确保 executionGraphStore 相关的配置项正确设置。

    确认 Flink 安装包中是否包含 /tmp/executionGraphStore-94c8652f-8743-4ba5-b138-21c437af0250 目录。如果没有,则手动创建该目录即可。

    如果以上两个方法都不行,那么尝试重新安装 Flink 或者在官网中寻找解决方法。

    评论

报告相同问题?

问题事件

  • 创建了问题 3月6日

悬赏问题

  • ¥15 不是,这到底错哪儿了😭
  • ¥15 2020长安杯与连接网探
  • ¥15 关于#matlab#的问题:在模糊控制器中选出线路信息,在simulink中根据线路信息生成速度时间目标曲线(初速度为20m/s,15秒后减为0的速度时间图像)我想问线路信息是什么
  • ¥15 banner广告展示设置多少时间不怎么会消耗用户价值
  • ¥16 mybatis的代理对象无法通过@Autowired装填
  • ¥15 可见光定位matlab仿真
  • ¥15 arduino 四自由度机械臂
  • ¥15 wordpress 产品图片 GIF 没法显示
  • ¥15 求三国群英传pl国战时间的修改方法
  • ¥15 matlab代码代写,需写出详细代码,代价私