你好,我想问下,我使用flume采集redis的日志信息到hdfs上,但在传输过程中出错了,只能传输两份日志。删掉flume重新下载也只能传输两份数据
问题代码块如下
2024-08-07 10:53:39,237 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.readEvents(ReliableTaildirEventReader.java:192)] Last read was never committed - resetting position
2024-08-07 10:53:39,238 (PollableSourceRunner-TaildirSource-r1) [ERROR - org.apache.flume.source.taildir.TaildirSource.process(TaildirSource.java:247)] Unable to tail files
com.alibaba.fastjson.JSONException: not close json text, token : string
at com.alibaba.fastjson.parser.DefaultJSONParser.close(DefaultJSONParser.java:1526)
at com.alibaba.fastjson.JSON.parse(JSON.java:174)
at com.alibaba.fastjson.JSON.parse(JSON.java:180)
at com.alibaba.fastjson.JSON.parse(JSON.java:149)
at com.alibaba.fastjson.JSON.parseObject(JSON.java:241)
at cn.wolfcode.flume.interceptor.TimeStampInterceptor.intercept(TimeStampInterceptor.java:28)
at cn.wolfcode.flume.interceptor.TimeStampInterceptor.intercept(TimeStampInterceptor.java:40)
at org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:148)
at org.apache.flume.source.taildir.TaildirSource.tailFileProcess(TaildirSource.java:276)
at org.apache.flume.source.taildir.TaildirSource.process(TaildirSource.java:239)
at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
at java.lang.Thread.run(Thread.java:748)
配置文件
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /bigdata/data/log/behavior/.*
a1.sources.r1.positionFile = /bigdata/server/flume/position/behavior/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = cn.wolfcode.flume.interceptor.ETLInterceptor$Builder
a1.sources.r1.interceptors = i2
a1.sources.r1.interceptors.i2.type = cn.wolfcode.flume.interceptor.TimeStampInterceptor$Builder
## channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /behavior/origin/log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 13421772800
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStream
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
conf/flume.env.sh文件只配置了java
export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"
export JAVA_HOME=/bigdata/server/jdk1.8
/etc/profile.d/custom.env.sh
#FLUME_HOME
export FLUME_HOME=/bigdata/server/flume
export PATH=$PATH:$FLUME_HOME/bin
找遍网上方法都不行,麻烦师傅帮忙解答谢谢