Fork me on GitHub
夸克的博客


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 搜索

Redisson实现分布式锁

发表于 2021-07-14 | 分类于 分布式锁 | 热度: ℃
字数统计: 1,221 | 阅读时长 ≈ 6

Redisson实现分布式锁的基本原理

锁实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static int count = 0;
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
// 单机地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379");

// 获取redisson的client
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("distribute-lock-key");
for (int i=0; i < 100; i++) {
new Thread(() -> {
try {
// lock.tryLock(2000, 1000, TimeUnit.MILLISECONDS);
// 写入:redis是hash结构:
// key:distribute-lock-key
// field:随机数:线程id
// value:重入次数
lock.lock(1000, TimeUnit.MILLISECONDS);
count++;
System.out.println("线程" + Thread.currentThread().getName() + "获取到了锁 当前count:" + count);
// 模拟业务线程的执行
Thread.sleep(200);
try {
// 尝试重入 value是重入次数
lock.lock(1000, TimeUnit.MILLISECONDS);
Thread.sleep(300);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}

new CountDownLatch(1).await();
}

lock操作加锁

关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

// 运用lua脚本完成加锁逻辑
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
// 传入lua脚本的参数
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

逻辑是运用了redis的lua脚本去完成加锁逻辑。可以分开看:

第一段

1
2
3
4
5
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;
  • 这里KEYS[1]就是锁的key,即redissonClient.getLock(“distribute-lock-key”)中的distribute-lock-key。
  • AVG[2]是field的值,Redisson使用的是redis的hash结构。这里是 uuid:threadId。因为存在多台机器,所以会加上一个uuid。
  • AVG[1]是设置key的过期时间。

这段逻辑流程是:

  • 判断distribute-lock-key 在redis中是否存在,结果为0是不存在、
    • 不存在调用hset key uuid:threadId 1 设置key、uuid:threadId是field、value是1。
    • 设置过期时间
    • 返回null代表成功。

第二段

1
2
3
4
5
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end;
  • 如果存在key、field hash值
    • 此时就是重入场景,把对应的value加一即可。 命令是hincrby
    • 再次设置下过期时间,这里相当于完成了一次锁续期。pexpire
    • 返回null表示成功。

第三段

1
redis.call('pttl', KEYS[1]);
  • 这里的场景是redis中存在key这把锁,但持有锁的不是当前线程,即锁被别的线程持有。这时候调用 pttl命令返回key的过期时间。

这里知道了加锁场景下,如果返回了null说明加锁成功,返回过期时间说明锁被占用。 拿到结果之后,可以根据ttl进行重试加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    @Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 内部调用了lua加锁逻辑
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// 为null 加锁成功返回
return;
}

// 如果ttl不是null 订阅对应key等待锁删除时的通知
// 释放锁的时候会订阅key的客户端可以收到通知
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);

try {
while (true) {
// 再次尝试获取
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// 获取到跳出循环
break;
}

// waiting for message
if (ttl >= 0) {
// ttl大于0 去利用semaphore的tryAcquire去阻塞获取 内部是Semaphore(0)
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 小于0去调用 Semaphore的acquire()获取
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}

Redisson加锁流程总结:
image-20220701072404582

unlock解锁流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
rotected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

这段lua脚本的逻辑:

  • 如果hexists命令调用之后发现key、field的hash结构不存在,(lock(leaseTime)没有watchDog机制续约锁),那就是锁超时过期释放了,无需释放直接返回。
  • 如果当前线程持有锁(也就是key、field的hash结构存在)
    • 给锁的的value-1。hincrby key uuid:tid -1命令。 返回null
    • 判断减1之后的值,如果还大于0,说明是锁重入场景的解锁。这里去锁续期一下(默认30s)。返回0
    • 减1之后为0,去删除key,然后发布删除key通知。(redis的发布订阅模式 pub/sub)
    • 返回1代表解锁成功。

image-20220701072421914

redis实现分布式锁

发表于 2021-07-12 | 分类于 分布式锁 | 热度: ℃
字数统计: 1,292 | 阅读时长 ≈ 5

关键点

有四个关键点:

  • 原子性
  • 过期时间
  • 锁续期
  • 正确释放锁

原子性

按照分布式锁的实现比如用两个命令:

1
2
redis.set(key,value);
redis.expire(key, time, unit);

但是这个不是原子性的。比如设置了key和value,但是没有设置失效时间,则会出现死锁的问题。

过期时间

这个没啥可说的,分布式锁防止死锁的做法。

锁续期

当业务执行时间超过超时时间,则可能出现其他客户端获取锁从而并发执行,失去了分布式锁的意义。这时候就是锁续期。

开辟另外一个线程,专门用于锁续期,加锁的时候就起个线程进行死循环续期,核心流程就是判断锁的时间过了三分之一则就重新续期为上锁时间。 比如,加锁时间是3s,执行1s没有释放锁之后,会为这个key的锁重新设置超时时间为3s。

正确释放锁

释放锁可能有问题:

  1. 可能释放别人的锁。其实还是上边提到的问题,比如锁设置了3s的超时时间,执行了4s,但是这时别的客户端或者线程获取了锁,所以可能删除了其他客户端的锁。
  2. 而且别的客户端可能上来就是搞破坏,可能有代码实现的问题,未加锁就解锁。

这两个问题总结起来就是要正确的释放锁。

  • 利用锁续期机制,防止业务没执行完成就释放锁的情况
  • 释放的时候要判断是否为自己加的锁,避免释放别人的锁。

关于锁续期的伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 标识当前锁是否在运行中
private volatile boolean isRunning;

// 抢锁成功
if (RESULT_OK.equals(client.setNxPx(key, value, ttl))) {
// 启动续期线程
renewalTask = new RenewTask(new IRenewalHandler() {
@Override
public void callBack() throws LockException {
// 刷新值
client.expire(key, ttl <= 0 ? 10 : ttl);
}
}, ttl);
renewalTask.start();
}

// 省略释放锁的代码 里面也要维护isRunning

// 续期线程的逻辑
@Override
public void run() {
while (isRunning) {
try {
// 1、续租,刷新值
call.callBack();
LOGGER.info("续租成功!");
// 2、三分之一过期时间续租
TimeUnit.SECONDS.sleep(this.ttl * 1000 / 3);
} catch (InterruptedException e) {
close();
} catch (LockException e) {
close();
}
}
}

public void close() {
isRunning = false;
}

另一个是要在删除时判断出是否为此次加锁,因为是分布式的,线程id可能重复,所以不能单单用线程id作为value,这里建议是用当前业务上下文中的userId或者其他随机数。但这里也要注意一下线程安全问题,因为在解锁(删除锁)时如果是先判断再删除,则有原子性问题:

1
2
3
4
// 代码有原子性问题
if (userId).equals(redis.get(lockKey)) {
redis.delete(key);
}

此原子性问题可能是:比如在get之后,del之前锁超时自动释放,那么可能在执行del的时候还是删除了刚刚加锁的其他客户端。当然如果保证了锁续期,那么这里不会出现此原子性问题,但是还是建议这里判断删除是用lua脚本提供原子性:

1
2
3
4
5
6
// 如果get的值等于传进来的值,就给它del
if redis.call("get",KEYS[1])==ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

Redis的部署方式

  • 单机
    单机模式下面就是一台redis为分布式锁的存储,优点是方便易部署,缺点是存在单机故障。
  • 哨兵(Sentinel)
    有单点故障,会搞几个slave从节点做备份,redis很好支持了sentinel模式。
    但是这里涉及master和slave的数据一致性问题。锁写到master,没有同步到slave,slave选举为master之后slave中没有锁,相当于此时其他客户端也能加锁。
  • 集群
    集群只是做了slot分片,降低了一定概率,锁还是只写到一个master上,和哨兵一样存在数据一致性的问题。
  • 红锁(RedLock)
    红锁的思路是:搞几个独立的master节点,比如5个,然后挨个加锁,只要超过一半以上(这里是3个)就代表加锁成功,这样一台master成功,还有其他的master节点,不耽误。虽然解决了上面集群模式的问题,但是性能影响很大,且集群维护也需要部署成本。

Redis实现分布式锁的优缺点

优点

  • 只引入了redis,大多数公司都在用redis,社区活跃,问题容错性高。
  • redis相对来说还是很简单的中间件,性能比较高,符合分布式锁的高性能。
  • 多个客户端支持,redisson客户端用法封装(公平锁、可重入锁、读写锁)。

    缺点

  • 存在集群模型下的各种问题,不能百分百安全,
  • 即使是红锁,也是需要牺牲性能(每次写过半节点,多次redis IO通讯),需要进行权衡。

mysql实现分布式锁

发表于 2021-07-11 | 分类于 分布式锁 | 热度: ℃
字数统计: 718 | 阅读时长 ≈ 3

实现原理

  • 利用唯一索引来实现,只要抢占锁就插入了一记录,互斥性体现在其他客户端插入重复数据会报唯一key的异常。
  • 具体的步骤:
    • 客户端1先来加锁,insert一条记录
    • 客户端2来加锁,插入一条数据返回重复key的异常,这时候就进行重试加锁。
    • 持有锁的线程执行完逻辑释放锁,即删除数据库中的记录即可。其他客户端可以进行抢锁了。

但是也有一些问题:

  • 分布式锁的表怎么设计,一个业务一张表?
  • 抢占锁要怎么去做重试
  • 释放锁如果安全的释放锁。

表设计和唯一key

  • 为了每个业务不去都加这个锁的表,则设计一个能兼容多个业务的锁表,抽象锁的业务id为resource_id;且标识是哪个业务,有project_name、method_name,同时在数据库层可以将唯一索引抽象为lock_id,生成规则可能是project_name下划线method_name下划线resource_id;另外为了实现可重入,要记录一个字段entry_count作为重入次数,同时保留加锁的host_ip和thread_id。

比如表DDL:

1
2
3
4
5
6
7
8
9
10
DROP TABLE IF EXISTS `common_lock`;
CREATE TABLE `common_lock` (
`id` int NOT NULL,
`lock_key` varchar(100) NOT NULL,
`thread_id` int NOT NULL,
`entry_count` int NOT NULL,
`host_ip` varchar(30) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `lock_key` (`lock_key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

释放锁

上面提到释放锁就是delete这条记录,但是因为有了重入次数,所以释放锁可能是重入次数-1的过程。
image-20220701072146437

防止死锁

如果加锁之后服务端发生宕机未释放锁,则可能造成死锁。当然这里可以记录下超时时间,然后定时任务去扫描超时未释放的数据,然后将其删除。
但这样还是出现一个问题,就是定时任务不能判断出是否业务逻辑超时,如果业务超时时间内未完成,定时任务删除了锁记录,也可能出现问题,mysql这里不太好解决这个问题。(redis解决可以有watchDog机制和续期的线程)。

MySQL实现分布式锁的优缺点:

优点

  • 实现方式简单,加锁是insert记录,重入是重入次数+1,删除锁是update重入次数或者删除记录。
  • 不依赖其他中间件,数据库的高可用保障性强。

缺点

  • 性能很低,支持并发不高。最简单的不考虑重入场景,也需要和数据库进行两次io操作。
  • 线程不安全,为了防止死锁引入定时任务扫描,没办法解决续约的场景。
  • 还是依赖了MySQL,没有百分之百高可用的中间件。
  • 没办法支持读写锁或者公平锁。

分布式锁基础

发表于 2021-07-11 | 分类于 分布式锁 | 热度: ℃
字数统计: 463 | 阅读时长 ≈ 2

为什么需要分布式锁

  • 锁能保证多线程环境下对同一份资源竞争的数据安全性
  • 分布式锁是 用于保证集群内多台机器的多线程并发线程安全的一种手段。
阅读全文 »

zk集群及ZAB

发表于 2021-07-08 | 分类于 zookeeper | 热度: ℃
字数统计: 1,188 | 阅读时长 ≈ 4

集群架构

image-20220701071828582

  • client节点:从业务角度来看,这是分布式应用中的一个节点,通过长连接和zkServer端建立连接,定时发送心跳。从集群角度来看,是集群的客户端,可以连接集群中的一个节点,进行node添加、删除、更新数据、注册Watcher机制等。
  • leader节点:zk的主节点,负责zk集群的写操作,保证集群内部事务处理的顺序性。同时也负责Follower和Observer节点的数据同步。
  • follower节点:ZK集群的从节点,可以接受Client的读请求返回数据,并不处理写请求,而是转发到Leader节点去写数据。follower节点也要参与集群内leader节点的选举。
  • observer节点:特殊节点,只处理读请求,但不参与集群选举。作用是提高zk的吞吐量,因为一直增加follower节点会带来副作用,leader写数据要半数follower节点都响应ack之后才能commit,follower节点太多会影响写吞吐量。因为Observer节点不参加选举,且能响应读请求,所以可以增加读吞吐量。

ZK的消息广播流程

对于写请求,如果Client连接的是Follower节点或者Observer节点,那么会转发给Leader节点进行数据写入和消息广播的流程。Leader节点处理写请求的核心流程:

  1. Leader节点收到写请求之后,为写请求赋予一个全局唯一的zxid(64位自增id),通过zxid的大小可以实现写操作的顺序一致性。
  2. Leader会通过先进先出队列(每个Follower节点创建一个队列,保证发送的顺序),将带有zxid消息的作为一个proposal(提案)分发给所有follower节点。
  3. 当Follower节点收到proposal提案消息之后,将proposal消息写到本地事务日志,写成功后返回给Leader节点 ACK消息。
  4. 当Leader节点收到过半的Follower节点的ACK消息之后,向所有的follower节点去发送Commit消息,并执行本地提交。
  5. Follower节点收到Commit命令之后,去执行本地提交,集群写操作完成。
    image-20220701071839001

Leader节点的重新选举(崩溃恢复)

写数据的流程中,如果Leader宕机,整个ZK集群可能出现两个状态:

  1. Leader节点收到半数Follower节点的ack消息,向各个Follower节点广播Commit命令,在各个Follower节点收到Commit命令之前宕机,剩下的服务器没办法执行本地写commit操作,也就是这次写操作数据还没有commit。
  2. Leader节点在生成proposal之后宕机了,其他Follower没有收到proposal或者未超过半数收到proposal消息,这次写入数据是失败的。

重新选举Leader,ZK集群有两个原则:

  • 对于原Leader提交的proposal(到了执行commit阶段),新的Leader能广播并提交,这样可以选择拥有最大zxid的节点作为新的Leader。
  • 对于原Leader未广播或者少数节点被广播的proposal消息,新的Leader能通知原Leader和已经同步的Follower删除此proposal消息,保证集群数据一致性。

zk的集群选举采用了ZAB协议,一个示例来解释选举新Leader的过程:
image-20220701071927624

启动时的leader选举

每个节点刚启动时处于Looking状态,然后进行选举流程。

  • 每个server发出一个投票,初始状态都对自己投票,投票信息包括epoch、zxid、myid(zk进程号),比如server1代表的票是(myid,zxid)是(1,0)、server2是(2,0),然后发送给集群中参与选举的其他机器。
  • 接收各个服务器的投票,集群中每个服务器收到投票之后,去比较和自己的投票信息:
    • 优先比较epoch,是同一纪元的选票才有效。
    • 再去比较zxid,在启动时都为0。集群选举的时候优先比较
    • 如果zxid相同,最后比较myid。
  • 比较完成之后,按照比较规则修改自己的投票,并且广播投票信息给其他服务器。
  • 超过半数节点统计出相同的票,那么对应节点就是新的leader节点
  • 各个服务器响应leader选举成功消息,改变自己服务器状态为follower节点。

image-20220701071914052

ZAB协议

zab协议是在2pc的基础上zk保证数据一致性的协议。(Zookeeper Atomic Broadcast),包含两个最基本的模式:

  • 消息广播(保证写入数据的一致性)
  • 崩溃恢复(Leader挂了之后的集群恢复)

Zk就在这两个模式之间进行切换。当Leader可用时,就进行消息广播模式;当Leader不可用时进入崩溃恢复模式。

ZK客户端curator的基本使用

发表于 2021-07-03 | 分类于 zookeeper | 热度: ℃
字数统计: 1,373 | 阅读时长 ≈ 7

Curator的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/**
* curator zk客户端api操作demo
*/
public class CuratorApiDemo {

public static void main(String[] args) throws Exception{
// zk地址 如果是集群 多个节点地址逗号隔开
String address = "127.0.0.1:2181";
// 重试策略 连接不上服务端的时候 会重试 重试间隔递增
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
// 启动zk客户端
client.start();

// createZNode(client);
// testBackGroundCallback(client);
// simpleWatcher(client);
watcherWithCache(client);

new CountDownLatch(1).await();
}


/**
* curator 提供了三种cache包装了watcher机制 内部封装了收到watcher通知之后再次进行watcher事件注册的逻辑 对编程很友好
* @param client
* @throws Exception
*/
static void watcherWithCache(CuratorFramework client) throws Exception{

// 创建NodeCache,监听的是"/user"这个节点
NodeCache nodeCache = new NodeCache(client, "/user");
// start()方法有个boolean类型的参数,默认是false。如果设置为true,
// 那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的
// 数据内容,并保存在Cache中。
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("NodeCache节点初始化数据为:"
+ new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("NodeCache节点数据为空");
}

// 添加监听器
nodeCache.getListenable().addListener(() -> {
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("NodeCache节点路径:" + nodeCache.getCurrentData().getPath()
+ ",节点数据为:" + data);
});

// 创建PathChildrenCache实例,监听的是"user"这个节点
PathChildrenCache childrenCache = new PathChildrenCache(client, "/user", true);
// StartMode指定的初始化的模式
// NORMAL:普通异步初始化
// BUILD_INITIAL_CACHE:同步初始化
// POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// childrenCache.start(PathChildrenCache.StartMode.NORMAL);
List<ChildData> children = childrenCache.getCurrentData();
System.out.println("获取子节点列表:");
// 如果是BUILD_INITIAL_CACHE可以获取这个数据,如果不是就不行
children.forEach(childData -> {
System.out.println(new String(childData.getData()));
});
childrenCache.getListenable().addListener(((client1, event) -> {
System.out.println(LocalDateTime.now() + " " + event.getType());
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
System.out.println("PathChildrenCache:子节点初始化成功...");
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
String path = event.getData().getPath();
System.out.println("PathChildrenCache添加子节点:" + event.getData().getPath());
if (event.getData()!= null && event.getData() != null) {
System.out.println("PathChildrenCache子节点数据:" + new String(event.getData().getData()));
}
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
System.out.println("PathChildrenCache删除子节点:" + event.getData().getPath());
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
System.out.println("PathChildrenCache修改子节点路径:" + event.getData().getPath());
System.out.println("PathChildrenCache修改子节点数据:" + new String(event.getData().getData()));
}
}));

// 创建TreeCache实例监听"user"节点
TreeCache cache = TreeCache.newBuilder(client, "/user").setCacheData(false).build();
cache.getListenable().addListener((c, event) -> {
if (event.getData() != null) {
System.out.println("TreeCache,type=" + event.getType() + " path=" + event.getData().getPath());
} else {
System.out.println("TreeCache,type=" + event.getType());
}
});
cache.start();
}

/**
* 简单的watcher监听机制
* @param client
* @throws Exception
*/
static void simpleWatcher(CuratorFramework client) throws Exception{
client.create().withMode(CreateMode.PERSISTENT).forPath("/test-children-watch");

client.getChildren().usingWatcher(new CuratorWatcher() {
// 为getChildren目录节点的子节点去创建一个watcher 注意 watcher只会被触发一次
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
// type NodeChildrenChanged
System.out.println("收到监听回调事件, type: " + watchedEvent.getType());
System.out.println("回调事件的path: " + watchedEvent.getPath());
}
}).forPath("/test-children-watch");

// 去添加children 来触发watcher回调
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-children-watch/child0");
// watcher是一次性的 不再去注册 不会再收到通知
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-children-watch/child1");
}


static void testBackGroundCallback(CuratorFramework client) throws Exception{
// backGround是异步处理机制 搭配CuratorListener监听器使用

// 添加一个CuratorListener 来处理backGround后台处理之后的回调
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 针对不通过的事件去处理
String path = curatorEvent.getPath();
System.out.println("当前线程名称:" + Thread.currentThread().getName());
switch (curatorEvent.getType()) {
case CREATE:
System.out.println("节点创建,path: "+ path);
break;
case EXISTS:
System.out.println("检查节点是否存在。path:" + path);
break;
case DELETE:
System.out.println("delete节点。path: " + path);
break;
case GET_DATA:
System.out.println("执行getdata path:" + path + " 数据:" + new String(curatorEvent.getData()));
break;
case SET_DATA:
System.out.println("执行setData path: " + path);
break;
case CHILDREN:
System.out.println("执行children path : "+ path);
break;
default:
}
}
});

// inBackGround操作 都是在上边的回调处理
client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath("/curator-background-api", "test".getBytes(StandardCharsets.UTF_8));
client.checkExists().inBackground().forPath("/curator-background-api");
client.getData().inBackground().forPath("/curator-background-api");
client.setData().inBackground().forPath("/curator-background-api", "test2".getBytes(StandardCharsets.UTF_8));
}

/**
* 创建节点
* @param client
*/
static void createZNode(CuratorFramework client) {
try {
String path = client.create().withMode(CreateMode.PERSISTENT).forPath("/curator-demo", "test".getBytes(StandardCharsets.UTF_8));
System.out.println("创建的持久化节点path是:" + path);

// 检查一个节点是否存在 返回节点的stat信息
Stat stat = client.checkExists().forPath("/curator-demo");
Stat stat1 = client.checkExists().forPath("/curator-demo1");
System.out.println("节点信息" + stat);
System.out.println("不存在的节点 checkExists方法返回值" + stat1);


// 查询节点存储的内容
byte[] bytes = client.getData().forPath("/curator-demo");
System.out.println("节点内容:" + new String(bytes));

// 设置节点存储的内容
client.setData().forPath("/curator-demo", "testChanged".getBytes(StandardCharsets.UTF_8));

// 在子目录下创建多个临时顺序节点
for (int i =0; i < 5 ; i++) {
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-demo/child-");
}
// 获取所有子节点
List<String> strings = client.getChildren().forPath("/curator-demo");
strings.stream().forEach(System.out::println);


// 删除指定的节点 级联删除
client.delete().deletingChildrenIfNeeded().forPath("/curator-demo");
} catch (Exception e) {

}
}
}

ZK简介和节点数据特性

发表于 2021-07-01 | 分类于 zookeeper | 热度: ℃
字数统计: 1,397 | 阅读时长 ≈ 5

ZK相关概念

zk是一个分布式协调框,可以用在分布式系统中的一些数据管理问题,比如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

ZX的核心功能

主要就两个:文件系统数据结构 和 监听通知机制。

1. 文件系统数据结构

image-20220701071359620

每个子目录项都被称作为znode(目录节点)。主要有几种类型的znode节点:

  • Persistent 持久化目录节点:客户端和zk断开连接之后,节点依旧存在,手动不删除永远存在。
  • Persistent_Sequential 持久化顺序编号目录节点:客户端与zk断开连接之后,节点仍然存在,一个有序节点被分配一个唯一的单调递增的整数。
  • Ephemeral 临时目录节点:客户端会话超时或者和zk断开连接之后,节点被删除。
  • Ephemeral——Sequential 临时顺序编号目录节点:客户端和zk断开连接之后,该节点被删除,只是zk给该节点名称进行顺序编号。
  • Container节点(3.5.3版本新增)容器节点:如果Container节点下面没有子节点,则Container节点在未来被Zk自动清除掉,定时任务默认60s检查一次。
  • TTL节点:需要配置开启,自动过期的功能不稳定。

image-20220701071409397

2. 监听通知机制

客户端注册监听它关心的任意节点,或者目录节点及递归子目录节点。ZK使用监听和通知模式来避免客户端轮询,当节点发生更改或者被删除,监听节点的客户端都能收到通知。

无论是对目录变化的监听还是对数据变化的监听,都是一次性的,收到一次事件之后,再次发生变更是不会触发这个客户端的。

  • 主动推送,避免轮询
  • 一次性。再次监听需要再次注册Watcher
  • 顺序性:如果更新了多个Watcher,那么Watcher通知顺序和更新顺序一致。

zk配置文件中的一些配置

image-20220701071420140

  • tickTime:客户端和服务端维持心跳的间隔,单位是毫秒
  • initLimit:容忍的最大心跳失活次数。
  • syncLimit:集群中follower服务器和leader服务器之间的请求、应答最多容忍心跳数量。
  • dataDir:存放数据的目录。

ZK相关命令

create创建znode命令

create path data。 不加参数默认是持久化节点。

  • -e:临时节点
  • -s:顺序节点
  • -c:容器节点
  • -t:给节点加过期时间,需要通过参数启动。

注意临时节点不能有子节点目录。

get查看节点数据

get path

  • -s:查看数据的同时查看节点的state信息
  • -w:针对当前节点数据建立一个watch监听。

ls查看目录

  • -R:递归看子节点
  • -w:建立一个watch监听,监听目录节点变更
  • -s:查看目录信息的同时查看state信息。

state信息

可以查看节点的state信息。
image-20220701071433523

  • cZxid:创建znode的事务id(Zxid的值)
  • mZxid:最后修改znode的事务id。
  • pZxid:最后添加或删除子节点的事务id(子节点列表发生变化才会发生改变)
  • ctime:znode的创建时间
  • mtime:znode的最近修改时间
  • dataVersion:znode的当前数据版本。
  • cversion:znode的子节点结果集版本。(子节点增加、删除都会影响这个版本)
  • aclVersion:对此znode的acl版本
  • ephemeralOwner:znode是临时节点时,表示znode所有者的sessionId。如果znode是持久化节点不是临时节点,此值为null。
  • dataLength:znode数据字段的长度。

顺序节点的创建

create -s /path即可创建顺序节点。

zk中的事件类型

事件监听机制可以监听的事件有:

  • None:连接建立事件
  • NodeCreated:节点创建
  • NodeDeleted:节点删除
  • NodeDataChanged:节点数据变化
  • NodeChildrenChanged:子节点列表变化
  • DataWatchRemoved:节点监听被移除
  • ChildWatchRemoved:子节点监听被移除

注意:一次操作可能触发多个事件:
下边注册了对节点的递归目录节点监听,然后删除一个子节点触发了节点删除和子节点列表变化两个事件。
image-20220701071447892

zk的内存数据和持久化

ZK在内存中的数据

image-20220701071458982

ZK的事务日志

zk会把客户端的事务操作(创建节点、删除节点、变更数据等)记录在事务日志中,对应的目录是在配置文件中用dataLogDir指定,如果没有使用dataDir。

这里能看到如果每个命令都去写对应的文件,那么会频繁造成磁盘IO操作,ZK这里的事务日志不断的追加为文件开辟新的磁盘块。为了提高效率,ZK在创建事务日志文件的时候会进行文件空间的预分配。创建文件的时候,就向操作系统申请一块大一点的磁盘块。

事务日志是写多个文件的,用log.<当时最大事务ID>作为文件名称

image-20220701071509342

ZK的数据快照

因为事务日志恢复速度慢,占用空间大,所以这里是用快照一起做数据日志持久化 的。

快照是某一时间的全量内存数据快照,恢复起来很快。而事务日志数据更全。一般数据恢复时先使用快照恢复某时刻的全量数据,再通过事务日志恢复更全的增量 数据。

redis持久化方式

发表于 2021-07-01 | 分类于 redis | 热度: ℃
字数统计: 844 | 阅读时长 ≈ 3

Redis持久化

RDB快照

redis内存数据库快照保存到dump.rdb二进制文件中。
image-20220701070640391

RDB快照文件体积小,因为进行过压缩,是一个二进制文件,且RDB文件直接恢复即可,恢复速度很快。

在redis.conf配置文件中有关于rdb持久化方式的频率策略:
image-20220701070655378

这里比如 save 60 10000代表的是60s内至少有10000个键被改动时,会自动生成并保存一个RDB快照文件。

可以手动执行RDB快照文件的生成,执行save命令或者bgsave命令即可。

save和bgsave命令

简单来说,save是主线程去写RDB内存快照文件的,而bgsave是fork一个子进程,用子进程去写RDB快照文件,不阻塞主线程去执行客户端发来的命令。(当然bgsave fork子进程的时候是阻塞的)。redis在触发生成RDB时是bgsave的方式。

image-20220701070713216

bgsave的写时复制机制

bgsave是利用COW(copy on write)这个机制来fork一个子进程去执行生成RDB持久化文件,redis本身还可以继续执行命令。子进程运行之后,会读取redis进程的内存数据,写入RDB文件。这时如果主线程还执行写数据命令,那么也会对修改的数据复制一份副本,bgsave子进程会将这个副本数据写入RDB文件。

RDB缺点

  • 虽然恢复速度很快,但是生成快照文件间隔之间redis挂了,那么这时没持久化到RDB文件的数据就会丢失。也就是RDB快照可能丢数据。

AOF持久化 (append only file)

AOF持久化机制,将修改的每一条指令记录进文件appendonly.aof中(先写入到buffer,再定时去fsync刷入磁盘)。当数据恢复时,会按照AOF文件中的写数据命令执行,来重建数据。

image-20220701070730233

这种AOF文件因为存放原生写命令,所以文件会比较大,且恢复时要执行Redis写命令,所以恢复速度比较慢。(相对于RDB快照恢复)当然,这种持久化方式数据安全性更高,因为可以设置fsync的频率,默认是每秒去将aof文件fsync到磁盘中,数据安全性比RDB更高。

AOF重写

AOF文件中可能有比较多的没用指令,比如自增五次,那么可以重写为set key 5即可,redis会定期根据内存最新数据进行aof重写,来压缩下AOF文件。

AOF重写是和bgsave类似,是fork一个子进程去执行AOF文件的重写,不会阻塞其他客户端请求redis。

RDB和AOF的比较

image-20220701070739316

Redis4.0之后的混合持久化方式

Redis 4.0之后提供了混合持久化方式。

1
2
-- 开启redis 混合持久化方式
# aof‐use‐rdb‐preamble yes

如果开启了混合持久化,那AOF在重写的时候,不再单纯去压缩无用的命令,会对此刻内存做RDB快照处理,并将此过程中新的写的命令追加到AOF文件,来生成新的AOF文件,覆盖原来的AOF文件。

image-20220701070749167

这样redis在恢复时,可以先根据RDB快照文件去恢复(速度快、体积小),然后再重放后面增量的写命令(数据安全),效率和安全都得到了保障。

dubbo异步调用原理

发表于 2021-06-01 | 分类于 dubbo | 热度: ℃
字数统计: 1,905 | 阅读时长 ≈ 9

dubbo支持异步调用,在2.7版本之后引入CompletableFuture来优化了异步调用。当然在2.6版本也支持异步调用。基本原理是一样的,只不过2.7版本支持了设置CompletableFuture来回调,细节上有些差别。

阅读全文 »

dubbo负载均衡算法

发表于 2021-05-15 | 分类于 dubbo | 热度: ℃
字数统计: 4,226 | 阅读时长 ≈ 19

dubbo负载均衡

image-20220716000024390

LoadBalance(负载均衡)的职责是将网络请求或者其他形式的负载“均摊”到不同的服务节点上,从而避免服务集群中部分节点压力过大、资源紧张,而另一部分节点比较空闲的情况。

dubbo提供了5种负载均衡实现,

  • 一致性hash的负载均衡算法 ConsistentHashLoadBalance
  • 基于权重随机算法 RandomLoadBalance
  • 基于最小活跃调用数算法 LeastActiveLoadBalance
  • 基于加权轮询算法的 RoundRobinLoadBalance
  • 基于最短响应时间的算法 ShortestResponseLoadBalance

AbstractLoadBalance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class AbstractLoadBalance implements LoadBalance {

static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 计算权重,随着服务运行时间uptime增大,权重ww的值会慢慢接近配置值weight
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
// 委托给子类实现
return doSelect(invokers, url, invocation);
}

protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);


/**
* AbstractLoadBalance提供的getWeight方法 用于计算Provider节点权重
* @param invoker the invoker
* @param invocation the invocation of this invoker
* @return weight
*/
int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight;
// provider节点的url
URL url = invoker.getUrl();
// Multiple registry scenario, load balance among multiple registries.
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
// 如果是RegistryService接口的话 直接获取参数上的权重
weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
} else {
// 从url中获取 权重
weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
// 对刚启动的provider节点 进行降权
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
}
return Math.max(weight, 0);
}
}
  • select方法委托给具体的子类实现。
  • 提供了getWeight获取provider节点权重的方法,内部对刚启动的provider节点做了预热处理,即权重在时间内逐步接近配置的权重值。

ConsistenHashLoadBalance 一致性hash负载均衡算法

一致性Hash算法简介

一致性 Hash 负载均衡可以让参数相同的请求每次都路由到相同的服务节点上,这种负载均衡策略可以在某些 Provider 节点下线的时候,让这些节点上的流量平摊到其他 Provider 上,不会引起流量的剧烈波动。

与直接对节点数取模不同,为了避免因一个 Provider 节点宕机,而导致大量请求的处理节点发生变化的情况,我们可以考虑使用一致性 Hash 算法。一致性 Hash 算法的原理也是取模算法,与 Hash 取模的不同之处在于:Hash 取模是对 Provider 节点数量取模,而一致性 Hash 算法是对 2^32 取模。其对Provider地址和请求参数对2^32取模。

1
2
hash(Provider地址) % 2^32
hash(请求参数) % 2^32

image-20220716000052705

一致性hash可能存在流量倾斜的问题,解决方案就是为真实节点设置虚拟槽位,来通过虚拟槽位让节点在逻辑上分布的更加均匀。

dubbo中的一致性hash实现

dubbo为每个dubbo接口(serviceKey做区分)的方法,缓存了一个一致性Hash选择器 ConsistentHashSelector,主要负载均衡去选出具体的Provider节点的过程就是去调用Selector的select方法。

1
2
// key:serviceKey+methodName value:ConsistentHashSelector 这个缓存来为方法调用选择一致性hash选择器
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

可以看到一致性hash负载均衡算法的doSelect的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// key是 service.methodName
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

// using the hashcode of list to compute the hash only pay attention to the elements in the list
// 这是为了在invokers列表发生变化时都会重新生成ConsistentHashSelector对象
int invokersHashCode = invokers.hashCode();
// 根据key从缓存中取
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != invokersHashCode) {
// selector不存在 则去创建一个
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 通过selector 选择一个Invoker对象
return selector.select(invocation);
}

选择器ConsistentHashSelector的初始化构造函数和最终select负载均衡逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
 private static final class ConsistentHashSelector<T> {

// 用于记录虚拟 Invoker 对象的 Hash 环。这里使用 TreeMap 实现 Hash 环,并将虚拟的 Invoker 对象分布在 Hash 环上。
// 虚拟槽是为了节点在分配流量时更加均匀
private final TreeMap<Long, Invoker<T>> virtualInvokers;

// 虚拟Invoker个数
private final int replicaNumber;

// Invoker集合的hashcode值
private final int identityHashCode;

// 需要参与hash计算的参数索引
private final int[] argumentIndex;

ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 初始化virtualInvokers字段,也就是虚拟Hash槽
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
// 记录Invoker集合的hashCode,用该hashCode值来判断Provider列表是否发生了变化
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 获取配置的虚拟槽的个数 默认160个
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 获取参与Hash计算的参数下标值,默认对第一个参数进行Hash运算
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 构建虚拟Hash槽,默认replicaNumber=160,相当于在Hash槽上放160个槽位
for (Invoker<T> invoker : invokers) {
// 循环当前的provider对应的invokers节点
String address = invoker.getUrl().getAddress();
// 外层轮询40次,内层轮询4次,共40*4=160次,也就是同一节点虚拟出160个槽位
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
// 构建Hash槽
virtualInvokers.put(m, invoker);
}
}
}
}

public Invoker<T> select(Invocation invocation) {
// 将参与一致性Hash的参数拼接到一起
String key = toKey(invocation.getArguments());
// 计算key的Hash值
byte[] digest = md5(key);
// 从hash环中去匹配Invoker对象
return selectForKey(hash(digest, 0));
}


private Invoker<T> selectForKey(long hash) {
// 从virtualInvokers集合(TreeMap是按照Key排序的)中查找第一个节点值大于或等于传入Hash值的Invoker对象
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
// 如果Hash值大于Hash环中的所有Invoker,则回到Hash环的开头,返回第一个Invoker对象
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
}

RandomLoadBalance 加权随机负载均衡算法

加权随机算法

RandomLoadBalance是dubbo默认的负载均衡算法。加权随机算法简单高效,可以通过一个例子来说明加权随机算法的核心思想。

image-20220716000109233

dubbo中的加权随机的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Select one invoker between a list using a random criteria
* @param invokers List of possible invokers
* @param url URL
* @param invocation Invocation
* @param <T>
* @return The selected invoker
*/
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;

// the weight of every invokers Invokers对应的权重数组
int[] weights = new int[length];

// the first invoker's weight 计算第一个节点的权重 getWright是在AbstractLoadBalance中提供的公共能力
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;

// The sum of weights 记录总权重
int totalWeight = firstWeight;

for (int i = 1; i < length; i++) {
// 获取每个节点的权重
int weight = getWeight(invokers.get(i), invocation);
// save for later use
weights[i] = weight;
// Sum
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
// 标记是否为全部一样的权重
sameWeight = false;
}
}
// 各个Invoker权重值不相等时,计算随机数落在哪个区间上
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// 随机获取一个[0, totalWeight)区间的内的数字
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
// 循环让offset数减去Invoker的权重值,当offset小于0时,返回相应的Invoker
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果总权重为0 或者 权重都相等 则随机返回一个
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

其核心逻辑有三个关键点:

  • 计算每个Invoker对应的权重及总权重值。
  • 每个Invoker权重值不相等时,计算随机数应该落在哪个Invoker区间中,返回对应的Invoker对象。计算的逻辑其实就是生成[0, 总权重)区间内的随机数
    ,然后循环每个节点的长度,第一个减去长度小于0的区间对应的节点即为按权重随机选择出的节点。
  • 如果各个Invoker权重相等,随机返回一个Invoker即可。方法也是生成一个随机数,从Invoker集合中按照随机数作为下标获取。

LeastActiveLoadBalance 最小活跃调用数负载均衡算法

最小活跃数负载均衡算法认为当前活跃请求数越小的Provider节点,剩余的处理能力越多,利用率越高。那么这个Provider会在单位时间内承载更多的请求,优先将这些请求分配给该Provider节点。

LeastActiveLoadBalance需要配合ActiveLimitFilter使用,ActiveLimitFilter会记录每个接口方法的活跃请求数,在进行负载均衡的时候,会根据活跃请求数最少的Invoker集合里挑选Invoker。

当然最小活跃调用数对应的Invoker可能有多个,在dubbo的实现中还像加权随机那样,去根据权重选出有一样最小活跃调用数的Invoker来调用。

dubbo实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();

// The least active value of all invokers
// 最小的活跃请求数
int leastActive = -1;

// The number of invokers having the same least active value (leastActive)
// 记录活跃请求数最小的Invoker集合的个数
int leastCount = 0;

// The index of invokers having the same least active value (leastActive)
// 记录活跃请求数最小的Invoker在invokers数组中的下标位置
int[] leastIndexes = new int[length];

// the weight of every invokers
// 记录活跃请求数最小的Invoker集合中,每个Invoker的权重值
int[] weights = new int[length];

// The sum of the warmup weights of all the least active invokers
int totalWeight = 0; // 总权重

// The weight of the first least active invoker
int firstWeight = 0;

// Every least active invoker has the same weight value?
boolean sameWeight = true; // 是否每个最小活跃数的Invoker有相同的权重


// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker 获取活跃数
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();

// Get the weight of the invoker's configuration. The default value is 100.
// 获取当前Invoker节点的权重
int afterWarmup = getWeight(invoker, invocation);

// save for later use
weights[i] = afterWarmup;

// If it is the first invoker or the active number of the invoker is less than the current least active number
if (leastActive == -1 || active < leastActive) {
// 当前的Invoker是第一个活跃请求数最小的Invoker,则记录如下信息

// Reset the active number of the current invoker to the least active number
// 重新记录最小活跃数
leastActive = active;

// Reset the number of least active invokers
// 重新记录活跃请求数最小的Invoker集合个数
leastCount = 1;

// Put the first least active invoker first in leastIndexes
// 重新记录最小活跃数Invoker索引下标
leastIndexes[0] = i;

// Reset totalWeight 重新记录总权重
totalWeight = afterWarmup;
// Record the weight the first least active invoker 记录第一个最小活跃数的权重
firstWeight = afterWarmup;

// Each invoke has the same weight (only one invoker here)
sameWeight = true;

// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {
// 又找到一个最小活跃请求数的Invoker

// Record the index of the least active invoker in leastIndexes order 记录该Invoker的下标
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker 更新总权重
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && afterWarmup != firstWeight) {
// 更新权重值是否相等
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
if (leastCount == 1) {
// 只有一个最小活跃数 直接返回
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
if (!sameWeight && totalWeight > 0) {
// 下面按照RandomLoadBalance的逻辑,从活跃请求数最小的Invoker集合中,随机选择一个Invoker对象返回
// 即最小活跃数Invoker集合中如果权重不一致 那么按照加权随机算法去选出一个Invoker 具体@see RandomLoadBalance
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 和加权随机一样 如果都有一样的权重 则从最小随机数的Invoker列表中随机选一个
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}

RoundRobinLoadBalance 加权轮询负载均衡算法

轮询指的是将请求轮流分配给每个 Provider。例如,有 A、B、C 三个 Provider 节点,按照普通轮询的方式,我们会将第一个请求分配给 Provider A,将第二个请求分配给 Provider B,第三个请求分配给 Provider C,第四个请求再次分配给 Provider A……如此循环往复。

轮询是一种无状态负载均衡算法,实现简单,适用于集群中所有 Provider 节点性能相近的场景。但现实情况中就很难保证这一点了,因为很容易出现集群中性能最好和最差的 Provider 节点处理同样流量的情况,这就可能导致性能差的 Provider 节点各方面资源非常紧张,甚至无法及时响应了,但是性能好的 Provider 节点的各方面资源使用还较为空闲。这时我们可以通过加权轮询的方式,降低分配到性能较差的 Provider 节点的流量。

加权之后,分配给每个 Provider 节点的流量比会接近或等于它们的权重比。例如,Provider 节点 A、B、C 权重比为 5:1:1,那么在 7 次请求中,节点 A 将收到 5 次请求,节点 B 会收到 1 次请求,节点 C 则会收到 1 次请求。

Dubbo加权轮询负载均衡算法的实现

算法总的描述:
每个Provider节点(Invoker)创建两个权重,一个是配置的weight,在整个过程中不会变化,另一个是currentWeight,在负载均衡的过程中动态调整,初始值为0。

当请求进来的时候,RoundRobinLoadBalance会遍历Invoker列表,并用响应的currentWeight加上其配置的权重。遍历完成之后,再找到最大的currentWeight,将其减去权重总和,然后返回相应的Invoker对象。 新的请求来了之后会重复上面的操作。

原理示意图:
image-20220716000215242

一个示例:
image-20220716000241259

源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 和一致性hash套路一样 缓存serviceKey_methodName 作为key value也是map key是Invoker对象 value是WeightedRoundRobin对象(记录了Invoker节点的配置权重和在轮询过程中的动态当前权重)
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();


protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 为当前服务方法在缓存中初始化 Invoker -> WeightedRoundRobin 映射 存在直接获取
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
// 最大的current值
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// 获取invoker的身份String 作为map的key
String identifyString = invoker.getUrl().toIdentityString();
// 获取权重 (配置值)
int weight = getWeight(invoker, invocation);
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
// 检测当前Invoker是否有相应的WeightedRoundRobin对象,没有则进行创建
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});

if (weight != weightedRoundRobin.getWeight()) {
//weight changed 配置的权重发生变化也会重新设置
weightedRoundRobin.setWeight(weight);
}
// current+配置的权重
long cur = weightedRoundRobin.increaseCurrent();
// 设置lastUpdate字段
weightedRoundRobin.setLastUpdate(now);

// 找具有最大currentWeight的Invoker,以及Invoker对应的WeightedRoundRobin
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight; // 计算权重总和
}
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
if (selectedInvoker != null) {
// 用选中的最大权重的Invoker的currentWeight减去totalWeight
selectedWRR.sel(totalWeight);
// 返回选中的Invoker对象
return selectedInvoker;
}
// should not happen here
// 一个兜底 应该不会走到这
return invokers.get(0);
}

123…12
夸克

夸克

愿赌服输

114 日志
32 分类
121 标签
GitHub E-Mail csdn
© 2022 夸克 | Site words total count: 168.9k
|
主题 — NexT.Muse v5.1.4
博客全站共168.9k字

载入天数...载入时分秒...