ConcurrentHashMap源码

  推荐文章:
  网上的文章,jdk7版本的比较多,由于自己本地是jdk8,所以还是整理jdk8的逻辑吧。
------------------------------------------------------------------------------------------
一、结构跟思路说明,主要字段解释
  HashMap的源码我们比较清楚,大概结构就是数组加链表,数组默认长度为16,负载因子为0.75,超过后扩容,长度为当前数组长度*2,数组下的链表元素超过8个时转换为红黑树结构,优化查找性能。jdk8中ConcurrentHashMap的结构跟HashMap一样,也是直接数组加链表,每个链表的首节点作为synchronized的同步对象使用,对该链表数据的访问都要经过同步块synchronized,跟jdk7不一样,这点最后再说。jdk8的难点在于多线程扩容部分。
  主要字段说明:
  int DEFAULT_CAPACITY = 16; // 默认容量,哈希表数组的初始长度
  int DEFAULT_CONCURRENCY_LEVEL = 16; // 默认并发级别,也就是允许的最大并发线程数
  float LOAD_FACTOR = 0.75f; // 负载因子
  int TREEIFY_THRESHOLD = 8; // 转化为红黑树的阈值
  volatile Node<K,V>[] table; // 哈希表数组
  volatile int sizeCtl; // 用来控制初始化跟扩容的字段。-1表示初始化,-(1+扩容线程数)表示在扩容或者缩容,默认值0,未初始化且大于0则表示初始容量,扩容后值为下一次应该扩容的阈值(当前容量*0.75)
