Fork me on GitHub

Redisson分布式限流组件RRateLimiter使用

RRateLimiter的使用Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class RRateLimiterDemo {
static {
// 输出日志info级别
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
List<Logger> loggerList = loggerContext.getLoggerList();
loggerList.forEach(logger -> {
logger.setLevel(Level.INFO);
});
}
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer()
.setTimeout(1000000)
.setAddress("redis://127.0.0.1:6379");

RedissonClient redisson = Redisson.create(config);
RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
// 初始化
// 最大流速 = 每1秒钟产生N个令牌
boolean b1 = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
if (b1) {
System.out.println("限流器初始化完成");
}

int allThreadNum = 20;

CountDownLatch latch = new CountDownLatch(allThreadNum);

long startTime = System.currentTimeMillis();
AtomicInteger fail = new AtomicInteger(0);
for (int i = 0; i < allThreadNum; i++) {
new Thread(() -> {
boolean b = rateLimiter.tryAcquire(1);
if (!b) {
System.out.println(Thread.currentThread().getName() + "被拒绝");
fail.incrementAndGet();
}else {
System.out.println(Thread.currentThread().getName());
}
latch.countDown();
}).start();
}
latch.await();
System.out.println("Elapsed " + (System.currentTimeMillis() - startTime) + ", 被限流fail: " + fail.get());
rateLimiter.delete();
redisson.shutdown();

}
}

RRateLimiter的基本原理

setRate设置速率

会向Redis中设计速率、时间间隔、和对应的模式。整个为hash结构,

1
2
3
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]); -- 速率 比如1s产生2个令牌 速率就是2
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]); -- 时间间隔为1s
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]); -- 模式这里忽略 分布式环境下都是集群

尝试获取一个许可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
-- 速率
local rate = redis.call("hget", KEYS[1], "rate")
-- 时间区间(ms)
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")

-- {name}:value 分析后面的代码,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]

-- {name}:permits 这个key是一个zset,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]

-- 单机限流才会用到,集群模式不用关注
if type == "1" then
valueName = KEYS[3]
permitsName = KEYS[5]
end

-- 原版本有bug(https://github.com/redisson/redisson/issues/3197),最新版将这行代码提前了
-- rate为1 arg1这里是 请求的令牌数量(默认是1)。rate必须比请求的令牌数大
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")

-- 第一次执行这里应该是null,会进到else分支
-- 第二次执行到这里由于else分支中已经放了valueName的值进去,所以第二次会进if分支
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
-- 从第一次设的zset中取数据,范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)
-- 可以看到,如果第二次请求时间距离第一次请求时间很短(小于令牌产生的时间),那么这个差值将小于上一次请求的时间,取出来的将会是空列表。反之,能取出之前的请求信息
-- 这里作者将这个取出来的数据命名为expiredValues,可认为指的是过期的数据
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
-- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("fI", v)
released = released + permits
end

-- 没有过期请求的话,released还是0,这个if不会进,有过期请求才会进
if released > 0 then
-- 移除zset中所有元素,重置周期
redis.call("zrem", permitsName, unpack(expiredValues))
currentValue = tonumber(currentValue) + released
redis.call("set", valueName, currentValue)
end

-- 这里简单分析下上面这段代码:
-- 1. 只有超过了1个令牌生产周期后的请求,expiredValues才会有值。
-- 2. 以rate为3举例,如果之前发生了两个请求那么现在released为2,currentValue为1 + 2 = 3
-- 以此可以看到,redisson的令牌桶放令牌操作是通过请求时间窗来做的,如果距离上一个请求的时间已经超过了一个令牌生产周期时间,那么令牌桶中的令牌应该得到重置,表示生产rate数量的令牌。

-- 如果当前令牌数 < 请求的令牌数
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1);
local random, permits = struct.unpack("fI", nearest[1])
-- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
-- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zset
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- valueName存的是当前总令牌数,-1表示取走一个
redis.call("decrby", valueName, ARGV[1])
return nil
end
else
-- set一个key-value数据 记录当前限流器的令牌数
redis.call("set", valueName, rate)
-- 建了一个以当前限流器名称相关的zset,并存入 以score为当前时间戳,以lua格式化字符串{当前时间戳为种子的随机数、请求的令牌数}为value的值。
-- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体。我的理解就是往zset里记录了最后一次请求的时间戳和请求的令牌数
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- 从总共的令牌数 减去 请求的令牌数。
redis.call("decrby", valueName, ARGV[1])
return nil
end

总结一下,redisson用了zset来记录请求的信息,这样可以非常巧妙的通过比较score,也就是请求的时间戳,来判断当前请求距离上一个请求有没有超过一个令牌生产周期。如果超过了,则说明令牌桶中的令牌需要生产,之前用掉了多少个就生产多少个,而之前用掉了多少个令牌的信息也在zset中保存了。

LUA脚本返回后的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
long s = System.currentTimeMillis();
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
future.onComplete((delay, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}

if (delay == null) {
//delay就是lua返回的 还需要多久才会有令牌
promise.trySuccess(true);
return;
}

//没有手动设置超时时间的逻辑
if (timeoutInMillis == -1) {
//延迟delay时间后重新执行一次拿令牌的动作
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
tryAcquireAsync(permits, promise, timeoutInMillis);
}, delay, TimeUnit.MILLISECONDS);
return;
}

//el 请求redis拿令牌的耗时
long el = System.currentTimeMillis() - s;
//如果设置了超时时间,那么应该减去拿令牌的耗时
long remains = timeoutInMillis - el;
if (remains <= 0) {
//如果那令牌的时间比设置的超时时间还要大的话直接就false了
promise.trySuccess(false);
return;
}
//比如设置的的超时时间为1s,delay为1500ms,那么1s后告知失败
if (remains < delay) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
promise.trySuccess(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
long start = System.currentTimeMillis();
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
//因为这里是异步的,所以真正再次拿令牌之前再检查一下过去了多久时间。如果过去的时间比设置的超时时∂间大的话,直接false
long elapsed = System.currentTimeMillis() - start;
if (remains <= elapsed) {
promise.trySuccess(false);
return;
}
//再次拿令牌
tryAcquireAsync(permits, promise, remains - elapsed);
}, delay, TimeUnit.MILLISECONDS);
}
});
}

再次总结一下,Java客户端拿到redis返回的下一个令牌生产完成还需要多少时间,也就是delay字段。如果这个delay为null,则表示成功获得令牌,如果没拿到,则过delay时间后通过异步线程再次发起拿令牌的动作。这里也可以看到,redisson的RateLimiter是非公平的,多个线程同时拿不到令牌的话并不保证先请求的会先拿到令牌。

-------------本文结束感谢您的阅读-------------

本文标题:Redisson分布式限流组件RRateLimiter使用

文章作者:夸克

发布时间:2021年02月23日 - 13:02

最后更新:2022年07月23日 - 03:07

原始链接:https://zhanglijun1217.github.io/2021/02/23/Redisson分布式限流组件RRateLimiter使用/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。