Fork me on GitHub

dubbo异步调用原理

dubbo支持异步调用,在2.7版本之后引入CompletableFuture来优化了异步调用。当然在2.6版本也支持异步调用。基本原理是一样的,只不过2.7版本支持了设置CompletableFuture来回调,细节上有些差别。

异步调用基本使用

2.6版本dubbo

在2.6版本中的使用就是在调用完dubbo接口(dubbo接口配置了异步属性)之后,从RpcContext中获取Future,调用get()方法等待结果返回唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
//1
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<GreetingService>();
...
//2\. 设置为异步
referenceConfig.setAsync(true);

//3\. 直接返回 null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello("world"));

//4.等待结果
java.util.concurrent.Future<String> future = RpcContext.getContext().getFuture();
System.out.println(future.get());

这里看到2.6版本还是用Future来实现的,调用线程阻塞式的等待,且需要手动从RpcContext中拿到调用时的Future对象。也无法聚合多个Future的结果。

2.7版本dubbo

2.7版本的dubbo直接在接口上声明一个返回CompletableFuture的默认方法即可。

  1. 声明异步接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public interface DemoService {

    String sayHello(String name);

    // 声明异步接口
    default CompletableFuture<String> sayHelloAsync(String name) {
    return CompletableFuture.completedFuture(sayHello(name));
    }

    }
  1. 直接调用异步方法即可实现异步调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public interface DemoService {

    String sayHello(String name);

    // 声明异步接口
    default CompletableFuture<String> sayHelloAsync(String name) {
    return CompletableFuture.completedFuture(sayHello(name));
    }

    }

    看到在2.7版本之后,只需要接口声明异步接口,然后调用即可完成异步调用。当然也可以从RpcContext中取出CompletableFuture对象。且因为CompletableFuture特性,可以聚合多个结果。

异步调用原理

image-20220701063807441

异步调用的过程:

  • 步骤 1 当服务消费端发起 RPC 调用时候使用的用户线程,用户线程首先使用步骤 2 创建了一个 Future 对象,然后步骤 3 会把请求转换为 IO 线程来执行,步骤 3 为异步过程,所以会马上返回,然后用户线程使用步骤 4 把其创建的 Future 对象设置到 RpcContext 中,其后用户线程就返回了。
  • 然后步骤 5 用户线程可以在某个时间点从 RpcContext 中获取设置的 Futrue 对象,并且使用步骤 6 来等待调用结果。
  • 步骤 7 当服务提供方返回结果后,调用方线程模型中的线程池中线程则会把结果使用步骤 8 写入到 Future,这时候用户线程就可以得到远程调用结果了。

2.6 和 2.7 版本实现异步的细节不同

2.6版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
// 异步调用
ResponseFuture future = currentClient.request(inv, timeout);
// 内部返回的future对象 设置到上下文中
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
// 同步调用 通过request返回的Future对象的get()方法实现阻塞发起dubbo请求的用户线程
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}

可以从代码看到:

  • 异步调用通过在上下文中设置进去Future对象,然后调用线程不会被阻塞返回,当需要异步调用的结果时需要get()从Future中取出结果。
  • 而同步调用是通过返回的Future直接get()阻塞实现,注意这里不是永久阻塞,内部是一个包装了timeout时间的get()阻塞。

2.7版本

2.7版本将客户端发起调用时创建的DefaultFuture继承了CompletableFuture,来更好的实现异步调用。

1
2
3
4
5
6
/**
* DefaultFuture. dubbo2.7版本继承了CompletableFuture
*/
public class DefaultFuture extends CompletableFuture<Object> {
// 省略代码
}

且在HeaderExchangeChannel的request()方法中发送完请求之后会将这个DefaultFuture封装在返回值AsyncRpcResult中,即都会向上传递这个CompletableFuture。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
// 封装DefaultFuture 返回。
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// 调用底层Netty进行请求的发送
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

2.7版本的同步调用会用一个异步转同步的AsyncToSyncInvoker包装DubboInvoker,用来实现同步调用阻塞调用线程等待。如果调用模式是异步调用则不会被转为同步,也就是会把CompletableFuture返回给上层的Invoker,即所以异步接口定义的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 调用内部Invoker的invoke方法 获取结果
// 注意DubboInvoker会在发送请求时 创建一个DefaultFuture
Result asyncResult = invoker.invoke(invocation);

try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
// 同步模式 才去异步转同步 调用future.get() 阻塞结果 异步不需要
/**
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
*/
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
// 省略异常代码
return asyncResult;
}

2.7版本在DubboInvoker这里的实现就很简单了,不再区分是否为异步调用,同步调用会直接在AsyncToSyncInvoker中阻塞等待,两种调用方式也都会把CompletableFuture写入到RpcContext中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
// 是否为oneWay调用?
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 计算配置的timeOut
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
// 如果是OneWay调用
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
// 直接返回一个包装结果为null的AsyncRpcResult
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 非OneWay场景 同步调用或者异步调用
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
// 发送请求 返回CompletableFuture
// 内部会返回一个DefaultFuture(DubboInvoker --> HeaderExchangeChannel) 这里Future内包装为AppResponse
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
// futureContext写入 CompletableFuture
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
// 封装一个AsyncRpcResult 在AsyncToSyncInvoker中 会调用其get方法阻塞等待
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}


// 继承的父类AbstractInvoker的invoke方法
@Override
public Result invoke(Invocation inv) throws RpcException {
// 省略代码...

AsyncRpcResult asyncResult;
try {
// 调用子类的doInvoke方法 比如DubboInvoker 返回AsyncRpcResult
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
// 省略异常处理代码..
}
// RpcContext中设置调用返回的futureRpcContext中设置调用返回的future 用于异步请求的结果返回
// dubbo2.7版本之后是一个CompletableFuture 客户端可以从RpcContext中取出执行CompletableFuture一系列操作。
RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
return asyncResult;
}
-------------本文结束感谢您的阅读-------------

本文标题:dubbo异步调用原理

文章作者:夸克

发布时间:2021年06月01日 - 06:06

最后更新:2022年07月01日 - 07:07

原始链接:https://zhanglijun1217.github.io/2021/06/01/dubbo异步调用原理/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。