Fork me on GitHub
夸克的博客


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

  • 搜索

redis缓存一致性相关问题

发表于 2021-05-01 | 分类于 redis | 热度: ℃
字数统计: 1,883 | 阅读时长 ≈ 6

https://mp.weixin.qq.com/s/4W7vmICGx6a_WX701zxgPQ

阅读全文 »

redis底层存储数据结构

发表于 2021-04-21 | 分类于 redis | 热度: ℃
字数统计: 2,480 | 阅读时长 ≈ 9

RedisKV设计原理

Redis底层数据结构

Redis底层有SDS(简单动态字符串)、链表(list)、字典(ht-hashtable)、跳跃表(SkipList)、整数集合(intset)、压缩链表(ziplist)、quicklist等数据结构。Redis使用这些底层数据结构实现key、value的存储。

Redis数据库的key和value是怎么存储的

整个Redis中所有的key和value组成了一个全局字典,存储在RedisDb数据结构中。

image-20220701070153549

image-20220701070203576

  • RedisDb:结构包含一个字典dict,还包含一些过期键值对等dict字典值。
  • dict:dict是字典结构,保存键值的抽象数据结构,包含两个hash表,供平时和rehash时候使用。hash表使用链地址法来解决键冲突,对hash表进行扩容或者缩容的时候,为了服务的可用性,rehash的过程不是一次性完成的,而是渐进式的。
  • dictht:即字典的dict的hashtable结构,包含多个dictEntry节点的数组。
  • dicEntry:真正包装key、value的抽象存储结构,包含key(key都是string类型的)和value结构。value即可以通过RedisObject包装为不同的基础数据类型。

Redis中的字典是怎么扩容的。(什么是redis的渐进式rehash)

存放dictEntry的数组在容量不足时,肯定需要扩容,上看可以看到dict中持有两个dictht 哈希表,即ht[0]和ht[1]。在扩容的时候,把ht[0]里的值rehash到ht[1],这是一个渐进式hash的过程。

  • 渐进式hash:是rehash过程并不是一次性、集中式的完成的,而是分多次、渐进式的完成的。

全部rehash完成之后,h[1]就取代h[0]存储字典中的元素。

Redis中的value存储结构

value支持很多数据类型。比如String、list、hash、set、sorted set等。在redis中value(dictEntry中的val指针)是使用RedisObject表示的。

1
2
3
4
5
6
typedef struct  redisObejct{
unsigned type; // value对象类型
unsigned encoding; // 编码方式
lru:LRU_BITS; // 淘汰时用
void *ptr; // 真正底层数据结构指针
}

Redis底层数据结构

常用的value基础数据类型和底层数据结构的映射关系:
image-20220701070220161

Redis中的字符串

redis中所有的key都是用字符串存储的,而且redis没有直接使用c语言中的字符串。而是使用自己实现的简单动态字符串(sds)。

1
2
3
4
5
6
7
8
9
10
11
struct sdshdr {
// 记录buf数组中已经使用的数量
// 等于sdc所保存字符串的长度
unsigned int len;

// 记录buf数组中未使用字节的数量
unsigned int free;

// 字符数组 用于保存字符串
char buf[];
}

image-20220701070236976

Redis为什么不使用c语言字符串进行存储

C语言中的字符串使用了长度为N+1的字符数组来存储表示长度为N的字符串,并且字符数组最后一个元素总是\0。
image-20220701070248344

存在的问题:

  • 获取字符串的效率复杂度高:C不保存字符串数组的长度,要获取长度要遍历数组。
  • 不能杜绝 缓冲区溢出/内存泄漏的问题:C语言一个问题可能造成缓冲区溢出的问题,从而导致内存泄漏。
  • C只能保存文本数据:C中的字符串不是二进制安全的,redis某些场景也要存储音视频文件,如果遇到字符 ‘\0’会截断不安全。

Redis的sds怎么解决的

image-20220701070303502

  1. 在sds中增加了数组长度len,获取长度直接获取。时间复杂度O(1)。
  2. 自动扩展空间:SDS对字符串修改时,根据free能检查是否满足修改所需的空间,不够的话SDS会自动扩容,避免了C语言存在的字符串溢出的问题。
  3. 预分配内存有效降低内存分配次数,避免频繁扩容分配内存。SDS在扩容时是成倍的扩容的,这种预分配的机制避免了字符数组频繁扩容。
  4. 二进制安全:C语言字符串只能保存ASCII码,对于图片、视频等是二进制不安全的。SDS除了兼容了C语言的字符串,同时对写入、读取不做过滤、分割,是二进制安全的。

Redis中的双端链表(linkedlist)

redis的数组结构的value可以通过list结构来实现。(list底层数据结构还有ziplist压缩列表实现)

Redis中的list是一个双向无环链表结构,每个链表的节点用listNode结构,每个节点有前驱节点和后置节点方便遍历。

image-20220701070319748

  • listNode:list中的一个节点,包含value(值)和前驱和后置节点。
  • list:list结构,包含head和tail节点,是维护节点。持有了链表头尾的指针。

Redis中的ziplist

压缩列表是为了节约内存而开发的线性顺序数据结构,可以包含多个节点,每个节点可以保留一个字节数组或者整数值。

