不想变秃。。。 2020-08-09 10:40 采纳率: 0%
浏览 14

Flume自定义sink报异常错误!!!

求求各路大神指点指点孩子吧,心态崩了,555555555

Flume自定义sink报如下图异常错误
图片说明
但是查看了自己的代码,配置文件,还有启动命令感觉一点问题都没。。。

代码:
package cn.hrb.sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink1 extends AbstractSink implements Configurable {

// 得到logger对象
private Logger logger = LoggerFactory.getLogger(MySink1.class); //!!!

private String prefix = null;
private String subfix = null;

// 定义配置信息
public void configure(Context context) {

    // 读取配置文件内容
    prefix = context.getString("prefix", "zsk-->");
    subfix = context.getString("subfix");
}

// 编写核心代码逻辑(把event写出去)
public Status process() throws EventDeliveryException {

    Status status = null;

    // 得到channel对象
    Channel channel = getChannel();

    // 得到事务对象
    Transaction transaction = channel.getTransaction();

    // 开启事务
    transaction.begin();

        try {

            // take() called when transaction is NEW!
            // 从channel对象中拿事件
            Event takeEvent = channel.take();

            // 判断channel中是否有事件
            if(takeEvent != null) {

                // 解析事件并写出
                String data = new String(takeEvent.getBody());

                logger.info(prefix + data + subfix);
            }

            // 提交事务
            transaction.commit();
            status = Status.READY;

        } catch (Exception e) {

            // 有异常回滚
            transaction.rollback();
            status = Status.BACKOFF;

            e.printStackTrace();

        } finally {

            // 关闭事务
            transaction.close();
        }


    return status;
}

}

配置文件(/opt/module/flume/case3/sink/flume1.conf)

Name the components on this agent

a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

Describe the sink

a1.sinks.k1.type = cn.hrb.sink.MySink1
a1.sinks.k1.prefix = begin-
a1.sinks.k1.subfix = -end

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动命令
bin/flume-ng agent -c conf/ -f case3/sink/flume1.conf -n a1 -Dflume.root.logger=INFO,console

  • 写回答

1条回答 默认 最新

  • 会唱歌的大笨象 2024-07-25 10:50
    关注

    帮你优化了一下代码:

    package cn.hrb.sink;
    
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MySink1 extends AbstractSink implements Configurable {
    
        // 得到logger对象
        private static final Logger logger = LoggerFactory.getLogger(MySink1.class); //!!!
        
        private String prefix = null;
        private String suffix = null; // 修改这里
        
        // 定义配置信息
        public void configure(Context context) {
            // 读取配置文件内容
            prefix = context.getString("prefix", "zsk-->");
            suffix = context.getString("suffix"); // 修改这里
        }
        
        // 编写核心代码逻辑(把event写出去)
        public Status process() throws EventDeliveryException {
            Status status = null;
            
            // 得到channel对象
            Channel channel = getChannel();
            
            // 得到事务对象
            Transaction transaction = channel.getTransaction();
            
            // 开启事务
            transaction.begin();
            
            try {
                // 从channel对象中拿事件
                Event takeEvent = channel.take();
                
                // 判断channel中是否有事件
                if (takeEvent != null) {
                    // 解析事件并写出
                    String data = new String(takeEvent.getBody());
                    logger.info(prefix + data + suffix); // 修改这里
                }
                
                // 提交事务
                transaction.commit();
                status = Status.READY;
                
            } catch (Exception e) {
                // 有异常回滚
                transaction.rollback();
                status = Status.BACKOFF;
                logger.error("Exception in MySink1 process", e);
            } finally {
                // 关闭事务
                transaction.close();
            }
            
            return status;
        }
    }
    
    
    
    评论

报告相同问题?

悬赏问题

  • ¥15 需要手写数字信号处理Dsp三个简单题 不用太复杂
  • ¥15 数字信号处理考试111
  • ¥100 关于#audobe audition#的问题,如何解决?
  • ¥15 allegro17.2生成bom表是空白的
  • ¥15 请问一下怎么打通CAN通讯
  • ¥20 如何在 rocky9.4 部署 CDH6.3.2?
  • ¥35 navicat将excel中的数据导入mysql出错
  • ¥15 rt-thread线程切换的问题
  • ¥15 高通uboot 打印ubi init err 22
  • ¥15 R语言中lasso回归报错