【Java集合】--ConcurrentHashMap源码解析

    • ConcurrentHashMap是HashMap的线程安全版本,内部也是使用(数组 + 链表 + 红黑树)的结构来存储元素。
  • 相比于同样线程安全的HashTable来说,效率等各方面都有极大地提高。
  • 分段锁,是一种锁的设计思路它细化了锁的粒度,主要运用在ConcurrentHashMap中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。

继承结构

图片

源码解析

1.构造方法

ConcurrentHashMap()

public ConcurrentHashMap() {
}

ConcurrentHashMap(int initialCapacity)

public ConcurrentHashMap(int initialCapacity) {
    //
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    //sizeCtl为(initialCapacity + (initialCapacity >>> 1)<= 2^N
    //如果initialCapacity为7则sizeCtl为16,而在HashMap中是8
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

ConcurrentHashMap(Map<? extends K, ? extends V> m)

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

ConcurrentHashMap(int initialCapacity, float loadFactor)

//自定义初始容量和加载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
}

ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    //越界判断
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    //
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
        initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);
    int cap = (size >= (long)MAXIMUM_CAPACITY) ?
        MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
}

从构造方法来看,ConcurrentHashMap同HashMap其内部数组的初始化都没有在构造方法中进行。构造方法与HashMap对比可以发现,没有了HashMap中的threshold和loadFactor,而是改用了sizeCtl来控制,而且只存储了容量在里面。