ziplist是一个为Redis专门提供的底层数据结构之一,本身可以有序也可以无序。当作为list和hash的底层实现时,节点之间没有顺序;当作为zset的底层实现时,节点之间会按照大小顺序排列。

image-20220701070329909

Redis中的quicklist

Redis早期版本list列表是通过压缩列表和普通的linkedlist来实现的,当元素少的时候用linkedlist,元素多的时候用ziplist。

而链表的附加空间比较高(prev和next指针),且链表内存中不连续,会有内存碎片化的问题。

在Redis 3.2版本之后对list结构的数据进行了改造,使用quicklist代替了ziplist和linkedlist。结合了空间和时间效率的考量。

quicklist其实也是由ziplist和linkedlist组成:
image-20220701070340512

  • quickList:本质还是一个linkedlist的维护节点,有头尾节点,连接的节点都是quicklistNode
  • quickListNode:有前驱和后继节点,但真正的内容不再是list中的一个值,而是一个ziplist。

这样两种数据结构的优点都能使用到。

Redis中的字典table结构

Redis中的hashtable就是dictht结构,和整体Redis kv数据存储是一个数据结构。当value是hash结构即代表value也是类型是dictht,内部每个dictEntry结构存储。

当hash结构的value元素比较少时,底层用ziplist存储(ziplist是有序的 dictht是无序的),元素数量比较大的时候会由字典来实现。由元素个数和元素大小的阈值来控制是否转变为字典结构。
image-20220701070403339

image-20220701070418240

Redis中的skiplist跳表

Redis中实现有序集合,底层选用的是hashtable和跳表作为数据结构。

和hash基础数据结构一样,当数据量小的时候,zset也可以用ziplist数据结构实现,当数据量大之后采用hashtabel+skiplist。

当zset满足:

  • 元素个数小于128
  • 所有成员的长度都小于64字节
    这两个条件时候,会采用ziplist作为底层存储结构。

当不满足任一条件时,会采用dict(hashtable) + skiplist来实现:

1
2
3
4
5
6
7
/* zset结构体 */
typedef struct zset {
// 字典,维护元素值和分值的映射关系
dict *dict;
// 按分值对元素值排序,支持O(logN)数量级的查找操作
zskiplist *zsl;
} zset;

  • hashtable:支持zrank、zscore等直接根据key查找的命令。但hash结构无序,不能很好支持快速排序。
  • skiplist:支持zrange、zrevrange等范围查找。跳表可以简单理解为简单的多层级排序链表,搜索的时间复杂度是O(LogN)。

skipList在Redis中的应用:在链表增加、删除节点O(1)的时间复杂度基础上,能加入多层级的索引节点,来实现近似二分查找的时间复杂度O(LogN)数据结构。

为什么选用skiplist,而不去选择其他结构

  • 有序和无序:skipList和各种平衡树(AVL、红黑树)都是有序排列的,而hash不是有序的。在hash表上只能做单key的查找,不适合范围查询。
  • 范围查找:在进行范围查找的时候,树结构比跳表的操作更复杂。虽然平衡树中序遍历能找到指定范围的小指,但需要再次中序遍历才能查找到范围的大值。而跳表只需要按照上层索引节点跳到第一层(原始链表)再次遍历即可。
  • 插入和删除:平衡树可能涉及到树的转换,子树的调整,实现复杂。而skiplist多层级都是链表,只需要维护对应的指针即可。

skipList的维护过程

image-20220701070445955

跳表的时间复杂度和优缺点

跳表是空间换时间的一种链表基础上优化的数据结构。

链表查找的时间复杂度是O(N),借鉴数据库索引的思想,提取出关键节点(索引),先在关键节点上查找,然后下层链表继续查找。

跳表的时间复杂度是O(logN)。空间复杂度是O(N)。在新增和删除的节点的过程中,因为要维护上层索引,且排序,时间复杂度也是logN。

优点:在数据量大且读多写少的场景下,能让链表查找的时间复杂度降低,Redis底层的链表是双向链表,很好的支持了zset的范围查询。

缺点:需要更多的空间,每次插入删除都要维护上层索引,写多的场景下效率存在折中。

Redis中的整数集合intset

用于保存整数值的抽象数据结构,不会出现重复元素。

Redis中的set value默认是用值为null的字典ht实现的。(和java中HashMap、HashSet很像),当值元素都是整数时,使用此intset结构来存储value。

image-20220701070459745

1
2
3
4
5
6
7
8
9
10
typedef struct intset {
//编码方式
uint32_t encoding;

// 数组长度
uint32_t length;

// 保存元素的数组
int8_t contents[]
}

image-20220701070511491

redis底层存储结构

发表于 2021-04-03 | 分类于 redis | 热度: ℃
字数统计: 0 | 阅读时长 ≈ 1

image-20220701065712069

dubbo超时机制

发表于 2021-04-02 | 分类于 dubbo | 热度: ℃
字数统计: 1,169 | 阅读时长 ≈ 5

预备知识

  • 针对服务端超时,dubb之后在Filter中进行超时的日志输出,但不会阻碍执行。
  • 针对客户端超时,会创建DefaultFuture的时候创建一个基于时间轮的定时任务,在客户端扫描,如果超过了超时时间,则构建一个超时异常让Future返回。

