watchDog核心原理
定时检测业务是否执行完毕,没结束的话观察这个key是否经历的锁超时时间的的三分之一,如果经历了,那么进行到期时间的重新设置,防止业务没有执行完就被释放锁造成的并发场景。
这其中还有一些实现细节,比如每个加锁的方法都去起一个线程去执行锁的续期吗?(Redisson这里不是直接的异步线程,而是借助了Netty的时间轮);再比如所有的加锁场景都会注册触发watchdog的锁续期吗?(这里如果传入了超时时间leaseTime,内部不会注册watchDog续期任务)
WatchDog注册的条件
不是所有的lock方法都会去触发watchDog的自动续期功能。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
        // 如果传入的leaseTime不为-1 (即用户设置了leaseTime)
        // 去直接调用加锁逻辑
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                // 大前提是leaseTime设置了-1
                // 如果返回了null 说明加锁成功 这里去续期
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
可以看到当调用lock(leaseTime)加锁api时,内部不会触发watchDog任务的建立,这里在使用的时候要注意,只有lock()方法才去做了watchDog的续期任务创建,默认超时释放时间是30s。
watchDog任务删除
在执行unlock的时候,会去删除注册的watchDog续期任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    if (opStatus) {
        // 解锁成功 取消watchdog任务
        cancelExpirationRenewal();
    }
}
// cancelExpirationRenewal方法
  void cancelExpirationRenewal() {
  // map中移除
    Timeout task = expirationRenewalMap.remove(getEntryName());
    if (task != null) {
    // task去取消 内部是加入到时间轮的取消队列中
        task.cancel();
    }
}
WatchDog的逻辑
看下scheduleExpirationRenewal方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47private void scheduleExpirationRenewal(final long threadId) {
        // watchDog任务的缓存
        if (expirationRenewalMap.containsKey(getEntryName())) {
        // 缓存中存在 直接返回 等待调度即可。
            return;
        }
        // 利用时间轮去创建一个任务 内部会有一个线程扫描并执行续期操作。
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            
            public void run(Timeout timeout) throws Exception {
                
                // 执行lua脚本去设置新的续期时间
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    
                    public void operationComplete(Future<Boolean> future) throws Exception {
                    // map中移除,只要续期任务被触发一次 在返回中就先删除缓存中的任务
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            // 在返回结果再次递归调用自己去续期
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
            // 任务执行的时间间隔
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            // 缓存中存在 任务取消 说明已经注册过续期任务了
            task.cancel();
        }
    }
在执行完续期操作之后,在返回值的回调中又去调用scheduleExpirationRenewal方法注册续期任务,来触发下一次的续期。
续期的lua脚本逻辑:
- 判断key、uuid:tid这个hash结构的锁是否存在,即当前线程是否持有锁
- 如果不存在,返回0。可以直接取消任务(这里应该认为unlock了,取消了续期任务)
 - 如果存在,去设置新的超时时间,相当于去续期。
 
 
整个任务注册的delay执行时间是三分之一 internalLockLeaseTime,按照默认配置是10s之后的任务。
WatchDog任务的执行
运用了Netty中的时间轮。和Dubbo中运用的时间轮大同小异。这里不是每加锁都会起一个线程去执行watchDog的逻辑,而是集中式的在时间轮中去管理这个定时任务。
总结

