念Nian 2017-09-21 08:47 采纳率: 0%
浏览 3687

使用Elasticsearch Scroll分页数据丢失

帮忙看看在Elasticsearch Scroll分页获取数据的过程中,丢失数据。。。代码如下:

private void saveDataFromEsToDB(Client client, String indexName, String type, long bgMillis, long edMillis, String statDate, int hour) {
        LOGGER.info("[select count(*), sum(exps_num) from " + indexName + "/" + type
                + " where time >= {} and time < {} and p_id <> ''] begin...", bgMillis, edMillis);

        SearchResponse searchResponse = client.prepareSearch(indexName)
                .setTypes(type)
                .setSearchType(SearchType.SCAN)
                .setScroll(TimeValue.timeValueMillis(BATCH_MILLIS))
                .setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.boolFilter()
                        .mustNot(FilterBuilders.termFilter("p_id", ""))
                        .must(FilterBuilders.rangeFilter("time").gte(bgMillis).lt(edMillis))))
                .addFields("p_id", "fst_cate_id", "exps_num")
                .setSize(BATCH_NUM)
                .get();

        LOGGER.info("searchResponse total hits is:{}...", searchResponse.getHits().getTotalHits());
        SearchHit[] hits = searchResponse.getHits().getHits();
        LOGGER.info("searchResponse hits length is:{}...", hits.length);
        do{
            List<Map<String, SearchHitField>> datas = new ArrayList<>();
            for (SearchHit searchHit : hits) {
                Map<String, SearchHitField> searchHitField = searchHit.getFields();
                if(MapUtils.isEmpty(searchHitField)){
                    LOGGER.warn("searchHitField is isEmpty...");
                    continue;
                }

                datas.add(searchHitField);
                if(datas.size() % BATCH_INSERT_NUM == 0) {
                    try {
                        LOGGER.info("batchInsertOrUpdate begin...");
                        sttRealTimeExposureOutGmvZidDao.batchInsertOrUpdate(statDate, hour, datas);
                        datas.clear();
                        LOGGER.info("batchInsertOrUpdate end...");
                    }catch (Exception e){
                        LOGGER.error("batchInsertOrUpdate error...", e);
                    }
                }
            }

            if(datas.size() > 0){
                try {
                    LOGGER.info("batchInsertOrUpdate last begin, size:{}...", datas.size());
                    sttRealTimeExposureOutGmvZidDao.batchInsertOrUpdate(statDate, hour, datas);
                    datas.clear();
                    LOGGER.info("batchInsertOrUpdate last end...");
                }catch (Exception e){
                    LOGGER.error("batchInsertOrUpdate last error...", e);
                }
            }

            searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                    .setScroll(TimeValue.timeValueMillis(BATCH_MILLIS))
                    .execute().actionGet();

            hits = searchResponse.getHits().getHits();
            LOGGER.info("searchResponse hits length is:{}...", hits.length);
        }while (hits.length != 0);
        client.prepareClearScroll();

        LOGGER.info("[select count(*), sum(exps_num) from " + indexName + "/" + type
                + " where time >= {} and time < {} and p_id <> ''] end...", bgMillis, edMillis);
    }

部分日志如下:
service.SttRealTimeExposureOutGmvZidService - searchResponse total hits is:927275...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:0...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:110000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:110000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:110000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:80035...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:0...

数据差距
total hits:927275
但是下面打印的日志加起来:890035
丢失了:37240

  • 写回答

2条回答 默认 最新

  • fox_mt 2017-09-21 09:29
    关注

    你确定total hits的搜索条件和你打印的日志的集合的搜索条件相同么?我怎么看着不太一样

    评论

报告相同问题?

悬赏问题

  • ¥100 有人会搭建GPT-J-6B框架吗?有偿
  • ¥15 求差集那个函数有问题,有无佬可以解决
  • ¥15 【提问】基于Invest的水源涵养
  • ¥20 微信网友居然可以通过vx号找到我绑的手机号
  • ¥15 寻一个支付宝扫码远程授权登录的软件助手app
  • ¥15 解riccati方程组
  • ¥15 display:none;样式在嵌套结构中的已设置了display样式的元素上不起作用?
  • ¥15 使用rabbitMQ 消息队列作为url源进行多线程爬取时,总有几个url没有处理的问题。
  • ¥15 Ubuntu在安装序列比对软件STAR时出现报错如何解决
  • ¥50 树莓派安卓APK系统签名