qq_34006675 2016-06-23 03:18
浏览 395

web service 通过时间取数据 但是之前正常 后来丢失30万条左右数据

web service 通过时间取数据 但是之前正常 后来丢失30万条左右数据
package dao;

import redis.clients.jedis.Jedis;
import util.HttpUtil;
import util.SingletonHelper;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;

import net.sf.json.xml.XMLSerializer;

import org.apache.log4j.Logger;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;

public class QueryAndSave {
private Logger logger = Logger.getLogger(QueryAndSave.class);
SingletonHelper bc = SingletonHelper.getInstance();// 单例
String url = bc.getValue("serverUrl");// keda服务地址
Jedis j = new Jedis(bc.getValue("redis_ip"), Integer.parseInt(bc
.getValue("redis_port")));// redis配置
long ms = Integer.parseInt(bc.getValue("restart_hours"))*10800000;// 规定时间?毫秒后停止并重新获取当前时间并以当前时间为条件查询数据
int cacheSize = Integer.parseInt(bc.getValue("cache_size"));//缓存中的建议条数,超过条数时推入redis
String limit = bc.getValue("recursive_limit");// 每次取数据的条数
String time = txt2String(new File("/home/time.txt")).trim();// 记录开始时间的文本
static Map deviceMap = new HashMap();// 设备map,用以判断设备是电警or卡口
static {
String[] heads = txt2String(new File("/home/head.txt")).split(",");
String[] backs = txt2String(new File("/home/back.txt")).split(",");
for (String s : heads) {
deviceMap.put(s.trim(), "1");
}
for (String s : backs) {
deviceMap.put(s.trim(), "2");
}
}
Map resultMap = new HashMap();
XMLSerializer xml = new XMLSerializer();
int c = 0;
int all = 0;

@SuppressWarnings("unchecked")
public void doCircle(int start) {
    Document doc = null;
    Element resultSet = null;
    Iterator<Element> records = null;
    String result = "";// 接口返回的数据
    Long begin = new Date().getTime();// 开始查询的时间

    while (true) {
        try {
            Date now = new Date();// 当前时间now-开始时间begin超过规定时间ms时,写入当前时间到记录时间的文本并停止循环
            Date nowtime = new Date(now.getTime() - 12000000);
            if (ms < (nowtime.getTime() - begin)) {
                FileWriter fw = new FileWriter("/home/time.txt", false);
                BufferedWriter bw = new BufferedWriter(fw);
                bw.write(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(nowtime));
                if(resultMap.size() > 0){
                    System.out.println("pushing into redis...("+resultMap.size()+")");
                    for(String s : resultMap.keySet()){
                        j.lpush("_jobs", resultMap.get(s));
                    }
                    c += resultMap.size();
                    resultMap.clear();
                }
                bw.close();
                fw.close();
                break;
            }
            nowtime = null;
            // 查询接口及字段名更改
            result = doQuery(start).replace("imageURL", "image_url")
                    .replace("PassTime", "capture_time")
                    .replace("VehicleType", "plate_type_id")
                    .replace("TollgateID", "location_id")
                    .replace("VehicleChannel", "lane_id")
                    .replace("VehiclePlate", "plate_number")
                    .replace("VehicleSpeed", "speed")
                    .replace("Direction", "direction_id");
            doc = DocumentHelper.parseText(result);
            resultSet = doc.getRootElement().element("ResultSet");
            // 处理空值
            if (null == resultSet) {
                System.out.println("Empty result,sleeping...(60s)");
                Thread.sleep(60000);
                continue;
            }
            // 获取返回xml中的Record内的元素转换位json格式存入redis
            records = resultSet.elementIterator("Record");
            while (records.hasNext()) {
                Element record_ = records.next();
                Element e = record_.addElement("unit_id");
                e.setText("1");
                e = record_.addElement("thirdpart_id");
                e.setText("1");
                e = record_.element("location_id");
                String id = e.getTextTrim();
                e = record_.addElement("device_id");
                e.setText(id);
                e = record_.addElement("source_id");
                e.setText(deviceMap.get(id) == null ? "0" : deviceMap.get(id));
                Iterator<Element> images = record_.element("Images").elementIterator("Images");
                if (images.hasNext()) {
                    Element i= images.next();
                    resultMap.put(i.elementText("image_url"), xml.read(record_.asXML()).toString());
                    all++;
                }else{
                    resultMap.put(UUID.randomUUID().toString(), xml.read(record_.asXML()).toString());
                    all++;
                }
                record_ = null;
                e = null;
                start++;
            }
            if(resultMap.size() > 5000){
                System.out.println("pushing into redis...("+resultMap.size()+")");
                for(String s : resultMap.keySet()){
                    j.lpush("_jobs", resultMap.get(s));
                }
                c += resultMap.size();
                resultMap.clear();
            }
            //log total
            if(all % 10000 == 0){
                //log total
                logger.error(">>>total pushed: " + c + "<<<");
                logger.error("***total received: " + all + "***");
                logger.error("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Remote server error,retrying......");
            continue;
        }
    }
}

public String doQuery(int start) {
    Map<String, String> p = null;// 用以保存请求数据
    StringBuffer sb = null;// 拼接请求的buffer
    String res = "";// 接口返回的数据
    p = new HashMap<String, String>();
    sb = new StringBuffer("<?xml version=\"1.0\" encoding=\"UTF-8\"?>")
            .append("<Request>")
            .append("<QueryID>64010000002005000001201602251445260001</QueryID>")
            .append("<MessageType>VehicleInfoQuery</MessageType>")
            .append("<LimitNum>").append(limit).append("</LimitNum>")
            .append("<StartNum>").append(start).append("</StartNum>")
            .append("<Order>asc</Order>").append("<VehiclePlateQuery>")
            .append("<VehiclePlate></VehiclePlate>")
            .append("<PlateColor></PlateColor>")
            .append("<VehicleColor></VehicleColor>")
            .append("<TollgateID></TollgateID>")
            .append("<StartTime></StartTime>")
            .append("<EndTime></EndTime>").append("<InsertTime>")
            .append(time).append("</InsertTime>")
            .append("</VehiclePlateQuery>").append("</Request>");
    p.put("conditionXml", sb.toString());
    try {
        res = HttpUtil.post(url, p);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return res;
}

/*
 * 读取txt文件中的数据
 */
public static String txt2String(File file) {
    String result = "";
    try {
        BufferedReader br = new BufferedReader(new FileReader(file));
        String s = null;
        while ((s = br.readLine()) != null) {
            result = result + "\n" + s;
        }
        br.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return result;
}

}
不知道哪里出问题了

  • 写回答

0条回答

    报告相同问题?

    悬赏问题

    • ¥100 有人会搭建GPT-J-6B框架吗?有偿
    • ¥15 求差集那个函数有问题,有无佬可以解决
    • ¥15 【提问】基于Invest的水源涵养
    • ¥20 微信网友居然可以通过vx号找到我绑的手机号
    • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
    • ¥15 解riccati方程组
    • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
    • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
    • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
    • ¥50 树莓派安卓APK系统签名