我想用flink 直接写入HDFS,但是总是报错。代码如下行
//然后建将官网写好的代码复制到这个平台
String path = "hdfs:///hdfs-01:9000/tmp/flink";
//必须要设置,检查点10秒钟
env.enableCheckpointing(10000);
//配置文件的滚动策略
DefaultRollingPolicy<String, String> defaultRollingPolicy = DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(30))//多长时间运行一个文件 秒
.withInactivityInterval(TimeUnit.MINUTES.toMillis(10))//多长时间没有写入就生成一个文件
.withMaxPartSize(1024 * 1024)
.build();
// OnCheckpointRollingPolicy.build().shouldRollOnEvent()
OutputFileConfig outputFileConfig = OutputFileConfig
.builder()
.withPartPrefix("log")
.withPartSuffix(".txt")
.build();
//按照行
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8")) //编译编码器,不支持压缩
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH")) //分桶,分文件夹
.withRollingPolicy(defaultRollingPolicy)//配置文件的滚动策略
.withOutputFileConfig(outputFileConfig)//文件名配置,
.build();
pom.xml已经加入了
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.8.3-10.0</version>
</dependency>
报错如下:
java.io.IOException: The given file system URI (hdfs:///hdfs-01:9000/tmp/flink) did not describe the authority (like for example HDFS NameNode address/port or S3 host). The attempt to use a configured default authority failed: Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') contains no valid authority component (like hdfs namenode, S3 host, etc)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:158) ~[flink-hadoop-fs-1.12.4.jar:1.12.4]
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527) ~[flink-core-1.12.4.jar:1.12.4]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408) ~[flink-core-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) [flink-runtime_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) [flink-runtime_2.12-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]