把集合拆成10份,通过十个线程把数据插入同一个freemarker的.ftl文件,保证数据不被覆盖?大神们求助啊!!!
1条回答 默认 最新
- 辽宁吴奇隆 2017-06-22 09:14关注
package test.com.linapex.room;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import com.linapex.common.util.FileUtils;
import com.linapex.common.util.ZhengzeValidate;public class TBuilderRoomSqlFileTool
{
final static int DATACACHENUM = 10000;static int currThreadCount = 0; static int maxThreadCount = 10; static File roomFilterLogFile = new File("roomFilter.log"); static File sqlFile = new File("roomSql.sql"); static File csvFile = new File("D:\\baiduyundownload\\asd\\2000W\\1-200W.csv"); final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');"; public static BufferedWriter initSQLWrite() throws Exception { if (sqlFile.exists()) { sqlFile.delete(); if (!sqlFile.createNewFile()) { System.err.println("创建文件失败:" + sqlFile.getAbsolutePath()); } } return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8")); } public static void loadCSV(CallBack2 callBack) throws Exception { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(csvFile)); String str = null; int num = 0; while ((str = reader.readLine()) != null) { num++; callBack.call(num, str); } } finally { reader.close(); } } public static void main(String[] args) throws Exception { final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount); final List<Future<Object>> threadResultList = new ArrayList<Future<Object>>(); final WriteSqlHandle writeSqlFile = new WriteSqlHandle(initSQLWrite(), DATACACHENUM); long begin = System.currentTimeMillis(); loadCSV(new CallBack2() { @Override public void call(int num, String str) { String[] strs = str.split(","); if (strs.length < 8) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return; } String name = strs[0].trim(); if (!ZhengzeValidate.isChina(name)) { writeLog("此条数据不录入::0", Arrays.toString(strs)); return; } try { String card = strs[4]; String gender = strs[5]; String birthday = strs[6]; String address = strs[7]; String zip = strs[8]; String mobile = strs[20]; String email = strs[22]; String version = strs[31]; //生成sql语句 final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version); //添加数据,如果超出了缓存数据,则 开始写入文件系统 if (writeSqlFile.add(tempSql)) { currThreadCount++; //如果提交的线程过多,则取回之后再提交. if (currThreadCount >= maxThreadCount) { System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); for (Future<Object> fs : threadResultList) { try { fs.get(); currThreadCount--; System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount)); } catch (Exception e) { e.printStackTrace(); } } threadResultList.clear(); //清空 currThreadCount = threadResultList.size(); System.out.println(String.format("重新开始提交线程 当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount)); } Future future = threadPool.submit(new Runnable() { @Override public void run() { try { writeSqlFile.save(); } catch (Exception e) { e.printStackTrace(); } } }); threadResultList.add(future); // System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num)); } } catch (Exception e) { writeLog("录入错误的数据::0", Arrays.toString(strs)); writeLog("错误的原因::0", e.getMessage()); } } }); writeSqlFile.flush(); threadPool.shutdown(); long end = System.currentTimeMillis() - begin; System.out.println(String.format("任务完成时间:%s", end)); } public static void writeLog(String str, Object... values) { FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false); } public static String tm(String strSource, Object... values) { if (strSource == null) { return null; } StringBuilder builder = new StringBuilder(strSource); final String prefix = ":"; for (int index = 0; index < values.length; index++) { String value = values[index].toString(); if (value == null) { continue; } String key = new StringBuilder(prefix).append(index).toString(); int i = -1; if ((i = builder.indexOf(key, i)) > -1) { int len = key.length(); builder.replace(i, i + len, value); } } return builder.toString(); }
}
class WriteSqlHandle
{
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();WriteLock writeLock = readWriteLock.writeLock(); List<String> cacheList; BufferedWriter bw; int dataCacheNum; public WriteSqlHandle(BufferedWriter bw) { this.bw = bw; cacheList = new ArrayList<String>(); } public WriteSqlHandle(BufferedWriter bw, int dataCacheNum) { this.bw = bw; this.dataCacheNum = dataCacheNum; cacheList = new ArrayList<String>(dataCacheNum); } public boolean add(String sqlStr) { writeLock.lock(); cacheList.add(sqlStr); writeLock.unlock(); return cacheList.size() >= dataCacheNum; } public void save() throws Exception { writeLock.lock(); long begin = System.currentTimeMillis(); System.out.println(String.format("%s,准备消费 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } long end = System.currentTimeMillis() - begin; System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), end, cacheList.size())); cacheList.clear(); //清空数据. writeLock.unlock(); } public void flush() throws Exception { System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size())); for (String str : cacheList) { bw.write(str + "\r\n"); } System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size())); cacheList.clear(); //清空数据 closeWrite(); } private void closeWrite() throws Exception { bw.flush(); bw.close(); }
}
interface CallBack2
{
void call(int num, String str);
}解决 无用评论 打赏 举报
悬赏问题
- ¥15 seatunnel-web使用SQL组件时候后台报错,无法找到表格
- ¥15 fpga自动售货机数码管(相关搜索:数字时钟)
- ¥15 用前端向数据库插入数据,通过debug发现数据能走到后端,但是放行之后就会提示错误
- ¥30 3天&7天&&15天&销量如何统计同一行
- ¥30 帮我写一段可以读取LD2450数据并计算距离的Arduino代码
- ¥15 飞机曲面部件如机翼,壁板等具体的孔位模型
- ¥15 vs2019中数据导出问题
- ¥20 云服务Linux系统TCP-MSS值修改?
- ¥20 关于#单片机#的问题:项目:使用模拟iic与ov2640通讯环境:F407问题:读取的ID号总是0xff,自己调了调发现在读从机数据时,SDA线上并未有信号变化(语言-c语言)
- ¥20 怎么在stm32门禁成品上增加查询记录功能