0
点赞
收藏
分享

微信扫一扫

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!

阎小妍 2022-12-29 阅读 137

什么是HashMap?

在了解 ​​HashMap​​ 之前先了解一下什么是 ​​Map​​;

什么是Map?

定义

​Map​​ 是一个用于存储 Key-Value 键值对的集合类,也就是一组键值对的映射,在 Java 中 ​​Map​​ 是一个接口,是和 ​​Collection​​ 接口同一等级的集合根接口;

存储结构

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci

上图看起来像是数据库中的关系表,有类似的两个字段,KeySet(键的集合)和 Values(值的集合),每一个键值对都是一个 Entry

特点

  1. 没有重复的 key;
  • key 用 set 保存,所以 key 必须唯一;
  • ​Map​​ 基本上是通过 key 来获取 value,如果有两个相同的 key,计算机将不知道取哪个值,如果 put 了两个相同的 key,后一个则会覆盖前一个的 value 值;在源码的注释中已经说明:

大致翻译一下:

将该 map 中的指定值与指定键关联(可选操作)。如果映射先前包含键的映射,则旧值将被指定的值替换。(当且仅当 {@link #containsKey(Object) m.containsKey(k)} 返回 true 时,映射 m 被称为包含键k的映射。)

  1. 每个 key 只能对应一个 value,多个 key 可以对应一个 value(这就是映射的概念,最经典的例子就是射箭,一排的射手和一排的箭靶,每个射手只有一根箭,那么一个射手只能射中一个箭靶,而每个箭靶可能被不同射手射中,箭就是映射);
  2. key,value 都可以是任何引用类型的数据,包括 null,但只能是引用类型;
  3. Map 取代了古老的 Dictionary 抽象类(简单了解一下);

HashMap定义

把任意长度的输入(预映射),通过一种函数 ​​hashCode()​​,变换成固定长度的输出,该输出就是哈希值 ​​hashCode​​,这种函数就叫做哈希函数,而计算哈希值的过程就叫做哈希

哈希的主要应用是哈希表和分布式缓存,注意,哈希算法和哈希函数不是一个东西,哈希函数是哈希算法的一种实现;

HashMap 是用哈希表(数组(桶)加单链表)+ 红黑树实现的 map 类,但是不同版本的 JDK 实现 HashMap 的原理有所不同:

  • JDK 1.6 - 1.7 采用位桶 + 链表实现;
  • JDK 1.8 采用位桶 + 链表 + 红黑树实现,当链表长度超过阈值 “8” 时,将链表转换为红黑树;

下面以 JDK 1.8 为版本进行讲解;

HashMap底层原理

体系结构

HashMap 是一个用于存储 Key-Value 键值对的集合,每一个键值对也叫做 Entry。HashMap 新增一个元素时,会先计算 key 的 hash 值,找到存入数组(桶)的位置,如果该位置已经有节点(链表头),则存入该节点的最后一个位置(链表尾),所以 HashMap 就是一个数组(桶),数组上每一个元素都是一个节点(节点和所有下一个节点组成一个链表)或者为 ​​null​​(HashMap 数组每一个元素的初始值都是 ​​null​​),显然同一个链表上的节点 hash 值都一样。

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_链表_02

源码解读

首先看到的是 HashMap 的构造器:

/**
* Constructs an empty <tt>HashMap</tt> with the specified initial
* capacity and load factor.
*
* @param initialCapacity the initial capacity
* @param loadFactor the load factor
* @throws IllegalArgumentException if the initial capacity is negative
* or the load factor is nonpositive
*/
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}

注释中已经说得很清楚了,​​Constructs an empty <tt>HashMap</tt> with the specified initial capacity and load factor.​​,这个构造器主要是用来初始化桶的数量和装载因子;

接下来往前看,看一下几个比较重要的常量,​​DEFAULT_INITIAL_CAPACITY​​,​​MAXIMUM_CAPACITY​​,​​DEFAULT_LOAD_FACTOR​​;

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_03

可以看到桶的初始容量默认为16,值得注意的是,桶的初始容量和扩容后的容量必须是 2n,所以桶的最大容量就是230,即 ​​1 << 30​​;

默认负载系数为 0.75,负载系数也称为负载因子,是哈希表在其容量自动增加之前可以达到多满的一种尺度,它衡量的是一个散列表的空间的使用程度,负载因子越大表示散列表的装填程度越高,反之愈小。如果负载因子越大,对空间的利用更充分,然而后果是查找效率的降低;如果负载因子太小,那么散列表的数据将过于稀疏,对空间造成严重浪费。经过大量的实验证明, HashMap 的默认负载因子为0.75最宜;

当表中超过75%的位置已经填入元素,这个表就会用双倍的桶数自动地进行再散列(rehashed),可以通过构造函数初始化;

这里进行扩展一下,便于理解:

由于 HashMap 特殊的存储结构,因此 HashMap 在获取指定元素前需要把 key 经过哈希运算,得到目标元素在哈希表中的位置,然后再进行少量比较即可得到元素,这使得 HashMap 的查找效率极高,说白了就是 HashMap 用了拉链法的哈希表,也有称之为桶数组的;

下面看到 JDK 1.8 中的源码部分:

public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}

