废柴太乙 2024-05-12 18:28 采纳率: 0%
浏览 16

flume数据无法发送

请问FlumeRpcClientExample运行时遇到这种问题该怎么解决?

img

rg.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 0.0.0.0, port: 44444 }: Failed to send event
    at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:249)
    at org.example.readcsv.FlumeRpcClientExample.lambda$sendCSVFile$0(FlumeRpcClientExample.java:50)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 0.0.0.0, port: 44444 }: Avro RPC call returned Status: FAILED
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:390)
    at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:296)
    at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:237)
    ... 6 more
0

FlumeRpcClientExample的代码,数据无法发送,到while (!executor.isTerminated()) {} // 等待所有任务完成,就不能正常运行,求各位大神帮帮忙。

 
package org.example.readcsv;
 
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
 
import java.io.*;
import java.nio.file.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class FlumeRpcClientExample {
    private static final String FLUME_HOST = "0.0.0.0";
    private static final int[] FLUME_PORTS = {44444, 44445, 44446}; // 替换为Flume监听的端口
    private static final String FLUME_HEADERS = ""; // 添加需要的Flume头部信息
 
    public static void main(String[] args) {
        String folderPath = "E:\\app\\dist";
        sendCSVFile(folderPath);
    }
 
    private static void sendCSVFile(String folderPath) {
        try {
            File folder = new File(folderPath);
            File[] listOfFiles = folder.listFiles();
            RpcClient[] clients = new RpcClient[3];
            clients[0]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[0]);
            clients[1]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[1]);
            clients[2]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[2]);
//            首先发送所有已经有的数据
            int count;
            int total=0;
            ExecutorService executor = Executors.newFixedThreadPool(30); // 创建一个固定大小的线程池
            for (File file : listOfFiles) {
                if (file.isFile() && file.getName().endsWith(".csv")) {
                    executor.submit(() -> { // 在一个新的线程中执行
                        int count_port = 0;
                        try {
                            BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "gbk"));
                            String line = reader.readLine();
                            count_port = 0;
                            while (line != null) {
                                RpcClient client = clients[count_port % FLUME_PORTS.length];
                                Event flumeEvent = EventBuilder.withBody(line.getBytes());
                                client.append(flumeEvent);
                                line = reader.readLine();
                                count_port++;
                            }
                            reader.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println(count_port);
                    });
                }
            }
            executor.shutdown(); // 关闭线程池
            while (!executor.isTerminated()) {} // 等待所有任务完成
            System.out.println(total);
            for (File file : listOfFiles) {
                if (file.isFile() && file.getName().endsWith(".csv")) {
                    BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "gbk"));
                    String line = reader.readLine();
                    count=0;
                    while (line != null) {
                        System.out.println("当前的数据: " + line);
                        RpcClient client = clients[count % FLUME_PORTS.length];
                        Event flumeEvent = EventBuilder.withBody(line.getBytes());
                        client.append(flumeEvent);
                        line = reader.readLine();
                        count++;
                        total++;
                    }
                    reader.close();
                }
                System.out.println(total);
            }
 
 
            WatchService watchService = FileSystems.getDefault().newWatchService();
            // 注册要监视的路径和事件类型到WatchService对象中
            Path dirPath = folder.toPath();
            dirPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
            // 创建一个Map对象,用于存储每个文件的BufferedReader和上一次长度
            Map<String, BufferedReader> readers = new HashMap<>();
            Map<String, Long> lastLengths = new HashMap<>();
            while (true) {
                WatchKey watchKey = watchService.take();
                // 获取一个WatchEventList对象
                for (WatchEvent<?> event : watchKey.pollEvents()) {
                    // 判断事件类型
                    if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
                        // 如果是文件修改事件,则读取文件的新内容
                        Path filePath = dirPath.resolve((Path) event.context());
                        if (filePath.toString().endsWith(".csv")) {
                            File file = filePath.toFile();
                            long length = file.length();
                            if (!readers.containsKey(filePath.toString())) {
                                // 如果是新的文件,则创建一个新的BufferedReader对象
                                RandomAccessFile raf = new RandomAccessFile(file, "r");
                                readers.put(filePath.toString(), new BufferedReader(new InputStreamReader(new FileInputStream(raf.getFD()), "GBK")));
                                lastLengths.put(filePath.toString(), 0L);
                            }
                            BufferedReader reader;
                            long lastLength = lastLengths.get(filePath.toString());
                            if (length > lastLength) {
                                // 文件有新增的数据
                                RandomAccessFile raf = new RandomAccessFile(file, "r");
                                raf.seek(lastLength);  // 将文件指针移动到上一次读取的位置
                                reader = new BufferedReader(new InputStreamReader(new FileInputStream(raf.getFD()), "GBK"));  // 从上一次读取的位置开始创建一个新的BufferedReader对象
                                String line;
                                count=0;
                                while ((line = reader.readLine()) != null) {
                                    RpcClient client = clients[count % FLUME_PORTS.length];
                                    Event flumeEvent = EventBuilder.withBody(line.getBytes());
                                    client.append(flumeEvent);
                                    System.out.println(line);
                                    count++;
                                }
                                // 更新文件的上一次长度
                                lastLengths.put(filePath.toString(), length);
                            }
                        }
                    }
                }
                // 重置watchKey状态为ready,以便继续监听事件
                if (!watchKey.reset()) {
                    break;
                }
            }
        } catch (IOException | InterruptedException | EventDeliveryException e) {
            e.printStackTrace();
        }
    }
 
 
}
 


  • 写回答

