nonono吧 2025-10-21 14:47 采纳率: 0%
浏览 8

大批量文本快速解析最佳实践(🆘)

大家有没有遇到过解析大文本数据(单个文件大概10w~100w),每天有500个文本。需要在5个小时内处理完。我采用的方法是:分批异步处理。即每次读取到3000条,丢线程池进行数据处理及入库。
1、线程池配置为核心线程10。
2、由于库里数据无法删除,不能采用batchInsert进行批量入库。
3、通过xxl-job进行分片广播,多个应用同时处理。
虽然基本上可以达到要求,但是我还是不太满意,不知道大家是否有更好的方法?欢迎一起探讨

  • 写回答

4条回答 默认 最新

  • 阿里嘎多学长 2025-10-21 15:01
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    大批量文本快速解析最佳实践

    感谢您提供的详细信息。根据您的描述,我理解您遇到的问题是:

    • 大量文本数据(每个文件10万到100万条数据)
    • 每天有500个文本需要处理
    • 需要在5个小时内处理完所有数据
    • 您当前使用的方法是分批异步处理,每次读取3000条数据,丢入线程池进行处理和入库

    建议

    1. 线程池:您的方法是正确的,使用线程池可以提高处理效率。但是,您需要考虑线程池的大小和数据分批处理的大小。一般来说,线程池大小应该与CPU核心数相等或略大。数据分批处理的大小应该根据机器的内存和处理能力来决定。
    2. 数据分批处理:您可以尝试使用更大的数据分批处理大小,例如10000或20000条数据。这样可以减少线程池的开销和数据处理的次数。
    3. 数据缓存:如果数据处理和入库的速度不能满足需求,您可以尝试使用数据缓存机制。例如,使用缓存队列(如Java的BlockingQueue)来缓存处理好的数据,等待入库线程池处理。
    4. 多机处理:如果单机处理速度仍然不能满足需求,您可以尝试使用多机处理。例如,使用分布式计算框架(如Hadoop或Spark)来分发数据处理任务到多台机器上。
    5. 数据压缩:如果数据量非常大,您可以尝试使用数据压缩机制来减少数据大小。例如,使用GZIP或LZ4压缩算法来压缩文本数据。

    核心代码

    // 线程池大小
    int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
    
    // 数据分批处理大小
    int batchSize = 10000;
    
    // 线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(threadPoolSize);
    
    // 数据缓存队列
    BlockingQueue<String> cacheQueue = new LinkedBlockingQueue<>();
    
    // 数据处理线程
    for (int i = 0; i < threadPoolSize; i++) {
        Thread thread = new Thread(() -> {
            while (true) {
                String data = cacheQueue.poll(10, TimeUnit.SECONDS);
                if (data != null) {
                    // 数据处理和入库
                    processAndInsert(data);
                }
            }
        });
        thread.start();
    }
    
    // 数据读取和分批处理
    while (true) {
        String data = readData();
        if (data != null) {
            cacheQueue.put(data);
        }
    }
    
    // 数据处理和入库
    void processAndInsert(String data) {
        // 数据处理逻辑
        String processedData = process(data);
        // 入库逻辑
        insert(processedData);
    }
    
    // 数据读取
    String readData() {
        // 读取数据逻辑
        return data;
    }
    
    // 数据处理
    String process(String data) {
        // 数据处理逻辑
        return processedData;
    }
    
    // 入库
    void insert(String data) {
        // 入库逻辑
    }
    

    上述代码仅提供了一个基本的示例,需要根据您的具体需求进行调整和扩展。

    评论

报告相同问题?

问题事件

  • 创建了问题 10月21日