ConcurrentHashMap扩容分析

CHM的扩容方法是 transfer ,发现调用该方法的地方一共有三处

  tryPresize    在treeifyBin被调用,判断该数组节点相关元素是不是已经超过8个的时候,此时要判断桶的数量是不是超过了64个,如果还没超过就先扩容。如果超过则把链表转为树

  helpTransfer     一个线程要对table中元素进行操作的时候(可以是put也可以是remove),如果检测到节点的HASH值为MOVED的时候,就会调用helpTransfer方法

  addCount  对CHM进行了增加元素的操作,使得数组中存储的元素个数发生了变化的时候会调用的方法

  这里还有一点特殊的,那就是helpTransfer 是有返回值的,而其他两个方法返回值是void。这也好理解,毕竟helpTransfer就表示已经开始扩容了,那么nextTable就必然不是null了。

  开始分析transfer

  第一部分 准备阶段

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range 不用纠结 就当它是16好了
        if (nextTab == null) {            // initiating 如果是第一个线程那么nextTab就是null
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;//transferIndex很关键 是老表的表长度比如16
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//迁移后的桶位置上就放上ForwardingNode 注意下整个桶里方法的fwd都是同一个
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab

  第二部分开始转移

for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {           第一部分 决定扩容的上限和下限 甚至可能是下一次的扩容
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) { 第二部分决定是否退出
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)//第三部分 开始做真正的转移
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            转移普通节点
                        }
                        else if (f instanceof TreeBin) {
                            转移树节点
                        }
                    }
                }
            }
        }
    }

首先大的脉络这是一个自旋的过程,而出口只有两个地方,我们最后分析这两处。

if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;

  先简化流程,就假设初始容量是16这样就只会有一个线程来扩容

一 计算转移上限下限

        while (advance) {           第一部分 决定扩容的上限和下限 甚至可能是下一次的扩容
                int nextIndex, nextBound;
                if (--i >= bound || finishing)//i是不断自减得
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {//transferIndex在上面被赋值,是源桶数组的长度,如果transferIndex <= 0是整个桶都处理完了
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {stride是16,nextIndex也是16,所以这里nextBound是0
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
i是
transferIndex - 1 ,正好是最高的下标
到这里我们知道了i是15 而bound是0,这就是一个转移段有16个桶

  然后就是对每个桶进行转移了,也就是第三部分,其实这部分的逻辑反倒是最简单的

 二 桶的转移

       else if ((f = tabAt(tab, i)) == null)//如果是null,直接通过cas把fwd放上去,fwd里面就有nextTable的引用
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)//这是已经处理过了 因为fwd节点的hash值就是-1
                advance = true; // already processed
            else {
                synchronized (f) {//加锁
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {//还记得特殊节点的特殊值嘛 forward是-1 treenode是-2 所以大于0表示这是个普通的节点
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //高位链,低位链和HashMap逻辑是一样的 只是加了点小技巧
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);//注意一个细节 新的table的里的Node都是new出来的,原桶根本没有变,也就是说在cas前老桶还是能够提供查询功能的
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {//红黑树的过程和普通节点没啥区别,因为红黑树里面也是有链表结构的,它也是按照高位链低位链划分的
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }

三 最重要的部分 退出

先看一种情况,就是处理完了16个桶之后会怎么样 

      while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing) //处理完本次的16个桶那么肯定这里 --i >=bound就是false了
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {//如果下标还不是0 就是还有要转移的
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {//这里还是会再次计算i和bound 再次处理这次的桶数组
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }

  也就是说一个线程进入transfer,不论是他是第一个开始扩容的线程,还是后来进入帮忙的,不处理完是不能退出的,而不是处理完16个就算完了。

  那么如果

nextIndex = transferIndex) <= 0

  那就说明此时真的数据转移完了,那么就进入退出判断流程,这部分也是最难得

      if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }

上一个代码段里说了,如果已经转移完了,那么 i = -1,满足了if条件

  finishing只有一个地方被赋值  finishing = advance = true; 所以第一次进来是不可能直接走

         if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }

  所以重点要分析的就是,通过CAS对 sizeCtl 这很好理解,扩容的线程要-1。如果经过-1后这个值不是

  (sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT 说明什么呢?说明它不是最后一个线程。

  这里其实挺有意思的,还记得扩容开始的 sizeCtl 怎么算的嘛? 就是扩容标识戳,然后左移16位,同时低16位赋值为2,所以啊 就是判断 当前线程-1之后等不等于这个值

  如果不等于这个值,那么他不是最后一个线程,那么你可以走了。

  而最后一个线程不能走,我们就来看看不能走的逻辑吧

        if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }

  最后一个线程会执行最后两句,i又被赋值为原桶的size

  代码自旋又走到while循环里

  然后又是 - -i,再把每个桶检查一遍

  都检查完了,还记得上面把finishing设置为true了吧

  

          if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }

最后的线程收尾工作

  1 nextTable 为null

  2 table指向新的nextTable

  3 重新计算sizeCtl

  到此整个transfer分析就完了

四 收尾

  我们最开始说了三个方法会调transfer

  其实这三个方法都差不太多

  这里面还有一个比较好玩的地方

  拿出来分析分析

       int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {//sizeCtl为负数才说明在扩容
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }

  sc >>> RESIZE_STAMP_SHIFT) != rs 其实我要说的就是这个,rs就是扩容标识戳,每个线程要进入扩容之前都要检查当前的扩容标识戳对不对。如果不对那说明上次的扩容结束了,这次是新的扩容。

  这里再次强调下 sizeCtl 高16位是扩容标识戳,低16位是扩容线程数 + 1

  

  

原文地址:https://www.cnblogs.com/juniorMa/p/13907580.html