/**
* Implements Map.put and related methods.
*
* @param hash hash for key
* @param key the key
* @param value the value to put
* @param onlyIfAbsent if true, don't change existing value
* @param evict if false, the table is in creation mode.
* @return previous value, or null if none
*/
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) {}

/**
* Computes key.hashCode() and spreads (XORs) higher bits of hash
* to lower. Because the table uses power-of-two masking, sets of
* hashes that vary only in bits above the current mask will
* always collide. (Among known examples are sets of Float keys
* holding consecutive whole numbers in small tables.) So we
* apply a transform that spreads the impact of higher bits
* downward. There is a tradeoff between speed, utility, and
* quality of bit-spreading. Because many common sets of hashes
* are already reasonably distributed (so don't benefit from
* spreading), and because we use trees to handle large sets of
* collisions in bins, we just XOR some shifted bits in the
* cheapest possible way to reduce systematic lossage, as well as
* to incorporate impact of the highest bits that would otherwise
* never be used in index calculations because of table bounds.
*/
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

当我们通过 ​​put()​​ 方法输入键值对后,虽然我们只输入了键值对,但他却传递了五个参数,源码注释已经很清楚了,就不多解释了,接下来看到 hash 算法,将键 key 传入,如果 key 为 ​​null​​,则返回值值为0;否则返回 key 的哈希值与 key 无符号右移16位(​​h >>> 16​​)后进行异或的结果;源码注释解释了为什么要进行这样的操作,主要是为了减少冲突,较低系统消耗;

哈希函数计算结果越分散均匀,哈希碰撞的概率就越小,map 的存取效率就会越高,即时间复杂度越小;

哈希表长度越长,空间成本越大,哈希函数计算结果越分散均匀;

扩容机制(实际上就是负载因子)和哈希函数越合理,空间成本越小,哈希函数计算结果越分散均匀;

从 HashMap 的默认构造函数源码可知,构造函数就是对下面几个字段进行初始化。

int threshold;             // 最大node结点(键值对)容量,threshold = CAPACITY * LoadFactor,超过这个数目就重新resize(扩容),扩容后的threshold是之前的两倍。
final float loadFactor; // 加载因子(HashMap默认值是0.75,建议不要修改)
int modCount; // 记录HashMap内部结构发生变化的次数,强调一点,内部结构发生变化指的是结构发生变化,例如put新键值对,但是某个key对应的value值被覆盖不属于结构变化。
int size,CAPACITY; // CAPACITY是桶数组的容量(桶的多少)(默认值是16),扩容后也是之前的两倍,size是HashMap中实际存在的键值对数量

  • 负载因子越大(长度一定),最大结点容量越大,resize 次数越少,空间成本越小,map 的存取效率就会越高。
  • 桶数组初始容量(长度)越大(加载因子一定),最大结点容量越大,resize 次数越少,空间成本越大,map的 存取效率就会越高。

这里存在一个问题,即使负载因子和哈希函数设计的再合理,也难免会出现拉链过长的情况,即桶内结点过多;

一旦出现拉链过长,则会严重影响 HashMap 的性能。于是在 JDK1.8 版本中,对数据结构做了进一步的优化,引入了红黑树,而当链表长度太长(默认超过8)时,链表就转换为红黑树,利用红黑树快速增删改查的特点提高 HashMap 的性能;

对于 HashMap,我们最常使用的是两个方法:​​Get()​​ 和 ​​Put()​​;

Put方法的原理

调用 ​​put()​​ 方法会发生了什么呢?其实上面已经提到过来,那这里就简单的介绍一下:

首先是输入键值对,​​hashmap.put("idiot",1)​​,这是会调用 ​​hash()​​ 函数来计算这个键值对(Entry)插入的位置,​​index = hash("idiot")​​,假定最后计算出的 ​​index​​ 是2,那么结果如下:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_键值对_04

但是 HashMap 的长度是有限的,当插入的 Entry 越来越多时,再完美的 Hash 函数也难免会出 现index 冲突的情况。比如下面这样:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_键值对_05

那该如何解决?利用链表来解决,将哈希值相同的键组成一个链表,每一个 Entry 对象通过 Next 指针指向它的下一个 Entry 节点,桶中装着每个链表的头结点。当新来的 Entry 映射到冲突的数组位置时,只需要插入到对应的链表即可:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_06

需要注意的是,上图使用的是“头插法”,但 JDK1.8 的源码中使用的是“尾插法”;

源码解读

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_键值对_07

​put()​​ 方法返回了 ​​putVal()​​ 方法的值,那么接下来就探究 ​​putVal()​​ 方法:

/**
* Implements Map.put and related methods.
*
* @param hash hash for key
* @param key the key
* @param value the value to put
* @param onlyIfAbsent if true, don't change existing value
* @param evict if false, the table is in creation mode.
* @return previous value, or null if none
*/
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}

注释已经解释了参数的意义,直接开始探究代码:

1、这里判断哈希表是否为空,然后进行一个扩容;

if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;

​resize()​​ 就是初始化或者加倍哈希表的大小;

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_08

2、判断桶是否有该元素,没有的话实例化一个;

if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);

​(n - 1) & hash​​ 相当于对将 ​​hash % n​​,根据 hash 得到桶的索引,

3、桶内存在元素,需要解决 hash 冲突;

3.1、桶内第一个元素的 key 值与新加入的键值对的 key 相同的时候,e 指向 p(仅仅指向);

if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;

3.2、如果元素已经树化,使用 ​​putTreeVal()​​ 方法加入元素,若存在相同的 key 的元素,则将引用返回;

else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);

3.3、遍历链表,使用尾插法插入元素,如果链表长度超过8(默认长度),则链表转换为红黑树;

else {
//如果桶内是链表,则插入链表,这里使用尾插法
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
//bitCount大于树化的阈值,转化为红黑树
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}

4、​​e != null​​ 的时候,将 ​​e.value​​ 作为旧值进行返回;

if (e != null) { // existing mapping for key
V oldValue = e.value;
//putVal中的参数。若onlyIfAbsent为null或者oldValue为空时才替换,
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}

如果 ​​e == null​​,则表示插入了新节点,在上面​​3.3​​的代码中表示过了;

5、当桶的容积不够时,使用 ​​resize()​​ 进行扩容;

if (++size > threshold)
resize();

​put()​​ 方法主要就是上面的一些解读,接下来来探究 ​​get()​​ 方法;

Get方法的原理

使用 ​​get()​​ 方法根据 Key 来查找 Value 是怎么实现的呢?下面就简单介绍一下:

首先会把输入的 Key 做一次 Hash 映射,得到对应的index:​​index = hash("idiot")​​;

由于存在 ​​Hash 冲突​​,因此同一个位置有可能匹配到多个 Entry,这时候就需要顺着对应链表的头节点,一个一个向下来查找。假设我们要查找的 Key 是 “idiot”:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_09

第一步,我们查看的是头节点 Entry6,Entry6 的 Key 是 Syyyy,显然不是我们要找的结果。

第二步,我们查看的是 Next 节点 Entry1,Entry1 的 Key 是 idiot,正是我们要找的结果。

这里使用“头插法”是因为部分人认为后插入的 Entry 被查找的可能性更大,以此来提高查找效率。

源码解读

先看到的是 ​​get()​​ 方法,只有短短的两行代码:

/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code (key==null ? k==null :
* key.equals(k))}, then this method returns {@code v}; otherwise
* it returns {@code null}. (There can be at most one such mapping.)
*
* <p>A return value of {@code null} does not <i>necessarily</i>
* indicate that the map contains no mapping for the key; it's also
* possible that the map explicitly maps the key to {@code null}.
* The {@link #containsKey containsKey} operation may be used to
* distinguish these two cases.
*
* @see #put(Object, Object)
*/
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}

