guojw8881 2023-10-27 11:32 采纳率: 100%
浏览 11
已结题

java中使用 Apache common包定时读取FTP中log列表,并按顺序解析列表中文件,实现断点续读功能

java中使用 Apache common包定时读取FTP中log列表,并按顺序解析列表中文件,实现断点续读功能
1,存在2个数据库表(可以根据需要增加相关数据表);
2,一个是主表,存FTP文件目录,文件大小;
3,一个子表存log文件中的 文件名称、唯一线程ID、开始日志、结束日志;
4,方便后续根据log文件名称和线程ID下载该文件中的log片段;
5,该FTP的log文件会定时更新;
6,每次定时读取ftp中的log文件时,需要对自动获取上一次定时任务读取的最后一个log文件,判断该log文件是否存在更新,如果存在更新,那么就需要对该文件实现断点续读的功能,解析该log的日志内容,并存入数据库;解析完该log文件后,继续解析该FTP文件夹中新更新的log文件;

谢谢各位了!

  • 写回答

13条回答 默认 最新

  • 技术宅program 2023-10-27 14:36
    关注
    import org.apache.commons.net.ftp.FTPClient;
    import org.apache.commons.net.ftp.FTPFile;
    import org.apache.commons.vfs2.FileObject;
    import org.apache.commons.vfs2.FileSystemManager;
    import org.apache.commons.vfs2.VFS;
    
    import java.io.BufferedReader;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.Timer;
    import java.util.TimerTask;
    
    public class FTPReader {
    
        // FTP服务器的地址、端口、用户名、密码
        private static final String FTP_HOST = "XXX.XXX.XXX.XXX";
        private static final int FTP_PORT = 21;
        private static final String FTP_USER = "root";
        private static final String FTP_PASS = "password";
    
        // FTP服务器上的log文件所在的目录
        private static final String FTP_DIR = "/logs";
    
        // 数据库连接对象
        private static Connection conn;
    
        // 定时任务的间隔时间,单位是毫秒
        private static final long INTERVAL = 1000 * 60 * 10; // 10分钟
    
        public static void main(String[] args) {
            // 启动定时任务
            Timer timer = new Timer();
            timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    try {
                        // 连接FTP服务器
                        FTPClient ftpClient = new FTPClient();
                        ftpClient.connect(FTP_HOST, FTP_PORT);
                        ftpClient.login(FTP_USER, FTP_PASS);
    
                        // 获取FTP服务器上的log文件列表
                        ftpClient.changeWorkingDirectory(FTP_DIR);
                        FTPFile[] ftpFiles = ftpClient.listFiles();
    
                        // 遍历log文件列表,按顺序解析每个文件
                        for (FTPFile ftpFile : ftpFiles) {
                            // 获取文件名和大小
                            String fileName = ftpFile.getName();
                            long fileSize = ftpFile.getSize();
    
                            // 查询数据库中是否已经存在该文件
                            PreparedStatement ps = conn.prepareStatement("select * from ftp_file where file_name = ?");
                            ps.setString(1, fileName);
                            ResultSet rs = ps.executeQuery();
    
                            if (rs.next()) {
                                // 如果数据库中已经存在该文件,判断是否有更新
                                long oldSize = rs.getLong("file_size");
                                if (fileSize > oldSize) {
                                    // 如果文件有更新,获取上次读取的位置
                                    long offset = rs.getLong("offset");
    
                                    // 实现断点续读功能,从上次读取的位置开始解析文件内容,并存入数据库
                                    parseLogFile(fileName, offset, fileSize);
    
                                    // 更新数据库中该文件的大小和位置
                                    ps = conn.prepareStatement("update ftp_file set file_size = ?, offset = ? where file_name = ?");
                                    ps.setLong(1, fileSize);
                                    ps.setLong(2, fileSize);
                                    ps.setString(3, fileName);
                                    ps.executeUpdate();
                                }
                            } else {
                                // 如果数据库中不存在该文件,从头开始解析文件内容,并存入数据库
                                parseLogFile(fileName, 0, fileSize);
    
                                // 插入数据库中该文件的记录
                                ps = conn.prepareStatement("insert into ftp_file (file_name, file_size, offset) values (?, ?, ?)");
                                ps.setString(1, fileName);
                                ps.setLong(2, fileSize);
                                ps.setLong(3, fileSize);
                                ps.executeUpdate();
                            }
                        }
    
                        // 断开FTP服务器连接
                        ftpClient.logout();
                        ftpClient.disconnect();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, 0, INTERVAL);
        }
    
        // 解析log文件的内容,并存入数据库
        private static void parseLogFile(String fileName, long offset, long fileSize) throws Exception {
            // 使用VFS获取FTP服务器上的文件对象
            FileSystemManager manager = VFS.getManager();
            FileObject fileObject = manager.resolveFile("ftp://" + FTP_USER + ":" + FTP_PASS + "@" + FTP_HOST + FTP_DIR + "/" + fileName);
    
            // 获取文件的输入流,并跳过已经读取过的部分
            InputStream inputStream = fileObject.getContent().getInputStream();
            inputStream.skip(offset);
    
            // 使用BufferedReader按行读取文件内容
            BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
            String line = null;
            while ((line = reader.readLine()) != null) {
                // 解析每一行的内容,获取文件名称、唯一线程ID、开始日志、结束日志等信息
                // 这里假设每一行的格式是:[文件名称] [线程ID] [开始日志] [结束日志]
                // 你可以根据你的实际情况修改这里的解析逻辑
                String[] parts = line.split(" ");
                String logFileName = parts[0];
                String threadId = parts[1];
                String startLog = parts[2];
                String endLog = parts[3];
    
                // 将解析出来的信息存入数据库
                PreparedStatement ps = conn.prepareStatement("insert into log_file (file_name, thread_id, start_log, end_log) values (?, ?, ?, ?)");
                ps.setString(1, logFileName);
                ps.setString(2, threadId);
                ps.setString(3, startLog);
                ps.setString(4, endLog);
                ps.executeUpdate();
            }
    
            // 关闭输入流和文件对象
            reader.close();
            inputStream.close();
            fileObject.close();
        }
    }
    
    
    
    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(12条)

报告相同问题?

问题事件

  • 系统已结题 11月10日
  • 已采纳回答 11月2日
  • 创建了问题 10月27日

悬赏问题

  • ¥15 x趋于0时tanx-sinx极限可以拆开算吗
  • ¥500 把面具戴到人脸上,请大家贡献智慧
  • ¥15 任意一个散点图自己下载其js脚本文件并做成独立的案例页面,不要作在线的,要离线状态。
  • ¥15 各位 帮我看看如何写代码,打出来的图形要和如下图呈现的一样,急
  • ¥30 c#打开word开启修订并实时显示批注
  • ¥15 如何解决ldsc的这条报错/index error
  • ¥15 VS2022+WDK驱动开发环境
  • ¥30 关于#java#的问题,请各位专家解答!
  • ¥30 vue+element根据数据循环生成多个table,如何实现最后一列 平均分合并
  • ¥20 pcf8563时钟芯片不启振