web service 通过时间取数据 但是之前正常 后来丢失30万条左右数据
package dao;
import redis.clients.jedis.Jedis;
import util.HttpUtil;
import util.SingletonHelper;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import net.sf.json.xml.XMLSerializer;
import org.apache.log4j.Logger;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
public class QueryAndSave {
private Logger logger = Logger.getLogger(QueryAndSave.class);
SingletonHelper bc = SingletonHelper.getInstance();// 单例
String url = bc.getValue("serverUrl");// keda服务地址
Jedis j = new Jedis(bc.getValue("redis_ip"), Integer.parseInt(bc
.getValue("redis_port")));// redis配置
long ms = Integer.parseInt(bc.getValue("restart_hours"))*10800000;// 规定时间?毫秒后停止并重新获取当前时间并以当前时间为条件查询数据
int cacheSize = Integer.parseInt(bc.getValue("cache_size"));//缓存中的建议条数,超过条数时推入redis
String limit = bc.getValue("recursive_limit");// 每次取数据的条数
String time = txt2String(new File("/home/time.txt")).trim();// 记录开始时间的文本
static Map deviceMap = new HashMap();// 设备map,用以判断设备是电警or卡口
static {
String[] heads = txt2String(new File("/home/head.txt")).split(",");
String[] backs = txt2String(new File("/home/back.txt")).split(",");
for (String s : heads) {
deviceMap.put(s.trim(), "1");
}
for (String s : backs) {
deviceMap.put(s.trim(), "2");
}
}
Map resultMap = new HashMap();
XMLSerializer xml = new XMLSerializer();
int c = 0;
int all = 0;
@SuppressWarnings("unchecked")
public void doCircle(int start) {
Document doc = null;
Element resultSet = null;
Iterator<Element> records = null;
String result = "";// 接口返回的数据
Long begin = new Date().getTime();// 开始查询的时间
while (true) {
try {
Date now = new Date();// 当前时间now-开始时间begin超过规定时间ms时,写入当前时间到记录时间的文本并停止循环
Date nowtime = new Date(now.getTime() - 12000000);
if (ms < (nowtime.getTime() - begin)) {
FileWriter fw = new FileWriter("/home/time.txt", false);
BufferedWriter bw = new BufferedWriter(fw);
bw.write(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(nowtime));
if(resultMap.size() > 0){
System.out.println("pushing into redis...("+resultMap.size()+")");
for(String s : resultMap.keySet()){
j.lpush("_jobs", resultMap.get(s));
}
c += resultMap.size();
resultMap.clear();
}
bw.close();
fw.close();
break;
}
nowtime = null;
// 查询接口及字段名更改
result = doQuery(start).replace("imageURL", "image_url")
.replace("PassTime", "capture_time")
.replace("VehicleType", "plate_type_id")
.replace("TollgateID", "location_id")
.replace("VehicleChannel", "lane_id")
.replace("VehiclePlate", "plate_number")
.replace("VehicleSpeed", "speed")
.replace("Direction", "direction_id");
doc = DocumentHelper.parseText(result);
resultSet = doc.getRootElement().element("ResultSet");
// 处理空值
if (null == resultSet) {
System.out.println("Empty result,sleeping...(60s)");
Thread.sleep(60000);
continue;
}
// 获取返回xml中的Record内的元素转换位json格式存入redis
records = resultSet.elementIterator("Record");
while (records.hasNext()) {
Element record_ = records.next();
Element e = record_.addElement("unit_id");
e.setText("1");
e = record_.addElement("thirdpart_id");
e.setText("1");
e = record_.element("location_id");
String id = e.getTextTrim();
e = record_.addElement("device_id");
e.setText(id);
e = record_.addElement("source_id");
e.setText(deviceMap.get(id) == null ? "0" : deviceMap.get(id));
Iterator<Element> images = record_.element("Images").elementIterator("Images");
if (images.hasNext()) {
Element i= images.next();
resultMap.put(i.elementText("image_url"), xml.read(record_.asXML()).toString());
all++;
}else{
resultMap.put(UUID.randomUUID().toString(), xml.read(record_.asXML()).toString());
all++;
}
record_ = null;
e = null;
start++;
}
if(resultMap.size() > 5000){
System.out.println("pushing into redis...("+resultMap.size()+")");
for(String s : resultMap.keySet()){
j.lpush("_jobs", resultMap.get(s));
}
c += resultMap.size();
resultMap.clear();
}
//log total
if(all % 10000 == 0){
//log total
logger.error(">>>total pushed: " + c + "<<<");
logger.error("***total received: " + all + "***");
logger.error("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("Remote server error,retrying......");
continue;
}
}
}
public String doQuery(int start) {
Map<String, String> p = null;// 用以保存请求数据
StringBuffer sb = null;// 拼接请求的buffer
String res = "";// 接口返回的数据
p = new HashMap<String, String>();
sb = new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>")
.append("<Request>")
.append("<QueryID>64010000002005000001201602251445260001</QueryID>")
.append("<MessageType>VehicleInfoQuery</MessageType>")
.append("<LimitNum>").append(limit).append("</LimitNum>")
.append("<StartNum>").append(start).append("</StartNum>")
.append("<Order>asc</Order>").append("<VehiclePlateQuery>")
.append("<VehiclePlate></VehiclePlate>")
.append("<PlateColor></PlateColor>")
.append("<VehicleColor></VehicleColor>")
.append("<TollgateID></TollgateID>")
.append("<StartTime></StartTime>")
.append("<EndTime></EndTime>").append("<InsertTime>")
.append(time).append("</InsertTime>")
.append("</VehiclePlateQuery>").append("</Request>");
p.put("conditionXml", sb.toString());
try {
res = HttpUtil.post(url, p);
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
/*
* 读取txt文件中的数据
*/
public static String txt2String(File file) {
String result = "";
try {
BufferedReader br = new BufferedReader(new FileReader(file));
String s = null;
while ((s = br.readLine()) != null) {
result = result + "\n" + s;
}
br.close();
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
}
不知道哪里出问题了