2.添加元素(put(K key, V value))

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //key和vlaue不能为null
    if (key == null || value == null) throw new NullPointerException();
    //获取key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    //自旋的方式存入数据,直到数据存入成功
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果数组不存在,初始化数组
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //要存入数据的位置没有数据,通过cas的方式存入数据
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //如果数据正在迁移,帮助迁移数据
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //要插入位置存在数据
        else {
            V oldVal = null;
            //加锁,确保线程安全
            //以桶中第一个元素作为锁对象
            synchronized (f) {
                //如果是一个链表,以链表的方式放入数据
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //把新加入元素放在链表结尾
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //如果是一个树节点,按树的方式存入元素
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //判断是否需要树化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //键值对个数加一
    addCount(1L, binCount);
    return null;
}

计算哈希(spread(int h))

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

HashMap中的哈希值计算:

static final int hash(Object key) {
    int h;
    //对象的hashcode值^(异或)其hashcode的高16位的值
    //目的:提高hashcode的随机性,减少hash冲突
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

因为ConcurrentHashMap不能存储null,所以与HashMap不同,不需要null值的判断。而ConcurrentHashMap相比较HashMap多了 & HASH_BITS,这一步意义不明(我不知道为什么,因为0x7fffffff与任何值相与都为原数)

初始化数组(initTable())

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    //自旋直到数组创建成功
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl 小于0,则说明数组正在初始化或扩容,所以当前线程让出CPU让其他线程完成初始化工作
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //通过cas把sizeCtl 的值改为-1,表示当前线程正在进行扩容,
        // 这样其他线程获得CPU的执行权时,知道有其他线程在进行扩容,就可以让出CPU执行权了    
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //再一次判断,防止其他线程创建数组
                if ((tab = table) == null || tab.length == 0) {
                    // 如果sc为0则使用默认值16
                    // 如果在构造方法中,指定过参数,那么sc为大于等于2的整数次幂的值
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    //新建数组
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    //把新数组赋值给table
                    table = tab = nt;
                    // 设置sc为数组长度的0.75倍
                    // n - (n >>> 2) = n - n/4 = 0.75
                    //加载因子
                    sc = n - (n >>> 2);
                }
            } finally {
            // 初始化后,sizeCtl变为数组长度的3/4,这时存储的是扩容门槛
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

3.获取元素(get(Object key))

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //计算哈希后的值
    int h = spread(key.hashCode());
    //判断数组存在,且要获取的元素存在
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //桶的第一个元素为所查询数
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
         // 第一个节点的 hash 小于0,说明是树节点或者正在扩容
        // 使用find寻找元素,find的寻找方式依据 Node 的不同子类有不同的实现方式
        // 如果 Node 是树节点,则以树的方式去获取
        // 如果 Node 是 ForwardingNode ,则去扩容后的数组中查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //是链表,在链表中获取值
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

4.删除元素

public V remove(Object key) {
    return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    //自旋,直到操作成功
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果要查询的key在table中不存在,不进行操作
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        //如果数组正在扩容,当前线程帮助扩容    
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //要查询key在table上存在
        else {
            V oldVal = null;
            boolean validated = false;
            //加锁,确保线程安全
            synchronized (f) {
                //是链表节点,以删除链表元素的方式删除
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        //自旋直到元素删除成功
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    //如果是数节点,以数的方式删除节点
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        //键值对个数-1
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

5.扩容

HashMap中如果个数超过阈值就会发生扩容,而在ConcurrentHashMap中是通过addCount()方法来增加元素个数,故扩容·也在该方法中进行判断。

addCount(long x, int check)

private final void addCount(long x, int check) {
    // 把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想)
  	// 并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段
  	// 这样可以保证尽量小的减少冲突
    CounterCell[] as; long b, s;
     // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
         // 通过 cas 的方式更改 CounterCell 中的值
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        //计算键值对总数
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        //如果达到扩容门槛,进行扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            //sc<0说明正在扩容
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 扩容未完成,则当前线程加入迁移元素中
          	    // 并把扩容线程数加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                //把旧节点的数据移动到新节点上
                transfer(tab, null);
            s = sumCount();
        }
    }
}

transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 将 length / 8 然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
    // 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    //初始化新桶
    if (nextTab == null) {            // initiating
        try {
            //扩容为原来的二倍
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            //更新
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            //扩容失败,把sizeCtl使用Integer的最大值
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        //更新成员变量
        nextTable = nextTab;
        //更新转移下标就是老的table的length
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 创建一个 fwd 节点,用于占位。当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 首次推进为 true,如果等于 true,说明需要再次推进一个下标(i--),反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
    boolean advance = true;
     // 完成状态,如果是 true,就结束此方法
    boolean finishing = false; // to ensure sweep before committing nextTab
      // 自旋,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            // 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务) 
            // 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。 
            // 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。如果 i 对应的桶处理成功了,改成可以推进。 if (--i >= bound || finishing)
            if (--i >= bound || finishing)
                // 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
                // 这里的目的是:1. 当一个线程进入时,会选取最新的转移下标。2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束) 
        // 如果 i >= tab.length(不知道为什么这么判断) 
        // 如果 i + tab.length >= nextTable.length (不知道为什么这么判断)
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            //扩容完成
            if (finishing) {
            //删除临时变量
                nextTable = null;
                table = nextTab;
                // 更新阈值,为原来的1.5倍
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 如果相等,扩容结束了,更新 finising 变量
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
         // 获取老 tab i 下标位置的变量,如果是 null,就使用 fwd 占位。
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        //
        else if ((fh = f.hash) == MOVED)
            //已经倍别的线程处理过,处理下一个值
            advance = true; // already processed
        else {
        // 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
            //加锁确保安全性
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                // 把一个链表分化成两个链表
           		// 规则是桶中各元素的hash与桶大小n进行与操作
            	// 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中
            	// 其中低位链表迁移到新桶中的位置相对旧桶不变
            	// 高位链表迁移到新桶中位置正好是其在旧桶的位置加n
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        // 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    //如果是树节点
                    // 如果第一个元素是树节点
                    // 也是一样,分化成两颗树
                    // 也是根据hash&n为0放在低位树中
                    // 不为0放在高位树中
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        // 低位树的位置不变 
                        setTabAt(nextTab, i, ln);
                        // 高位树的位置是原位置加n
                        setTabAt(nextTab, i + n, hn);
                        // 标记该桶已迁移
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

transfer 方法内部多线程扩容性能很厉害,通过给每个线程分配桶区间,避免线程间的争用,通过为每个桶节点加锁,避免 putVal 方法导致数据不一致。同时,在扩容的时候,也会将链表拆成两份,这点和 HashMap 的 resize 方法类似。而如果有新的线程想 put 数据时,也会帮助其扩容。

总结

ConcurrentHashMap 的本质就是利用分段锁的原理,将锁的粒度给细化然后再做一系列操作,提高并发度。除了 Synchronized 锁,其还利用了乐观锁,即 CAS + 自旋,降低上下文切换的代价。

而ConcurrentHashMap使用sychronized加锁,而不用lock是因为经过几代jdk的版本更替,sychronized锁的效率不必lock低。

相较于JDK1.7,JDK1.8的ConcurrentHashMap在扩容时,对锁的粒度进行了再一次的细化,使其效率更加提高。

原文地址:https://www.cnblogs.com/wf614/p/12397236.html