做超时重试这块
long handleTime = 0;
boolean checkPass = false;
for (int i = 0; i < retries; i++) {
if (handleTime >= timeout) {
// 超时重试
rpcRequest.setReSend(Boolean.TRUE);
log.warn("call service timeout and retry to call [ rms: {}, tms: {} ]", handleTime, timeout);
log.info("retry counts:{}", retryCounts.incrementAndGet());
}
long startTime = System.currentTimeMillis();
log.info("request call count: {}", count.incrementAndGet());
log.info("start calling remote service [requestId: {}, serviceMethod: {}]", rpcRequest.getRequestId(), rpcRequest.getMethodName());
CompletableFuture<RpcResponse> completableFuture = (CompletableFuture<RpcResponse>) rpcClient.sendRequest(rpcRequest);
try {
rpcResponse = completableFuture.get(asyncTime, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// 忽视 超时引发的异常,自行处理,防止程序中断
log.warn("recommend that asyncTime [ {} ] should be greater than current task runeTime [ {} ]", asyncTime, System.currentTimeMillis() - startTime);
continue;
}
long endTime = System.currentTimeMillis();
handleTime = endTime - startTime;
if (handleTime < timeout) {
// 没有超时不用再重试
// 进一步校验包
checkPass = RpcMessageChecker.check(rpcRequest, rpcResponse);
if (checkPass) {
log.info("success counts:{}", successCounts.incrementAndGet());
log.info("client call success [ rms: {}, tms: {} ]", handleTime, timeout);
return rpcResponse;
}
// 包被 劫持触发 超时重发机制 保护重发
}
}
log.info("client call failed [ rms: {}, tms: {} ]", handleTime, timeout);
log.info("fatal counts:{}", fatalCounts.incrementAndGet());
// 客户端在这里无法探知是否成功收到服务器响应,只能确定该请求包 客户端已经抛弃了
unprocessedRequests.remove(rpcRequest.getRequestId());
throw new RetryTimeoutException("The retry call timeout exceeds the threshold, the channel is closed, the thread is interrupted, and an exception is forced to be thrown!");
犹如上面代码,进行调用远程服务时,会出现三种情况,分别为 重试、成功和失败。
这三种现象理应都会调用到 rpcClient.sendRequest(rpcRequest);
但经过大量测试,使用复用线程池执行500个任务,或单独隔离的500个线程去并发测试调用,调用 rpcClient.sendRequest(rpcRequest);次数总大于超时重试、成功和失败的情况总和。
单线程情况下没问题,多线程情况下,每个线程调用的该方法应该都是私有隔离的,而且变量都是局部变量。
多出来的情况按理就是超时才有可能触发调用的,跟服务端应该关系不大。