客户端如何知道请求失败?

RPC采用Netty作为底层通讯框架,非阻塞通信方式更高效,非阻塞的特性导致发送数据和接收数据是一个异步的过程,当存在服务端异常、网络问题时(比如超时)客户端是收不到响应的,如何判断一次RPC调用是失败的?

误区一:Dubbo是同步调用的吗?

dubbo在通讯层面因为采用Netty是异步的,呈现给调用者错觉是内部做了阻塞等待,实现了异步转同步。

误区二:channel.writeAndFlush返回一个channelFuture,channelFuture.isSuccess能判断请求成功了。

writeAndFlush只能表示网络缓存区写入成功,不代表发送成功。

那么Dubbo客户端感知到请求失败得自己去造了。

客户端的超时感知

客户端发起一个RPC请求时,会设置一个超时时间,发起调用的时候会开启一个定时器:

  • 如果收到正常响应,删除这个定时器。
  • 定时器倒计时完毕,会被认为超时,在客户端构造一个失败的返回。

Dubbo中的超时判定逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
// ThreadlessExecutor needs to hold the waiting future in case of circuit return.
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
// 创建一个Future时候 进行timeout check 内部是创建一个基于时间轮的定时check任务
timeoutCheck(future);
return future;
}

private static void timeoutCheck(DefaultFuture future) {
// 超时时间的检查任务
TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}


private static class TimeoutCheckTask implements TimerTask {

private final Long requestID;

TimeoutCheckTask(Long requestID) {
this.requestID = requestID;
}

// 超时检查的逻辑
@Override
public void run(Timeout timeout) {
// 根据requestId从Future缓存中获取future
DefaultFuture future = DefaultFuture.getFuture(requestID);
if (future == null || future.isDone()) {
// 如果future正常返回 则说明没有超时
return;
}
// 否则去响应一个超时
if (future.getExecutor() != null) {
future.getExecutor().execute(() -> notifyTimeout(future));
} else {
notifyTimeout(future);
}
}

private void notifyTimeout(DefaultFuture future) {
// create exception response.
// 客户端在超时之后创建一个超时的返回
Response timeoutResponse = new Response(future.getId());
// set timeout status.
// 根据future的isSent确定状态是客户端响应超时 还是 服务端响应超时
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
// 响应超时
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
}
}

超时即调用失败,一次 RPC 调用的失败,必须以客户端收到失败响应为准。

这里关键信息在dubbo发送请求相关代码中。

1
2
3
4
5
6
7
8
9
发起RPC调用 
--> 客户端代理对象
--> 服务目录/负载均衡选择一个可执行的Invoker
-->Invoker客户端Filter链
-->AsyncToSyncInvoker(路由请求)
--> DubboInvoker(委托NettyClient发送请求)
--> HeaderExchangeChannel.request(发送请求 创建一个DefaltFuture)
--> DefaultFuture创建 时会注册一个定时任务检测超时
--> channel.send 真正去发送请求

服务端的超时判断

在服务端,超时是通过Filter机制来完成的。具体代码是在TimeoutFilter中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Activate(group = CommonConstants.PROVIDER)
public class TimeoutFilter implements Filter, Filter.Listener {

private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}

// 执行完Invoker.invoke之后的回调
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// 拿到写在上下文中的timeout统计
Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
if (obj != null) {
TimeoutCountDown countDown = (TimeoutCountDown) obj;
if (countDown.isExpired()) {
// 如果超时 结果调用clear()方法清除
((AppResponse) appResponse).clear(); // clear response in case of timeout.
if (logger.isWarnEnabled()) {
// 打印warn日志
logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
", invoke elapsed " + countDown.elapsedMillis() + " ms.");
}
}
}
}

@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {

}
}

Filter.Listener接口中的onResponse方法可以认为是在执行完此filter中的invoke方法之后,获取到的Result提供了回调机制,能触发正常返回和异常的两个回调函数。(代码在ProtocolFilterWrapper中)

可以看到这里就是在调用正常返回之后触发了一个检查,如果服务端执行超时,则会打印一个warn日志。

redis的单线程和io多路复用

发表于 2021-04-01 | 分类于 redis | 热度: ℃
字数统计: 1,408 | 阅读时长 ≈ 5

redis是全部只有一个线程吗?

Redis是单线程,主要是指Redis的网络IO和键值读写是一个线程完成的,这也是Redis对外提供键值存储服务的主要流程。但Redis的其他功能,比如持久化、异步删除、集群数据同步等,是由额外的线程执行的(Redis 4.0增加功能)。

阅读全文 »

mybatis连接池的实现

发表于 2021-03-22 | 分类于 mybatis | 热度: ℃
字数统计: 60 | 阅读时长 ≈ 1

前言

最近看了Mybatis自带的数据源连接池的实现,在对比其他数据源的复杂实现,mybatis比较轻量简单的实现了数据库连接池化管理。这里写下来记录下。

本地消息表的一种实现方式

发表于 2021-03-04 | 分类于 分布式事务 | 热度: ℃
字数统计: 0 | 阅读时长 ≈ 1

image-20220704010219043

