把集合拆成10份,通过十个线程把数据插入同一个freemarker的.ftl文件,保证数据不被覆盖?大神们求助啊!!!
1条回答 默认 最新
- 辽宁吴奇隆 2017-06-22 09:14关注
package test.com.linapex.room;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.linapex.common.util.FileUtils;
import com.linapex.common.util.ZhengzeValidate;public class TBuilderRoomSqlFileTool
{
final static int DATACACHENUM = 10000;static int currThreadCount = 0; static int maxThreadCount = 10; static File roomFilterLogFile = new File("roomFilter.log"); static File sqlFile = new File("roomSql.sql"); static File csvFile = new File("D:\\baiduyundownload\\asd\\2000W\\1-200W.csv"); final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');"; public static BufferedWriter initSQLWrite() throws Exception { if (sqlFile.exists()) { sqlFile.delete(); if (!sqlFile.createNewFile()) { System.err.println("创建文件失败:" + sqlFile.getAbsolutePath()); } } return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8")); } public static void loadCSV(CallBack2 callBack) throws Exception { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(csvFile)); String str = null; int num = 0; while ((str = reader.readLine()) != null) { num++; callBack.call(num, str); } } finally { reader.close(); } } public static void main(String[] args) throws Exception { final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount); final List<Future<Object>> threadResultList = new ArrayList<Future<Object>>(); final WriteSqlHandle writeSqlFile = new WriteSqlHandle(initSQLWrite(), DATACACHENUM); long begin = System.currentTimeMillis(); loadCSV(new CallBack2() { @Override public void call(int num, String str) { String[] strs = str.split(","); if (strs.length < 8) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return; } String name = strs[0].trim(); if (!ZhengzeValidate.isChina(name)) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return; } try { String card = strs[4]; String gender = strs[5]; String birthday = strs[6]; String address = strs[7]; String zip = strs[8]; String mobile = strs[20]; String email = strs[22]; String version = strs[31]; //生成sql语句 final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version); //添加数据,如果超出了缓存数据,则 开始写入文件系统 if (writeSqlFile.add(tempSql)) { currThreadCount++; //如果提交的线程过多,则取回之后再提交. if (currThreadCount >= maxThreadCount) { System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); for (Future<Object> fs : threadResultList) { try { fs.get(); currThreadCount--; System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount)); } catch (Exception e) { e.printStackTrace(); } } threadResultList.clear(); //清空 currThreadCount = threadResultList.size(); System.out.println(String.format("重新开始提交线程 当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); } Future future = threadPool.submit(new Runnable() { @Override public void run() { try { writeSqlFile.save(); } catch (Exception e) { e.printStackTrace(); } } }); threadResultList.add(future); // System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num)); } } catch (Exception e) { writeLog("录入错误的数据::0", Arrays.toString(strs)); writeLog("错误的原因::0", e.getMessage()); } } }); writeSqlFile.flush(); threadPool.shutdown(); long end = System.currentTimeMillis() - begin; System.out.println(String.format("任务完成时间:%s", end)); } public static void writeLog(String str, Object... values) { FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false); } public static String tm(String strSource, Object... values) { if (strSource == null) { return null; } StringBuilder builder = new StringBuilder(strSource); final String prefix = ":"; for (int index = 0; index < values.length; index++) { String value = values[index].toString(); if (value == null) { continue; } String key = new StringBuilder(prefix).append(index).toString(); int i = -1; if ((i = builder.indexOf(key, i)) > -1) { int len = key.length(); builder.replace(i, i + len, value); } } return builder.toString(); }
}
class WriteSqlHandle
{
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();WriteLock writeLock = readWriteLock.writeLock(); List<String> cacheList; BufferedWriter bw; int dataCacheNum; public WriteSqlHandle(BufferedWriter bw) { this.bw = bw; cacheList = new ArrayList<String>(); } public WriteSqlHandle(BufferedWriter bw, int dataCacheNum) { this.bw = bw; this.dataCacheNum = dataCacheNum; cacheList = new ArrayList<String>(dataCacheNum); } public boolean add(String sqlStr) { writeLock.lock(); cacheList.add(sqlStr); writeLock.unlock(); return cacheList.size() >= dataCacheNum; } public void save() throws Exception { writeLock.lock(); long begin = System.currentTimeMillis(); System.out.println(String.format("%s,准备消费 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } long end = System.currentTimeMillis() - begin; System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), end, cacheList.size())); cacheList.clear(); //清空数据. writeLock.unlock(); } public void flush() throws Exception { System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size())); cacheList.clear(); //清空数据 closeWrite(); } private void closeWrite() throws Exception { bw.flush(); bw.close(); }
}
interface CallBack2
{
void call(int num, String str);
}解决 无用评论 打赏 举报
悬赏问题
- ¥15 #MATLAB仿真#车辆换道路径规划
- ¥15 java 操作 elasticsearch 8.1 实现 索引的重建
- ¥15 数据可视化Python
- ¥15 要给毕业设计添加扫码登录的功能!!有偿
- ¥15 kafka 分区副本增加会导致消息丢失或者不可用吗?
- ¥15 微信公众号自制会员卡没有收款渠道啊
- ¥15 stable diffusion
- ¥100 Jenkins自动化部署—悬赏100元
- ¥15 关于#python#的问题:求帮写python代码
- ¥20 MATLAB画图图形出现上下震荡的线条