其实注释已经很清楚的进行了解释,该方法要么返回节点的值 ​​e.value​​,要么返回 ​​null​​;但要注意的是,返回值为 ​​null​​ 并不一定表明映射不包含键的映射;也有可能映射显式地将键映射为 ​​null​​ 。可以使用 ​​containsKey​​ 操作来区分这两种情况;

接下来就来看看 ​​getNode()​​ 方法,跟 ​​putVal()​​ 方法其实是比较对称的,那就直接在代码中写注释了:

/**
* Implements Map.get and related methods.
*
* @param hash hash for key
* @param key the key
* @return the node, or null if none
*/
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;

//判断哈希表是否为空,头节点是否存在;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {

//对比头节点,如果hash值相同且key相同(地址或内容),则返回该节点;
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;

//判断下一个节点是否存在
if ((e = first.next) != null) {

//判断是否链表是否树化,如果树化则遍历树节点来进行查找;
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);

//不是树化则按照链表进行遍历;
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}

//没有相符合的key,返回null值;
return null;
}

顺带提一下刚刚注释里出现的 ​​containsKey()​​ 方法:

/**
* Returns <tt>true</tt> if this map contains a mapping for the
* specified key.
*
* @param key The key whose presence in this map is to be tested
* @return <tt>true</tt> if this map contains a mapping for the specified
* key.
*/
public boolean containsKey(Object key) {
return getNode(hash(key), key) != null;
}

只要就是调用 ​​getNode()​​ 方法来获取键值对,如果没有找到返回 false,找到了就返回 ture;

补充

思考:为什么 HashMap 的初始长度默认为16,之后扩展也要是2的幂?

这主要是为了服务于从 KEY 映射到 index 的 Hash 算法,使其尽可能的均匀分布;

那是不是吧 KEY 的 HashCode 值和 HashMap 长度做取模运算?​​index = Key.hashCode() % Length​​;

错!取模运算的方式固然简单,但是效率太低,因此采用了位运算的方式,​​index = Key.hashCode() % (Length-1)​​;

下面以 “idiot” 为 KEY 演示整个过程:

  1. 显示计算 idiot 的 hashCode,这里是 JDK1.8 版本的:

String key = "idiot";
System.out.println(key.hashCode());

结果为100053267,转换成二进制就是101111101101011000100010011;

  1. HashMap 长度是默认的16,计算 Length-1 的结果为15,转成二进制就是1111;
  2. 把以上两个结果做与运算,101111101101011000100010011 & 1111 = 0011,十进制就是3,所以 ​​index=3​​;

可以说 Hash 算法最终得到的 index 结果,完全取决于 Key 的 Hashcode 值的最后几位;

那这样子有什么好处呢?为什么长度必须是2的幂,如果长度是10会怎么样?

这样子做不但效果同等于取模,而且性能上还有大大的提升,接下来我们尝试一下长度为10会出现什么情况;

101111101101011000100010011 & 1001 = 0001,十进制就是1,所以 ​​index=1​​;

这样咋一看好像没什么问题,但如果说现在把 hashCode 101111101101011000100010011 的最后四位从0011改成0110,结果还是一样的,还是 ​​index=1​​,这说明当 HashMap 长度为10的时候,有些 index 结果的出现几率会更大,而有些 index 结果永远不会出现(比如0110,0111)!

反观长度16或者其他2的幂,Length-1 的值是所有二进制位全为1,这种情况下,index 的结果等同于 HashCode 后几位的值。只要输入的 HashCode 本身分布均匀,Hash 算法的结果就是均匀的。

高并发下的HashMap

先简单讲讲单线程下的 HashMap;

HashMap 的容量是有限的,当经过多次元素插入,使得 HashMap 达到一定饱和度时,Key 映射位置发生冲突的几率会逐渐提高。

这时候,HashMap 需要扩展它的长度,也就是进行 ​​Resize()​​;

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_10

影响因素

影响发生 ​​Resize()​​ 的因素有两个:

  1. ​Capacity​​HashMap 的当前长度,2的幂,​​static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;​​;
  2. ​LoadFactor​​HashMap 负载因子,默认值为 ​​0.75f​​,​​static final float DEFAULT_LOAD_FACTOR = 0.75f;​​;

衡量 HashMap 是否进行 ​​Resize()​​ 的条件如下:

HashMap.Size >= Capacity * LoadFactor;

