liulishc 2022-05-28 10:13 采纳率: 0%
浏览 39

hdfs sink报错

问题遇到的现象和发生背景

flume hdfs sink 同步kafka主题诗句到hdfs总出错误

问题相关代码,请勿粘贴截图

flume配置文件

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 2000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = mini02:9092,mini03:9092,mini04:9092
a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.liuli.flume.interceptor.TimeStampInterceptor$Builder

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false

#控制生成的小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
## a1.sinks.k1.hdfs.fileType = CompressedStream
## a1.sinks.k1.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
运行结果及报错内容
2022-05-28 09:57:54,230 ERROR hdfs.HDFSEventSink: process failed
java.lang.NoSuchMethodError: org.apache.hadoop.io.retry.RetryUtils.getDefaultRetryPolicy(Lorg/apache/hadoop/conf/Configuration;Ljava/lang/String;ZLjava/lang/String;Ljava/lang/String;Ljava/lang/Class;)Lorg/apache/hadoop/io/retry/RetryPolicy;
    at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:318)
    at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:235)
    at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:139)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:510)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:453)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:136)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3354)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:255)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:247)
    at org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:727)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:724)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
我的解答思路和尝试过的方法

切换了几种其他的sink都没问题,只有hdfs sink出错

我想要达到的结果

帮忙解决问题

  • 写回答

2条回答 默认 最新

  • liulishc 2022-05-28 11:37
    关注

    是的,主要是org.apache.hadoop.io.retry.RetryUtils.getDefaultRetryPolicy方法找不到,hadoop-common.jar中有这个方法只是参数不一致,我也搞不懂flume怎么调用这个方法的。
    下面是hadoop-common.jar中getDefaultRetryPolicy方的参数列表和调用的不一致
    public static RetryPolicy getDefaultRetryPolicy(
    Configuration conf,
    String retryPolicyEnabledKey,
    boolean defaultRetryPolicyEnabled,
    String retryPolicySpecKey,
    String defaultRetryPolicySpec,
    String remoteExceptionToRetry)

    评论

报告相同问题?

问题事件

  • 创建了问题 5月28日

悬赏问题

  • ¥15 vs2019中数据导出问题
  • ¥20 云服务Linux系统TCP-MSS值修改?
  • ¥20 关于#单片机#的问题:项目:使用模拟iic与ov2640通讯环境:F407问题:读取的ID号总是0xff,自己调了调发现在读从机数据时,SDA线上并未有信号变化(语言-c语言)
  • ¥20 怎么在stm32门禁成品上增加查询记录功能
  • ¥15 Source insight编写代码后使用CCS5.2版本import之后,代码跳到注释行里面
  • ¥50 NT4.0系统 STOP:0X0000007B
  • ¥15 想问一下stata17中这段代码哪里有问题呀
  • ¥15 flink cdc无法实时同步mysql数据
  • ¥100 有人会搭建GPT-J-6B框架吗?有偿
  • ¥15 求差集那个函数有问题,有无佬可以解决