Redisson分布式限流组件RRateLimiter使用

发表于 2021-02-23 | 分类于 Redis | 热度: ℃
字数统计: 2,051 | 阅读时长 ≈ 9

RRateLimiter的使用Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class RRateLimiterDemo {
static {
// 输出日志info级别
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
List<Logger> loggerList = loggerContext.getLoggerList();
loggerList.forEach(logger -> {
logger.setLevel(Level.INFO);
});
}
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer()
.setTimeout(1000000)
.setAddress("redis://127.0.0.1:6379");

RedissonClient redisson = Redisson.create(config);
RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");
// 初始化
// 最大流速 = 每1秒钟产生N个令牌
boolean b1 = rateLimiter.trySetRate(RateType.OVERALL, 5, 1, RateIntervalUnit.SECONDS);
if (b1) {
System.out.println("限流器初始化完成");
}

int allThreadNum = 20;

CountDownLatch latch = new CountDownLatch(allThreadNum);

long startTime = System.currentTimeMillis();
AtomicInteger fail = new AtomicInteger(0);
for (int i = 0; i < allThreadNum; i++) {
new Thread(() -> {
boolean b = rateLimiter.tryAcquire(1);
if (!b) {
System.out.println(Thread.currentThread().getName() + "被拒绝");
fail.incrementAndGet();
}else {
System.out.println(Thread.currentThread().getName());
}
latch.countDown();
}).start();
}
latch.await();
System.out.println("Elapsed " + (System.currentTimeMillis() - startTime) + ", 被限流fail: " + fail.get());
rateLimiter.delete();
redisson.shutdown();

}
}

RRateLimiter的基本原理

setRate设置速率

会向Redis中设计速率、时间间隔、和对应的模式。整个为hash结构,

1
2
3
redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]); -- 速率 比如1s产生2个令牌 速率就是2
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]); -- 时间间隔为1s
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]); -- 模式这里忽略 分布式环境下都是集群

尝试获取一个许可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
-- 速率
local rate = redis.call("hget", KEYS[1], "rate")
-- 时间区间(ms)
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")

-- {name}:value 分析后面的代码,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]

-- {name}:permits 这个key是一个zset,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]

-- 单机限流才会用到,集群模式不用关注
if type == "1" then
valueName = KEYS[3]
permitsName = KEYS[5]
end

-- 原版本有bug(https://github.com/redisson/redisson/issues/3197),最新版将这行代码提前了
-- rate为1 arg1这里是 请求的令牌数量(默认是1)。rate必须比请求的令牌数大
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")

-- 第一次执行这里应该是null,会进到else分支
-- 第二次执行到这里由于else分支中已经放了valueName的值进去,所以第二次会进if分支
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
-- 从第一次设的zset中取数据,范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)
-- 可以看到,如果第二次请求时间距离第一次请求时间很短(小于令牌产生的时间),那么这个差值将小于上一次请求的时间,取出来的将会是空列表。反之,能取出之前的请求信息
-- 这里作者将这个取出来的数据命名为expiredValues,可认为指的是过期的数据
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
-- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("fI", v)
released = released + permits
end

-- 没有过期请求的话,released还是0,这个if不会进,有过期请求才会进
if released > 0 then
-- 移除zset中所有元素,重置周期
redis.call("zrem", permitsName, unpack(expiredValues))
currentValue = tonumber(currentValue) + released
redis.call("set", valueName, currentValue)
end

-- 这里简单分析下上面这段代码:
-- 1. 只有超过了1个令牌生产周期后的请求,expiredValues才会有值。
-- 2. 以rate为3举例,如果之前发生了两个请求那么现在released为2,currentValue为1 + 2 = 3
-- 以此可以看到,redisson的令牌桶放令牌操作是通过请求时间窗来做的,如果距离上一个请求的时间已经超过了一个令牌生产周期时间,那么令牌桶中的令牌应该得到重置,表示生产rate数量的令牌。

-- 如果当前令牌数 < 请求的令牌数
if tonumber(currentValue) < tonumber(ARGV[1]) then
-- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息
local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1);
local random, permits = struct.unpack("fI", nearest[1])
-- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌
return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
else
-- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zset
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- valueName存的是当前总令牌数,-1表示取走一个
redis.call("decrby", valueName, ARGV[1])
return nil
end
else
-- set一个key-value数据 记录当前限流器的令牌数
redis.call("set", valueName, rate)
-- 建了一个以当前限流器名称相关的zset,并存入 以score为当前时间戳,以lua格式化字符串{当前时间戳为种子的随机数、请求的令牌数}为value的值。
-- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体。我的理解就是往zset里记录了最后一次请求的时间戳和请求的令牌数
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
-- 从总共的令牌数 减去 请求的令牌数。
redis.call("decrby", valueName, ARGV[1])
return nil
end

总结一下,redisson用了zset来记录请求的信息,这样可以非常巧妙的通过比较score,也就是请求的时间戳,来判断当前请求距离上一个请求有没有超过一个令牌生产周期。如果超过了,则说明令牌桶中的令牌需要生产,之前用掉了多少个就生产多少个,而之前用掉了多少个令牌的信息也在zset中保存了。

