ConcurrentHashMap与HashMap等的区别
1.HashMap
我们知道HashMap是线程不安全的,在多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
2.HashTable
HashTable和HashMap的实现原理几乎一样,差别无非是
HashTable不允许key和value为null
HashTable是线程安全的
但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁。
多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。
3.ConcurrentHashMap
主要就是为了应对hashmap在并发环境下不安全而诞生的,ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响。
我们都知道Map一般都是数组+链表结构(JDK1.8该为数组+红黑树)。
高并发编程系列:ConcurrentHashMap的实现原理(JDK1.7和JDK1.8)
ConcurrentHashMap避免了对全局加锁改成了局部加锁操作,这样就极大地提高了并发环境下的操作速度,由于ConcurrentHashMap在JDK1.7和1.8中的实现非常不同,接下来我们谈谈JDK在1.7和1.8中的区别。
JDK1.7的实现
在JDK1.7版本中,ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,如下图所示:
Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样
JDK1.8的实现
JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本
CAS是compare and swap的缩写,即我们所说的比较交换。cas是一种基于锁的操作,而且是乐观锁。在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。
CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存地址里面的值和A的值是一样的,那么就将内存里面的值更新成B。CAS是通过无限循环来获取数据的,若果在第一轮循环中,a线程获取地址里面的值被b线程修改了,那么a线程需要自旋,到下次循环才有可能机会执行。
源码解析
基本属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private static final int MAXIMUM_CAPACITY = 1 << 30 ;private static final int DEFAULT_CAPACITY = 16 ;static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 ;private static final int DEFAULT_CONCURRENCY_LEVEL = 16 ;private static final float LOAD_FACTOR = 0.75f ;static final int TREEIFY_THRESHOLD = 8 ;static final int UNTREEIFY_THRESHOLD = 6 ;static final int MIN_TREEIFY_CAPACITY = 64 ;private static final int MIN_TRANSFER_STRIDE = 16 ;private static int RESIZE_STAMP_BITS = 16 ;private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1 ;private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;static final int MOVED = -1 ; static final int TREEBIN = -2 ; static final int RESERVED = -3 ; static final int NCPU = Runtime.getRuntime().availableProcessors();transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable;private transient volatile long baseCount;private transient volatile int sizeCtl;private transient volatile int cellsBusy;private transient volatile CounterCell[] counterCells;
ConcurrentHashMap?的构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 1 . public ConcurrentHashMap () { } 2 . public ConcurrentHashMap (int initialCapacity) { if (initialCapacity < 0 ) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1 ) + 1 )); this .sizeCtl = cap; } 3 . public ConcurrentHashMap (Map<? extends K, ? extends V> m) { this .sizeCtl = DEFAULT_CAPACITY; putAll(m); } 4 . public ConcurrentHashMap (int initialCapacity, float loadFactor) { this (initialCapacity, loadFactor, 1 ); } 5 . public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long )(1.0 + (long )initialCapacity / loadFactor); int cap = (size >= (long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int )size); this .sizeCtl = cap; } private static final int tableSizeFor (int c) { int n = c - 1 ; n |= n >>> 1 ; n |= n >>> 2 ; n |= n >>> 4 ; n |= n >>> 8 ; n |= n >>> 16 ; return (n < 0 ) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1 ; }
ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作
ConcurrentHashMap中各种数据节点
jdk1.8 ConcurrentHashMap 中数据节点的种类比较多,比如 Node<K, V>、TreeNode<K, V>、TreeBin<K, V>、ReservationNode<K,V>。正式这种独具特色的设计,才拥有高性能的Map并发容器。
Node
Node是ConcurrentHashMap存储结构的基本单元,保存key,value及key的hash值的数据结构,其中value和next都用volatile修饰,保证并发的可见性,继承于HashMap中的Entry,源代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 static class Node <K ,V > implements Map .Entry <K ,V > { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this .hash = hash; this .key = key; this .val = val; this .next = next; } public final K getKey () { return key; } public final V getValue () { return val; } public final String toString () { return key + "=" + val; } public final int hashCode () { return key.hashCode() ^ val.hashCode(); } public final V setValue (V value) { throw new UnsupportedOperationException(); } public final boolean equals (Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } Node<K,V> find (int h, Object k) { Node<K,V> e = this ; if (k != null ) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null ); } return null ; } }
Node数据结构很简单,从上可知,就是一个链表,但是只允许对数据进行查找,不允许进行修改
专用于红黑树的 TreeNode<K, V> 节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 static final class TreeNode <K ,V > extends Node <K ,V > { TreeNode<K,V> parent; TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super (hash, key, val, next); this .parent = parent; } Node<K,V> find (int h, Object k) { return findTreeNode(h, k, null ); } final TreeNode<K,V> findTreeNode (int h, Object k, Class<?> kc) { if (k != null ) { TreeNode<K,V> p = this ; do { int ph, dir; K pk; TreeNode<K,V> q; TreeNode<K,V> pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null ) p = pr; else if (pr == null ) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null ) && (dir = compareComparables(kc, k, pk)) != 0 ) p = (dir < 0 ) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null ) return q; else p = pl; } while (p != null ); } return null ; } }
computeIfAbsent 和 compute 方法使用的位置 保留节点
1 2 3 4 5 6 7 8 9 static final class ReservationNode <K ,V > extends Node <K ,V > { ReservationNode() { super (RESERVED, null , null , null ); } Node<K,V> find (int h, Object k) { return null ; } }
持有红黑树根节点的容器
TreeBin 是保证 ConcurrentHashMap 线程安全的重要数据结构,它自身维护着读/写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static final class TreeBin <K ,V > extends Node <K ,V > { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; static final int WRITER = 1 ; static final int WAITER = 2 ; static final int READER = 4 ; ...其他方法跟 TreeNode 中的方法很相似 }
ConcurrentHashMap添加元素操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;
如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比HashTable的synchronized要好得多。
putVal(K key, V value, boolean onlyIfAbsent)方法干的工作如下:
检查key/value是否为空,如果为空,则抛异常,否则进行2
进入for死循环,进行3
检查table是否初始化了,如果没有,则调用initTable()进行初始化然后进行 2,否则进行4
根据key的hash值计算出其应该在table中储存的位置i,取出table[i]的节点用f表示。
根据f的不同有如下三种情况:
(1) 如果table[i]==null(即该位置的节点为空,没有发生碰撞),
则利用CAS操作直接存储在该位置,如果CAS操作成功则退出死循环。
(2) 如果table[i]!=null(即该位置已经有其它节点,发生碰撞),碰撞处理也有两种情况
(2.1)检查table[i]的节点的hash是否等于MOVED,如果等于,则检测到正在扩容,则帮助其扩容
(2.2)说明table[i]的节点的hash值不等于MOVED,如果table[i]为链表节点,则将此节点插入链表中即可
(3) 如果table[i]为树节点,则将此节点插入树中即可。插入成功后,进行 5
如果table[i]的节点是链表节点,则检查table的第i个位置的链表是否需要转化为数,如果需要则调用treeifyBin函数进行转化
1、第一步根据给定的key的hash值找到其在table中的位置index。
2、找到位置index后,存储进行就好了。
只是这里的存储有三种情况罢了,第一种:table[index]中没有任何其他元素,即此元素没有发生碰撞,这种情况直接CAS存储就好了哈。第二种,table[i]存储的是一个链表,如果链表不存在key则直接加入到链表尾部即可,如果存在key则更新其对应的value。第三种,table[i]存储的是一个树,则按照树添加节点的方法添加就好。第二种和第三种需要对节点进行加锁。
相关问题
并发环境下为什么使用ConcurrentHashMap?
HashMap线程不安全
HashTable虽然是线程安全的,但是效率低下,当一个线程访问HashTable的同步方法时,其他线程如果也访问HashTable的同步方法,那么会进入阻塞或者轮训状态。
在jdk1.6中ConcurrentHashMap使用锁分段技术提高并发访问效率。首先将数据分成一段一段地存储,然后给每一段数据配一个锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问。然而在jdk1.8中的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。
总结
其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树。
1.数据结构:取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。
2.保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS+Synchronized保证线程安全。
3.锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组元素加锁(Node)。
4.链表转化为红黑树:定位结点的hash算法简化会带来弊端,Hash冲突加剧,因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。5.查询时间复杂度:从原来的遍历链表O(n),变成遍历红黑树O(logN)。