Fork me on GitHub

dubbo负载均衡算法

dubbo负载均衡

image-20220716000024390

LoadBalance(负载均衡)的职责是将网络请求或者其他形式的负载“均摊”到不同的服务节点上,从而避免服务集群中部分节点压力过大、资源紧张,而另一部分节点比较空闲的情况。

dubbo提供了5种负载均衡实现,

  • 一致性hash的负载均衡算法 ConsistentHashLoadBalance
  • 基于权重随机算法 RandomLoadBalance
  • 基于最小活跃调用数算法 LeastActiveLoadBalance
  • 基于加权轮询算法的 RoundRobinLoadBalance
  • 基于最短响应时间的算法 ShortestResponseLoadBalance

AbstractLoadBalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class AbstractLoadBalance implements LoadBalance {

static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重,随着服务运行时间uptime增大,权重ww的值会慢慢接近配置值weight
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
// 委托给子类实现
return doSelect(invokers, url, invocation);
}

protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);


/**
* AbstractLoadBalance提供的getWeight方法 用于计算Provider节点权重
* @param invoker the invoker
* @param invocation the invocation of this invoker
* @return weight
*/
int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight;
// provider节点的url
URL url = invoker.getUrl();
// Multiple registry scenario, load balance among multiple registries.
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
// 如果是RegistryService接口的话 直接获取参数上的权重
weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
} else {
// 从url中获取 权重
weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
// 对刚启动的provider节点 进行降权
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
}
return Math.max(weight, 0);
}
}
  • select方法委托给具体的子类实现。
  • 提供了getWeight获取provider节点权重的方法,内部对刚启动的provider节点做了预热处理,即权重在时间内逐步接近配置的权重值。

ConsistenHashLoadBalance 一致性hash负载均衡算法

一致性Hash算法简介

一致性 Hash 负载均衡可以让参数相同的请求每次都路由到相同的服务节点上,这种负载均衡策略可以在某些 Provider 节点下线的时候,让这些节点上的流量平摊到其他 Provider 上,不会引起流量的剧烈波动。

与直接对节点数取模不同,为了避免因一个 Provider 节点宕机,而导致大量请求的处理节点发生变化的情况,我们可以考虑使用一致性 Hash 算法。一致性 Hash 算法的原理也是取模算法,与 Hash 取模的不同之处在于:Hash 取模是对 Provider 节点数量取模,而一致性 Hash 算法是对 2^32 取模。其对Provider地址和请求参数对2^32取模。

1
2
hash(Provider地址) % 2^32
hash(请求参数) % 2^32

image-20220716000052705

一致性hash可能存在流量倾斜的问题,解决方案就是为真实节点设置虚拟槽位,来通过虚拟槽位让节点在逻辑上分布的更加均匀。

dubbo中的一致性hash实现

dubbo为每个dubbo接口(serviceKey做区分)的方法,缓存了一个一致性Hash选择器 ConsistentHashSelector,主要负载均衡去选出具体的Provider节点的过程就是去调用Selector的select方法。

1
2
// key:serviceKey+methodName value:ConsistentHashSelector 这个缓存来为方法调用选择一致性hash选择器
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

可以看到一致性hash负载均衡算法的doSelect的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// key是 service.methodName
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

// using the hashcode of list to compute the hash only pay attention to the elements in the list
// 这是为了在invokers列表发生变化时都会重新生成ConsistentHashSelector对象
int invokersHashCode = invokers.hashCode();
// 根据key从缓存中取
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != invokersHashCode) {
// selector不存在 则去创建一个
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 通过selector 选择一个Invoker对象
return selector.select(invocation);
}

选择器ConsistentHashSelector的初始化构造函数和最终select负载均衡逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
 private static final class ConsistentHashSelector<T> {

// 用于记录虚拟 Invoker 对象的 Hash 环。这里使用 TreeMap 实现 Hash 环,并将虚拟的 Invoker 对象分布在 Hash 环上。
// 虚拟槽是为了节点在分配流量时更加均匀
private final TreeMap<Long, Invoker<T>> virtualInvokers;

// 虚拟Invoker个数
private final int replicaNumber;

// Invoker集合的hashcode值
private final int identityHashCode;

// 需要参与hash计算的参数索引
private final int[] argumentIndex;

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 初始化virtualInvokers字段,也就是虚拟Hash槽
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
// 记录Invoker集合的hashCode,用该hashCode值来判断Provider列表是否发生了变化
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取配置的虚拟槽的个数 默认160个
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 获取参与Hash计算的参数下标值,默认对第一个参数进行Hash运算
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 构建虚拟Hash槽,默认replicaNumber=160,相当于在Hash槽上放160个槽位
for (Invoker<T> invoker : invokers) {
// 循环当前的provider对应的invokers节点
String address = invoker.getUrl().getAddress();
// 外层轮询40次,内层轮询4次,共40*4=160次,也就是同一节点虚拟出160个槽位
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
// 构建Hash槽
virtualInvokers.put(m, invoker);
}
}
}
}

