watchDog核心原理
定时检测业务是否执行完毕,没结束的话观察这个key是否经历的锁超时时间的的三分之一,如果经历了,那么进行到期时间的重新设置,防止业务没有执行完就被释放锁造成的并发场景。
这其中还有一些实现细节,比如每个加锁的方法都去起一个线程去执行锁的续期吗?(Redisson这里不是直接的异步线程,而是借助了Netty的时间轮);再比如所有的加锁场景都会注册触发watchdog的锁续期吗?(这里如果传入了超时时间leaseTime,内部不会注册watchDog续期任务)
WatchDog注册的条件
不是所有的lock方法都会去触发watchDog的自动续期功能。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
// 如果传入的leaseTime不为-1 (即用户设置了leaseTime)
// 去直接调用加锁逻辑
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
// 大前提是leaseTime设置了-1
// 如果返回了null 说明加锁成功 这里去续期
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
可以看到当调用lock(leaseTime)加锁api时,内部不会触发watchDog任务的建立,这里在使用的时候要注意,只有lock()方法才去做了watchDog的续期任务创建,默认超时释放时间是30s。
watchDog任务删除
在执行unlock的时候,会去删除注册的watchDog续期任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
// 解锁成功 取消watchdog任务
cancelExpirationRenewal();
}
}
// cancelExpirationRenewal方法
void cancelExpirationRenewal() {
// map中移除
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
// task去取消 内部是加入到时间轮的取消队列中
task.cancel();
}
}
WatchDog的逻辑
看下scheduleExpirationRenewal方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47private void scheduleExpirationRenewal(final long threadId) {
// watchDog任务的缓存
if (expirationRenewalMap.containsKey(getEntryName())) {
// 缓存中存在 直接返回 等待调度即可。
return;
}
// 利用时间轮去创建一个任务 内部会有一个线程扫描并执行续期操作。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
// 执行lua脚本去设置新的续期时间
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
// map中移除,只要续期任务被触发一次 在返回中就先删除缓存中的任务
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " + getName() + " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself
// 在返回结果再次递归调用自己去续期
scheduleExpirationRenewal(threadId);
}
}
});
}
// 任务执行的时间间隔
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
// 缓存中存在 任务取消 说明已经注册过续期任务了
task.cancel();
}
}
在执行完续期操作之后,在返回值的回调中又去调用scheduleExpirationRenewal方法注册续期任务,来触发下一次的续期。
续期的lua脚本逻辑:
- 判断key、uuid:tid这个hash结构的锁是否存在,即当前线程是否持有锁
- 如果不存在,返回0。可以直接取消任务(这里应该认为unlock了,取消了续期任务)
- 如果存在,去设置新的超时时间,相当于去续期。
整个任务注册的delay执行时间是三分之一 internalLockLeaseTime,按照默认配置是10s之后的任务。
WatchDog任务的执行
运用了Netty中的时间轮。和Dubbo中运用的时间轮大同小异。这里不是每加锁都会起一个线程去执行watchDog的逻辑,而是集中式的在时间轮中去管理这个定时任务。