dubbo支持异步调用,在2.7版本之后引入CompletableFuture来优化了异步调用。当然在2.6版本也支持异步调用。基本原理是一样的,只不过2.7版本支持了设置CompletableFuture来回调,细节上有些差别。
异步调用基本使用
2.6版本dubbo
在2.6版本中的使用就是在调用完dubbo接口(dubbo接口配置了异步属性)之后,从RpcContext中获取Future,调用get()方法等待结果返回唤醒。
1 | //1 |
这里看到2.6版本还是用Future来实现的,调用线程阻塞式的等待,且需要手动从RpcContext中拿到调用时的Future对象。也无法聚合多个Future的结果。
2.7版本dubbo
2.7版本的dubbo直接在接口上声明一个返回CompletableFuture的默认方法即可。
声明异步接口
1
2
3
4
5
6
7
8
9
10public interface DemoService {
String sayHello(String name);
// 声明异步接口
default CompletableFuture<String> sayHelloAsync(String name) {
return CompletableFuture.completedFuture(sayHello(name));
}
}
直接调用异步方法即可实现异步调用
1
2
3
4
5
6
7
8
9
10public interface DemoService {
String sayHello(String name);
// 声明异步接口
default CompletableFuture<String> sayHelloAsync(String name) {
return CompletableFuture.completedFuture(sayHello(name));
}
}看到在2.7版本之后,只需要接口声明异步接口,然后调用即可完成异步调用。当然也可以从RpcContext中取出CompletableFuture对象。且因为CompletableFuture特性,可以聚合多个结果。
异步调用原理
异步调用的过程:
- 步骤 1 当服务消费端发起 RPC 调用时候使用的用户线程,用户线程首先使用步骤 2 创建了一个 Future 对象,然后步骤 3 会把请求转换为 IO 线程来执行,步骤 3 为异步过程,所以会马上返回,然后用户线程使用步骤 4 把其创建的 Future 对象设置到 RpcContext 中,其后用户线程就返回了。
- 然后步骤 5 用户线程可以在某个时间点从 RpcContext 中获取设置的 Futrue 对象,并且使用步骤 6 来等待调用结果。
- 步骤 7 当服务提供方返回结果后,调用方线程模型中的线程池中线程则会把结果使用步骤 8 写入到 Future,这时候用户线程就可以得到远程调用结果了。
2.6 和 2.7 版本实现异步的细节不同
2.6版本
1 | protected Result doInvoke(final Invocation invocation) throws Throwable { |
可以从代码看到:
- 异步调用通过在上下文中设置进去Future对象,然后调用线程不会被阻塞返回,当需要异步调用的结果时需要get()从Future中取出结果。
- 而同步调用是通过返回的Future直接get()阻塞实现,注意这里不是永久阻塞,内部是一个包装了timeout时间的get()阻塞。
2.7版本
2.7版本将客户端发起调用时创建的DefaultFuture继承了CompletableFuture,来更好的实现异步调用。
1 | /** |
且在HeaderExchangeChannel的request()方法中发送完请求之后会将这个DefaultFuture封装在返回值AsyncRpcResult中,即都会向上传递这个CompletableFuture。
1 |
|
2.7版本的同步调用会用一个异步转同步的AsyncToSyncInvoker包装DubboInvoker,用来实现同步调用阻塞调用线程等待。如果调用模式是异步调用则不会被转为同步,也就是会把CompletableFuture返回给上层的Invoker,即所以异步接口定义的返回值。
1 | // 调用内部Invoker的invoke方法 获取结果 |
2.7版本在DubboInvoker这里的实现就很简单了,不再区分是否为异步调用,同步调用会直接在AsyncToSyncInvoker中阻塞等待,两种调用方式也都会把CompletableFuture写入到RpcContext中。
1 |
|