4条回答 默认 最新

  • 笑面虎 2024-05-12 18:28
    关注

    以下内容由CHATGPT及笑面虎共同生成、有用望采纳:


    【您想要解决的问题】:您遇到的问题是在使用 Flume 的 FlumeRpcClientExample 运行时,数据无法通过 NettyAvroRpcClient 发送到配置的主机和端口。

    【图片内容】:您上传的图片包含了错误日志信息,显示了 EventDeliveryException 异常,指出在尝试通过 NettyAvroRpcClient 发送事件时失败了。错误发生在 NettyAvroRpcClient.append 方法调用时,并且是由于 Avro RPC 调用返回了 FAILED 状态。

    【问题出现原因】:这个问题可能由以下几个原因引起:

    1. 主机地址错误:日志中显示的主机地址是 0.0.0.0,这通常是一个通配地址,可能不是您想要连接的实际地址。
    2. 端口不可用:端口 44444 可能没有在目标主机上监听,或者防火墙设置阻止了连接。
    3. 网络问题:可能存在网络连接问题,导致客户端无法到达服务器。
    4. Flume 配置问题:Flume 的配置可能不正确,导致无法建立 RPC 连接。

    【问题解决方案】:为了解决这个问题,您可以尝试以下步骤:

    1. 检查主机地址:确保您使用的是正确的主机地址,而不是 0.0.0.0
    2. 检查端口:确认端口 44444 在目标主机上是开放的,并且Flume的配置文件中指定了正确的端口。
    3. 检查网络连接:确保网络连接没有问题,您可以尝试 ping 目标主机以确认连接。
    4. 检查防火墙设置:确保没有防火墙规则阻止了到端口 44444 的连接。
    5. 检查 Flume 配置:确保 Flume 的配置文件(如 flume.confflume.properties)中的相关配置是正确的。

    【提供代码】:由于这个问题是关于配置和环境设置的,而不是代码本身的问题,所以没有具体的代码可以提供。但是,您可能需要检查和修改您的 Flume 配置文件。

    【代码运行方式】:如果您需要修改配置文件,通常您需要做的是:

    1. 找到您的 Flume 配置文件。
    2. 使用文本编辑器打开它并进行必要的修改。
    3. 保存文件并重新启动 Flume 服务。

    【代码预期运行结果】:如果配置正确,网络连接正常,那么您应该能够看到数据成功发送到 Flume 代理,而不会遇到 EventDeliveryException 异常。

    【推荐相关链接】:

    请根据这些步骤检查和调整您的配置,如果问题仍然存在,您可能需要提供更多的信息或者联系 Flume 社区以获得更专业的帮助。

    评论 编辑记录

报告相同问题?

问题事件

  • 修改了问题 5月12日
  • 修改了问题 5月12日
  • 创建了问题 5月12日

悬赏问题

  • ¥15 如何在vue.config.js中读取到public文件夹下window.APP_CONFIG.API_BASE_URL的值
  • ¥50 浦育平台scratch图形化编程
  • ¥20 求这个的原理图 只要原理图
  • ¥15 vue2项目中,如何配置环境,可以在打完包之后修改请求的服务器地址
  • ¥20 微信的店铺小程序如何修改背景图
  • ¥15 UE5.1局部变量对蓝图不可见
  • ¥15 一共有五道问题关于整数幂的运算还有房间号码 还有网络密码的解答?(语言-python)
  • ¥20 sentry如何捕获上传Android ndk 崩溃
  • ¥15 在做logistic回归模型限制性立方条图时候,不能出完整图的困难
  • ¥15 G0系列单片机HAL库中景园gc9307液晶驱动芯片无法使用硬件SPI+DMA驱动,如何解决?