不積跬步,無以至千里
不積小流,無以成江海

0%

ConcurrentHashMap源码解析

ConcurrentHashMap与HashMap等的区别

1.HashMap
我们知道HashMap是线程不安全的,在多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
2.HashTable
HashTable和HashMap的实现原理几乎一样,差别无非是
   HashTable不允许key和value为null
   HashTable是线程安全的
但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁。
多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。
3.ConcurrentHashMap
主要就是为了应对hashmap在并发环境下不安全而诞生的,ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响。
我们都知道Map一般都是数组+链表结构(JDK1.8该为数组+红黑树)。
高并发编程系列:ConcurrentHashMap的实现原理(JDK1.7和JDK1.8)
ConcurrentHashMap避免了对全局加锁改成了局部加锁操作,这样就极大地提高了并发环境下的操作速度,由于ConcurrentHashMap在JDK1.7和1.8中的实现非常不同,接下来我们谈谈JDK在1.7和1.8中的区别。

JDK1.7的实现

在JDK1.7版本中,ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,如下图所示:

Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样

JDK1.8的实现

JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本

CAS是compare and swap的缩写,即我们所说的比较交换。cas是一种基于锁的操作,而且是乐观锁。在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,若果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。

源码解析

基本属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// node数组最大容量:2^30=1073741824  
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认初始值,必须是2的幕数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
//链表转红黑树的另一个阈值(条件):Map的容量至少为64
static final int MIN_TREEIFY_CAPACITY = 64;
//每次进行转移的最小值
private static final int MIN_TRANSFER_STRIDE = 16;
//生成sizeCtl所使用的bit位数
private static int RESIZE_STAMP_BITS = 16;
// 2^15-1,help resize扩容的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// forwarding nodes的hash值
/** 首先哈希值不小于0,下面的几个常量相当于标识位 */
/** 用于转换节点的哈希值 */
static final int MOVED = -1; // hash for forwarding nodes
// 用于红黑树根节点的哈希码的标识位
static final int TREEBIN = -2; // hash for roots of trees
// 容器中还有一个保留节点,此处也是有关的哈希码的标识位
static final int RESERVED = -3; // hash for transient reservations
// 可用cpu数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组,容器的数组(哈希表、散列表),在第一次插入元素时才初始化,大小总是2的次幂
transient volatile Node<K,V>[] table;
/** 扩容时使用的临时过度的数组(仅使用于扩容) */
private transient volatile Node<K,V>[] nextTable;
//记录容器的容量大小,通过CAS更新
private transient volatile long baseCount;
/**
* 这个sizeCtl是volatile的,那么他是线程可见的,一个思考:它是所有修改都在CAS中进行,但是sizeCtl为什么不设计成LongAdder(jdk8出现的)类型呢?
* 或者设计成AtomicLong(在高并发的情况下比LongAdder低效),这样就能减少自己操作CAS了。
* 默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。
* <0 标识容器正在初始化或扩容
* -1 代表table正在初始化
* -N 表示有N-1个线程正在进行扩容操作
* 其余情况:
*1、如果table未初始化,表示table需要初始化的大小。
*2、如果table初始化完成,表示table的容量,默认是table大小的0.75 倍,居然用这个公式算0.75(n - (n >>> 2))。
**/
private transient volatile int sizeCtl;
/**
* 自旋锁 (锁定通过 CAS) 在调整大小和/或创建 CounterCells 时使用。 在CounterCell类更新value中会使用,功能类似显示锁和内置锁,性能更好
* 在Striped64类也有应用
*/
private transient volatile int cellsBusy;
/** 计数器池,非空时,大小为2的次幂 */
private transient volatile CounterCell[] counterCells;

ConcurrentHashMap?的构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//默认无参构造函数
1. public ConcurrentHashMap() {
}
//initialCapacity 初始化容量
2. public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 创建与给定map具有相同映射的新map
3. public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//initialCapacity 初始化容量;loadFactor 负载因子,当容量达到initialCapacity*loadFactor时,执行扩容
4. public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
/*initialCapacity 初始化容量;
* loadFactor 负载因子,当容量达到initialCapacity*loadFactor时,执行扩容
* concurrencyLevel 预估的并发更新线程数
*/
5. public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;//初始化sizeCtl
}

/**
*返回给定所需容量,table的大小总是2的幂次方
**/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作

ConcurrentHashMap中各种数据节点

jdk1.8 ConcurrentHashMap 中数据节点的种类比较多,比如 Node<K, V>、TreeNode<K, V>、TreeBin<K, V>、ReservationNode<K,V>。正式这种独具特色的设计,才拥有高性能的Map并发容器。

Node

