ConcurrentHashMap源码分析

1,使用及场景

    CHM是J.U.C提供的一个线程安全且高效的HashMap,api基本和Hashmap类似,主要有get,put等方法。

2,源码分析

    2.1 JDK1.7和1.8的变化

        JDK1.7,简单来说chm是一个segment数组,它通过集成ReentrantLock进行加锁,通过每次锁住一个segment来保证每个segment内的操作是线程安全的。理论上可以同时支持16个线程的并发写入。因为有16个segment
        JDK 1.8 相对于1.7有两个方面的改动,
            1,取消了segment的设计,直接用Node数组来保存数据,对每个Node加锁来实现线程安全行。   
            2,将原来的数组+单向链表改为数据+单向链表+红黑树实现,为什么要用红黑树呢,因为会存在链表过长的节点,如果还采用单向列表的话,那么查询某个节点的时间复杂度为O(n),因此对于链表长度超过8的,jdk1.8采用了红黑树的结构将时间复杂度降为O(logN),提升查询性能。

    2.2  put方法 initTable

    如果当前table为空,则初始化一个长度为16的table,并且将扩容大小赋值给sizeCtl(0.75倍原数组大小)。
    sizeCtl:(1)负数代表正在初始化(-1)或者扩容(-N代表有N-1个线程正在进行扩容)操作,
                  (2)0表示Node数组还没有被初始化,
                  (3)正数表示下一次扩容的大小。
    if (tab == null || (n = tab.length) == 0)
        tab = initTable();
    //n是2的n次方,所以减去1再和hash求与,结果还是hash,结果是取到位置为i的Node
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        if (casTabAt(tab, i, null,
                     new Node<K,V>(hash, key, value, null)))
            break;                   // no lock when adding to empty bin
    }
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    上面 tabAt(tab, i = (n - 1) & hash)) 等价于tab[i],为什么不直接使用tab[i]呢,虽然数组本身是volatile修饰的,但是volatile的数组只针对它的引用,并不针对它里面的元素,所以它内部是用 getObjectVolatile 获取的。保证有其它线程对这个数组进行写操作,那么当前线程也一定可以读到最新的值。

    2.3 put方法 addCount

    putVal 执行完之后,会通过 addCount 来增加CHM中元素的个数,并且还会触发扩容操作。它里面运用了分而治制的思想
    如果在没有并发的情况下,chm采用 cas的方法修改 basecount 字段来统计集合的大小,否则使用CounterCell数组来计算。?为什么用这种方式呢,按照正常的想法为了保证安全性,肯定是用一个成员变量加锁来统计元素的个数,但是竞争比较激烈的情况下会影响性能。所以采用了分片的方法来记录大小。
    addCount(1L, binCount);     // 1L 表示这次需要在表中增加的元素个数,binCount是链表长度,如果大于等于0都需要进行扩容检查

    private transient volatile int cellsBusy;    // 标识当前cell数组是否在初始化或者扩容的cas标志
     */
    private transient volatile CounterCell[] counterCells;    //集合大小分别存在每个cell中

    if (counterCells == as) {
        CounterCell[] rs = new CounterCell[2];
        rs[h & 1] = new CounterCell(x);
        counterCells = rs;
        init = true;
    }

        默认会初始化长度为2的CounterCell数组,然后随机得到指定的一个数组下标,将需要新增的值放到对应的下标处。

    2.4 transfer扩容阶段

    当 集合键值对总数 >= sizeCtl(扩容阀值)时,进行扩容操作。扩容的核心操作是数据的转移,最直观的方法是使用互斥锁,把转移过程锁住,但是持有锁的线程耗时越长,其它线程就一直被阻塞,从而导致吞吐量较低。
    resizeStamp(n); 会生成一个int值,高16位代表扩容标记,低16位代表并发扩容线程数,n不同生成的值也不会相同。保证每次扩容都生成唯一的戳。
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);    //
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }

    chm并没有直接加锁,而是采用cas实现无锁的并发同步策略,最精华的部分是利用多线程来协同扩容。

    简单的过程就是,我们把Node数组当做多个线程的共享队列,然后通过一个指针来划分每个线程负责的区间,每个线程通过逆向遍历实现扩容,已经遍历完的Node会被替换为一个 ForwardingNode 节点,表示当前Node已经被其它线程迁移完了。
    1,fwd节点:标识类,用于指向新表
    2,advance:提示代码是否进行推进处理,也就是当前桶处理完,处理下一个桶的标识。
    3,finishing:用于提示扩展是否结束用的。
    while (advance) {
        int nextIndex, nextBound;
        if (--i >= bound || finishing)
            advance = false;
        else if ((nextIndex = transferIndex) <= 0) {
            i = -1;
            advance = false;
        }
        else if (U.compareAndSwapInt
                 (this, TRANSFERINDEX, nextIndex,
                  nextBound = (nextIndex > stride ?
                               nextIndex - stride : 0))) {
            bound = nextBound;
            i = nextIndex - 1;
            advance = false;
        }
    }
    通过for自循环每个槽位中的链表元素,默认advance为真,通过CAS设置transferIndex属性值,并初始化 i 和bound的值,i 指当前处理的槽位号,bound指需要处理的槽位边界,先处理槽位31的节点,(boud, i) = (16,31) 从31的位置往前推动,如果15槽位处理完了,通过CAS插入初始化的fwd节点,用于告诉其他线程该槽位已经处理过了。
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
    在扩容操作中,每执行完一个扩容操作,就通过cas将sc减1,直到sc-2等于原来的resizeStamp(n)代表扩容已经完成。

    2.5 数据迁移阶段的实现分析    

    当划分好迁移区间好后,针对每个节点的列表迁移,chm使用了高低位原理,将原来的链表的处于高位的元素放到高位节点下,低位的保持不变。
    因为n总是2的次幂,所以 fh&n的结果只有两种结果, 第n位为0或者1,然后将0的放到低位链表,为1的放到高位链表。
    synchronized (f) {
        if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            if (fh >= 0) {
                int runBit = fh & n;
                Node<K,V> lastRun = f;
                for (Node<K,V> p = f.next; p != null; p = p.next) {
                    int b = p.hash & n;
                    if (b != runBit) {
                        runBit = b;
                        lastRun = p;
                    }
                }
                if (runBit == 0) {
                    ln = lastRun;
                    hn = null;
                }
                else {
                    hn = lastRun;
                    ln = null;
                }
                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                    int ph = p.hash; K pk = p.key; V pv = p.val;
                    if ((ph & n) == 0)
                        ln = new Node<K,V>(ph, pk, pv, ln);
                    else
                        hn = new Node<K,V>(ph, pk, pv, hn);
                }
                setTabAt(nextTab, i, ln);
                setTabAt(nextTab, i + n, hn);
                setTabAt(tab, i, fwd);
                advance = true;
            }
            else if (f instanceof TreeBin) {
                TreeBin<K,V> t = (TreeBin<K,V>)f;
                TreeNode<K,V> lo = null, loTail = null;
                TreeNode<K,V> hi = null, hiTail = null;
                int lc = 0, hc = 0;
                for (Node<K,V> e = t.first; e != null; e = e.next) {
                    int h = e.hash;
                    TreeNode<K,V> p = new TreeNode<K,V>
                        (h, e.key, e.val, null, null);
                    if ((h & n) == 0) {
                        if ((p.prev = loTail) == null)
                            lo = p;
                        else
                            loTail.next = p;
                        loTail = p;
                        ++lc;
                    }
                    else {
                        if ((p.prev = hiTail) == null)
                            hi = p;
                        else
                            hiTail.next = p;
                        hiTail = p;
                        ++hc;
                    }
                }
                ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                setTabAt(nextTab, i, ln);
                setTabAt(nextTab, i + n, hn);
                setTabAt(tab, i, fwd);
                advance = true;
            }
        }
    }
 

    2.6 put方法 helpTransfer

    在putval的时候,如果当前节点是fwd节点,既hash为moved,意味着有其它线程正在扩容,那么当前线程直接帮助它进行扩容,调用helpTransfer方法。
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);

    2.7 put方法 添加节点到链表中

    如果添加节点的位置已经存在节点,需要以链表的方式加入到节点中,如果当前节点已经是一颗红黑树,那么久按照红黑树的规则将当前节点加入到红黑树中。  
    //锁住当前节点链表
    synchronized (f) {
        if (tabAt(tab, i) == f) {
            if (fh >= 0) {
                binCount = 1;
                for (Node<K,V> e = f;; ++binCount) {
                    K ek;
                    if (e.hash == hash &&
                        ((ek = e.key) == key ||
                         (ek != null && key.equals(ek)))) {
                        oldVal = e.val;
                        if (!onlyIfAbsent)
                            e.val = value;
                        break;
                    }
                    Node<K,V> pred = e;
                    if ((e = e.next) == null) {
                        pred.next = new Node<K,V>(hash, key,
                                                  value, null);
                        break;
                    }
                }
            }
            else if (f instanceof TreeBin) {
                Node<K,V> p;
                binCount = 2;
                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                      value)) != null) {
                    oldVal = p.val;
                    if (!onlyIfAbsent)
                        p.val = value;
                }
            }
        }
    }

    2.8 put方法 判断链表是否需要转换为红黑树

    如果链表的长度 bincount 大于等于8,那么会根据当前链表的长度来决定是扩容还是转为红黑树,也就是说如果当前数组的长度小于64,那么优先扩容,否则转为红黑树。
    if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)//TREEIFY_THRESHOLD=8
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY=64
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }



原文地址:https://www.cnblogs.com/gaojf/p/12836639.html