当然 ​​Resize()​​ 并不是简单地把长度扩大了,而是经过了以下两个步骤:

  1. 扩容
    创建一个新的 Entry 空数组,长度是原数组的2倍;
  2. ReHash
    遍历原 Entry 数组,把所有的 Entry 重新 Hash 到新数组。

为什么要重新 Hash 呢?因为长度扩大以后,Hash 的规则也随之改变;

回顾一下 Hash 公式: ​​index = Key.hashCode() & (Length - 1)​​;

当原数组长度为8时,Hash 运算是和111B做与运算;新数组长度为16,Hash 运算是和1111B做与运算;Hash结果显然不同;

Resize 前的 HashMap:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_键值对_11

Resize 后的 HashMap:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_12

这是一个扩容机制:

/**
* Transfers all entries from current table to newTable.
*/
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];
newTable[i] = e;
e = next;
}
}
}

理论推导

在单线程下执行时是毫无问题的,但如果是多线程,就会出现问题:

  1. 假设一个 HashMap 已经到了 Resize 的临界点。此时有两个线程A和B,在同一时刻对 HashMap 进行 Put 操作:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_13

  1. 此时达到 Resize 条件,两个线程各自进行 Rezie 的第一步,也就是扩容:
  2. 这时候,两个线程都走到了 ReHash 的步骤。让我们回顾一下 ReHash 的代码:
  3. 假如此时线程B遍历到 Entry3 对象,刚执行完红框里的这行代码,线程就被挂起。对于线程B来说:

e = Entry3
next = Entry2

  1. 这时候线程A畅通无阻地进行着 Rehash,当 ReHash 完成后,结果如下(图中的 e 和 next,代表线程B的两个引用):
  2. 直到这一步,看起来没什么毛病。接下来线程B恢复,继续执行属于它自己的 ReHash。线程B刚才的状态是:

e = Entry3
next = Entry2

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_链表_14

当执行到上面这一行时,显然 `i = 3`,因为刚才线程A对于 Entry3 的 hash 结果也是3。

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_链表_15

7.我们继续执行到这两行,Entry3 放入了线程B的数组下标为3的位置,并且 e 指向了 Entry2。此时 e 和 next 的指向如下:

```
e = Entry2
next = Entry2

```

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_16

  1. 接着是新一轮循环,又执行到红框内的代码行:

```
e = Entry2
next = Entry3

```

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_链表_17

  1. 接下来执行下面的三行,用头插法把 Entry2 插入到了线程B的数组的头结点:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_链表_18

  1. 第三次循环开始,又执行到红框的代码:

```
e = Entry3
next = Entry3.next = null

```

  1. 最后一步,当我们执行下面这一行的时候,见证奇迹的时刻来临了:

```
newTable[i] = Entry2
e = Entry3
Entry2.next = Entry3
Entry3.next = Entry2

```

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_键值对_19

此时,问题还没有直接产生。当调用 Ge t查找一个不存在的 Key,而这个 Key 的 Hash 结果恰好等于3的时候,由于位置3带有环形链表,所以程序将会进入**死循环**!

那该如何避免这种问题?

在高并发的场景下,通常采用另一个集合类 ​​ConcurrentHashMap​​​,这个集合类兼顾了线程安全和性能,接下来就讲讲 ​​ConcurrentHashMap​​;

什么是ConcurrentHashMap?

上阶段​​高并发下的HashMap​​提到看似完美的 HashMap 在高并发的情况下并不理想,会出现环形链表,换句话说就是进入死循环,那该如何避免 HashMap 的线程安全问题?

可以考虑改用 ​​HashTable​​​ 或者 ​​Collections.synchronizedMap​​​,看到 ​​synchronized​​ 是不是非常熟悉,没错这两者就是用了锁来确保安全

但是这样子确保安全的话,就会影响性能,无论读操作还是写操作,它们都会给整个集合加锁,导致同一时间的其他操作阻塞,如下图所示:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_20


【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_21

在并发环境下,如何能够兼顾线程安全和运行效率呢?这时候 ​​ConcurrentHashMap​​ 就应运而生了;

理论介绍

​ConcurrentHashMap​​​ 最关键的就是理解一个概念 ​​Segment​​;

​Segment​​​ 是什么呢?​​Segment​​​ 本身就相当于一个 HashMap 对象。流行于 JDK 1.6-1.7,在 JDK1.8 中被弃用了,改换成 ​​CAS+Synchronized​​;

同 HashMap 一样,​​Segment​​ 包含一个 HashEntry 数组,数组中的每一个 HashEntry 既是一个键值对,也是一个链表的头节点。