LUA脚本返回后的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private void tryAcquireAsync(long permits, RPromise<Boolean> promise, long timeoutInMillis) {
long s = System.currentTimeMillis();
RFuture<Long> future = tryAcquireAsync(RedisCommands.EVAL_LONG, permits);
future.onComplete((delay, e) -> {
if (e != null) {
promise.tryFailure(e);
return;
}

if (delay == null) {
//delay就是lua返回的 还需要多久才会有令牌
promise.trySuccess(true);
return;
}

//没有手动设置超时时间的逻辑
if (timeoutInMillis == -1) {
//延迟delay时间后重新执行一次拿令牌的动作
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
tryAcquireAsync(permits, promise, timeoutInMillis);
}, delay, TimeUnit.MILLISECONDS);
return;
}

//el 请求redis拿令牌的耗时
long el = System.currentTimeMillis() - s;
//如果设置了超时时间,那么应该减去拿令牌的耗时
long remains = timeoutInMillis - el;
if (remains <= 0) {
//如果那令牌的时间比设置的超时时间还要大的话直接就false了
promise.trySuccess(false);
return;
}
//比如设置的的超时时间为1s,delay为1500ms,那么1s后告知失败
if (remains < delay) {
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
promise.trySuccess(false);
}, remains, TimeUnit.MILLISECONDS);
} else {
long start = System.currentTimeMillis();
commandExecutor.getConnectionManager().getGroup().schedule(() -> {
//因为这里是异步的,所以真正再次拿令牌之前再检查一下过去了多久时间。如果过去的时间比设置的超时时∂间大的话,直接false
long elapsed = System.currentTimeMillis() - start;
if (remains <= elapsed) {
promise.trySuccess(false);
return;
}
//再次拿令牌
tryAcquireAsync(permits, promise, remains - elapsed);
}, delay, TimeUnit.MILLISECONDS);
}
});
}

再次总结一下,Java客户端拿到redis返回的下一个令牌生产完成还需要多少时间,也就是delay字段。如果这个delay为null,则表示成功获得令牌,如果没拿到,则过delay时间后通过异步线程再次发起拿令牌的动作。这里也可以看到,redisson的RateLimiter是非公平的,多个线程同时拿不到令牌的话并不保证先请求的会先拿到令牌。

实现LRU算法

发表于 2021-01-17 | 分类于 算法 | 热度: ℃
字数统计: 1,874 | 阅读时长 ≈ 9

LRU算法概述

LRU:Least Recently Read (最近最少访问),是一种常见的缓存淘汰算法,当容量不够时淘汰最近最少访问的元素。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* 缓存容量为 2 */
LRUCache cache = new LRUCache(2);
// 你可以把 cache 理解成一个队列
// 假设左边是队头,右边是队尾
// 最近使用的排在队头,久未使用的排在队尾
// 圆括号表示键值对 (key, val)
cache.put(1, 1);
// cache = [(1, 1)]
cache.put(2, 2);
// cache = [(2, 2), (1, 1)]
cache.get(1); // 返回 1
// cache = [(1, 1), (2, 2)]
// 解释:因为最近访问了键 1,所以提前至队头
// 返回键 1 对应的值 1
cache.put(3, 3);
// cache = [(3, 3), (1, 1)]
// 解释:缓存容量已满,需要删除内容空出位置
// 优先删除久未使用的数据,也就是队尾的数据
// 然后把新的数据插入队头
cache.get(2); // 返回 -1 (未找到)
// cache = [(3, 3), (1, 1)]
// 解释:cache 中不存在键为 2 的数据
cache.put(1, 4);
// cache = [(1, 4), (3, 3)]
// 解释:键 1 已存在,把原始值 1 覆盖为 4
// 不要忘了也要将键值对提前到队头

一些特点

  • 查找效率高。尽量O(1)的查找效率
  • 插入、删除要求效率高,容量满的时候要淘汰最近最少访问的元素。
  • 元素之间是有顺序的,因为区分最近使用和久未使用的数据。

实现LRU核心数据结构

  • hash表:快速查找,但是各个节点的数据不是有顺序的。
  • 链表:插入删除快,有序但是查找速度不快。

最终结合两种数据结构。哈希链表

image-20220717191857466

这里的两个问题:

  • 为什么链表是双向链表?
    因为在容量满足了之后需要对链表尾部的节点删除,这时候要断开上一个节点和其的链,这就需要找到上一个节点。

  • 为什么链表中也是key和value,不能直接放value吗?
    因为在删除尾部节点的时候需要同时删除hash表中的元素,这时候需要根据key来删除,索引链表里也要冗余key。

java代码实现

使用HashMap和LinkedList来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class LRUCache {
class EasyNode {
Integer key;
Integer value;
EasyNode(Integer key, Integer value) {
this.key = key;
this.value = value;
}
}
Map<Integer, EasyNode> map;
LinkedList<EasyNode> cache;
int cap;

public LRUCache(int capacity) {
map = new HashMap<>(capacity);
cache = new LinkedList<>();
this.cap = capacity;
}

public int get(int key) {
// 如果key在map中不存在 返回-1
if (!map.containsKey(key)) {
return -1;
}

// 存在 则维护LRU中双向链表的逻辑
EasyNode node = map.get(key);
// 删除再添加到队尾
cache.remove(node);
cache.addLast(node);
return node.value;

}

public void put(int key, int value) {
// map中存在 则map更新 且将该节点放入队尾
EasyNode node = new EasyNode(key, value);
if (map.containsKey(key)) {
cache.remove(map.get(key));
cache.addLast(node);
map.put(key, node);
} else{
// 判断是否需要淘汰
if (cache.size() == cap) {
// 进行缓存的淘汰
EasyNode removeNode = cache.removeFirst();
map.remove(removeNode.key);
}
// 再添加到map和队尾
map.put(key, node);
cache.addLast(node);
}
}
}

/**
* Your LRUCache object will be instantiated and called as such:
* LRUCache obj = new LRUCache(capacity);
* int param_1 = obj.get(key);
* obj.put(key,value);
*/

使用HashMap和自定义的双向链表实现

  • 自定义双向链表的节点和双向链表
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * 双向链表节点
    */
    @Setter
    @Getter
    public class DeListNode <Key, Value>{

    private Key key;
    private Value value;
    // 前驱节点
    private DeListNode<Key, Value> prev;
    // 后继节点
    private DeListNode<Key, Value> next;

    public DeListNode(Key key, Value value)
    {
    this.key = key;
    this.value = value;
    }
    public String toString() {
    return String.format(" (key:%s,value:%s) ", key, value);
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* 简单实现双端链表
*/
public class DeLinkedList<Key, Value> {
// 头和尾节点
private DeListNode<Key, Value> head, tail;

// 链表元素个数
private int size;

public int size() {return size;}

public DeLinkedList() {
// 初始化一个 头尾巴节点 两个节点不会在真正使用中用到
head = new DeListNode<>(null, null);
tail = new DeListNode<>(null, null);
// 首尾连接
head.setNext(tail);
tail.setPrev(head);
}

// 给尾巴加入新的节点
public void addLast(DeListNode<Key, Value> node) {

// 虚拟的尾巴
DeListNode<Key, Value> dummyTail = tail;
// 真实的尾节点
DeListNode<Key, Value> trueTail = dummyTail.getPrev();

// 加入的节点和真实尾巴节点链接
trueTail.setNext(node);
node.setPrev(trueTail);

// 再链接上虚拟尾巴节点
node.setNext(dummyTail);
dummyTail.setPrev(node);
size ++;
}

public String toString() {
if (head.getNext() == tail) {
return
"[]";
} else {
DeListNode current = head.getNext();
StringBuffer sb = new StringBuffer();
sb.append(" 旧 [");
while (!current.equals(tail)) {
sb.append(current.toString());
current = current.getNext();
}
sb.append("] 新");
return sb.toString();
}
}

public void remove(DeListNode<Key, Value> node) {
// 假设node 一定存在
node.getPrev().setNext(node.getNext());
node.getNext().setPrev(node.getPrev());
size -- ;
}

// 队头移除元素
public DeListNode<Key, Value> removeFirst() {
// 没有元素不操作
if (head.getNext() == tail) {
return null;
}

DeListNode<Key, Value> trueHead = head.getNext();
head.setNext(trueHead.getNext());
trueHead.getNext().setPrev(head);
size--;
return trueHead;
}
}
  • 实现LRUCache
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    public class 实现LRU {
    /**
    * 简单双端队列
    * @param <Key>
    * @param <Value>
    */

    public static class LRUCache <Key, Value>{
    // 容量
    private int cap;
    // hash表
    private HashMap<Key, DeListNode<Key, Value>> map;
    // 双向链表缓存
    private DeLinkedList<Key, Value> cache;

    // 初始化函数
    public LRUCache(int cap) {
    this.cap = cap;
    map = new HashMap<>(cap);
    cache = new DeLinkedList<>();
    }

    /**
    * 根据key获取value
    * 逻辑:
    * 1. 如果缓存中不存在 则返回null
    * 2. 如果存在,将命中的元素移动到队头(LRU的逻辑) 返回元素
    * @param key
    * @return
    */
    public synchronized Value get(Key key) {
    if (!map.containsKey(key)) {
    return null;
    } else {
    // 移动元素到队尾 采用先删除再插入的方式
    DeListNode<Key, Value> keyValueDeListNode = map.get(key);
    cache.remove(keyValueDeListNode);
    cache.addLast(keyValueDeListNode);
    return keyValueDeListNode.getValue();
    }
    }

    /**
    * 添加一个元素到LRU缓存中
    * 逻辑:
    * 1. LRUCache中是否存在key 如果存在替换对应的node 即map.put 然后维护双向链表(插入再删除)
    * 2. 如果不存在 需要根基cap判断是否需要淘汰队头元素。 队列尾部添加节点 map中添加然后删除淘汰的key
    *
    * @param key
    * @param value
    */
    public synchronized void put(Key key, Value value) {
    // 构造新节点
    DeListNode<Key, Value> node = new DeListNode<>(key, value);
    if (map.containsKey(key)){
    // 存在key 则去替换map中的节点 且维护双端队列(删除再添加)
    cache.remove(map.get(key));
    cache.addLast(node);
    map.put(key, node);
    } else {
    // 不存在要去判断是否触发淘汰
    if (cap == cache.size()) {
    // 双端队列队头淘汰(LRU逻辑)
    DeListNode<Key, Value> first = cache.removeFirst();
    // map中remove
    map.remove(first.getKey());
    }

    // 直接添加到尾部即可
    cache.addLast(node);
    // map中添加
    map.put(key, node);
    }
    }
    public void printf() {
    System.out.println("cachhe 队列" + cache.toString());
    }
    }

    public static void main(String[] args) {
    // 初始化为2的lruCache
    LRUCache<Integer, Integer> lruCache = new LRUCache<>(2);
    lruCache.put(1, 1);
    lruCache.put(2, 2); // 新 [(2,2), (1,1)] 旧
    lruCache.printf();

    Integer integer = lruCache.get(1); // 新 [(1,1), (2,2)] 旧
    lruCache.printf();

    lruCache.put(2,3); // 新 [(2,3), (1,1)] 旧
    lruCache.printf();

    // 再添加 淘汰的是1
    lruCache.put(4, 4); // 新 [(4,4), (2,3)] 旧
    lruCache.printf();
    }
    }

mybatis-spring-boot-starter自动装配实现

发表于 2020-12-18 | 分类于 spring boot | 热度: ℃
字数统计: 1,314 | 阅读时长 ≈ 6

Mybatis-spring-boot-starter的入口

image-20220718024228821

会在springboot启动的时候加载autoconfigure模块定义的自动装配类:MybatisAutoConfiguration
image-20220718024241136

MybatisAutoConfiguration中装配Mybatis的逻辑

SqlSessionFactory的构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
// 通过SqlSessionFactoryBean来实现 SqlSesionFactory的初始化
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();

factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
// 给factory设置Mybatis的全局配置对象Configuration
applyConfiguration(factory);
if (this.properties.getConfigurationProperties() != null) {
factory.setConfigurationProperties(this.properties.getConfigurationProperties());
}
if (!ObjectUtils.isEmpty(this.interceptors)) {
factory.setPlugins(this.interceptors);
}
if (this.databaseIdProvider != null) {
factory.setDatabaseIdProvider(this.databaseIdProvider);
}
if (StringUtils.hasLength(this.properties.getTypeAliasesPackage())) {
factory.setTypeAliasesPackage(this.properties.getTypeAliasesPackage());
}
if (this.properties.getTypeAliasesSuperType() != null) {
factory.setTypeAliasesSuperType(this.properties.getTypeAliasesSuperType());
}
if (StringUtils.hasLength(this.properties.getTypeHandlersPackage())) {
factory.setTypeHandlersPackage(this.properties.getTypeHandlersPackage());
}
if (!ObjectUtils.isEmpty(this.typeHandlers)) {
factory.setTypeHandlers(this.typeHandlers);
}
if (!ObjectUtils.isEmpty(this.properties.resolveMapperLocations())) {
factory.setMapperLocations(this.properties.resolveMapperLocations());
}
Set<String> factoryPropertyNames = Stream
.of(new BeanWrapperImpl(SqlSessionFactoryBean.class).getPropertyDescriptors()).map(PropertyDescriptor::getName)
.collect(Collectors.toSet());
Class<? extends LanguageDriver> defaultLanguageDriver = this.properties.getDefaultScriptingLanguageDriver();
if (factoryPropertyNames.contains("scriptingLanguageDrivers") && !ObjectUtils.isEmpty(this.languageDrivers)) {
// Need to mybatis-spring 2.0.2+
factory.setScriptingLanguageDrivers(this.languageDrivers);
if (defaultLanguageDriver == null && this.languageDrivers.length == 1) {
defaultLanguageDriver = this.languageDrivers[0].getClass();
}
}
if (factoryPropertyNames.contains("defaultScriptingLanguageDriver")) {
// Need to mybatis-spring 2.0.2+
factory.setDefaultScriptingLanguageDriver(defaultLanguageDriver);
}

return factory.getObject();
}

SqlSessionFactory的实例化是通过SqlSessionFactoryBean.getObject()实现的,该类会被注入DataSource对象(负责管理数据库连接池,Session指的是一次会话,而这个会话是在DataSource提供的Connection上进行的)。

在factory.getObject()中,最核心还是调用了buildSqlSessionFactory 去创建了sqlSessionFactory实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected SqlSessionFactory buildSqlSessionFactory() throws IOException {
Configuration configuration;
XMLConfigBuilder xmlConfigBuilder = null;
if (this.configLocation != null) {
// 创建XMLConfigBuilder对象,读取指定的配置文件
xmlConfigBuilder = new XMLConfigBuilder(this.configLocation.getInputStream(),
null, this.configurationProperties);
configuration = xmlConfigBuilder.getConfiguration();
} else {
// 其他方式初始化Configuration全局配置对象
}
// 初始化MyBatis的相关配置和对象,其中包括:
// 扫描typeAliasesPackage配置指定的包,并为其中的类注册别名
// 注册plugins集合中指定的插件
// 扫描typeHandlersPackage指定的包,并注册其中的TypeHandler
// 配置缓存、配置数据源、设置Environment等一系列操作
// ...... 省略配置代码

if (this.transactionFactory == null) {
// 默认使用的事务工厂类
this.transactionFactory = new SpringManagedTransactionFactory();
}

// 根据mapperLocations配置,加载Mapper.xml映射配置文件以及对应的Mapper接口
for (Resource mapperLocation : this.mapperLocations) {
XMLMapperBuilder xmlMapperBuilder = new XMLMapperBuilder(...);
xmlMapperBuilder.parse();
}
// 最后根据前面创建的Configuration全局配置对象创建SqlSessionFactory对象
return this.sqlSessionFactoryBuilder.build(configuration);
}

SqlSessionFactory.getObject()方法里会根据我们mybatis相关配置(比如上面的mybatis.mapper-locations配置)找到并解析我们的mapper文件,解析出sql与dao方法里的映射、ResultMap与具体实体类的映射等,并放到SqlSessionFactory的Configuration中缓存下来,在后续调用过程中会通过这些信息来匹配jdbc操作。这个过程中会生成Mapper的代理类,执行sql时会走代理拦截方法执行,则不需要实现类就可以直接调用到具体的sql。

SqlSessionTemplate

如果使用手写java代码的方式去操作数据库,则使用的为sqlSessionTemplate,SqlSessionTemplate 是线程安全的,可以在多个线程之间共享使用。SqlSessionTemplate 内部持有一个 SqlSession 的代理对象(sqlSessionProxy 字段),这个代理对象是通过 JDK 动态代理方式生成的;使用的 InvocationHandler 接口是 SqlSessionInterceptor,其 invoke() 方法会拦截 SqlSession 的全部方法,并检测当前事务是否由 Spring 管理。

在MybatisAutoConfiguration自动配置中也去生成了SqlSessionTemplate这个bean对象。

1
2
3
4
5
6
7
8
9
10
11
@Bean
@ConditionalOnMissingBean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
ExecutorType executorType = this.properties.getExecutorType();
if (executorType != null) {
// 返回SqlSessionTemplate bean 用于手写数据库的CRUD操作。
return new SqlSessionTemplate(sqlSessionFactory, executorType);
} else {
return new SqlSessionTemplate(sqlSessionFactory);
}
}

