银河小分队 2021-11-05 18:14 采纳率: 0%
浏览 155
已结题

Flink1.12本地window环境写hdfs/hadoop

我想用flink 直接写入HDFS,但是总是报错。代码如下行

//然后建将官网写好的代码复制到这个平台
        String path = "hdfs:///hdfs-01:9000/tmp/flink";

        //必须要设置,检查点10秒钟
        env.enableCheckpointing(10000);

        //配置文件的滚动策略
        DefaultRollingPolicy<String, String> defaultRollingPolicy = DefaultRollingPolicy.builder()
                .withRolloverInterval(TimeUnit.MINUTES.toMillis(30))//多长时间运行一个文件  秒
                .withInactivityInterval(TimeUnit.MINUTES.toMillis(10))//多长时间没有写入就生成一个文件
                .withMaxPartSize(1024 * 1024)
                .build();

//        OnCheckpointRollingPolicy.build().shouldRollOnEvent()

        OutputFileConfig outputFileConfig = OutputFileConfig
                .builder()
                .withPartPrefix("log")
                .withPartSuffix(".txt")
                .build();
        //按照行
        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8")) //编译编码器,不支持压缩
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH")) //分桶,分文件夹
                .withRollingPolicy(defaultRollingPolicy)//配置文件的滚动策略
                .withOutputFileConfig(outputFileConfig)//文件名配置,
                .build();

pom.xml已经加入了

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.8.3-10.0</version>
        </dependency>

报错如下:
java.io.IOException: The given file system URI (hdfs:///hdfs-01:9000/tmp/flink) did not describe the authority (like for example HDFS NameNode address/port or S3 host). The attempt to use a configured default authority failed: Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') contains no valid authority component (like hdfs namenode, S3 host, etc)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:158) ~[flink-hadoop-fs-1.12.4.jar:1.12.4]
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:527) ~[flink-core-1.12.4.jar:1.12.4]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:408) ~[flink-core-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink.java:288) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:298) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:469) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:545) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:535) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:575) ~[flink-streaming-java_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) [flink-runtime_2.12-1.12.4.jar:1.12.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) [flink-runtime_2.12-1.12.4.jar:1.12.4]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]

  • 写回答

0条回答 默认 最新

    报告相同问题?

    问题事件

    • 系统已结题 11月13日
    • 创建了问题 11月5日

    悬赏问题

    • ¥15 HLs设计手写数字识别程序编译通不过
    • ¥15 Stata外部命令安装问题求帮助!
    • ¥15 从键盘随机输入A-H中的一串字符串,用七段数码管方法进行绘制。提交代码及运行截图。
    • ¥15 TYPCE母转母,插入认方向
    • ¥15 如何用python向钉钉机器人发送可以放大的图片?
    • ¥15 matlab(相关搜索:紧聚焦)
    • ¥15 基于51单片机的厨房煤气泄露检测报警系统设计
    • ¥15 Arduino无法同时连接多个hx711模块,如何解决?
    • ¥50 需求一个up主付费课程
    • ¥20 模型在y分布之外的数据上预测能力不好如何解决