单一的 ​​Segment​​ 结构如下:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_22

像这样的 ​​Segment​​​ 对象,在 ​​ConcurrentHashMap​​ 集合中有多少个呢?有2n个,共同保存在一个名为 ​​segments​​ 的数组当中。

因此整个 ConcurrentHashMap 的结构如下:

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_23

可以说,​​ConcurrentHashMap​​ 是一个二级哈希表。在一个总的哈希表下面,有若干个子哈希表。这样的二级结构,和数据库的水平拆分有些相似。

那 ​​ConcurrentHashMap​​ 这样设计有什么好处呢?

​ConcurrentHashMap​​​ 优势就是采用了锁分段技术,每一个 ​​Segment​​​ 就好比一个自治区,读写操作高度自治,​​Segment​​ 之间互不影响;

情景复现

Case1:不同 ​​Segment​​ 的并发写入;

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_24

Case2:同一 ​​Segment​​ 的一写一读;

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_链表_25

同一 ​​Segment​​ 的写和读是可以并发执行的。

Case3:同一 ​​Segment​​ 的并发写入

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_键值对_26

​Segment​​​ 的写入是需要上锁的,因此对同一 ​​Segment​​ 的并发写入会被阻塞。

由此可见,​​ConcurrentHashMap​​​ 当中每个 ​​Segment​​ 各自持有一把锁。在保证线程安全的同时降低了锁的粒度,让并发操作效率更高。

源码解读

下述两方法的源码为 JDK1.7 版本;

Get方法

1、为输入的 Key 做 Hash 运算,得到 hash 值。; 2、通过 hash 值,定位到对应的 Segment 对象; 3、再次通过 hash 值,定位到 Segment 当中数组的具体位置;

//获取某个具体的value值
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return

Put方法

1、为输入的 Key 做 Hash 运算,得到 hash 值; 2、通过 hash 值,定位到对应的 ​​Segment​​​ 对象; 3、获取可重入锁; 4、再次通过 hash 值,定位到 ​​Segment​​ 当中数组的具体位置; 5、插入或覆盖 HashEntry 对象; 6、释放锁;

//向map中添加一个key-value键值对
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

从上述两个方法的源码中可以看出,​​ConcurrentHashMap​​​ 在读写时需要二次定位,首先定位到 ​​Segment​​​,之后定位到 ​​Segment​​内的具体数组下标;

Size方法

既然每一个 ​​Segment​​​ 都在各自加锁,那么在调用 ​​Size​​ 方法的时候,怎么解决一致性问题呢?

​Size​​​ 方法的目的是统计 ​​ConcurrentHashMap​​​ 的总元素数量, 自然需要把各个 ​​Segment​​ 内部的元素数量汇总起来。

但是,如果在统计 ​​Segment​​​ 元素数量的过程中,已统计过的 ​​Segment​​ 瞬间插入新的元素,这时候该怎么办呢?

【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_27


【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_ci_28


【数据结构】超详细!从HashMap到ConcurrentMap,我是如何一步步实现线程安全的!_链表_29

接下来探究一下 ​​ConcurrentHashMap​​​ 的 ​​size​​ 工作原理:

  1. 遍历所有的 ​​Segment​​。
  2. 把 ​​Segment​​ 的元素数量累加起来。
  3. 把 ​​Segment​​ 的修改次数累加起来。
  4. 判断所有 ​​Segment​​​ 的总修改次数是否大于上一次的总修改次数。如果大于,说明统计过程中有修改,重新统计,尝试次数 ​​+1​​;如果不是。说明没有修改,统计结束。
  5. 如果尝试次数超过阈值,则对每一个 ​​Segment​​ 加锁,再重新统计。
  6. 再次判断所有 ​​Segment​​ 的总修改次数是否大于上一次的总修改次数。由于已经加锁,次数一定和上次相等。
  7. 释放锁,统计结束。

//获取map的大小
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}

为什么这样设计呢?这种思想和乐观锁悲观锁的思想如出一辙。

为了尽量不锁住所有 ​​Segment​​​,首先乐观地假设 ​​Size​​​ 过程中不会有修改。当尝试一定次数,才无奈转为悲观锁,锁住所有 ​​Segment​​ 保证强一致性。

后记

这些就是全部内容了,以上内容较为深奥,建议收藏,反复观看!

举报

相关推荐

0 条评论