问题遇到的现象和发生背景
用Flink SQL连接hive,执行tableEnv.executeSql("insert into table1 select * fromtable2"),本地运行正常,集群上执行per-job模式时如果select的查询结果为空,就会报临时文件staging_1646118395295 不存在,如果不为空就能正常执行。
问题相关代码,请勿粘贴截图
public class CodeCorDwmFour {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "ly_test";
String hiveConfDir = "/etc/hive/conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
tableEnv.useCatalog("myhive");
tableEnv.executeSql(
"insert overwrite dwm_mart.dwm_list_compoutcd_tmp_02" +
" select " +
" t.*" +
" from dwm_mart.dwm_tq_comp_codecor_e50_cr_tmp t \n " +
" left join dwm_mart.dwm_list_compoutcd_tmp_01 tmp on t.outcode = tmp.outcode and t.compname = tmp.compname \n" +
" where not (tmp.outcode is not null and tmp.compname is not null) ");
}
}
运行结果及报错内容
Caused by: java.io.FileNotFoundException: File hdfs://knowlegene/user/hive/warehouse/ly_test.db/stu/.staging_1646118395295 does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:697) ~[flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:2.6.5-10.0]
at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) ~[flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:2.6.5-10.0]
at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) ~[flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:2.6.5-10.0]
at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) ~[flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:2.6.5-10.0]
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:2.6.5-10.0]
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751) ~[flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:2.6.5-10.0]
at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[flink-connector-hive_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:137) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:93) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:91) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:148) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.vertexFinished(DefaultExecutionGraph.java:1088) ~[flink-dist_2.11-1.13.2.jar:1.13.2]
32 more
我的解答思路和尝试过的方法
我想要达到的结果
怎么才能解决这个问题,无论select * from table2 是否为空都可正常执行,至少不报错,让后续调度任务可以执行。