二、put源码
  put整体逻辑就是根据key哈希值计算在哈希桶的位置,然后将数据放入链表,具体如下:
  1、如果未初始化则先进行初始化,完成后再次进入循环;
  2、若找到位置,该处元素为null,则用cas方式进行设置,成功则退出,否则进入下一次循环;
  3、若发现对应位置的链表首节点hash值为-1,说明在进行扩容,则当前线程也帮助进行扩容,扩容完成后继续进行put操作;
  4、以上判断都结束后,synchronized锁定链表首节点,进入链表遍历,进行设置。
  相关代码:
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) { // onlyIfAbsent,是否保留原来的值,默认false,也就是覆盖旧值
    if (key == null || value == null) throw new NullPointerException();  // 不允许null的key跟value,因为有可能会作为对象锁使用,synchronized是基于monitor机制的,null没有对象头
    int hash = spread(key.hashCode()); // 对hashcode进行散列,获取一个相对分布更加均匀的hash值。这个散列是用hashcode的高16位跟低16位进行异或运算得到的一个结果。
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { // 哈希数组赋值给tab----为啥要
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0) // 哈希数组默认是空的,要初始化
            tab = initTable(); // 见下文,new了一个node数组
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 根据hash值取table中元素,该元素为空
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // cas设置该元素的值,失败则说明有线程竞争,break进入下次循环
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED) // 从名字来看,正在扩容,MOVED 值为-1,这个扩容部分再看
            tab = helpTransfer(tab, f); // 
        else {  // 找到哈希表中的位置,而且该位置元素不为null,hash值也不是-1,进入正常的遍历链表赋值
            V oldVal = null;
            synchronized (f) { // 就是这个操作,把链表首节点作为锁对象进行同步(也可能是红黑树头节点)
                if (tabAt(tab, i) == f) { // 普通Node节点
                    if (fh >= 0) { // 首节点的hash值
                        binCount = 1; 
                        for (Node<K,V> e = f;; ++binCount) { 
                            K ek;
                            if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // key 的hash值相等且equals比较也相同,则覆盖
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) { // 在链表追加
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 红黑树节点
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD) // 链表长度达到8个,转换为红黑树节点--这个并不完全是
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
  对于链表过长而转换为红黑树,实际是并不是,还要看哈希表容量,容量小于64的话,哈希表扩容,一般来说就解决了单节点过长问题。若单链表过长,且哈希表长度超过64,这才会转换为红黑树。
  哈希桶(数组)初始化过程,也是整个map的初始化过程:
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) { // 数组为空
        if ((sc = sizeCtl) < 0)  // 初始化的"功劳"被别人抢走了
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  // 抢夺初始化"功劳"的操作,cas设置sizeCtl = -1
            try {
                if ((tab = table) == null || tab.length == 0) {
                      int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 数组大小默认为16
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //建了个node数组,长度为sizeCtl大小(默认16)
                    table = tab = nt;  // 数组赋给哈希表
                    sc = n - (n >>> 2); // sc 变为原来的3/4,为啥是这个数,因为负载因子0.75,正好用这个数做记录
                }
            } finally {
                sizeCtl = sc;  // 第12行的数据给了sizeCtl
            }
            break;
        }
    }
    return tab; 
}
  这里的这个spread(),就是把高位拿下来也参与了hash。因为如果仅仅取余的话,其实看的只是最低的几位,即使高位不同,最后也会被分到同一个位置,这会导致数据在hash表中分布不均匀。将高位跟低位一起参与运算,可以适当的减轻这种情况。
三、get源码
  get的主要逻辑也比较清晰,就是根据哈希值进行取余来确定位置,然后根据equals来比较是否相等,是则取出。hash值为负数,则认为是在进行扩容导致的数据迁移。数据迁移的整体逻辑最后整理。逻辑如下:
  1、判断是否已初始化,若未初始化,则直接返回null;hash值对应的链表首节点为null,也直接返回null;
  2、hash值跟链表首节点相等,且key地址相同或者equals相等,则认为链表首节点就是要找的节点,返回其值;
  3、若链表首节点的hash值为负数,说明map在扩容,链表可能数据不全;通过节点Node的find方法进行查找;(这个方法需要分析一下)
  4、正常的遍历链表进行查找;
  相关代码:
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());  // 跟put一样的散列操作,结果最后会被修正为正数
    if ((tab = table) != null && (n = tab.length) > 0 &&  (e = tabAt(tab, (n - 1) & h)) != null) { //table已经初始化,而且hash值对应的位置,首节点有值,未初始化直接返回null
        if ((eh = e.hash) == h) { // 首节点hash值跟要查找的key的hash值一样
            if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 地址或者equals结果相同
                return e.val;
        }
        else if (eh < 0)  // hash值不一样的情况(元素能被分到这个地方,按道理hash应该是取余相等):hash值  < 0,说明在扩容或者为红黑树
            return (p = e.find(h, key)) != null ? p.val : null; //查找元素,支持红黑树节点
        while ((e = e.next) != null) { // hash值不一样,挨个查找
            if (e.hash == h &&  ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
四、扩容源码
  先理清楚方法中几个变量的含义:
  int size ---- 本次想要扩充的数量(一般是数组长度的一半(右移1位),putAll的时候是新map的长度)
  int c ---- 当前数组扩容后的大小
private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);  // 计算本次扩容的目标值
    int sc;
    while ((sc = sizeCtl) >= 0) { // 目标值不一定足够大,要跟sizeCtl进行比较
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) { //没有初始化
            n = (sc > c) ? sc : c;  //扩容的阈值跟计算的目标值比较,较大者为目标值(这里相当于用一个map来初始化concurrentHashMap了)
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 抢夺初始化,Unsafe的这个方法说明见下文
                try {
                    if (table == tab) {
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY) // 没到达扩容阈值或者表已经太大,无法再扩容
            break;
        else if (tab == table) { // 
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
  这里是扩容的主要方法,我们先理清楚扩容的容量是怎么变化的:
  直接调用这个tryPresize(int size)的有两个地方putAll(Map m)跟treeifyBin(Node[] tab, int index),前者传进来的参数是m.size(),后者是tab.length << 1。结合initTable,初始化后的数组长度为16,如果触发了treeifyBin,由于数组长度小于64,会进行数组扩容而不是转换为红黑树,扩容时传值过来的是16 << 1也就是32,然后这个tableSizeFor(32+(32 >>> 1) +1)也就是tableSizeFor(49),tableSizeFor()是个对入参往上取2的最小幂的过程,也就是64。看到没,因为初始数组长度是16,如果在数组长度不是很长的情况下触发了红黑树转换机制,那么数组长度直接成了64,而不是32!!这是为了省事儿吧,因为这种情况下,hash表元素不多,而单链表的长度居然到了8个,可以说是元素分布的非常不均匀了,极端一点一下扩容多点,相对来说能使元素分布的分散一点。以后再有链表过长的,也不会触发这个tryPresize了,因为第一次扩容后就已经达到红黑树要求的数组长度最低值了。
然后看putAll(Map m)这个方法,这个是将另外的map添加到ConcurrentHashMap中来的方法,扩容的目标数值是size + (size >>> 1) + 1,size为新来的map的元素个数,这个具体最终要扩展到多少要跟sizeCtl进行比较,如果sizeCtl大则说明不用扩容,直接退出,否则进行扩容。
  U.compareAndSwapInt(this, SIZECTL, sc, -1)这一行是调用的jdk的Unsafe类的一个方法,看名字是cas操作修改一个int值。Unsafe类是jdk用于操作内存等一些精细操作的类,仅限于jdk使用,我们调用会报错(原理是检查类加载对象,我们可以有方法绕过去),有四个参数compareAndSwapInt(this, offset, expect, update),第一个参数是被修改的对象,offset是被修改字段在对象中的内存偏移量(从对象在内存中的起始位置算到该字段的偏移量),第三个是期望值,是被修改字段的初始值,相当于乐观锁的版本号,update是如果cas判断符合后字段要新后的新值。这一部分的相关介绍,请参考:https://blog.csdn.net/sherld/article/details/42492259
到这里,整体思路相对来说比较明确。复杂的是扩容的具体执行transfer的代码;这里要考虑多个线程的并发扩容问题。读了几遍还是有的地方没太明白:
transfer的思路:
  为了便于多线程并行处理,不会引起冲突,这里对哈希桶进行了分段,每个线程处理的数据为一个“步长”--也就是一个stride的长度,数据迁移完成后再继续处理下一个分段。需要注意的是对于整个哈希桶来说,数据的迁移是从末尾开始倒着往前进行的。比如哈希桶数组的长度是64,则先进行迁移的是49-64这16个链表对应的数据,这也正是一个步长的长度。对于旧数组,扩容线程每完成一个链表的数据迁移,就会将该链表首节点元素修改为ForwardingNode,查找元素可以通过该对象的find方法进行,remove,pudate,put则要优先执行扩容,然后再进行相关操作。
  int stride :步长,每个扩容线程要处理的哈希桶的位数;
  int transferIndex :迁移下标,下一个要进行扩容的线程应该获取步长的起始位置;注意是倒着来的;
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { //每次扩容结束,nextTab 会被置空,因此第一个扩容的线程的nextTab应该是null
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    // 步长的计算,默认最小值是16,so,数组不是很大的情况下,一般是16。从计算过程看出,对8线程cpu来说,这个值要比16大,n最小应该是64*16=1024
        stride = MIN_TRANSFER_STRIDE; // subdivide range  
    if (nextTab == null) {            // initiating    //新数组的初始化,直接容量翻倍
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;  //因为是倒着来的,注意这里是n而不是0
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;   // hash桶完成的标志位,true已经处理过了,false未处理
    boolean finishing = false; // to ensure sweep before committing nextTab   //整个扩容完成的标志位
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) { // 更新待迁移的hash桶索引
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {  //本线程任务抢占成功,将下个线程要开始的位置设置好
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {  // 所有节点完成
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 本线程迁移完成,
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 最后一个线程
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)  //该位置为空,不需要迁移,直接放一个fwd对象,表示迁移过了
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)   //已经是fwd对象,说明迁移完了,不用处理
            advance = true; // already processed
        else {
            synchronized (f) { // 正常的迁移逻辑,跟hashmap类似,利用按位与操作将链表数据巧妙的分到两个链表中,而且位置也已经计算好了
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        //普通节点迁移,略
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        //树节点迁移,略
                        advance = true;
                    }
                }
            }
        }
    }
}
  第31、32行代码的解释:第一个扩容的线程,执行transfer方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),后续帮其扩容的线程,执行transfer方法之前,会设置 sizeCtl = sizeCtl+1,每一个退出transfer的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1,那么最后一个线程退出时:必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT。
  本以为ConcurrentHashMap的结构了解了,翻看源码应该会很容易,结果实际花费的时间比预期多的多。主要原因是对于扩容的transfer思路以及其中各种标志位的处理以及这么处理的原因不甚了解,需要进行反复猜测推理。也幸而有各路大神提前研究过,这才跟在大神身后亦步亦趋,终于理清楚。

原文地址:https://www.cnblogs.com/nevermorewang/p/10093796.html