Node是ConcurrentHashMap存储结构的基本单元,保存key,value及key的hash值的数据结构,其中value和next都用volatile修饰,保证并发的可见性,继承于HashMap中的Entry,源代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/** 实现于 Map.Entry */
static class Node<K,V> implements Map.Entry<K,V> {
// // final 修饰的 哈希码、key,防止被重复赋值
final int hash;
final K key;
// 具有可见性的 val 和 next
volatile V val;
// 当前节点指向的下一个节点
volatile Node<K,V> next;

/**
* 构造方法用于注入 node节点 的属性值(或引用)
* 参数从左至右依次是:key的哈希码,key,value,指向的下一个节点next
*/
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
// getter & toString 方法
public final K getKey() { return key; }
public final V getValue() { return val; }
public final String toString(){ return key + "=" + val; }
// 返回节点的哈希码
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
// 设置 value 的方法,可能会抛出不支持的操作异常,不允许更新value
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

// 用于节点比较是否相等的方法
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
// 返回判断key、value是否相同的结果
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* 虚拟化地支持 map.get() 操作; 子类可以重写此方法.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
// 循环遍历链表
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

Node数据结构很简单,从上可知,就是一个链表,但是只允许对数据进行查找,不允许进行修改

专用于红黑树的 TreeNode<K, V> 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
static final class TreeNode<K,V> extends Node<K,V> {
// 父节点
TreeNode<K,V> parent;
// 左子节点
TreeNode<K,V> left;
// 右子节点
TreeNode<K,V> right;
// 指向上一个节点(一般是父节点),删除节点时会用到
TreeNode<K,V> prev;
// 红黑标识:true表示此节点为红色,false表示此节点为黑色
boolean red;
// 有参构造方法
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* 查找并返回红黑树中是存在的节点,不存在返回null
* 在上篇关于jdk1.8HashMap源码分析的文章中,分析过类似的(写)操作

*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
// 当前节点
TreeNode<K,V> p = this;
// 循环遍历平衡树
do {
// ph:当前节点的哈希码,
// dir:搜索的方向,左或右:-1表示左,1表示右,
// pk:当前节点的key
// q:当前节点
int ph, dir; K pk; TreeNode<K,V> q;
// pl:当前节点p的左子节点;pr:当前节点p的右子节点
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
// 当前节点的哈希值大于指定的哈希值,指向左子节点
p = pl;
else if (ph < h)
// 当前节点的哈希值小于指定的哈希值,指向右子节点
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
// 如果当前节点的key与指定的k相同,那么就直接返回此节点
return p;
else if (pl == null)
// 如果左子节点为空,就向右子节点查找
p = pr;
else if (pr == null)
// 如果右子节点为空,就向左子节点查找
p = pl;
// 判断 k 的类是否实现了比较器
else if ((kc != null ||
// 判断 k 的类是否实现了比较器
(kc = comparableClassFor(k)) != null) &&
// 这里实际是 pk == null || pk.getClass() != kc ? 0 :
// ((Comparable)pk).compareTo(pk)
// 下面是解读这个三目运算:
// pk == null 表示判断当前节点是否为null
// pk.getClass() != kc 表示当前节点对象的类和key的对象的类是否不同
// ((Comparable)k).compareTo(pk)表示将指定的key与当前节点的key比较
(dir = compareComparables(kc, k, pk)) != 0)
// dir小于0表示向左子节点搜索
p = (dir < 0) ? pl : pr;
// 循环查找
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}

computeIfAbsent 和 compute 方法使用的位置 保留节点

1
2
3
4
5
6
7
8
9
static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null, null);
}

Node<K,V> find(int h, Object k) {
return null;
}
}

持有红黑树根节点的容器

TreeBin 是保证 ConcurrentHashMap 线程安全的重要数据结构,它自身维护着读/写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static final class TreeBin<K,V> extends Node<K,V> {
// 红黑树的根节点
TreeNode<K,V> root;
// // 链表的头节点(桶顶的节点)
volatile TreeNode<K,V> first;
// 最近一个设置 waiter 标识位的线程
volatile Thread waiter;
// 锁状态标识位
volatile int lockState;
static final int WRITER = 1; // 持有写锁时的状态位
static final int WAITER = 2; // 正在等待写锁的状态位
static final int READER = 4; // 设置读锁时的增量值

...其他方法跟 TreeNode 中的方法很相似
}

