多线程操作集合,把集合拆分开,用多线程把数据插入freemarker的ftl文件

把集合拆成10份,通过十个线程把数据插入同一个freemarker的.ftl文件,保证数据不被覆盖?大神们求助啊!!!

1个回答

package test.com.linapex.room;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.linapex.common.util.FileUtils;
import com.linapex.common.util.ZhengzeValidate;

public class TBuilderRoomSqlFileTool
{
final static int DATACACHENUM = 10000;

static int currThreadCount = 0;
static int maxThreadCount = 10;

static File roomFilterLogFile = new File("roomFilter.log");
static File sqlFile = new File("roomSql.sql");
static File csvFile = new File("D:\\baiduyundownload\\asd\\2000W\\1-200W.csv");

final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";

public static BufferedWriter initSQLWrite() throws Exception
{
    if (sqlFile.exists())
    {
        sqlFile.delete();

        if (!sqlFile.createNewFile())
        {
            System.err.println("创建文件失败:" + sqlFile.getAbsolutePath());
        }
    }

    return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));
}

public static void loadCSV(CallBack2 callBack) throws Exception
{
    BufferedReader reader = null;
    try
    {
        reader = new BufferedReader(new FileReader(csvFile));
        String str = null;

        int num = 0;
        while ((str = reader.readLine()) != null)
        {
            num++;
            callBack.call(num, str);
        }
    } finally
    {
        reader.close();
    }
}

public static void main(String[] args) throws Exception
{
    final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount);

    final List<Future<Object>> threadResultList = new ArrayList<Future<Object>>();

    final WriteSqlHandle writeSqlFile = new WriteSqlHandle(initSQLWrite(), DATACACHENUM);

    long begin = System.currentTimeMillis();

    loadCSV(new CallBack2()
    {

        @Override
        public void call(int num, String str)
        {
            String[] strs = str.split(",");

            if (strs.length < 8)
            {
                writeLog("此条数据不录入::0", Arrays.toString(strs));
                return;
            }

            String name = strs[0].trim();
            if (!ZhengzeValidate.isChina(name))
            {
                writeLog("此条数据不录入::0", Arrays.toString(strs));
                return;
            }

            try
            {
                String card = strs[4];
                String gender = strs[5];
                String birthday = strs[6];
                String address = strs[7];
                String zip = strs[8];
                String mobile = strs[20];
                String email = strs[22];
                String version = strs[31];

                //生成sql语句
                final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version);

                //添加数据,如果超出了缓存数据,则 开始写入文件系统
                if (writeSqlFile.add(tempSql))
                {
                    currThreadCount++;

                    //如果提交的线程过多,则取回之后再提交.
                    if (currThreadCount >= maxThreadCount)
                    {
                        System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
                        for (Future<Object> fs : threadResultList)
                        {
                            try
                            {
                                fs.get();
                                currThreadCount--;
                                System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount));
                            } catch (Exception e)
                            {
                                e.printStackTrace();
                            }
                        }

                        threadResultList.clear(); //清空
                        currThreadCount = threadResultList.size();
                        System.out.println(String.format("重新开始提交线程   当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
                    }

                    Future future = threadPool.submit(new Runnable()
                    {
                        @Override
                        public void run()
                        {
                            try
                            {
                                writeSqlFile.save();
                            } catch (Exception e)
                            {
                                e.printStackTrace();
                            }
                        }
                    });

                    threadResultList.add(future);
                    //                      System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num));
                }

            } catch (Exception e)
            {
                writeLog("录入错误的数据::0", Arrays.toString(strs));
                writeLog("错误的原因::0", e.getMessage());
            }
        }
    });

    writeSqlFile.flush();

    threadPool.shutdown();

    long end = System.currentTimeMillis() - begin;

    System.out.println(String.format("任务完成时间:%s", end));

}

public static void writeLog(String str, Object... values)
{
    FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false);
}

public static String tm(String strSource, Object... values)
{
    if (strSource == null)
    {
        return null;
    }

    StringBuilder builder = new StringBuilder(strSource);

    final String prefix = ":";
    for (int index = 0; index < values.length; index++)
    {
        String value = values[index].toString();
        if (value == null)
        {
            continue;
        }

        String key = new StringBuilder(prefix).append(index).toString();

        int i = -1;
        if ((i = builder.indexOf(key, i)) > -1)
        {
            int len = key.length();
            builder.replace(i, i + len, value);
        }
    }

    return builder.toString();
}

}

class WriteSqlHandle
{
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

WriteLock writeLock = readWriteLock.writeLock();

List<String> cacheList;

BufferedWriter bw;

int dataCacheNum;

public WriteSqlHandle(BufferedWriter bw)
{
    this.bw = bw;
    cacheList = new ArrayList<String>();
}

public WriteSqlHandle(BufferedWriter bw, int dataCacheNum)
{
    this.bw = bw;
    this.dataCacheNum = dataCacheNum;
    cacheList = new ArrayList<String>(dataCacheNum);
}

public boolean add(String sqlStr)
{
    writeLock.lock();

    cacheList.add(sqlStr);

    writeLock.unlock();

    return cacheList.size() >= dataCacheNum;
}

public void save() throws Exception
{
    writeLock.lock();

    long begin = System.currentTimeMillis();

    System.out.println(String.format("%s,准备消费   需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

    for (String str : cacheList)
    {
        bw.write(str + "\r\n");
    }

    long end = System.currentTimeMillis() - begin;

    System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), end, cacheList.size()));

    cacheList.clear(); //清空数据.

    writeLock.unlock();
}

public void flush() throws Exception
{
    System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

    for (String str : cacheList)
    {
        bw.write(str + "\r\n");
    }

    System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size()));

    cacheList.clear(); //清空数据

    closeWrite();
}

private void closeWrite() throws Exception
{
    bw.flush();
    bw.close();
}

}

interface CallBack2
{
void call(int num, String str);
}

qq_37636951
qq_37636951 谢谢,不好意思,是我表达不清楚,我要做的是把集合拆分开用多线程向freemarker的.ftl插入数据,不能让数据出现覆盖或者混乱
大约 3 年之前 回复
Csdn user default icon
上传中...
上传图片
插入图片
抄袭、复制答案,以达到刷声望分或其他目的的行为,在CSDN问答是严格禁止的,一经发现立刻封号。是时候展现真正的技术了!
立即提问