在工作中我遇到了这样一个场景:公司内部使用了两个 OLAP 引擎/数据库,一个是 ElasticSearch,存储原始数据;一个是 Doris,对ES中的数据按天进行了统计,因此查询效率会比 ES 好。现在有个BI工具页面有一堆图表,如果调用后端查询ES接口的参数中带有关键词检索则通过网关转发到ES查询,如果不带关键词,则重写路径地址,路由到一个查询 Doris 查询统计好的数据的接口,以实现性能和功能的平衡。
为了应对这个需求,我在 Spring Gateway 网关中继承了 AbstractGatewayFilterFactory 自定义了一个过滤器,当解析到参数列表中指定字段不包含关键词时重写路径地址,转发到另一个查询 Doris 的接口。这个自定义过滤器的实现的核心代码如下:
@Override
public GatewayFilter apply(Config config) {
String replacement = config.replacement.replace("$\\", "$");
return (exchange, chain) -> {
ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders());
ServerWebExchange.Builder builder = exchange.mutate();
return serverRequest.bodyToMono(JSONObject.class).doOnNext(jsonObject -> {
log.info("jsonObject:{}", jsonObject);
//判断参数列表中是否包含关键词,如果不包含重写源查询ES接口的路径,使其指向 Doris 库的接口
boolean matchAnalysisFields = matchAnalysisFields(jsonObject);
log.info("matchAnalysisFields:{}", matchAnalysisFields);
if (!matchAnalysisFields) {
ServerHttpRequest req = exchange.getRequest();
addOriginalRequestUrl(exchange, req.getURI());
String path = req.getURI().getRawPath();
String newPath;
log.info("path:{},replacement:{},config.regexp:{}", path, replacement, config.regexp);
if (StrUtil.isNotBlank(replacement) && !"null".equals(replacement)) {
newPath = path.replaceAll(config.regexp, replacement);
} else {
newPath = path;
}
log.info("source url:{},new path:{}", path, newPath);
ServerHttpRequest request = req.mutate().path(newPath).build();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI());
builder.request(request);
} else {
builder.request(exchange.getRequest());
}
}).then(Mono.defer(() -> {
String rawPath = builder.build().getRequest().getURI().getRawPath();
log.info("rawPath:{}", rawPath);
return chain.filter(builder.build());
}));
};
}
但是加了这个过滤器后直接导致了整个请求阻塞,最终超时被中断。以下是返回的异常信息:
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP POST "/serv/v2/api/cV8m/shequndongchav2/volumfenbuv2" [ExceptionHandlingWebHandler]
Stack trace:
我查找了很多资料,也做了很多尝试,依旧没有解决。
按我最初思路,如果是传统的命令式编程获取参数列表以后通过 matchAnalysisFields() 方法判断参数中是否含有关键词然后写对应的处理逻辑。但 serverRequest.bodyToMono(JSONObject.class) 返回的是一个 Mono 类型的对象,而 Mono 是一个非阻塞的异步调用,在主流程里没办法拿到 matchAnalysisFields(jsonObject) 的布尔值并进行接下来的逻辑处理。除了上面这种方式的实现外,我还尝试了使用Mono.subscribe(Consumer<? super T> consumer) 将一个布尔类型局部变量传入赋值等方式,均会造成阻塞,不知道原因。