qq_37636951 2017-06-22 08:53 采纳率: 0%
浏览 1243

多线程操作集合,把集合拆分开,用多线程把数据插入freemarker的ftl文件

把集合拆成10份,通过十个线程把数据插入同一个freemarker的.ftl文件,保证数据不被覆盖?大神们求助啊!!!

  • 写回答

1条回答 默认 最新

  • 辽宁吴奇隆 2017-06-22 09:14
    关注

    package test.com.linapex.room;

    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.FileReader;
    import java.io.OutputStreamWriter;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
    import com.linapex.common.util.FileUtils;
    import com.linapex.common.util.ZhengzeValidate;

    public class TBuilderRoomSqlFileTool
    {
    final static int DATACACHENUM = 10000;

    static int currThreadCount = 0;
    static int maxThreadCount = 10;
    
    static File roomFilterLogFile = new File("roomFilter.log");
    static File sqlFile = new File("roomSql.sql");
    static File csvFile = new File("D:\\baiduyundownload\\asd\\2000W\\1-200W.csv");
    
    final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";
    
    public static BufferedWriter initSQLWrite() throws Exception
    {
        if (sqlFile.exists())
        {
            sqlFile.delete();
    
            if (!sqlFile.createNewFile())
            {
                System.err.println("创建文件失败:" + sqlFile.getAbsolutePath());
            }
        }
    
        return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));
    }
    
    public static void loadCSV(CallBack2 callBack) throws Exception
    {
        BufferedReader reader = null;
        try
        {
            reader = new BufferedReader(new FileReader(csvFile));
            String str = null;
    
            int num = 0;
            while ((str = reader.readLine()) != null)
            {
                num++;
                callBack.call(num, str);
            }
        } finally
        {
            reader.close();
        }
    }
    
    public static void main(String[] args) throws Exception
    {
        final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount);
    
        final List<Future<Object>> threadResultList = new ArrayList<Future<Object>>();
    
        final WriteSqlHandle writeSqlFile = new WriteSqlHandle(initSQLWrite(), DATACACHENUM);
    
        long begin = System.currentTimeMillis();
    
        loadCSV(new CallBack2()
        {
    
            @Override
            public void call(int num, String str)
            {
                String[] strs = str.split(",");
    
                if (strs.length < 8)
                {
                    writeLog("此条数据不录入::0", Arrays.toString(strs));
                    return;
                }
    
                String name = strs[0].trim();
                if (!ZhengzeValidate.isChina(name))
                {
                    writeLog("此条数据不录入::0", Arrays.toString(strs));
                    return;
                }
    
                try
                {
                    String card = strs[4];
                    String gender = strs[5];
                    String birthday = strs[6];
                    String address = strs[7];
                    String zip = strs[8];
                    String mobile = strs[20];
                    String email = strs[22];
                    String version = strs[31];
    
                    //生成sql语句
                    final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version);
    
                    //添加数据,如果超出了缓存数据,则 开始写入文件系统
                    if (writeSqlFile.add(tempSql))
                    {
                        currThreadCount++;
    
                        //如果提交的线程过多,则取回之后再提交.
                        if (currThreadCount >= maxThreadCount)
                        {
                            System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
                            for (Future<Object> fs : threadResultList)
                            {
                                try
                                {
                                    fs.get();
                                    currThreadCount--;
                                    System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount));
                                } catch (Exception e)
                                {
                                    e.printStackTrace();
                                }
                            }
    
                            threadResultList.clear(); //清空
                            currThreadCount = threadResultList.size();
                            System.out.println(String.format("重新开始提交线程   当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
                        }
    
                        Future future = threadPool.submit(new Runnable()
                        {
                            @Override
                            public void run()
                            {
                                try
                                {
                                    writeSqlFile.save();
                                } catch (Exception e)
                                {
                                    e.printStackTrace();
                                }
                            }
                        });
    
                        threadResultList.add(future);
                        //                      System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num));
                    }
    
                } catch (Exception e)
                {
                    writeLog("录入错误的数据::0", Arrays.toString(strs));
                    writeLog("错误的原因::0", e.getMessage());
                }
            }
        });
    
        writeSqlFile.flush();
    
        threadPool.shutdown();
    
        long end = System.currentTimeMillis() - begin;
    
        System.out.println(String.format("任务完成时间:%s", end));
    
    }
    
    public static void writeLog(String str, Object... values)
    {
        FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false);
    }
    
    public static String tm(String strSource, Object... values)
    {
        if (strSource == null)
        {
            return null;
        }
    
        StringBuilder builder = new StringBuilder(strSource);
    
        final String prefix = ":";
        for (int index = 0; index < values.length; index++)
        {
            String value = values[index].toString();
            if (value == null)
            {
                continue;
            }
    
            String key = new StringBuilder(prefix).append(index).toString();
    
            int i = -1;
            if ((i = builder.indexOf(key, i)) > -1)
            {
                int len = key.length();
                builder.replace(i, i + len, value);
            }
        }
    
        return builder.toString();
    }
    

    }

    class WriteSqlHandle
    {
    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    WriteLock writeLock = readWriteLock.writeLock();
    
    List<String> cacheList;
    
    BufferedWriter bw;
    
    int dataCacheNum;
    
    public WriteSqlHandle(BufferedWriter bw)
    {
        this.bw = bw;
        cacheList = new ArrayList<String>();
    }
    
    public WriteSqlHandle(BufferedWriter bw, int dataCacheNum)
    {
        this.bw = bw;
        this.dataCacheNum = dataCacheNum;
        cacheList = new ArrayList<String>(dataCacheNum);
    }
    
    public boolean add(String sqlStr)
    {
        writeLock.lock();
    
        cacheList.add(sqlStr);
    
        writeLock.unlock();
    
        return cacheList.size() >= dataCacheNum;
    }
    
    public void save() throws Exception
    {
        writeLock.lock();
    
        long begin = System.currentTimeMillis();
    
        System.out.println(String.format("%s,准备消费   需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));
    
        for (String str : cacheList)
        {
            bw.write(str + "\r\n");
        }
    
        long end = System.currentTimeMillis() - begin;
    
        System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), end, cacheList.size()));
    
        cacheList.clear(); //清空数据.
    
        writeLock.unlock();
    }
    
    public void flush() throws Exception
    {
        System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));
    
        for (String str : cacheList)
        {
            bw.write(str + "\r\n");
        }
    
        System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size()));
    
        cacheList.clear(); //清空数据
    
        closeWrite();
    }
    
    private void closeWrite() throws Exception
    {
        bw.flush();
        bw.close();
    }
    

    }

    interface CallBack2
    {
    void call(int num, String str);
    }

    评论

报告相同问题?

悬赏问题

  • ¥15 seatunnel-web使用SQL组件时候后台报错,无法找到表格
  • ¥15 fpga自动售货机数码管(相关搜索:数字时钟)
  • ¥15 用前端向数据库插入数据,通过debug发现数据能走到后端,但是放行之后就会提示错误
  • ¥30 3天&7天&&15天&销量如何统计同一行
  • ¥30 帮我写一段可以读取LD2450数据并计算距离的Arduino代码
  • ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型
  • ¥15 vs2019中数据导出问题
  • ¥20 云服务Linux系统TCP-MSS值修改?
  • ¥20 关于#单片机#的问题:项目:使用模拟iic与ov2640通讯环境:F407问题:读取的ID号总是0xff,自己调了调发现在读从机数据时,SDA线上并未有信号变化(语言-c语言)
  • ¥20 怎么在stm32门禁成品上增加查询记录功能