ConcurrentHashMap添加元素操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许key、value为null
// 将null值判断提前,符合异常处理的规则(这里也是较上一版做了优化)
if (key == null || value == null) throw new NullPointerException();
// 计算key的哈希码
int hash = spread(key.hashCode());
// binCount 用来记录链表中节点数量,进而判断是否达到转为红黑树的阈值
int binCount = 0;
// 遍历table数组
for (Node<K,V>[] tab = table;;) {
// 要插入元素所在位置的节点
Node<K,V> f;
// n 表示数组长度;i 表示索引;fh 表示要插入元素所在位置的节点的哈希码
int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 如果数组为null或者数组的大小为0,那么进行初始化数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 通过指定key的哈希码找到对应的节点,
// 在节点为null的情况下,通过CAS自旋方式将这个元素放入其中
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
// 如果通过CAS自旋成功添加元素,就直接跳出循环
// 否则就进入下一个循环
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 标识着要迁移数据
tab = helpTransfer(tab, f);
// 通过上面过滤的条件,在应该能猜到下面不是关于链表就是关于红黑树
// 因为哈希槽的位置不为null
else {
//旧值
V oldVal = null;
//只给单个节点加锁
synchronized (f) {
//再次判断结点
if (tabAt(tab, i) == f) {
// 判断节点f的哈希码是否不小于0
if (fh >= 0) {
binCount = 1;
// 从桶顶(哈希槽)开始遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 判断key是否相同
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 如果相同的话,就准备获取value
oldVal = e.val;
// onlyIfAbsent为true标识可以覆盖value,false标识不允许覆盖
if (!onlyIfAbsent)
// 如果允许覆盖value,就覆盖value
e.val = value;
break;
}
// 记录下一个节点的上一个节点(目前以为着是当前节点e)
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 如果当前节点的下一个节点为null,就把元素添加到链表的尾部
pred.next = new Node<K,V>(hash, key,
value, null);
// 添加完成后就直接跳出循环
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 将元素添加到红黑树
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
// 是否可以覆盖已有的value
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 判断在链表不为空的情况下,是否达到转为红黑树的阈值
if (binCount >= TREEIFY_THRESHOLD)
// 如果链表元素达到转为红黑树的阈值,就转为红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//添加计数
addCount(1L, binCount);
return null;
}

如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;
如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比HashTable的synchronized要好得多。
putVal(K key, V value, boolean onlyIfAbsent)方法干的工作如下:

  1. 检查key/value是否为空,如果为空,则抛异常,否则进行2
  2. 进入for死循环,进行3
  3. 检查table是否初始化了,如果没有,则调用initTable()进行初始化然后进行 2,否则进行4
  4. 根据key的hash值计算出其应该在table中储存的位置i,取出table[i]的节点用f表示。
    根据f的不同有如下三种情况:
    (1) 如果table[i]==null(即该位置的节点为空,没有发生碰撞),
    则利用CAS操作直接存储在该位置,如果CAS操作成功则退出死循环。
    (2) 如果table[i]!=null(即该位置已经有其它节点,发生碰撞),碰撞处理也有两种情况
    (2.1)检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
    (2.2)说明table[i]的节点的hash值不等于MOVED,如果table[i]为链表节点,则将此节点插入链表中即可
    (3) 如果table[i]为树节点,则将此节点插入树中即可。插入成功后,进行 5
  5. 如果table[i]的节点是链表节点,则检查table的第i个位置的链表是否需要转化为数,如果需要则调用treeifyBin函数进行转化

1、第一步根据给定的key的hash值找到其在table中的位置index。
2、找到位置index后,存储进行就好了。

只是这里的存储有三种情况罢了,第一种:table[index]中没有任何其他元素,即此元素没有发生碰撞,这种情况直接CAS存储就好了哈。第二种,table[i]存储的是一个链表,如果链表不存在key则直接加入到链表尾部即可,如果存在key则更新其对应的value。第三种,table[i]存储的是一个树,则按照树添加节点的方法添加就好。第二种和第三种需要对节点进行加锁。

相关问题

并发环境下为什么使用ConcurrentHashMap?

  1. HashMap线程不安全
  2. HashTable虽然是线程安全的,但是效率低下,当一个线程访问HashTable的同步方法时,其他线程如果也访问HashTable的同步方法,那么会进入阻塞或者轮训状态。
  3. 在jdk1.6中ConcurrentHashMap使用锁分段技术提高并发访问效率。首先将数据分成一段一段地存储,然后给每一段数据配一个锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问。然而在jdk1.8中的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。

总结

其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树。
1.数据结构:取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。
2.保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS+Synchronized保证线程安全。
3.锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组元素加锁(Node)。
4.链表转化为红黑树:定位结点的hash算法简化会带来弊端,Hash冲突加剧,因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。5.查询时间复杂度:从原来的遍历链表O(n),变成遍历红黑树O(logN)。

欣赏此文?求鼓励,求支持!