1,使用及场景
CHM是J.U.C提供的一个线程安全且高效的HashMap,api基本和Hashmap类似,主要有get,put等方法。
2,源码分析
2.1 JDK1.7和1.8的变化
JDK1.7,简单来说chm是一个segment数组,它通过集成ReentrantLock进行加锁,通过每次锁住一个segment来保证每个segment内的操作是线程安全的。理论上可以同时支持16个线程的并发写入。因为有16个segment
JDK 1.8 相对于1.7有两个方面的改动,
1,取消了segment的设计,直接用Node数组来保存数据,对每个Node加锁来实现线程安全行。
2,将原来的数组+单向链表改为数据+单向链表+红黑树实现,为什么要用红黑树呢,因为会存在链表过长的节点,如果还采用单向列表的话,那么查询某个节点的时间复杂度为O(n),因此对于链表长度超过8的,jdk1.8采用了红黑树的结构将时间复杂度降为O(logN),提升查询性能。
2.2 put方法 initTable
如果当前table为空,则初始化一个长度为16的table,并且将扩容大小赋值给sizeCtl(0.75倍原数组大小)。
sizeCtl:(1)负数代表正在初始化(-1)或者扩容(-N代表有N-1个线程正在进行扩容)操作,
(2)0表示Node数组还没有被初始化,
(3)正数表示下一次扩容的大小。
if (tab == null || (n = tab.length) == 0) tab = initTable(); //n是2的n次方,所以减去1再和hash求与,结果还是hash,结果是取到位置为i的Node else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); }
上面 tabAt(tab, i = (n - 1) & hash)) 等价于tab[i],为什么不直接使用tab[i]呢,虽然数组本身是volatile修饰的,但是volatile的数组只针对它的引用,并不针对它里面的元素,所以它内部是用 getObjectVolatile 获取的。保证有其它线程对这个数组进行写操作,那么当前线程也一定可以读到最新的值。
2.3 put方法 addCount
putVal 执行完之后,会通过 addCount 来增加CHM中元素的个数,并且还会触发扩容操作。它里面运用了分而治制的思想。
如果在没有并发的情况下,chm采用 cas的方法修改 basecount 字段来统计集合的大小,否则使用CounterCell数组来计算。?为什么用这种方式呢,按照正常的想法为了保证安全性,肯定是用一个成员变量加锁来统计元素的个数,但是竞争比较激烈的情况下会影响性能。所以采用了分片的方法来记录大小。
addCount(1L, binCount); // 1L 表示这次需要在表中增加的元素个数,binCount是链表长度,如果大于等于0都需要进行扩容检查 private transient volatile int cellsBusy; // 标识当前cell数组是否在初始化或者扩容的cas标志 */ private transient volatile CounterCell[] counterCells; //集合大小分别存在每个cell中 if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; }
默认会初始化长度为2的CounterCell数组,然后随机得到指定的一个数组下标,将需要新增的值放到对应的下标处。
2.4 transfer扩容阶段
当 集合键值对总数 >= sizeCtl(扩容阀值)时,进行扩容操作。扩容的核心操作是数据的转移,最直观的方法是使用互斥锁,把转移过程锁住,但是持有锁的线程耗时越长,其它线程就一直被阻塞,从而导致吞吐量较低。
resizeStamp(n); 会生成一个int值,高16位代表扩容标记,低16位代表并发扩容线程数,n不同生成的值也不会相同。保证每次扩容都生成唯一的戳。
if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } }
chm并没有直接加锁,而是采用cas实现无锁的并发同步策略,最精华的部分是利用多线程来协同扩容。
简单的过程就是,我们把Node数组当做多个线程的共享队列,然后通过一个指针来划分每个线程负责的区间,每个线程通过逆向遍历实现扩容,已经遍历完的Node会被替换为一个 ForwardingNode 节点,表示当前Node已经被其它线程迁移完了。
1,fwd节点:标识类,用于指向新表
2,advance:提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识。
3,finishing:用于提示扩展是否结束用的。
while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } }
通过for自循环每个槽位中的链表元素,默认advance为真,通过CAS设置transferIndex属性值,并初始化 i 和bound的值,i 指当前处理的槽位号,bound指需要处理的槽位边界,先处理槽位31的节点,(boud, i) = (16,31) 从31的位置往前推动,如果15槽位处理完了,通过CAS插入初始化的fwd节点,用于告诉其他线程该槽位已经处理过了。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit }
在扩容操作中,每执行完一个扩容操作,就通过cas将sc减1,直到sc-2等于原来的resizeStamp(n)代表扩容已经完成。
2.5 数据迁移阶段的实现分析
当划分好迁移区间好后,针对每个节点的列表迁移,chm使用了高低位原理,将原来的链表的处于高位的元素放到高位节点下,低位的保持不变。
因为n总是2的次幂,所以 fh&n的结果只有两种结果, 第n位为0或者1,然后将0的放到低位链表,为1的放到高位链表。
synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } }
2.6 put方法 helpTransfer
在putval的时候,如果当前节点是fwd节点,既hash为moved,意味着有其它线程正在扩容,那么当前线程直接帮助它进行扩容,调用helpTransfer方法。
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);
2.7 put方法 添加节点到链表中
如果添加节点的位置已经存在节点,需要以链表的方式加入到节点中,如果当前节点已经是一颗红黑树,那么久按照红黑树的规则将当前节点加入到红黑树中。
//锁住当前节点链表 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } }
2.8 put方法 判断链表是否需要转换为红黑树
如果链表的长度 bincount 大于等于8,那么会根据当前链表的长度来决定是扩容还是转为红黑树,也就是说如果当前数组的长度小于64,那么优先扩容,否则转为红黑树。
if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD)//TREEIFY_THRESHOLD=8 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY=64 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }