0
点赞
收藏
分享

微信扫一扫

Redis_20_Redis跳跃表


文章目录

  • ​​一、前言​​
  • ​​二、sortedset数据类型 + skiplist编码类型​​
  • ​​2.1 宏观上,sortedset 中的 skiplist​​
  • ​​2.2 微观上,skiplist​​
  • ​​2.2.1 skiplist概要​​
  • ​​2.2.2 跳跃表的引入:跳跃表代替红黑树​​
  • ​​2.3 跳跃表crud​​
  • ​​2.3.1 跳跃表:快速查找​​
  • ​​2.3.2 跳跃表的插入删除更新:skiplist不要求相邻层次之间维持2:1关系,插入和删除数据时间复杂度为 O(logN)​​
  • ​​三、跳跃表的实现​​
  • ​​3.1 跳跃表:数据结构定义 + 基本结构​​
  • ​​3.1.1 跳跃表:数据结构定义​​
  • ​​3.1.2 跳跃表:基本结构​​
  • ​​3.2 跳跃表:随机层数​​
  • ​​3.3 跳跃表:创建跳跃表​​
  • ​​3.4 跳跃表:crud​​
  • ​​3.4.1 跳跃表:查找节点​​
  • ​​3.4.2 跳跃表:插入节点​​
  • ​​3.4.3 跳跃表:节点删除​​
  • ​​3.4.4 跳跃表:节点更新实现​​
  • ​​3.5 跳跃表:元素排名的实现​​
  • ​​四、小结​​

一、前言

需求:
1)需要通过hash 结构来存储 value 和 score 的对应关系
2)要支持按照 score 来排序
3)还要按照指定 score 的范围来获取 value 列表的
回答:Redis 中的 zset的结构:通过一个hash字典加跳跃列表skiplist

二、sortedset数据类型 + skiplist编码类型

2.1 宏观上,sortedset 中的 skiplist

skiplist编码的有序集合对象使用 zset结构作为底层实现,zset结构同时包含一个字典和一个跳跃表。如下:

typedef struct zset{
dict *dict; // 字典dict
zskiplist *zsl; // 跳跃表zsl
}zset;

为什么有序集合zset(sorted set)要同时由字典dict和跳跃表实现?
跳跃表利于执行范围操作(跳跃表是排好序的),而字典有利于执行分值查找操作。同时由于Redis里的跳跃表和字典元素很多都是用指针实现的,所以不会浪费内存。

Redis_20_Redis跳跃表_跳跃表

对于这个图的解释:
redisObject:
分为三个 类型type 编码encoding 指针ptr
type为五种基本类型,这里为sorted set
encoding,redis任何一种基本类型至少有两种编码,这里是skiplist
ptr:指针,指向底层数据结构的指针
zset 表示sorted set 实体
dict 表示字典
zsl 表示sorted skiplist 跳跃表
…… 表示跳跃
(key,value)=(apple,8.5) 表示ziplist第一个元素
(key,value)=(banana,5.0) 表示ziplist第二个元素
(key,value)=(cherry,6.0) 表示ziplist第三个元素

2.2 微观上,skiplist

2.2.1 skiplist概要

跳跃表(skiplist)是一种随机化的数据结构,是一种可以与平衡树媲美的层次化链表结构——查找、删除、添加等操作都可以在对数期望时间下完成,以下是一个典型的跳跃表例子:

Redis_20_Redis跳跃表_链表_02

Redis 的五种基本结构中,有一个叫做 有序列表 zset 的数据结构,它类似于 Java 中的 SortedSet 和 HashMap 的结合体,一方面它是一个 set 保证了内部 value 的唯一性,另一方面又可以给每个 value 赋予一个排序的权重值 score,来达到 排序 的目的。

它的内部实现就依赖了一种叫做 「跳跃列表」 的数据结构。

2.2.2 跳跃表的引入:跳跃表代替红黑树

首先,因为 zset 要支持随机的插入和删除,所以它 不宜使用数组来实现,关于排序问题,我们也很容易就想到 红黑树/ 平衡树 这样的树形结构,为什么 Redis 不使用这样一些结构呢?

第一,性能考虑: 在高并发的情况下,树形结构需要执行一些类似于 rebalance(再平衡) 这样的可能涉及整棵树的操作,相对来说跳跃表的变化只涉及局部 (下面详细说);
第二,实现考虑: 在复杂度与红黑树相同的情况下,跳跃表实现起来更简单,看起来也更加直观;

基于以上的一些考虑,Redis 采用了 跳跃表 这样的结构,而不是红黑树。

2.3 跳跃表crud

2.3.1 跳跃表:快速查找

我们先来看一个普通的链表结构:

Redis_20_Redis跳跃表_java_03

我们需要这个链表按照 score 值进行排序,这也就意味着,当我们需要添加新的元素时,我们需要定位到插入点,这样才可以继续保证链表是有序的,通常我们会使用 二分查找法,但二分查找是有序数组的,链表没办法进行位置定位,我们除了遍历整个找到第一个比给定数据大的节点为止 (时间复杂度 O(n)) 似乎没有更好的办法。

二分查询时间复杂度为O(logN),时间复杂度最好,二分查找之所以可以时间复杂度最好,是因为跳跃比较,但是,跳跃比较必须满足两个条件,第一,有序,第二,数组。
有序是因为:有序是因为跳跃取元素比较,所以必须是有序
数组是因为:跳跃取出元素比较,必须按位查找,按下标定位到元素,
链表虽然可以有序,但是不可以按位查找,所以无法二分查找。
skiplist就是想要把二分查找这种思想复制到链表上来,使skiplist查找达到O(logN)

但假如我们每相邻两个节点之间就增加一个指针,让指针指向下一个节点,如下图:

Redis_20_Redis跳跃表_数据库_04

这样所有新增的指针连成了一个新的链表,但它包含的数据却只有原来的一半 (图中的为 3,11)。

现在假设我们想要查找数据时,可以根据这条新的链表查找,如果碰到比待查找数据大的节点时,再回到原来的链表中进行查找,比如,我们想要查找 7,查找的路径则是沿着下图中标注出的红色指针所指向的方向进行的:

Redis_20_Redis跳跃表_java_05

这是一个略微极端的例子,但我们仍然可以看到,通过新增加的指针查找,我们不再需要与链表上的每一个节点逐一进行比较,这样改进之后需要比较的节点数大概只有原来的一半。

利用同样的方式,我们可以在新产生的链表上,继续为每两个相邻的节点增加一个指针,从而产生第三层链表:

Redis_20_Redis跳跃表_数据库_06

在这个新的三层链表结构中,我们试着 查找 13,那么沿着最上层链表首先比较的是 11,发现 11 比 13 小,于是我们就知道只需要到 11 后面继续查找,从而一下子跳过了 11 前面的所有节点。

可以想象,当链表足够长,这样的多层链表结构可以帮助我们跳过很多下层节点,从而加快查找的效率。

跳跃表就是把数组二分查找的思想复制过来
跳跃表 skiplist 就是受到这种多层链表结构的启发而设计出来的。按照上面生成链表的方式,上面每一层链表的节点个数,是下面一层的节点个数的一半,这样查找过程就非常类似于一个二分查找,使得查找的时间复杂度可以降低到 O(logn)。

skiplist:维持相邻层次之间2:1关系,插入和删除数据时间复杂度为 O(n)
但是,这种方法在插入数据的时候有很大的问题。新插入一个节点之后,就会打乱上下相邻两层链表上节点个数严格的 2:1 的对应关系。如果要维持这种对应关系,就必须把新插入的节点后面的所有节点 (也包括新插入的节点) 重新进行调整,这会让时间复杂度重新蜕化成 O(n)。删除数据也有同样的问题。

2.3.2 跳跃表的插入删除更新:skiplist不要求相邻层次之间维持2:1关系,插入和删除数据时间复杂度为 O(logN)

金手指:
既然,skiplist不要求相邻层次之间维持2:1关系,插入和删除数据只要找到位置,然后修改指针就好,找到位置的时间复杂度为 O(logN),所以,插入和删除数据的时间复杂度为 O(logN)
所以,skiplist 查找、插入、删除的时间复制度为 O(logN)
还是一个更新,对于skiplist来说,更新就是先删除源节点,然后插入新节点,根据上面插入和删除就只要找到位置,然后修改指针就好,所以,更新的时间复杂度为 2 * O(logN),也可以看做是 O(logN),
所以,skiplist 查找、插入、删除、更新的时间复制度为 O(logN),好了,面试这么吹就好了,至于skiplist crud的源码,能看懂大致就行。
附:回到原来的问题,redis中为什么使用skiplist而不是红黑树/平衡树?
第一,性能考虑: 在高并发的情况下,树形结构需要执行一些类似于 rebalance(再平衡) 这样的可能涉及整棵树的操作,相对来说跳跃表的变化只涉及局部 (下面详细说);理解了,对于插入删除,树形结构涉及到rebalance和节点合并和节点分裂,会影响整个结构,skiplist找到位置,修改相邻指针就好,仅修改局部的。
第二,实现考虑: 在复杂度与红黑树相同的情况下,跳跃表实现起来更简单,看起来也更加直观;理解了,skiplist是链表结构,crud操作比树要简单,特别是增加节点,删除节点操作比树要简单。

skiplist 为了避免这一问题,它不要求上下相邻两层链表之间的节点个数有严格的对应关系,而是 为每个节点随机出一个层数(level)。比如,一个节点随机出的层数是 3,那么就把它链入到第 1 层到第 3 层这三层链表中。为了表达清楚,下图展示了如何通过一步步的插入操作从而形成一个 skiplist 的过程:

Redis_20_Redis跳跃表_链表_07

从上面的创建和插入的过程中可以看出,每一个节点的层数(level)是随机出来的,而且新插入一个节点并不会影响到其他节点的层数,因此,插入操作只需要修改节点前后的指针,而不需要对多个节点都进行调整,这就降低了插入操作的复杂度。

现在我们假设从我们刚才创建的这个结构中查找 23 这个不存在的数,那么查找路径会如下图:

Redis_20_Redis跳跃表_链表_08

三、跳跃表的实现

3.1 跳跃表:数据结构定义 + 基本结构

3.1.1 跳跃表:数据结构定义

Redis 中的跳跃表由 server.h/zskiplistNode 和 server.h/zskiplist 两个结构定义,前者为跳跃表节点,后者则保存了跳跃节点的相关信息,同之前的 集合 list 结构类似,其实只有 zskiplistNode 就可以实现了,但是引入后者是为了更加方便的操作:

/* ZSETs use a specialized version of Skiplists */
typedefstruct zskiplistNode {
// value
sds ele;
// 分值
double score;
// 后退指针
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsignedlong span;
} level[];
} zskiplistNode;

typedefstruct zskiplist {
// 跳跃表头指针
struct zskiplistNode *header, *tail;
// 表中节点的数量
unsignedlong length;
// 表中层数最大的节点的层数
int level;
} zskiplist;

3.1.2 跳跃表:基本结构

Redis_20_Redis跳跃表_跳跃表_09

解释:Redis的跳跃列表总共有64层,上图中每个kv对应的结构如下图中的zslnode结构,kv header也是zslnode结构,只不过value字段是null,score值是Double.MIN_VALUE,相邻来个kv之间用指针窜接起来,形成双向列表,而且是有序的,从小到大,每一层元素的遍历是从kv header开始

3.2 跳跃表:随机层数

对于每一个新插入的节点,都需要调用一个随机算法给它分配一个合理的层数,源码在 t_zset.c/zslRandomLevel(void) 中被定义:

#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */

int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

直观上期望的目标是 50% 的概率被分配到 Level 1,25% 的概率被分配到 Level 2,12.5% 的概率被分配到 Level 3,以此类推…有 2-63 的概率被分配到最顶层,因为这里每一层的晋升率都是 50%。

Redis 跳跃表默认允许最大的层数是 32,被源码中 ZSKIPLIST_MAXLEVEL 定义,当 Level[0] 有 264 个元素时,才能达到 32 层,所以定义 32 完全够用了。

金手指:random() 在 0-1之间,ZSKIPLIST_P表示概率,也是0-1之间。
0xFFFF=1111 1111 1111 1111
任何数和0xFFFF与运算其实就是暗含高位清零,低位与 结果就是一个0和65535之间的数
ZSKIPLIST_P * 0xFFFF=0.25*65535 肯定是比65535小 ,
所以系数ZSKIPLIST_P越小 ,((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))是1的可能就越大,就会继续while
(random()&0xFFFF) 本身就有50%的概率是0 0肯定小于ZSKIPLIST_P * 0xFFFF

金手指:最理想的是每一层的晋升概率都是50%,但是Redis的标准源码的晋升概率只有25%。

3.3 跳跃表:创建跳跃表

这个过程比较简单,在源码中的 t_zset.c/zslCreate 中被定义:

zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

// 申请内存空间
zsl = zmalloc(sizeof(*zsl));
// 初始化层数为 1
zsl->level = 1;
// 初始化长度为 0
zsl->length = 0;
// 创建一个层数为 32,分数为 0,没有 value 值的跳跃表头节点
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);

// 跳跃表头节点初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
// 将跳跃表头节点的所有前进指针 forward 设置为 NULL
zsl->header->level[j].forward = NULL;
// 将跳跃表头节点的所有跨度 span 设置为 0
zsl->header->level[j].span = 0;
}
// 跳跃表头节点的后退指针 backward 置为 NULL
zsl->header->backward = NULL;
// 表头指向跳跃表尾节点的指针置为 NULL
zsl->tail = NULL;
return zsl;
}

即执行完之后创建了如下结构的初始化跳跃表:

Redis_20_Redis跳跃表_redis_10

3.4 跳跃表:crud

3.4.1 跳跃表:查找节点

Redis_20_Redis跳跃表_链表_11

1)如果要定位到紫色的模块,首先从header往下定位,比header降一层,在定位比上层降一层的,通过3次定位,最后定位到紫色的那个元素,我们将中间一系列节点称为搜索路径

2)当插入一个新节点时,我们通过随机算法来定位新插入的节点有多少层

3.4.2 跳跃表:插入节点

这几乎是最重要的一段代码了,但总体思路也比较清晰简单,如果理解了上面所说的跳跃表的原理,那么很容易理清楚插入节点时发生的几个动作 (几乎跟链表类似):

找到当前我需要插入的位置 (其中包括相同 score 时的处理);
创建新节点,调整前后的指针指向,完成插入;
为了方便阅读,我把源码 t_zset.c/zslInsert 定义的插入函数拆成了几个部分

第一部分:声明需要存储的变量

// 存储搜索路径
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 存储经过的节点跨度
unsignedint rank[ZSKIPLIST_MAXLEVEL];
int i, level;

第二部分:搜索当前节点插入位置

serverAssert(!isnan(score));
x = zsl->header;
// 逐步降级寻找目标节点,得到 "搜索路径"
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 如果 score 相等,还需要比较 value 值
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 记录 "搜索路径"
update[i] = x;
}

讨论: 有一种极端的情况,就是跳跃表中的所有 score 值都是一样,zset 的查找性能会不会退化为 O(n) 呢?
从上面的源码中我们可以发现 zset 的排序元素不只是看 score 值,也会比较 value 值 (字符串比较)

第三部分:生成插入节点

/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
// 如果随机生成的 level 超过了当前最大 level 需要更新跳跃表的信息
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 创建新节点
x = zslCreateNode(level,score,ele);

第四部分:重排前向指针

for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

第五部分:重排后向指针并返回

x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;

金手指:首先我们在搜索合适插入点的过程中将「搜索路径」摸出来了,然后就创建新节点,创建的时候需要给这个节点随机分配一个层数,再将搜索路径上的节点和这个新节点通过前向后向指针串起来。如果分配的新节点的高度高于当前跳跃列表的最大高度,就需要更新一下跳跃列表的最大高度。

3.4.3 跳跃表:节点删除

删除过程由源码中的 t_zset.c/zslDeleteNode 定义,和插入过程类似,都需要先把这个 “搜索路径” 找出来,然后对于每个层的相关节点重排一下前向后向指针,同时还要注意更新一下最高层数 maxLevel,直接放源码 (如果理解了插入这里还是很容易理解的):

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}

/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return1;
}
return0; /* not found */
}

金手指-删除节点:
首页找到搜索路径,然后对每一层相关节点排下向前向后指针,最后更新最高层maxLevel

3.4.4 跳跃表:节点更新实现

当我们调用 ZADD 方法时,如果对应的 value 不存在,那就是插入过程,如果这个 value 已经存在,只是调整一下 score 的值,那就需要走一个更新流程。

假设这个新的 score 值并不会带来排序上的变化,那么就不需要调整位置,直接修改元素的 score 值就可以了,但是如果排序位置改变了,那就需要调整位置,该如何调整呢?

从源码 t_zset.c/zsetAdd 函数 1350 行左右可以看到,Redis 采用了一个非常简单的策略:

/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}

把这个元素删除再插入这个,需要经过两次路径搜索,从这一点上来看,Redis 的 ZADD 代码似乎还有进一步优化的空间。

金手指-更新节点
1)当我们调用zadd方法时,如果value不存在,则就是新增
2)如果value存在就是更新
3)如果该score值没有改变排序,那么位置就不需要调整,直接修改元素的score值
4)如果排序位置改变了,那就要调整位置了

小结:更新时候,先找到value对应的score,然后删除,删除后在插入这个新的节点。

3.5 跳跃表:元素排名的实现

跳跃表本身是有序的,Redis 在 skiplist 的 forward 指针上进行了优化,给每一个 forward 指针都增加了 span 属性,用来 表示从前一个节点沿着当前层的 forward 指针跳到当前这个节点中间会跳过多少个节点。在上面的源码中我们也可以看到 Redis 在插入、删除操作时都会小心翼翼地更新 span 值的大小。

所以,沿着 “搜索路径”,把所有经过节点的跨度 span 值进行累加就可以算出当前元素的最终 rank 值了:

/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsignedlong rank = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
// span 累加
rank += x->level[i].span;
x = x->level[i].forward;
}

/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return0;
}

金手指:
Redis的skiplist的forward指针上增加了span属性,表示从前一个节点沿着当前层的forward指针跳到当前这个节点中间会跳过多少的节点,Redis在插入删除操作时会更新span的大小,当我们要计算一个元素的排名时,只需要将span进行相加就能算出元素的最终的rank值

四、小结

跳跃表源码解析,完成了。

天天打码,天天进步!!


举报

相关推荐

0 条评论