学成大数据前不改名! 2024-08-07 11:12 采纳率: 0%
浏览 17
已结题

flume采集日志信息到hdfs

你好,我想问下,我使用flume采集redis的日志信息到hdfs上,但在传输过程中出错了,只能传输两份日志。删掉flume重新下载也只能传输两份数据
问题代码块如下

2024-08-07 10:53:39,237 (PollableSourceRunner-TaildirSource-r1) [INFO - org.apache.flume.source.taildir.ReliableTaildirEventReader.readEvents(ReliableTaildirEventReader.java:192)] Last read was never committed - resetting position
2024-08-07 10:53:39,238 (PollableSourceRunner-TaildirSource-r1) [ERROR - org.apache.flume.source.taildir.TaildirSource.process(TaildirSource.java:247)] Unable to tail files
com.alibaba.fastjson.JSONException: not close json text, token : string
        at com.alibaba.fastjson.parser.DefaultJSONParser.close(DefaultJSONParser.java:1526)
        at com.alibaba.fastjson.JSON.parse(JSON.java:174)
        at com.alibaba.fastjson.JSON.parse(JSON.java:180)
        at com.alibaba.fastjson.JSON.parse(JSON.java:149)
        at com.alibaba.fastjson.JSON.parseObject(JSON.java:241)
        at cn.wolfcode.flume.interceptor.TimeStampInterceptor.intercept(TimeStampInterceptor.java:28)
        at cn.wolfcode.flume.interceptor.TimeStampInterceptor.intercept(TimeStampInterceptor.java:40)
        at org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62)
        at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:148)
        at org.apache.flume.source.taildir.TaildirSource.tailFileProcess(TaildirSource.java:276)
        at org.apache.flume.source.taildir.TaildirSource.process(TaildirSource.java:239)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:748)

配置文件

a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source 
a1.sources.r1.type = TAILDIR  
a1.sources.r1.filegroups = f1 
a1.sources.r1.filegroups.f1 = /bigdata/data/log/behavior/.*
a1.sources.r1.positionFile =  /bigdata/server/flume/position/behavior/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = cn.wolfcode.flume.interceptor.ETLInterceptor$Builder
a1.sources.r1.interceptors =  i2
a1.sources.r1.interceptors.i2.type = cn.wolfcode.flume.interceptor.TimeStampInterceptor$Builder
## channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /behavior/origin/log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 13421772800
a1.sinks.k1.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = DataStream  
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

conf/flume.env.sh文件只配置了java

export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"
export JAVA_HOME=/bigdata/server/jdk1.8

/etc/profile.d/custom.env.sh

#FLUME_HOME
export FLUME_HOME=/bigdata/server/flume
export PATH=$PATH:$FLUME_HOME/bin

找遍网上方法都不行,麻烦师傅帮忙解答谢谢

  • 写回答

26条回答 默认 最新

  • 专家-郭老师 Java领域新星创作者 2024-08-07 11:17
    关注
    获得0.30元问题酬金

    🕖上午好,题主!目前:🟢[在线]
    📢参考通义千问、由郭老师整理提供!
    📣今日诗号:我心匪石,不可转也。

    您好,根据您提供的错误信息和代码,我分析您在使用 Flume 采集 Redis 日志到 HDFS 时遇到的问题很可能与以下两方面有关:

    1. JSON 解析错误:

    错误信息 com.alibaba.fastjson.JSONException: not close json text, token : string 清晰地表明问题出在 JSON 数据解析环节。Flume 在处理您的日志时,尝试将其解析为 JSON 格式,但发现数据格式不完整或存在错误,导致解析失败。

    2. Flume 内部状态异常:

    Last read was never committed - resetting position 信息暗示 Flume 在读取文件时遇到了问题,导致之前读取的内容没有被正确提交。这可能与 JSON 解析错误间接相关,例如错误的 JSON 格式导致 Flume 无法确定读取位置。

    解决方案:

    1. 检查 Redis 日志格式:

    • 确认 Redis 生成的日志是否是标准 JSON 格式。
    • 检查日志文件中是否存在不完整的 JSON 字符串,例如缺少括号或引号。您可以使用在线 JSON 校验工具或命令行工具(如 jq) 进行验证。
    • 如果日志并非 JSON 格式,您需要修改 Flume 配置,使用合适的解析方式或自定义拦截器处理原始日志数据。

    2. 调整 Flume 配置和代码:

    • 错误处理: 考虑在您的 TimeStampInterceptor 中添加错误处理机制。当 com.alibaba.fastjson.JSONException 发生时,记录错误日志并跳过错误数据,避免阻塞整个 Flume Agent。
    • 文件读取: 尝试调整 a1.sources.r1.positionFile 的路径和文件名,确保 Flume 能够正确记录和读取文件读取位置。 您可以尝试删除之前的 position 文件,让 Flume 重新开始记录。
    • 日志级别: 为了更详细地排查问题,建议您在 Flume 配置文件中将日志级别调整为 DEBUG (logging.level = DEBUG),观察 Flume 运行时的详细日志信息,特别是有关文件读取和数据解析的部分。

    代码示例 (错误处理):

    package cn.wolfcode.flume.interceptor;
    
    import com.alibaba.fastjson.JSON;
    // ... 其他导入语句
    
    public class TimeStampInterceptor implements Interceptor {
        // ... 其他代码 ...
    
        @Override
        public Event intercept(Event event) {
            // ... 获取 event body ...
    
            try {
                // ... 使用 fastjson 解析 JSON 数据 ...
            } catch (com.alibaba.fastjson.JSONException e) {
                // 记录错误日志,例如:
                System.err.println("JSON 解析错误: " + e.getMessage());
                // 可选择性地跳过错误数据,返回 null 或原始 event
                return null; // 或 return event;
            }
    
            // ... 其他处理逻辑 ...
        }
        // ... 其他代码 ...
    }
    

    其他建议:

    • 尝试使用其他 JSON 解析库,例如 Gson 或 Jackson, 确保您选择的库与 Flume 版本兼容。
    • 如果问题仍然存在,请提供更详细的日志信息,包括 Flume Agent 的完整配置文件以及 Redis 生成的日志示例。

    希望以上信息能够帮助您解决问题。 如果您还有其他问题,请随时提出。

    评论

报告相同问题?

问题事件

  • 系统已结题 8月15日
  • 创建了问题 8月7日

悬赏问题

  • ¥15 CCF-CSP 2023 第三题 解压缩(50%)
  • ¥30 comfyui openpose报错
  • ¥20 Wpf Datarid单元格闪烁效果的实现
  • ¥15 图像分割、图像边缘提取
  • ¥15 sqlserver执行存储过程报错
  • ¥100 nuxt、uniapp、ruoyi-vue 相关发布问题
  • ¥15 浮窗和全屏应用同时存在,全屏应用输入法无法弹出
  • ¥100 matlab2009 32位一直初始化
  • ¥15 Expected type 'str | PathLike[str]…… bytes' instead
  • ¥15 三极管电路求解,已知电阻电压和三级关放大倍数