MapperFactoryBean和MapperScannerConfigurer

在MybatisAutoConfiguration中注册了AutoConfiguredMapperScannerRegistrar,其实ImportBeanDefinitionRegistrar的实现类,Spring中可以通过 @Import 注解导入的 ImportBeanDefinitionRegistrar 实现类往 BeanDefinitionRegistry 注册 BeanDefinition。这里其实就是去注册了MapperScannerConfigurer 这个BeanDefinitionRegistryPostProcessor的实现。
image-20220718024301164

BeanDefinitionRegistryPostProcessor是一个特殊的接口,继承了BeanFactoryPostProcessor接口,提供了注册bean定义的能力,这里MapperScannerConfigurer扫描所有的@Mapper注解的Maapper类,在扩展点方法中去注册bean定义都为MapperFactoryBean接口,这些Bean在实例化时会因为是FactoryBean,而调用getObject方法寻找Mapper接口的动态代理缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
if (this.processPropertyPlaceHolders) {
processPropertyPlaceHolders();
}

ClassPathMapperScanner scanner = new ClassPathMapperScanner(registry);
scanner.setAddToConfig(this.addToConfig);
scanner.setAnnotationClass(this.annotationClass);
scanner.setMarkerInterface(this.markerInterface);
scanner.setSqlSessionFactory(this.sqlSessionFactory);
scanner.setSqlSessionTemplate(this.sqlSessionTemplate);
scanner.setSqlSessionFactoryBeanName(this.sqlSessionFactoryBeanName);
scanner.setSqlSessionTemplateBeanName(this.sqlSessionTemplateBeanName);
scanner.setResourceLoader(this.applicationContext);
scanner.setBeanNameGenerator(this.nameGenerator);
// 设置扫描的Bean都是MapperFactoryBean接口的子类
scanner.setMapperFactoryBeanClass(this.mapperFactoryBeanClass);
if (StringUtils.hasText(lazyInitialization)) {
scanner.setLazyInitialization(Boolean.valueOf(lazyInitialization));
}
if (StringUtils.hasText(defaultScope)) {
scanner.setDefaultScope(defaultScope);
}
scanner.registerFilters();
// scan扫描@Mapper注解的接口 注册bean定义。
scanner.scan(
StringUtils.tokenizeToStringArray(this.basePackage, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
}s

MapperFactoryBean的getObject()方法是从Configuration配置对象中寻找对应Mapper接口的代理类缓存,没有的话会创建。 这个SpringBean在注入的时候就是对应的代理类了。内部会转发到sqlSession、SqlExecutor去执行sql。

1
2
3
4
5
@Override
public T getObject() throws Exception {
// getSqlSession.getMapper 会从全局唯一的配置对象中寻找接口类对应的Mapper代理
return getSqlSession().getMapper(this.mapperInterface);
}s
1234…12
夸克

夸克

愿赌服输

114 日志
32 分类
121 标签
GitHub E-Mail csdn
© 2022 夸克 | Site words total count: 168.9k
|
主题 — NexT.Muse v5.1.4
博客全站共168.9k字

载入天数...载入时分秒...