dubbo负载均衡
LoadBalance(负载均衡)的职责是将网络请求或者其他形式的负载“均摊”到不同的服务节点上,从而避免服务集群中部分节点压力过大、资源紧张,而另一部分节点比较空闲的情况。
dubbo提供了5种负载均衡实现,
- 一致性hash的负载均衡算法 ConsistentHashLoadBalance
- 基于权重随机算法 RandomLoadBalance
- 基于最小活跃调用数算法 LeastActiveLoadBalance
- 基于加权轮询算法的 RoundRobinLoadBalance
- 基于最短响应时间的算法 ShortestResponseLoadBalance
AbstractLoadBalance
1 | public abstract class AbstractLoadBalance implements LoadBalance { |
- select方法委托给具体的子类实现。
- 提供了getWeight获取provider节点权重的方法,内部对刚启动的provider节点做了预热处理,即权重在时间内逐步接近配置的权重值。
ConsistenHashLoadBalance 一致性hash负载均衡算法
一致性Hash算法简介
一致性 Hash 负载均衡可以让参数相同的请求每次都路由到相同的服务节点上,这种负载均衡策略可以在某些 Provider 节点下线的时候,让这些节点上的流量平摊到其他 Provider 上,不会引起流量的剧烈波动。
与直接对节点数取模不同,为了避免因一个 Provider 节点宕机,而导致大量请求的处理节点发生变化的情况,我们可以考虑使用一致性 Hash 算法。一致性 Hash 算法的原理也是取模算法,与 Hash 取模的不同之处在于:Hash 取模是对 Provider 节点数量取模,而一致性 Hash 算法是对 2^32 取模。其对Provider地址和请求参数对2^32取模。1
2hash(Provider地址) % 2^32
hash(请求参数) % 2^32
一致性hash可能存在流量倾斜的问题,解决方案就是为真实节点设置虚拟槽位,来通过虚拟槽位让节点在逻辑上分布的更加均匀。
dubbo中的一致性hash实现
dubbo为每个dubbo接口(serviceKey做区分)的方法,缓存了一个一致性Hash选择器 ConsistentHashSelector,主要负载均衡去选出具体的Provider节点的过程就是去调用Selector的select方法。1
2// key:serviceKey+methodName value:ConsistentHashSelector 这个缓存来为方法调用选择一致性hash选择器
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
可以看到一致性hash负载均衡算法的doSelect的实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// key是 service.methodName
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// using the hashcode of list to compute the hash only pay attention to the elements in the list
// 这是为了在invokers列表发生变化时都会重新生成ConsistentHashSelector对象
int invokersHashCode = invokers.hashCode();
// 根据key从缓存中取
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != invokersHashCode) {
// selector不存在 则去创建一个
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 通过selector 选择一个Invoker对象
return selector.select(invocation);
}
选择器ConsistentHashSelector的初始化构造函数和最终select负载均衡逻辑:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 private static final class ConsistentHashSelector<T> {
// 用于记录虚拟 Invoker 对象的 Hash 环。这里使用 TreeMap 实现 Hash 环,并将虚拟的 Invoker 对象分布在 Hash 环上。
// 虚拟槽是为了节点在分配流量时更加均匀
private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 虚拟Invoker个数
private final int replicaNumber;
// Invoker集合的hashcode值
private final int identityHashCode;
// 需要参与hash计算的参数索引
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 初始化virtualInvokers字段,也就是虚拟Hash槽
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
// 记录Invoker集合的hashCode,用该hashCode值来判断Provider列表是否发生了变化
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取配置的虚拟槽的个数 默认160个
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 获取参与Hash计算的参数下标值,默认对第一个参数进行Hash运算
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 构建虚拟Hash槽,默认replicaNumber=160,相当于在Hash槽上放160个槽位
for (Invoker<T> invoker : invokers) {
// 循环当前的provider对应的invokers节点
String address = invoker.getUrl().getAddress();
// 外层轮询40次,内层轮询4次,共40*4=160次,也就是同一节点虚拟出160个槽位
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
// 构建Hash槽
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
// 将参与一致性Hash的参数拼接到一起
String key = toKey(invocation.getArguments());
// 计算key的Hash值
byte[] digest = md5(key);
// 从hash环中去匹配Invoker对象
return selectForKey(hash(digest, 0));
}
private Invoker<T> selectForKey(long hash) {
// 从virtualInvokers集合(TreeMap是按照Key排序的)中查找第一个节点值大于或等于传入Hash值的Invoker对象
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
// 如果Hash值大于Hash环中的所有Invoker,则回到Hash环的开头,返回第一个Invoker对象
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}
RandomLoadBalance 加权随机负载均衡算法
加权随机算法
RandomLoadBalance是dubbo默认的负载均衡算法。加权随机算法简单高效,可以通过一个例子来说明加权随机算法的核心思想。
dubbo中的加权随机的实现
1 | /** |
其核心逻辑有三个关键点:
- 计算每个Invoker对应的权重及总权重值。
- 每个Invoker权重值不相等时,计算随机数应该落在哪个Invoker区间中,返回对应的Invoker对象。计算的逻辑其实就是生成[0, 总权重)区间内的随机数
,然后循环每个节点的长度,第一个减去长度小于0的区间对应的节点即为按权重随机选择出的节点。 - 如果各个Invoker权重相等,随机返回一个Invoker即可。方法也是生成一个随机数,从Invoker集合中按照随机数作为下标获取。
LeastActiveLoadBalance 最小活跃调用数负载均衡算法
最小活跃数负载均衡算法认为当前活跃请求数越小的Provider节点,剩余的处理能力越多,利用率越高。那么这个Provider会在单位时间内承载更多的请求,优先将这些请求分配给该Provider节点。
LeastActiveLoadBalance需要配合ActiveLimitFilter使用,ActiveLimitFilter会记录每个接口方法的活跃请求数,在进行负载均衡的时候,会根据活跃请求数最少的Invoker集合里挑选Invoker。
当然最小活跃调用数对应的Invoker可能有多个,在dubbo的实现中还像加权随机那样,去根据权重选出有一样最小活跃调用数的Invoker来调用。
dubbo实现
1 | protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { |
RoundRobinLoadBalance 加权轮询负载均衡算法
轮询指的是将请求轮流分配给每个 Provider。例如,有 A、B、C 三个 Provider 节点,按照普通轮询的方式,我们会将第一个请求分配给 Provider A,将第二个请求分配给 Provider B,第三个请求分配给 Provider C,第四个请求再次分配给 Provider A……如此循环往复。
轮询是一种无状态负载均衡算法,实现简单,适用于集群中所有 Provider 节点性能相近的场景。但现实情况中就很难保证这一点了,因为很容易出现集群中性能最好和最差的 Provider 节点处理同样流量的情况,这就可能导致性能差的 Provider 节点各方面资源非常紧张,甚至无法及时响应了,但是性能好的 Provider 节点的各方面资源使用还较为空闲。这时我们可以通过加权轮询的方式,降低分配到性能较差的 Provider 节点的流量。
加权之后,分配给每个 Provider 节点的流量比会接近或等于它们的权重比。例如,Provider 节点 A、B、C 权重比为 5:1:1,那么在 7 次请求中,节点 A 将收到 5 次请求,节点 B 会收到 1 次请求,节点 C 则会收到 1 次请求。
Dubbo加权轮询负载均衡算法的实现
算法总的描述:
每个Provider节点(Invoker)创建两个权重,一个是配置的weight,在整个过程中不会变化,另一个是currentWeight,在负载均衡的过程中动态调整,初始值为0。
当请求进来的时候,RoundRobinLoadBalance会遍历Invoker列表,并用响应的currentWeight加上其配置的权重。遍历完成之后,再找到最大的currentWeight,将其减去权重总和,然后返回相应的Invoker对象。 新的请求来了之后会重复上面的操作。
原理示意图:
一个示例:
源码实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56// 和一致性hash套路一样 缓存serviceKey_methodName 作为key value也是map key是Invoker对象 value是WeightedRoundRobin对象(记录了Invoker节点的配置权重和在轮询过程中的动态当前权重)
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 为当前服务方法在缓存中初始化 Invoker -> WeightedRoundRobin 映射 存在直接获取
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
// 最大的current值
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// 获取invoker的身份String 作为map的key
String identifyString = invoker.getUrl().toIdentityString();
// 获取权重 (配置值)
int weight = getWeight(invoker, invocation);
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
// 检测当前Invoker是否有相应的WeightedRoundRobin对象,没有则进行创建
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
if (weight != weightedRoundRobin.getWeight()) {
//weight changed 配置的权重发生变化也会重新设置
weightedRoundRobin.setWeight(weight);
}
// current+配置的权重
long cur = weightedRoundRobin.increaseCurrent();
// 设置lastUpdate字段
weightedRoundRobin.setLastUpdate(now);
// 找具有最大currentWeight的Invoker,以及Invoker对应的WeightedRoundRobin
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight; // 计算权重总和
}
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
// 用选中的最大权重的Invoker的currentWeight减去totalWeight
selectedWRR.sel(totalWeight);
// 返回选中的Invoker对象
return selectedInvoker;
}
// should not happen here
// 一个兜底 应该不会走到这
return invokers.get(0);
}