public Invoker<T> select(Invocation invocation) {
// 将参与一致性Hash的参数拼接到一起
String key = toKey(invocation.getArguments());
// 计算key的Hash值
byte[] digest = md5(key);
// 从hash环中去匹配Invoker对象
return selectForKey(hash(digest, 0));
}


private Invoker<T> selectForKey(long hash) {
// 从virtualInvokers集合(TreeMap是按照Key排序的)中查找第一个节点值大于或等于传入Hash值的Invoker对象
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
// 如果Hash值大于Hash环中的所有Invoker,则回到Hash环的开头,返回第一个Invoker对象
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}

RandomLoadBalance 加权随机负载均衡算法

加权随机算法

RandomLoadBalance是dubbo默认的负载均衡算法。加权随机算法简单高效,可以通过一个例子来说明加权随机算法的核心思想。

image-20220716000109233

dubbo中的加权随机的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Select one invoker between a list using a random criteria
* @param invokers List of possible invokers
* @param url URL
* @param invocation Invocation
* @param <T>
* @return The selected invoker
*/
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;

// the weight of every invokers Invokers对应的权重数组
int[] weights = new int[length];

// the first invoker's weight 计算第一个节点的权重 getWright是在AbstractLoadBalance中提供的公共能力
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;

// The sum of weights 记录总权重
int totalWeight = firstWeight;

for (int i = 1; i < length; i++) {
// 获取每个节点的权重
int weight = getWeight(invokers.get(i), invocation);
// save for later use
weights[i] = weight;
// Sum
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
// 标记是否为全部一样的权重
sameWeight = false;
}
}
// 各个Invoker权重值不相等时,计算随机数落在哪个区间上
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// 随机获取一个[0, totalWeight)区间的内的数字
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
// 循环让offset数减去Invoker的权重值,当offset小于0时,返回相应的Invoker
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果总权重为0 或者 权重都相等 则随机返回一个
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

其核心逻辑有三个关键点:

  • 计算每个Invoker对应的权重及总权重值。
  • 每个Invoker权重值不相等时,计算随机数应该落在哪个Invoker区间中,返回对应的Invoker对象。计算的逻辑其实就是生成[0, 总权重)区间内的随机数
    ,然后循环每个节点的长度,第一个减去长度小于0的区间对应的节点即为按权重随机选择出的节点。
  • 如果各个Invoker权重相等,随机返回一个Invoker即可。方法也是生成一个随机数,从Invoker集合中按照随机数作为下标获取。

LeastActiveLoadBalance 最小活跃调用数负载均衡算法

最小活跃数负载均衡算法认为当前活跃请求数越小的Provider节点,剩余的处理能力越多,利用率越高。那么这个Provider会在单位时间内承载更多的请求,优先将这些请求分配给该Provider节点。

LeastActiveLoadBalance需要配合ActiveLimitFilter使用,ActiveLimitFilter会记录每个接口方法的活跃请求数,在进行负载均衡的时候,会根据活跃请求数最少的Invoker集合里挑选Invoker。

当然最小活跃调用数对应的Invoker可能有多个,在dubbo的实现中还像加权随机那样,去根据权重选出有一样最小活跃调用数的Invoker来调用。

dubbo实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();

// The least active value of all invokers
// 最小的活跃请求数
int leastActive = -1;

// The number of invokers having the same least active value (leastActive)
// 记录活跃请求数最小的Invoker集合的个数
int leastCount = 0;

// The index of invokers having the same least active value (leastActive)
// 记录活跃请求数最小的Invoker在invokers数组中的下标位置
int[] leastIndexes = new int[length];

// the weight of every invokers
// 记录活跃请求数最小的Invoker集合中,每个Invoker的权重值
int[] weights = new int[length];

// The sum of the warmup weights of all the least active invokers
int totalWeight = 0; // 总权重

// The weight of the first least active invoker
int firstWeight = 0;

// Every least active invoker has the same weight value?
boolean sameWeight = true; // 是否每个最小活跃数的Invoker有相同的权重


// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker 获取活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();

// Get the weight of the invoker's configuration. The default value is 100.
// 获取当前Invoker节点的权重
int afterWarmup = getWeight(invoker, invocation);

// save for later use
weights[i] = afterWarmup;

// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// 当前的Invoker是第一个活跃请求数最小的Invoker,则记录如下信息

// Reset the active number of the current invoker to the least active number
// 重新记录最小活跃数
leastActive = active;

