import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemManager;
import org.apache.commons.vfs2.VFS;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Timer;
import java.util.TimerTask;
public class FTPReader {
// FTP服务器的地址、端口、用户名、密码
private static final String FTP_HOST = "XXX.XXX.XXX.XXX";
private static final int FTP_PORT = 21;
private static final String FTP_USER = "root";
private static final String FTP_PASS = "password";
// FTP服务器上的log文件所在的目录
private static final String FTP_DIR = "/logs";
// 数据库连接对象
private static Connection conn;
// 定时任务的间隔时间,单位是毫秒
private static final long INTERVAL = 1000 * 60 * 10; // 10分钟
public static void main(String[] args) {
// 启动定时任务
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// 连接FTP服务器
FTPClient ftpClient = new FTPClient();
ftpClient.connect(FTP_HOST, FTP_PORT);
ftpClient.login(FTP_USER, FTP_PASS);
// 获取FTP服务器上的log文件列表
ftpClient.changeWorkingDirectory(FTP_DIR);
FTPFile[] ftpFiles = ftpClient.listFiles();
// 遍历log文件列表,按顺序解析每个文件
for (FTPFile ftpFile : ftpFiles) {
// 获取文件名和大小
String fileName = ftpFile.getName();
long fileSize = ftpFile.getSize();
// 查询数据库中是否已经存在该文件
PreparedStatement ps = conn.prepareStatement("select * from ftp_file where file_name = ?");
ps.setString(1, fileName);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
// 如果数据库中已经存在该文件,判断是否有更新
long oldSize = rs.getLong("file_size");
if (fileSize > oldSize) {
// 如果文件有更新,获取上次读取的位置
long offset = rs.getLong("offset");
// 实现断点续读功能,从上次读取的位置开始解析文件内容,并存入数据库
parseLogFile(fileName, offset, fileSize);
// 更新数据库中该文件的大小和位置
ps = conn.prepareStatement("update ftp_file set file_size = ?, offset = ? where file_name = ?");
ps.setLong(1, fileSize);
ps.setLong(2, fileSize);
ps.setString(3, fileName);
ps.executeUpdate();
}
} else {
// 如果数据库中不存在该文件,从头开始解析文件内容,并存入数据库
parseLogFile(fileName, 0, fileSize);
// 插入数据库中该文件的记录
ps = conn.prepareStatement("insert into ftp_file (file_name, file_size, offset) values (?, ?, ?)");
ps.setString(1, fileName);
ps.setLong(2, fileSize);
ps.setLong(3, fileSize);
ps.executeUpdate();
}
}
// 断开FTP服务器连接
ftpClient.logout();
ftpClient.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, INTERVAL);
}
// 解析log文件的内容,并存入数据库
private static void parseLogFile(String fileName, long offset, long fileSize) throws Exception {
// 使用VFS获取FTP服务器上的文件对象
FileSystemManager manager = VFS.getManager();
FileObject fileObject = manager.resolveFile("ftp://" + FTP_USER + ":" + FTP_PASS + "@" + FTP_HOST + FTP_DIR + "/" + fileName);
// 获取文件的输入流,并跳过已经读取过的部分
InputStream inputStream = fileObject.getContent().getInputStream();
inputStream.skip(offset);
// 使用BufferedReader按行读取文件内容
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
while ((line = reader.readLine()) != null) {
// 解析每一行的内容,获取文件名称、唯一线程ID、开始日志、结束日志等信息
// 这里假设每一行的格式是:[文件名称] [线程ID] [开始日志] [结束日志]
// 你可以根据你的实际情况修改这里的解析逻辑
String[] parts = line.split(" ");
String logFileName = parts[0];
String threadId = parts[1];
String startLog = parts[2];
String endLog = parts[3];
// 将解析出来的信息存入数据库
PreparedStatement ps = conn.prepareStatement("insert into log_file (file_name, thread_id, start_log, end_log) values (?, ?, ?, ?)");
ps.setString(1, logFileName);
ps.setString(2, threadId);
ps.setString(3, startLog);
ps.setString(4, endLog);
ps.executeUpdate();
}
// 关闭输入流和文件对象
reader.close();
inputStream.close();
fileObject.close();
}
}