Fork me on GitHub

dubbo超时机制

预备知识

  • 针对服务端超时,dubb之后在Filter中进行超时的日志输出,但不会阻碍执行。
  • 针对客户端超时,会创建DefaultFuture的时候创建一个基于时间轮的定时任务,在客户端扫描,如果超过了超时时间,则构建一个超时异常让Future返回。

客户端如何知道请求失败?

RPC采用Netty作为底层通讯框架,非阻塞通信方式更高效,非阻塞的特性导致发送数据和接收数据是一个异步的过程,当存在服务端异常、网络问题时(比如超时)客户端是收不到响应的,如何判断一次RPC调用是失败的?

误区一:Dubbo是同步调用的吗?

dubbo在通讯层面因为采用Netty是异步的,呈现给调用者错觉是内部做了阻塞等待,实现了异步转同步。

误区二:channel.writeAndFlush返回一个channelFuture,channelFuture.isSuccess能判断请求成功了。

writeAndFlush只能表示网络缓存区写入成功,不代表发送成功。

那么Dubbo客户端感知到请求失败得自己去造了。

客户端的超时感知

客户端发起一个RPC请求时,会设置一个超时时间,发起调用的时候会开启一个定时器:

  • 如果收到正常响应,删除这个定时器。
  • 定时器倒计时完毕,会被认为超时,在客户端构造一个失败的返回。

Dubbo中的超时判定逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
// ThreadlessExecutor needs to hold the waiting future in case of circuit return.
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
// 创建一个Future时候 进行timeout check 内部是创建一个基于时间轮的定时check任务
timeoutCheck(future);
return future;
}

private static void timeoutCheck(DefaultFuture future) {
// 超时时间的检查任务
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}


private static class TimeoutCheckTask implements TimerTask {

private final Long requestID;

TimeoutCheckTask(Long requestID) {
this.requestID = requestID;
}

// 超时检查的逻辑
@Override
public void run(Timeout timeout) {
// 根据requestId从Future缓存中获取future
DefaultFuture future = DefaultFuture.getFuture(requestID);
if (future == null || future.isDone()) {
// 如果future正常返回 则说明没有超时
return;
}
// 否则去响应一个超时
if (future.getExecutor() != null) {
future.getExecutor().execute(() -> notifyTimeout(future));
} else {
notifyTimeout(future);
}
}

private void notifyTimeout(DefaultFuture future) {
// create exception response.
// 客户端在超时之后创建一个超时的返回
Response timeoutResponse = new Response(future.getId());
// set timeout status.
// 根据future的isSent确定状态是客户端响应超时 还是 服务端响应超时
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
// 响应超时
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
}
}

超时即调用失败,一次 RPC 调用的失败,必须以客户端收到失败响应为准

这里关键信息在dubbo发送请求相关代码中。

1
2
3
4
5
6
7
8
9
发起RPC调用 
--> 客户端代理对象
--> 服务目录/负载均衡选择一个可执行的Invoker
-->Invoker客户端Filter链
-->AsyncToSyncInvoker(路由请求)
--> DubboInvoker(委托NettyClient发送请求)
--> HeaderExchangeChannel.request(发送请求 创建一个DefaltFuture)
--> DefaultFuture创建 时会注册一个定时任务检测超时
--> channel.send 真正去发送请求

服务端的超时判断

在服务端,超时是通过Filter机制来完成的。具体代码是在TimeoutFilter中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Activate(group = CommonConstants.PROVIDER)
public class TimeoutFilter implements Filter, Filter.Listener {

private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}

// 执行完Invoker.invoke之后的回调
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// 拿到写在上下文中的timeout统计
Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
if (obj != null) {
TimeoutCountDown countDown = (TimeoutCountDown) obj;
if (countDown.isExpired()) {
// 如果超时 结果调用clear()方法清除
((AppResponse) appResponse).clear(); // clear response in case of timeout.
if (logger.isWarnEnabled()) {
// 打印warn日志
logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
", invoke elapsed " + countDown.elapsedMillis() + " ms.");
}
}
}
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {

}
}

Filter.Listener接口中的onResponse方法可以认为是在执行完此filter中的invoke方法之后,获取到的Result提供了回调机制,能触发正常返回和异常的两个回调函数。(代码在ProtocolFilterWrapper中)

可以看到这里就是在调用正常返回之后触发了一个检查,如果服务端执行超时,则会打印一个warn日志。

-------------本文结束感谢您的阅读-------------

本文标题:dubbo超时机制

文章作者:夸克

发布时间:2021年04月02日 - 04:04

最后更新:2022年07月02日 - 04:07

原始链接:https://zhanglijun1217.github.io/2021/04/02/dubbo超时机制/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。