gennon 2021-07-01 01:37 采纳率: 0%
浏览 1015

如何在同步代码中获取到 Spring WebFlux Mono 类型的返回值?

在工作中我遇到了这样一个场景:公司内部使用了两个 OLAP 引擎/数据库,一个是 ElasticSearch,存储原始数据;一个是 Doris,对ES中的数据按天进行了统计,因此查询效率会比 ES 好。现在有个BI工具页面有一堆图表,如果调用后端查询ES接口的参数中带有关键词检索则通过网关转发到ES查询,如果不带关键词,则重写路径地址,路由到一个查询 Doris 查询统计好的数据的接口,以实现性能和功能的平衡。

为了应对这个需求,我在 Spring Gateway 网关中继承了 AbstractGatewayFilterFactory 自定义了一个过滤器,当解析到参数列表中指定字段不包含关键词时重写路径地址,转发到另一个查询 Doris 的接口。这个自定义过滤器的实现的核心代码如下:

@Override
    public GatewayFilter apply(Config config) {
        String replacement = config.replacement.replace("$\\", "$");
        return (exchange, chain) -> {
            ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders());
            ServerWebExchange.Builder builder = exchange.mutate();
            return serverRequest.bodyToMono(JSONObject.class).doOnNext(jsonObject -> {
                log.info("jsonObject:{}", jsonObject);
                //判断参数列表中是否包含关键词,如果不包含重写源查询ES接口的路径,使其指向 Doris 库的接口
                boolean matchAnalysisFields = matchAnalysisFields(jsonObject);
                log.info("matchAnalysisFields:{}", matchAnalysisFields);
                if (!matchAnalysisFields) {
                    ServerHttpRequest req = exchange.getRequest();
                    addOriginalRequestUrl(exchange, req.getURI());
                    String path = req.getURI().getRawPath();
                    String newPath;
                    log.info("path:{},replacement:{},config.regexp:{}", path, replacement, config.regexp);
                    if (StrUtil.isNotBlank(replacement) && !"null".equals(replacement)) {
                        newPath = path.replaceAll(config.regexp, replacement);
                    } else {
                        newPath = path;
                    }
                    log.info("source url:{},new path:{}", path, newPath);
                    ServerHttpRequest request = req.mutate().path(newPath).build();
                    exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI());
                    builder.request(request);
                } else {
                    builder.request(exchange.getRequest());
                }
            }).then(Mono.defer(() -> {
                String rawPath = builder.build().getRequest().getURI().getRawPath();
                log.info("rawPath:{}", rawPath);
                return chain.filter(builder.build());
            }));
        };
    }

但是加了这个过滤器后直接导致了整个请求阻塞,最终超时被中断。以下是返回的异常信息:

reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP POST "/serv/v2/api/cV8m/shequndongchav2/volumfenbuv2" [ExceptionHandlingWebHandler]
Stack trace:

我查找了很多资料,也做了很多尝试,依旧没有解决。

按我最初思路,如果是传统的命令式编程获取参数列表以后通过 matchAnalysisFields() 方法判断参数中是否含有关键词然后写对应的处理逻辑。但 serverRequest.bodyToMono(JSONObject.class) 返回的是一个 Mono 类型的对象,而 Mono 是一个非阻塞的异步调用,在主流程里没办法拿到 matchAnalysisFields(jsonObject) 的布尔值并进行接下来的逻辑处理。除了上面这种方式的实现外,我还尝试了使用Mono.subscribe(Consumer<? super T> consumer) 将一个布尔类型局部变量传入赋值等方式,均会造成阻塞,不知道原因。

  • 写回答

2条回答 默认 最新

  • 有问必答小助手 2021-07-06 16:47
    关注

    你好,我是有问必答小助手,非常抱歉,本次您提出的有问必答问题,技术专家团超时未为您做出解答

    本次提问扣除的有问必答次数,将会以问答VIP体验卡(1次有问必答机会、商城购买实体图书享受95折优惠)的形式为您补发到账户。

    ​​​​因为有问必答VIP体验卡有效期仅有1天,您在需要使用的时候【私信】联系我,我会为您补发。

    评论

报告相同问题?

悬赏问题

  • ¥20 思科:Router c3600 MN-4E插槽
  • ¥15 16进制修改视频的元数据
  • ¥15 岛津txt格式文件转nirs格式
  • ¥15 石墨烯磁表面等离子体
  • ¥15 angular 项目无法启动
  • ¥15 安装wampserver,图标绿色,但是无法进入软件
  • ¥15 C++ MFC 标准库 加密解密解惑
  • ¥15 两条数据合并成一条数据
  • ¥15 Ubuntu虚拟机设置
  • ¥15 comsol三维模型中磁场为什么没有“速度(洛伦兹项)”这一选项