帮忙看看在Elasticsearch Scroll分页获取数据的过程中,丢失数据。。。代码如下:
private void saveDataFromEsToDB(Client client, String indexName, String type, long bgMillis, long edMillis, String statDate, int hour) {
LOGGER.info("[select count(*), sum(exps_num) from " + indexName + "/" + type
+ " where time >= {} and time < {} and p_id <> ''] begin...", bgMillis, edMillis);
SearchResponse searchResponse = client.prepareSearch(indexName)
.setTypes(type)
.setSearchType(SearchType.SCAN)
.setScroll(TimeValue.timeValueMillis(BATCH_MILLIS))
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.boolFilter()
.mustNot(FilterBuilders.termFilter("p_id", ""))
.must(FilterBuilders.rangeFilter("time").gte(bgMillis).lt(edMillis))))
.addFields("p_id", "fst_cate_id", "exps_num")
.setSize(BATCH_NUM)
.get();
LOGGER.info("searchResponse total hits is:{}...", searchResponse.getHits().getTotalHits());
SearchHit[] hits = searchResponse.getHits().getHits();
LOGGER.info("searchResponse hits length is:{}...", hits.length);
do{
List<Map<String, SearchHitField>> datas = new ArrayList<>();
for (SearchHit searchHit : hits) {
Map<String, SearchHitField> searchHitField = searchHit.getFields();
if(MapUtils.isEmpty(searchHitField)){
LOGGER.warn("searchHitField is isEmpty...");
continue;
}
datas.add(searchHitField);
if(datas.size() % BATCH_INSERT_NUM == 0) {
try {
LOGGER.info("batchInsertOrUpdate begin...");
sttRealTimeExposureOutGmvZidDao.batchInsertOrUpdate(statDate, hour, datas);
datas.clear();
LOGGER.info("batchInsertOrUpdate end...");
}catch (Exception e){
LOGGER.error("batchInsertOrUpdate error...", e);
}
}
}
if(datas.size() > 0){
try {
LOGGER.info("batchInsertOrUpdate last begin, size:{}...", datas.size());
sttRealTimeExposureOutGmvZidDao.batchInsertOrUpdate(statDate, hour, datas);
datas.clear();
LOGGER.info("batchInsertOrUpdate last end...");
}catch (Exception e){
LOGGER.error("batchInsertOrUpdate last error...", e);
}
}
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMillis(BATCH_MILLIS))
.execute().actionGet();
hits = searchResponse.getHits().getHits();
LOGGER.info("searchResponse hits length is:{}...", hits.length);
}while (hits.length != 0);
client.prepareClearScroll();
LOGGER.info("[select count(*), sum(exps_num) from " + indexName + "/" + type
+ " where time >= {} and time < {} and p_id <> ''] end...", bgMillis, edMillis);
}
部分日志如下:
service.SttRealTimeExposureOutGmvZidService - searchResponse total hits is:927275...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:0...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:120000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:110000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:110000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:110000...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:80035...
service.SttRealTimeExposureOutGmvZidService - searchResponse hits is:0...
数据差距
total hits:927275
但是下面打印的日志加起来:890035
丢失了:37240