WayneSlytherin 2022-08-03 17:05 采纳率: 100%
浏览 88
已结题

关于ElasticSearch7.x 异步并发读写的一些疑问

背景

最近项目组准备重构会员系统,搜索引擎选定为elasticSearch7.6.1。在业务编码中遇到了并发读写的问题。在springboot中使用RestHighLevelClient客户端对ES进行读写。在并发环境下,即便对ES的”写请求“已经通过restHighLevelClient发送给ES,同时也收到了响应成功,但实时上ES似乎也是对这个请求做了异步处理,在响应成功的同时可能写操作并没有执行完毕。

以下是我的代码验证

这里我删除一条记录覆盖到ES中

public synchronized void delTagcRela(T_MS_TAGC_RELA tar ,Integer hrId) throws IOException {
        Integer msSgId = tar.getMsSgId();
        Integer msTagId = tar.getMsTagId();
        VT_MS_SHOPGUI_INFO queryParam = new VT_MS_SHOPGUI_INFO();
        queryParam.setShopGuideHrid(hrId);
        queryParam.setMsSgId(msSgId);
        List<Map<String, Object>> rsts = elasticSearchUtil.searchMemberInner(queryParam);
        Map<String, Object> rst = rsts.get(0);
        ArrayList<HashMap<String,Object>> tags = (ArrayList) rst.get("tags");
        if(tags==null||tags.size()==0){
            throw new BusinessRuntimeException("未查到相关标签绑定记录");
        }else{
            HashMap<String,Object> oldItem = new HashMap<String,Object>();//待删
            JSONObject paramItem = JSONObject.parseObject(JSON.toJSONString(tar));
            for (HashMap<String,Object> item : tags) {
                if (msTagId==item.get("msTagId"))
                    oldItem = item;//定位
            }
            tags.remove(oldItem);
            rst.put("tags",tags);
            long startTime = System.currentTimeMillis();
            Boolean sucFlag = elasticSearchUtil.addMemberToEs(JSONObject.parseObject(JSON.toJSONString(rst)), msSgId.toString());
            boolean continueFlag = true;
            while (continueFlag){
                //判断修改是否执行成功
                if (sucFlag){
                    List<Map<String, Object>> theInfos = elasticSearchUtil.searchMemberInner(queryParam);
                    Map<String, Object> theInfo = theInfos.get(0);
                    ArrayList<HashMap<String,Object>> losTags = (ArrayList) theInfo.get("tags");
                    System.err.println("***剩下的标签:"+losTags);
                    if (losTags.size()==0)
                        continueFlag = false;
                }
            }
            long endTime = System.currentTimeMillis();
            System.err.println("耗时"+(endTime-startTime)+"ms");

        }
    }

elasticSearchUtil部分方法

/**
     * 内部调用查客户信息
     * @param tar
     * @return
     * @throws IOException
     */
    public synchronized List<Map<String,Object>> searchMemberInner(VT_MS_SHOPGUI_INFO tar) throws IOException{
        Integer hrId = tar.getShopGuideHrid();
        Integer msSgId = tar.getMsSgId();
        //条件搜索
        SearchRequest searchRequest = new SearchRequest("shopguide_business_info");
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (hrId!=null){
            QueryBuilder queryBuilderSGH = QueryBuilders.matchQuery("shopGuideHrid",hrId);//导购id
            if (msSgId!=null){
                QueryBuilder queryBuilderSgId = QueryBuilders.matchQuery("msSgId",msSgId);//导购前置标识
                boolQueryBuilder.must(queryBuilderSgId);
            }
            boolQueryBuilder.must(queryBuilderSGH);
        }
        sourceBuilder.query(boolQueryBuilder);
        //设置请求超时时间
        sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
        //执行搜索
        searchRequest.source(sourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        //解析结果
        List<Map<String,Object>> list = new ArrayList<>();
        for (SearchHit hit : searchResponse.getHits().getHits()) {
            list.add(hit.getSourceAsMap());
        }
        return list;
    }

/**
     * 导购私有会员数据写入ES
     * @param inJson
     * @param docId
     * @return
     */
    public synchronized Boolean addMemberToEs(JSONObject inJson,String docId){
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout("2m");//超时时间2min
        bulkRequest.add(new IndexRequest("shopguide_business_info").source(inJson, XContentType.JSON).id(docId));
        BulkResponse bulkResponse = null;
        try {
            bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("数据写入ES失败:"+e.getMessage());
            throw new BusinessRuntimeException(e.getMessage().length()>30?"服务异常":e.getMessage());
        }
        return !bulkResponse.hasFailures();
    }

运行结果及报错内容

测试过程中,不考虑效率,使用重量级锁的情况下,发现在bulkResponse.hasFailures()返回成功后的292ms才真正的查到了修改后的结果:

img


操作应该删掉的就是[{msSgId=58, msTagcRelaId=35, msTagId=52}]中的json

BulkResponse相关源码

似乎响应成功并不代表修改成功

/**
     * Has anything failed with the execution.
     */
    public boolean hasFailures() {
        for (BulkItemResponse response : responses) {
            if (response.isFailed()) {
                return true;
            }
        }
        return false;
    }

我想要达到的结果

综上,如果是一组有顺序要求的逻辑数据并发写入,就会出现在ES没有修改完成就释放了锁而下一个请求又按既定逻辑读写的情况,进而影响业务的正常运行。
目前我能想到的解决方案就是在写后做循环复读(验证业务数据或es数据的version),验证是否真的写入成功,验证后再释放同步锁。
在此想请教各位是否还有更优雅的解决方案,对于以上陈述如有谬误也希望各位斧正,感激不尽。

  • 写回答

2条回答 默认 最新

  • 三千烦恼丝xzh 2022-08-03 19:05
    关注

    这不是并发的问题好吧,你不看elasticsearch的基本介绍吗,近实时搜索引擎!数据的可见性取决于索引刷盘间隔,正常设置1s刷一次你的写入最差的情况下要1秒后刷盘才可能被检索,但是写入可以通过参数控制强制刷盘来达到立即可见,如果你用的spring的elasticsearchTemplate和elasticsearchRepository默认是会加参数强制刷盘的

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论 编辑记录
查看更多回答(1条)

报告相同问题?

问题事件

  • 系统已结题 8月12日
  • 已采纳回答 8月4日
  • 修改了问题 8月3日
  • 修改了问题 8月3日
  • 展开全部

悬赏问题

  • ¥15 echarts动画效果失效的问题。官网下载的例子。
  • ¥60 许可证msc licensing软件报错显示已有相同版本软件,但是下一步显示无法读取日志目录。
  • ¥15 Attention is all you need 的代码运行
  • ¥15 一个服务器已经有一个系统了如果用usb再装一个系统,原来的系统会被覆盖掉吗
  • ¥15 使用esm_msa1_t12_100M_UR50S蛋白质语言模型进行零样本预测时,终端显示出了sequence handled的进度条,但是并不出结果就自动终止回到命令提示行了是怎么回事:
  • ¥15 前置放大电路与功率放大电路相连放大倍数出现问题
  • ¥30 关于<main>标签页面跳转的问题
  • ¥80 部署运行web自动化项目
  • ¥15 腾讯云如何建立同一个项目中物模型之间的联系
  • ¥30 VMware 云桌面水印如何添加