// Reset the number of least active invokers
// 重新记录活跃请求数最小的Invoker集合个数
leastCount = 1;

// Put the first least active invoker first in leastIndexes
// 重新记录最小活跃数Invoker索引下标
leastIndexes[0] = i;

// Reset totalWeight 重新记录总权重
totalWeight = afterWarmup;
// Record the weight the first least active invoker 记录第一个最小活跃数的权重
firstWeight = afterWarmup;

// Each invoke has the same weight (only one invoker here)
sameWeight = true;

// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// 又找到一个最小活跃请求数的Invoker

// Record the index of the least active invoker in leastIndexes order 记录该Invoker的下标
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker 更新总权重
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && afterWarmup != firstWeight) {
// 更新权重值是否相等
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// 只有一个最小活跃数 直接返回
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// 下面按照RandomLoadBalance的逻辑,从活跃请求数最小的Invoker集合中,随机选择一个Invoker对象返回
// 即最小活跃数Invoker集合中如果权重不一致 那么按照加权随机算法去选出一个Invoker 具体@see RandomLoadBalance
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 和加权随机一样 如果都有一样的权重 则从最小随机数的Invoker列表中随机选一个
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}

RoundRobinLoadBalance 加权轮询负载均衡算法

轮询指的是将请求轮流分配给每个 Provider。例如,有 A、B、C 三个 Provider 节点,按照普通轮询的方式,我们会将第一个请求分配给 Provider A,将第二个请求分配给 Provider B,第三个请求分配给 Provider C,第四个请求再次分配给 Provider A……如此循环往复。

轮询是一种无状态负载均衡算法,实现简单,适用于集群中所有 Provider 节点性能相近的场景。但现实情况中就很难保证这一点了,因为很容易出现集群中性能最好和最差的 Provider 节点处理同样流量的情况,这就可能导致性能差的 Provider 节点各方面资源非常紧张,甚至无法及时响应了,但是性能好的 Provider 节点的各方面资源使用还较为空闲。这时我们可以通过加权轮询的方式,降低分配到性能较差的 Provider 节点的流量。

加权之后,分配给每个 Provider 节点的流量比会接近或等于它们的权重比。例如,Provider 节点 A、B、C 权重比为 5:1:1,那么在 7 次请求中,节点 A 将收到 5 次请求,节点 B 会收到 1 次请求,节点 C 则会收到 1 次请求。

Dubbo加权轮询负载均衡算法的实现

算法总的描述:
每个Provider节点(Invoker)创建两个权重,一个是配置的weight,在整个过程中不会变化,另一个是currentWeight,在负载均衡的过程中动态调整,初始值为0。

当请求进来的时候,RoundRobinLoadBalance会遍历Invoker列表,并用响应的currentWeight加上其配置的权重。遍历完成之后,再找到最大的currentWeight,将其减去权重总和,然后返回相应的Invoker对象。 新的请求来了之后会重复上面的操作。

原理示意图:
image-20220716000215242

一个示例:
image-20220716000241259

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 和一致性hash套路一样 缓存serviceKey_methodName 作为key value也是map key是Invoker对象 value是WeightedRoundRobin对象(记录了Invoker节点的配置权重和在轮询过程中的动态当前权重)
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();


protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 为当前服务方法在缓存中初始化 Invoker -> WeightedRoundRobin 映射 存在直接获取
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
// 最大的current值
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// 获取invoker的身份String 作为map的key
String identifyString = invoker.getUrl().toIdentityString();
// 获取权重 (配置值)
int weight = getWeight(invoker, invocation);
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
// 检测当前Invoker是否有相应的WeightedRoundRobin对象,没有则进行创建
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});

if (weight != weightedRoundRobin.getWeight()) {
//weight changed 配置的权重发生变化也会重新设置
weightedRoundRobin.setWeight(weight);
}
// current+配置的权重
long cur = weightedRoundRobin.increaseCurrent();
// 设置lastUpdate字段
weightedRoundRobin.setLastUpdate(now);

// 找具有最大currentWeight的Invoker,以及Invoker对应的WeightedRoundRobin
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight; // 计算权重总和
}
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
// 用选中的最大权重的Invoker的currentWeight减去totalWeight
selectedWRR.sel(totalWeight);
// 返回选中的Invoker对象
return selectedInvoker;
}
// should not happen here
// 一个兜底 应该不会走到这
return invokers.get(0);
}

-------------本文结束感谢您的阅读-------------

本文标题:dubbo负载均衡算法

文章作者:夸克

发布时间:2021年05月15日 - 23:05

最后更新:2022年07月16日 - 00:07

原始链接:https://zhanglijun1217.github.io/2021/05/15/dubbo负载均衡算法/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。