请问FlumeRpcClientExample运行时遇到这种问题该怎么解决?
rg.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 0.0.0.0, port: 44444 }: Failed to send event
at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:249)
at org.example.readcsv.FlumeRpcClientExample.lambda$sendCSVFile$0(FlumeRpcClientExample.java:50)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: 0.0.0.0, port: 44444 }: Avro RPC call returned Status: FAILED
at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:390)
at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:296)
at org.apache.flume.api.NettyAvroRpcClient.append(NettyAvroRpcClient.java:237)
... 6 more
0
FlumeRpcClientExample的代码,数据无法发送,到while (!executor.isTerminated()) {} // 等待所有任务完成,就不能正常运行,求各位大神帮帮忙。
package org.example.readcsv;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.io.*;
import java.nio.file.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FlumeRpcClientExample {
private static final String FLUME_HOST = "0.0.0.0";
private static final int[] FLUME_PORTS = {44444, 44445, 44446}; // 替换为Flume监听的端口
private static final String FLUME_HEADERS = ""; // 添加需要的Flume头部信息
public static void main(String[] args) {
String folderPath = "E:\\app\\dist";
sendCSVFile(folderPath);
}
private static void sendCSVFile(String folderPath) {
try {
File folder = new File(folderPath);
File[] listOfFiles = folder.listFiles();
RpcClient[] clients = new RpcClient[3];
clients[0]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[0]);
clients[1]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[1]);
clients[2]=RpcClientFactory.getDefaultInstance(FLUME_HOST, FLUME_PORTS[2]);
// 首先发送所有已经有的数据
int count;
int total=0;
ExecutorService executor = Executors.newFixedThreadPool(30); // 创建一个固定大小的线程池
for (File file : listOfFiles) {
if (file.isFile() && file.getName().endsWith(".csv")) {
executor.submit(() -> { // 在一个新的线程中执行
int count_port = 0;
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "gbk"));
String line = reader.readLine();
count_port = 0;
while (line != null) {
RpcClient client = clients[count_port % FLUME_PORTS.length];
Event flumeEvent = EventBuilder.withBody(line.getBytes());
client.append(flumeEvent);
line = reader.readLine();
count_port++;
}
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(count_port);
});
}
}
executor.shutdown(); // 关闭线程池
while (!executor.isTerminated()) {} // 等待所有任务完成
System.out.println(total);
for (File file : listOfFiles) {
if (file.isFile() && file.getName().endsWith(".csv")) {
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "gbk"));
String line = reader.readLine();
count=0;
while (line != null) {
System.out.println("当前的数据: " + line);
RpcClient client = clients[count % FLUME_PORTS.length];
Event flumeEvent = EventBuilder.withBody(line.getBytes());
client.append(flumeEvent);
line = reader.readLine();
count++;
total++;
}
reader.close();
}
System.out.println(total);
}
WatchService watchService = FileSystems.getDefault().newWatchService();
// 注册要监视的路径和事件类型到WatchService对象中
Path dirPath = folder.toPath();
dirPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY);
// 创建一个Map对象,用于存储每个文件的BufferedReader和上一次长度
Map<String, BufferedReader> readers = new HashMap<>();
Map<String, Long> lastLengths = new HashMap<>();
while (true) {
WatchKey watchKey = watchService.take();
// 获取一个WatchEventList对象
for (WatchEvent<?> event : watchKey.pollEvents()) {
// 判断事件类型
if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
// 如果是文件修改事件,则读取文件的新内容
Path filePath = dirPath.resolve((Path) event.context());
if (filePath.toString().endsWith(".csv")) {
File file = filePath.toFile();
long length = file.length();
if (!readers.containsKey(filePath.toString())) {
// 如果是新的文件,则创建一个新的BufferedReader对象
RandomAccessFile raf = new RandomAccessFile(file, "r");
readers.put(filePath.toString(), new BufferedReader(new InputStreamReader(new FileInputStream(raf.getFD()), "GBK")));
lastLengths.put(filePath.toString(), 0L);
}
BufferedReader reader;
long lastLength = lastLengths.get(filePath.toString());
if (length > lastLength) {
// 文件有新增的数据
RandomAccessFile raf = new RandomAccessFile(file, "r");
raf.seek(lastLength); // 将文件指针移动到上一次读取的位置
reader = new BufferedReader(new InputStreamReader(new FileInputStream(raf.getFD()), "GBK")); // 从上一次读取的位置开始创建一个新的BufferedReader对象
String line;
count=0;
while ((line = reader.readLine()) != null) {
RpcClient client = clients[count % FLUME_PORTS.length];
Event flumeEvent = EventBuilder.withBody(line.getBytes());
client.append(flumeEvent);
System.out.println(line);
count++;
}
// 更新文件的上一次长度
lastLengths.put(filePath.toString(), length);
}
}
}
}
// 重置watchKey状态为ready,以便继续监听事件
if (!watchKey.reset()) {
break;
}
}
} catch (IOException | InterruptedException | EventDeliveryException e) {
e.printStackTrace();
}
}
}