Fork me on GitHub

dubbo集群容错Cluster

Cluster接口

Cluster接口提供了集群容错的功能。在 Dubbo 中,通过 Cluster 这个接口把一组可供调用的 Provider 信息组合成为一个统一的 Invoker 供调用方进行调用。经过 Router 过滤、LoadBalance 选址之后,选中一个具体 Provider 进行调用,如果调用失败,则会按照集群的容错策略进行容错处理。

Cluster接口的工作流程

Cluster接口工作流程分为两步:

  • 在Consumer进行服务引用的时候,会创建对应Cluster实现类的集群容错策略对应的ClusterInvoker。也就是说在Cluster接口实现中,都会创建对应的Invoker对象,这些都继承自AbstractClusterInvoker抽象类。
    image-20220716000438172
  • 调用时使用ClusterInvoker实例,内部会实现集群容错的逻辑,且会依赖Directory、Router、LoadBalance等组件得到最终要调用的Invoker对象。

也就是说,因为Cluster在服务引用的过程中将多个Invoker伪装为带有集群容错的ClusterInvoker实现,所以在调用的时候可以在对应集群容错逻辑下,再对Invokers进行服务目录、服务路由过滤、负载均衡选址选出真正调用的Invoker发起远程调用逻辑。

image-20220716000449753

常见的几种集群容错的方式

image-20220716000501970

Dubbo中的AbstractClusterInvoker

了解了 Cluster Invoker 的继承关系之后,我们首先来看 AbstractClusterInvoker,它有两点核心功能:一个是实现的 Invoker 接口,对 Invoker.invoke() 方法进行通用的抽象实现;另一个是实现通用的负载均衡算法。

在 AbstractClusterInvoker.invoke() 方法中,会通过 Directory 获取 Invoker 列表,然后通过 SPI 初始化 LoadBalance,最后调用 doInvoke() 方法执行子类的逻辑。在 Directory.list() 方法返回 Invoker 集合之前,已经使用 Router 进行了一次筛选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Result invoke(final Invocation invocation) throws RpcException {
// 检查当前Invoker是否已销毁
checkWhetherDestroyed();

// binding attachments into invocation.
// 将RpcContext中的attachment添加到Invocation中
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}

// 通过Directory获取Invoker对象列表 RegistryDirectory内部会调用Router完成服务路由的功能
List<Invoker<T>> invokers = list(invocation); // 先去服务目录选择
// 再通过SPI加载对应的负载均衡实现
LoadBalance loadbalance = initLoadBalance(invokers, invocation);// 再负载均衡
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

// 委托给具体的集群容错子类 实现调用
return doInvoke(invocation, invokers, loadbalance);
}

在子类实现的doInvoke方法中,调用了在抽象类中提供的select()方法,来完成负载均衡。这里没有去直接委托给LoadBalance去做负载均衡,而是在 select() 方法中会根据配置决定是否开启粘滞连接特性,如果开启了,则需要将上次使用的 Invoker 缓存起来,只要 Provider 节点可用就直接调用,不会再进行负载均衡。这里知道即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

if (CollectionUtils.isEmpty(invokers)) {
return null;
}
// 方法名称
String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();

// 获取sticky配置,sticky表示粘滞连接,所谓粘滞连接是指Consumer会尽可能地调用同一个Provider节点,除非这个Provider无法提供服务
boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore concurrency problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}

// doSelect选择新的Invoker
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}

Dubbo中的AbstractCluster

常用的 ClusterInvoker 实现都继承了 AbstractClusterInvoker 类型,对应的 Cluster 扩展实现都继承了 AbstractCluster 抽象类。AbstractCluster 抽象类的核心逻辑是在 ClusterInvoker 外层包装一层 ClusterInterceptor,从而实现类似切面的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 2. 外层对ClusterInvoker进行切面包装 返回InterceptorInvokerNode (继承自AbstractClusterInvoker接口)
return buildClusterInterceptors(
// 1. 先doJoin获取最终要调用的Invoker对象 比如直接返回一个FailOverClusterInvoker
doJoin(directory),
directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY)
);
}

// buildClusterInterceptors方法
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
AbstractClusterInvoker<T> last = clusterInvoker;
// ClusterInterceptor是SPI加载自动激活的扩展实现
List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);

if (!interceptors.isEmpty()) {
for (int i = interceptors.size() - 1; i >= 0; i--) {
// 将InterceptorInvokerNode收尾连接到一起,形成调用链
final ClusterInterceptor interceptor = interceptors.get(i);
final AbstractClusterInvoker<T> next = last;
last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
}
}
return last;
}

Dubbo中具体的集群容错实现

FailOverCluster

默认是FailOverCluster,可以看到其实现的doJoin方法就是创建一个对应的FailOverClusterInvoker对象并返回。

1
2
3
4
5
@Override
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
// 直接委托给FailoverClusterInvoker
return new FailoverClusterInvoker<>(directory);
}

而在FailOverClutserInvoker中实现的doList方法中,有对失败重试容错的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 获取重试次数
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
// 默认重试次数是1
len = 1;
}
// retry loop. 循环来实现重试

// 最近的一次异常
RpcException le = null; // last exception.
// 记录已经尝试调用过的Invoker
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.

// 记录执行过的provider地址
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
// 第二次进来就是重试的逻辑 需要再次走一遍 服务目录list(内部会走route服务路由)拉取最新的Invoker列表并检查
checkWhetherDestroyed();
// list重新从服务目录中获取集合
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// AbstractClusterInvoker.select() 有自己的粘连逻辑(优先调用一个provider) 也会根据负载均衡算法来选出provider
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 进行调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception. dubbo内部的biz异常
throw e;
}
// 记录异常
le = e;
} catch (Throwable e) {
// 封装为rpc异常 然后记录下次重试
le = new RpcException(e.getMessage(), e);
} finally {
// 记录已经尝试过的provider地址
providers.add(invoker.getUrl().getAddress());
}
}
// 超过重试次数 抛出异常
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
-------------本文结束感谢您的阅读-------------

本文标题:dubbo集群容错Cluster

文章作者:夸克

发布时间:2020年07月16日 - 00:07

最后更新:2022年07月16日 - 00:07

原始链接:https://zhanglijun1217.github.io/2020/07/16/dubbo集群容错Cluster/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。