并发容器之 ConcurrentHashMap源码剖析

本文总结自《Java并发编程的艺术》 和 https://mp.weixin.qq.com/s/My4P_BBXDnAGX1gh630ZKw

5.6.1 ConcurrentHashMap

JDK7中HashMap在多线程情况下会出现的问题

在多线程环境下,JDK7中进行扩容操作时可能会造成某个桶内形成环形链表或者数据丢失。

void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    for (Entry<K,V> e : table) {
        while(null != e) {
            Entry<K,V> next = e.next;
            if (rehash) {
            	e.hash = null == e.key ? 0 : hash(e.key);
        	}
        int i = indexFor(e.hash, newCapacity);
        e.next = newTable[i]; // 将目标桶中的元素接到当前元素下面
        newTable[i] = e; //将当前元素所在的桶整个接到目标桶下面
        e = next; // 继续遍历原始桶中的下一个元素
        }
    }
}

在对table进行扩容到newTable后,需要将原来数据转移到newTable中,注意10-12行代码,这里可以看出在转移元素的过程中,使用的是头插法,也就是会将目标桶中链表的顺序翻转,这里也是多线程下形成死循环的关键点。

JDK8中HashMap在多线程环境下会出现的问题

在多线程环境下,JDK8中进行put操作,可能会发生数据覆盖。
假如现在有两个线程都执行到了上图中的划线处。当线程一判断为空之后,CPU 时间片到了,被挂起。线程二也执行到此处判断为空,继续执行下一句,创建了一个新节点,插入到此下标位置。然后,线程一解挂,同样认为此下标的元素为空,因此也创建了一个新节点放在此下标处,因此造成了元素的覆盖。

Hashtable存在的问题
> 管是往 map 里边添加元素还是获取元素,都会用 synchronized 关键字加锁。当有多个元素之前存在资源竞争时,只能有一个线程可以获取到锁,操作资源。更不能忍的是,一个简单的读取操作,互相之间又不影响,为什么也不能同时进行呢?所以,hashtable 的缺点显而易见,它不管是 get 还是 put 操作,都是锁住了整个 table,效率低下,因此 并不适合高并发场景。
Collections.synchronizedMap(Map)

SynchronizedMap内部维护了一个普通对象Map和一个排斥锁mutex。我们在调用这个方法的时候就需要传进去一个map,类中有两个构造器,如果你传入了mutex参数,则将对象排斥锁赋值为你传入的对象。如果没有,则将对象排斥锁赋值为this,即调用synchronizedMap的对象,就是上面的Map。创建出synchronizedMap之后,再操作Map的时候,就会对方法上锁。
例如:
public int size() { synchronized (mutex) {return m.size();}}
public boolean isEmpty() {synchronized(mutex) {return m.isEmpty();}}

快速失败(fail-fast)

快速失败是Java集合的一种机制,当使用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改,则会抛出Concurrent Modification Exception。迭代器在遍历时会直接访问集合中的内容,并且在遍历过程中,每当集合内容发生变化,就会改变modCount的值。每当迭代器时候hasNext() / next()遍历下一个元素之前,都会检测modCount变量是否为expectedmodCount,是的话就返回继续遍历,否则抛出异常终止遍历。
这里异常抛出的条件是检测到modCount != expectedmodCount。如果集合发生变化时修改modCount,然后又刚好设置为expectmodCount值,则异常不会抛出,不能依赖这个异常是否抛出而进行并发操作的编程,这个异常只用于检测并发修改的bug。
应用场景:java.util包下的集合类都是快速失败的,不能在多线程下发生并发修改。算是一种安全机制。

安全失败(fail-safe)

java.util.concurrent包下的容器都是安全失败的。在遍历时不是直接在集合内容上访问的,而是先复制原有集合内容,在拷贝的集合上进行遍历。因此迭代时对拷贝集合的改变对原集合没有影响。因此可以在多线程下进行使用。

JDK 7中的ConcurrentHashMap
  1. 底层数据结构:在 JDK1.7中,本质上还是采用链表+数组的形式存储键值对的。但是,为了提高并发,把原来的整个 table 划分为 n 个 Segment 。所以,从整体来看,它是一个由 Segment 组成的数组。然后,每个 Segment 里边是由 HashEntry 组成的数组,每个 HashEntry之间又可以形成链表。我们可以把每个 Segment 看成是一个小的 HashMap,其内部结构和 HashMap 是一模一样的。当对某个 Segment 加锁时,并不会影响到其他 Segment 的读写。每个 Segment 内部自己操作自己的数据。这样一来,我们要做的就是尽可能的让元素均匀的分布在不同的 Segment中。最理想的状态是,所有执行的线程操作的元素都是不同的 Segment,这样就可以降低锁的竞争。
  2. 常用变量
//默认初始化容量,这个和 HashMap中的容量是一个概念,表示的是整个 Map的容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

//默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
v 
//默认的并发级别,这个参数决定了 Segment 数组的长度
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//最大的容量
static final int MAXIMUM_CAPACITY = 1 << 30;

//每个Segment中table数组的最小长度为2,且必须是2的n次幂。
//由于每个Segment是懒加载的,用的时候才会初始化,因此为了避免使用时立即调整大小,设定了最小容量2
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//用于限制Segment数量的最大值,必须是2的n次幂
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

//在size方法和containsValue方法,会优先采用乐观的方式不加锁,直到重试次数达到2,才会对所有Segment加锁
//这个值的设定,是为了避免无限次的重试。后边size方法会详讲怎么实现乐观机制的。
static final int RETRIES_BEFORE_LOCK = 2;

//segment掩码值,用于根据元素的hash值定位所在的 Segment 下标。后边会细讲
final int segmentMask;

//和 segmentMask 配合使用来定位 Segment 的数组下标,后边讲。
final int segmentShift;

// Segment 组成的数组,每一个 Segment 都可以看做是一个特殊的 HashMap
final Segment<K,V>[] segments;

//Segment 对象,继承自 ReentrantLock 可重入锁。
//其内部的属性和方法和 HashMap 神似,只是多了一些拓展功能。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
 
 //这是在 scanAndLockForPut 方法中用到的一个参数,用于计算最大重试次数
 //获取当前可用的处理器的数量,若大于1,则返回64,否则返回1。
 static final int MAX_SCAN_RETRIES =
  Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

 //用于表示每个Segment中的 table,是一个用HashEntry组成的数组。
 transient volatile HashEntry<K,V>[] table;

 //Segment中的元素个数,每个Segment单独计数(下边的几个参数同样的都是单独计数)
 transient int count;

 //每次 table 结构修改时,如put,remove等,此变量都会自增
 transient int modCount;

 //当前Segment扩容的阈值,同HashMap计算方法一样也是容量乘以加载因子
 //需要知道的是,每个Segment都是单独处理扩容的,互相之间不会产生影响
 transient int threshold;

 //加载因子
 final float loadFactor;

 //Segment构造函数
 Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
     this.loadFactor = lf;
     this.threshold = threshold;
     this.table = tab;
 }
 
 ...
 // put(),remove(),rehash() 方法都在此类定义
}

// HashEntry,存在于每个Segment中,它就类似于HashMap中的Node,用于存储键值对的具体数据和维护单向链表的关系
static final class HashEntry<K,V> {
    //每个key通过哈希运算后的结果,用的是 Wang/Jenkins hash 的变种算法,此处不细讲,感兴趣的可自行查阅相关资料
    final int hash;
    final K key;
    //value和next都用 volatile 修饰,用于保证内存可见性和禁止指令重排序
    volatile V value;
    //指向下一个节点
    volatile HashEntry<K,V> next;

     HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
         this.hash = hash;
         this.key = key;
         this.value = value;
         this.next = next;
	}
}
  1. 构造函数:ConcurrentHashMap 有五种构造函数,但是最终都会调用同一个构造函数,所以只需要搞明白这一个核心的构造函数就可以了。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    //检验参数是否合法。值得说的是,并发级别一定要大于0,否则就没办法实现分段锁了。
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    //并发级别不能超过最大值
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    //偏移量,是为了对hash值做位移操作,计算元素所在的Segment下标,put方法详讲
    int sshift = 0;
    //用于设定最终Segment数组的长度,必须是2的n次幂
    int ssize = 1;
    //这里就是计算 sshift 和 ssize 值的过程  (1)
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    //Segment的掩码
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    //c用于辅助计算cap的值   (2)
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // cap 用于确定某个Segment的容量,即Segment中HashEntry数组的长度
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //(3)
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    //这里用 loadFactor做为加载因子,cap乘以加载因子作为扩容阈值,创建长度为cap的HashEntry数组,
    //三个参数,创建一个Segment对象,保存到S0对象中。后边在 ensureSegment 方法会用到S0作为原型对象去创建对应的Segment。
    Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                    (HashEntry<K,V>[])new HashEntry[cap]);
    //创建出长度为 ssize 的一个 Segment数组
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    //把S0存到Segment数组中去。在这里,我们就可以发现,此时只是创建了一个Segment数组,
    //但是并没有把数组中的每个Segment对象创建出来,仅仅创建了一个Segment用来作为原型对象。
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
  1. put方法
    1. put方法的总体流程
      1. 通过哈希算法计算出当前 key 的 hash 值。
      2. 通过这个 hash 值找到它所对应的 Segment 数组的下标。
      3. 再通过 hash 值计算出它在对应 Segment 的 HashEntry数组的下标。
      4. 找到合适的位置插入元素 。
// ConcurrentHashMap类中的put方法
public V put(K key, V value) {
    Segment<K,V> s;
    //不支持value为空
    if (value == null)
        throw new NullPointerException();
    //通过 Wang/Jenkins 算法的一个变种算法,计算出当前key对应的hash值
    int hash = hash(key);
    //上边我们计算出的 segmentShift为28,因此hash值右移28位,说明此时用的是hash的高4位,
    //然后把它和掩码15进行与运算,得到的值一定是一个 0000 ~ 1111 范围内的值,即 0~15 。
    int j = (hash >>> segmentShift) & segmentMask;
    //这里是用Unsafe类的原子操作找到Segment数组中j下标的 Segment 对象
    if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        //初始化j下标的Segment
        s = ensureSegment(j);
    //在此Segment中添加元素
    return s.put(key, hash, value, false);
}

// ConcurrentHashMap类中的ensureSegment方法
private ConcurrentHashMap.Segment<K,V> ensureSegment(int k) {
    final ConcurrentHashMap.Segment<K,V>[] ss = this.segments; // 克隆当前整个Map
    // u代表 k 的偏移量,用于通过 UNSAFE 获取主内存最新的实际 K 值
    long u = (k << SSHIFT) + SBASE;
    ConcurrentHashMap.Segment<K,V> seg;
    if ((seg =(ConcurrentHashMap.Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // //从内存中取到最新的下标位置的 Segment 对象,对象为空则创建对象
        ConcurrentHashMap.Segment<K,V> proto = ss[0]; // 使用之前构造函数中初始化的S0作为原型对象,创建新的Segment对象
        int cap = proto.table.length; // 容量
        float lf = proto.loadFactor; // 加载因子
        int threshold = (int)(cap * lf); // 阈值
        ConcurrentHashMap.HashEntry<K,V>[] tab = (ConcurrentHashMap.HashEntry<K,V>[])new ConcurrentHashMap.HashEntry[cap]; // 先把Segment中的桶数组创建好
        if ((seg = (ConcurrentHashMap.Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // 再次从主存读取判断Map中k位置上Segment对象是否为空
            ConcurrentHashMap.Segment<K,V> s = new ConcurrentHashMap.Segment<K,V>(lf, threshold, tab); // 此处把Segment对象创建出来并赋值位s
            while ((seg =  (ConcurrentHashMap.Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // 循环检查 K 下标位置的 Segment 是否为空,若不为空,则说明其他线程抢先创建成功,并且已经成功同步到主存中,如果已经被创建成功则直接返回
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break; // 使用UNSAFE中的原子操作将ss中偏移量为u的元素替换为刚创建的Segment变量s,若成功就跳出循环,否则就一直自旋直到成功,或者seg不为空(其他线程导致)。
            }
        }
    }
    return seg;
    /* 上面代码中三次判断分段锁中第k段是否有元素,原因如下。
    在多线程环境下,因为不确定是什么时候会有其它线程 CAS 成功,有可能发生在以上的任意时刻。所以,只要发现一旦内存中的对象已经存在了,则说明已经有其它线程把Segment对象创建好,并CAS成功同步到主内存了。此时,就可以直接返回,而不需要往下执行了。这样做,是为了代码执行效率考虑。
    */
}

// 该方法是Segment类中的put方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value); 
    /*
    这里通过tryLock尝试加锁,如果加锁成功,返回null,否则执行 scanAndLockForPut方法自旋。
    这里说明一下,tryLock 和 lock 是 ReentrantLock 中的方法,
    区别是 tryLock 不会阻塞,抢锁成功就返回true,失败就立马返回false,
    而 lock 方法是,抢锁成功则返回,失败则会进入同步队列,阻塞等待获取锁。
    */
    V oldValue;
    try {
    	// 获取当前Segment的桶数组
        ConcurrentHashMap.HashEntry<K,V>[] tab = table;
        // 和hashmap中类似,通过 hash % (tab.length - 1)来获取元素在指定segment中的桶下标
        int index = (tab.length - 1) & hash;
        // 取出index桶的第一个元素
        ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index);
        for (ConcurrentHashMap.HashEntry<K,V> e = first;;) { // 根据第一个元素,遍历该桶
            if (e != null) {
                K k; // 遍历桶中的每一个元素,如果发现桶中有和当前插入元素key相同的元素,则替换
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) { // onlyIfAbsent是一个boolean类型的变量,用来记录如果当前map中插入key重复的元素是否进行更新
                        e.value = value;
                        ++modCount; // 改变了桶中的元素,要将modCount++,该值是在size操作时用来记录是否在统计期间map中是否有值更新
                    }
                    break;
                }
                e = e.next;
            }
            else { // 如果找到最后都没有key重复的元素
                if (node != null) // 如果node不为空则直接头插
                    node.setNext(first);
                else // 如果node为空则先new一个node,然后头插
                    node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 如果插入后Segment中的元素个数大于阈值,并且没有超过容量最大值,则进行扩容并插入
                else
                    setEntryAt(tab, index, node); // 否则就在当前桶数组的第index桶中通过头插的方式插入node
                ++modCount; 
                count = c; // 并更新count
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock(); // 更新之后释放锁
    }
    return oldValue;
}

// Segment类中的scanAndLockForPut方法
// 该方法可以确保返回时,当前线程一定是获取到锁的状态
private ConcurrentHashMap.HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
	// 根据hash值定位到它对应的HashEntry数组的下标位置,并找到链表的第一个节点
	// 注意,这个操作会从主内存中获取到最新的状态,以确保获取到的first是最新值
    ConcurrentHashMap.HashEntry<K,V> first = entryForHash(this, hash);
    ConcurrentHashMap.HashEntry<K,V> e = first;
    ConcurrentHashMap.HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) { // 通过循环的方式尝试获取锁,若获取成功就返回
        ConcurrentHashMap.HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) { // 如果该桶中没有元素并且node节点为空,就先试探性地创建一个节点
                if (node == null)
                    node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, null); // 为什么说是试探性地,由于在循环过程中还没有获取到锁,因此可以预先做一些其他事,比如创建新节点,这样在返回put方法后就不需要再去创建这个节点,而可以直接进行头插。
                retries = 0; 
            }
            else if (key.equals(e.key)) // 如果说在该桶中存在与要插入节点key相同的节点,就不会继续往下遍历,而是直接统计获取次数
                retries = 0;
            else // 判断该桶中的下一个元素的key是否与要插入的key相同
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            lock(); // 若获取次数超过最大尝试次数,则直接对lock方法加锁,通过lock方法,线程只有成功获取到锁才会返回,否则就会在同步队列中等待。
            break;
        }
        else if ((retries & 1) == 0 && // 每第偶数次获取锁都会判断当前桶第一个节点是否发生变化了,如果其他线程有新值插入了该桶,那就将计数器重置,并重新遍历当前链表
                (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

  1. rehash方法
    1. 当put方法时发现元素个数超过了阈值,则会扩容。需要注意的是,每个Segment只会检查自己的容量是否查过阈值,互相之间并不影响。
// node节点为新插入的节点
private void rehash(ConcurrentHashMap.HashEntry<K,V> node) {
	// 该分段中的旧哈希表
    ConcurrentHashMap.HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 创建一个新的哈希表
    ConcurrentHashMap.HashEntry<K,V>[] newTable =
            (ConcurrentHashMap.HashEntry<K,V>[]) new ConcurrentHashMap.HashEntry[newCapacity];
    // 计算出新的掩码,用来计算节点在哈希表中的位置
    int sizeMask = newCapacity - 1;
    // 接下来的for循环会将旧桶中的元素移动到新桶中
    for (int i = 0; i < oldCapacity ; i++) {
    	// 遍历老表中的每个元素
        ConcurrentHashMap.HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            ConcurrentHashMap.HashEntry<K,V> next = e.next;
            int idx = e.hash & sizeMask; // 计算该元素在新哈希表中的位置,要么在原位置,要么为原下标 + 原数组长度
            if (next == null) // 如果原表中某个桶中只有一个元素,则直接将它插入新表的新桶中
                newTable[idx] = e;
            else {
            	/ *
            	接下来的这个for循环中采用了一种优化,他找出了在旧桶桶底那些在新桶中位置相同的连续的数个节点(注意,是在桶底部往上连续的,也就是说如果遍历过程中遇到一个新元素与之前记录的lastRun不同,那么要同时更新lastRun和lastIdx为当前元素的,这样才能确保找到一个元素lastRun,使得他后面的元素都跟他在新桶中是属于同一个桶中的,才可以使用这种优化),后续直接将lastRun以及它后面的元素一次性加入新桶,剩余的元素不能使用优化,只能挨个去放
            	*/
                ConcurrentHashMap.HashEntry<K,V> lastRun = e;
                int lastIdx = idx; // 初始情况下假设首节点以下的节点在新桶中的位置都一样
                for (ConcurrentHashMap.HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) { 
                     // 元素在新桶中的位置
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                    	// 如果记录的lastRun与当前元素在新桶中位置不同,则替换lastRun为当前元素,这样我们就可以确保,找到的那个lastRun和他所在旧桶中后面的所有元素在新桶中都在同一个位置
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun; // 这行代码就是优化的关键,新桶中lastIdx绝对现在是没有东西的,所以直接将lastRun为首的后续节点全放在这个桶中
                for (ConcurrentHashMap.HashEntry<K,V> p = e; p != lastRun; p = p.next) { // 后续的这些节点将不能使用上述优化,因此只能挨个计算新位置,重新插入
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    ConcurrentHashMap.HashEntry<K,V> n = newTable[k]; // 这一步将原来桶中的元素都提在了新一个临时桶中
                    newTable[k] = new ConcurrentHashMap.HashEntry<K,V>(h, p.key, v, n);
                    // 然后建立一个新节点,并将刚刚记录的临时桶作为当前节点的next节点插入,因此可以看出来1.7中ConcurrentHashMap使用的是头插法插入新节点
                }
            }
        }
    }
    // 将新节点插入我们的新表中
    int nodeIndex = node.hash & sizeMask; 
    // 也是头插法,先将目标桶中的元素放到该节点下面
    node.setNext(newTable[nodeIndex]);
    // 然后将该节点整个放到哈希表中目标桶的位置
    newTable[nodeIndex] = node;
    // 最后将此Segment中的table替换为上述已经完成了表转移和插入新节点后的newTable
    table = newTable;
}
  1. get方法
    1. 先定位到Segment
    2. 再定位到HashEntry
public V get(Object key) {
    ConcurrentHashMap.Segment<K,V> s; // manually integrate access methods to reduce overhead
    ConcurrentHashMap.HashEntry<K,V>[] tab;
    int h = hash(key); // 计算节点hash值
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 对应segment在segments中的偏移量
    if ((s = (ConcurrentHashMap.Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            // 通过hash定位在HashTable中的偏移量找到对应的桶,并遍历
        for (ConcurrentHashMap.HashEntry<K,V> e = (ConcurrentHashMap.HashEntry<K,V>) UNSAFE.getObjectVolatile
                (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k; // 选择该桶中key对应的value
            if ((k = e.key) == key /*key是值可能直接用 == 判断是否相等 */ || (e.hash == h && key.equals(k) /* 或者说哈希值相等且key为对象时,对象equals */))
                return e.value;
        }
    }
    return null; // 找了一圈没找到返回null
}
  1. remove方法

// ConcurrentHashMap中的remove方法
public V remove(Object key) {
	int hash = hash(key);
	// 定位到segment
	Segment<K, V> s = segmentForHash(hash);
	// 不为空则执行remove
	return s == null ? null : s.remove(key, hash, null);
}

// ConcurrentHashMap中的remove方法
public boolean remove(Object key, Object value) {
 	int hash = hash(key);
 	Segment<K,V> s;
 	return value != null && (s = segmentForHash(hash)) != null &&
  	s.remove(key, hash, value) != null;
}

// Segment中的remove方法
final V remove(Object key, int hash, Object value) {
    if (!tryLock()) // 尝试直接加锁,若失败,则执行scanAndLock,此方法和scanAndLockForPut方法类似
        scanAndLock(key, hash);
    V oldValue = null;
    try {
        ConcurrentHashMap.HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        // 从主存中获取key对应table的最新的桶的头结点
        ConcurrentHashMap.HashEntry<K,V> e = entryAt(tab, index);
        ConcurrentHashMap.HashEntry<K,V> pred = null;
        while (e != null) {
            K k;
            ConcurrentHashMap.HashEntry<K,V> next = e.next;
            // 通过key和hash进行元素匹配
            if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                V v = e.value;
                if (value == null || value == v || value.equals(v)) { // 当value为空或者 value也匹配上时,我们直接删除
                    if (pred == null) // 如果该元素是该桶中的第一个元素,则直接将next设置为该桶的头结点
                        setEntryAt(tab, index, next);
                    else // 否则,将上一个节点的next设置为下一个节点
                        pred.setNext(next);
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }
            pred = e; // 记录上一个元素
            e = next;
        }
    } finally {
        unlock();
    }
    return oldValue;
}
  1. size()方法
    1. 并发情况下,如果将每个segment的元素统计并求和可能会导致元素统计不准确,因为有可能表是实时在变化的,因此JDK 1.7采用了一种乐观方法,对每个segment设置一个modCount,也就是说只要整个ConcurrnetHashMap有一点变化,那modCount的值就会发生改变。那么我们统计前可以记录modCount,统计得出结果后如果modCount没有变化,那就说明统计结果是正确的,就可以使用。JDK 1.7中最多会进行三次乐观方法(第一次用来计算一个modCount的总和,后续共2次重试,并会在第一次重试失败后更新modCount总和)的统计,如果三次统计modCount变化,则直接使用加锁的方式进行统计。
public int size() {
    final ConcurrentHashMap.Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) { // 对每个段进行加锁
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                ConcurrentHashMap.Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) { // 对每个段进行modCount的统计
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true; // 如果大小溢出,直接记录overflow
                }
            }
            // 第一次尝试只是为了计算last,用于后续比较
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock(); // 如果使用了锁就要释放锁
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}
JDK 8中的ConcurrentHashMap
  1. 底层数据结构:JDK1.8中的ConcurrentHashMap完全重构了1.7.不再有分段锁的概念,只是为了向下兼容才申明了。JDK1.8中的ConcurrentHashMap使用了Synchronized锁,他给每一个桶都加锁,锁的粒度降低了。底层使用数组 + 链表 + 红黑树的结构,相比HashMap多了一些并发处理。
  2. put()方法
    1. 根据key计算出hashcode。
    2. 判断是否需要进行初始化。
    3. 当前桶空则可以使用CAS尝试写入,失败则自选保证成功。
    4. 如果当前桶的hash为MOVED(-1),则表示数组正在进行扩容,需要当前线程帮忙迁移数据。
    5. 如果都不满足,则利用synchronized锁写入数据。
    6. 如果数量大于TREEIFY_THRESHOLD则要转换为红黑树。
public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
	// 在并发情况下,key和value都不支持为空。
    if (key == null || value == null) throw new NullPointerException();
    // Object中的本地方法hashCode()可以为每一个对象返回一个不同的值,再使用spread对返回值的高低位做异或运算(让键的高位也参与了运算,试想,如果两个对象hash的高位大不相同,低位全都相同,那他们很有可能在同一个桶中,但是如果将其hash值的低位与高位进行位运算处理,就可以让hash值的低位保留一些高位的特征,可以降低hash冲突,为什么非要采用异或运算?因为&运算会将计算出来的值向0靠拢,|运算会将计算出来的值向1靠拢,而异或运算较平均的使得低位获得高位的特征),最后将结果 & HASH_BITS(0x7fffffff),保证最高位的1个bit位总是0。
    int hash = spread(key.hashCode());
    // 用来计算当前链表上的元素个数
    int binCount = 0;
    for (ConcurrentHashMap.Node<K,V>[] tab = table;;) {
        ConcurrentHashMap.Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); // hash表为空则初始化(只有一个线程可以成功)
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        	// 若当前桶为空,则通过CAS操作,把新节点插入到桶中
            if (casTabAt(tab, i, null,
                    new ConcurrentHashMap.Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
        	// 如果当前桶不为空,当前桶的hash值为MOVED(值为-1),表示当前数组正在扩容,则需要当前线程帮忙迁移数据
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 这里采用加同步锁的方式,来保证线程安全,给桶中的第一个节点对象加锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                    	// 如果hash值大于等于0则说明是正常的链表结构
                        binCount = 1;
                        for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 如果发现要插入元素的key已经存在,且该表支持重复键插入,则更新旧值为新value
                                oldVal = e.val; 
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            ConcurrentHashMap.Node<K,V> pred = e;
                            if ((e = e.next) == null) { // 如果遍历到最后一个元素都没有发现重复值,则新建一个节点,为当前末尾节点的next节点
                            	pred.next = new ConcurrentHashMap.Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof ConcurrentHashMap.TreeBin) { // 判断桶内部是不是已经转换为红黑树
                        ConcurrentHashMap.Node<K,V> p;
                        binCount = 2;
                        // 将节点插入红黑树,如果存在key相同的节点则使用新值替换旧值
                        if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key,
                                value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
            	// 如果节点个数大于等于8,则转换为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 给元素个数加1,并有可能出发扩容
    addCount(1L, binCount);
    return null;
}
  1. initTable()方法
private final ConcurrentHashMap.Node<K,V>[] initTable() {
    ConcurrentHashMap.Node<K,V>[] tab; int sc;
    // 循环保证当前表初始化成功
    while ((tab = table) == null || tab.length == 0) {
    	/*
    		sizeCtl 这个值有很多情况,默认值为0,
    		当为 -1 时,说明有其它线程正在对表进行初始化操作,
    		当表初始化成功后,又会把它设置为扩容阈值,
    		当为一个小于 -1 的负数,用来表示当前有几个线程正在帮助扩容
    	*/
        if ((sc = sizeCtl) < 0)
        	// 小于0,其实此时就是-1(说明有其它线程正在对表进行初始化操作),因为表是空的,因此,当前线程放弃CPU时间片,只是自旋
            Thread.yield();
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
        	// 通过 CAS 把sizeCtl的值设置为-1,表明当前线程正在进行表的初始化,其它失败的线程就会自旋
            try {
            	// 再次确认表是否没有被初始化
                if ((tab = table) == null || tab.length == 0) {
                	// 表示容量,当调用有参数构造时sizeCtl的值是大于零的
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n];
                    table = tab = nt;
                    // n减去 1/4 n,即为0.75n,表示扩容阈值
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            // 当前线程如果初始化成功,其他线程因为数组不为空,也会停止自旋
            break;
        }
    }
    return tab;
}
  1. addCount()方法
    • 若元素插入成功之后,则会调用此方法,传入参数addCount(1L, binCount)。这个方法的目的是把整个table的元素个数加1。实现比较困难。考虑使用volatile保证多线程之间的内存可见性,然后通过CAS乐观锁实现自增,但如果在特别高并发的情况下,将会造成特别严重的竞争。所以,基于此,这里做了更好的优化。让这些竞争的线程,分散到不同的对象里边,单独操作它自己的数据(计数变量),用这样的方式尽量降低竞争。到最后需要统计 size 的时候,再把所有对象里边的计数相加就可以了。上边提到的 size ,在此用 baseCount 表示。分散到的对象用 CounterCell 表示,对象里边的计数变量用 value 表示。注意这里的变量都是 volatile 修饰的。当需要修改元素数量时,线程会先去 CAS 修改 baseCount 加1,若成功即返回。若失败,则线程被分配到某个 CounterCell(格子),然后操作 value 加1。若成功,则返回。否则,给当前线程重新分配一个 CounterCell,再尝试给 value 加1。(这里简略的说,实际更复杂)。CounterCell 会组成一个数组,也会涉及到扩容问题。

@sun.misc.Contended static final class CounterCell {
	// 此格子内记录的value值
	volatile long value;
	CounterCell(long x) { value = x; }
}

// 用来存储线程和线程生成的随机数的对应关系
static final int getProbe() {
	return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

// x为1,check代表链上的元素个数
private final void addCount(long x, int check) {
    ConcurrentHashMap.CounterCell[] as; long b, s;
    /* 
    	此处有两种情况要进入if
    	1. 数组不为空,则说明数组已经被创建好。
    	2. 数组为空,说明数组还未创建,很有可能竞争的线程非常少,因此就直接通过CAS尝试更新baseCount,失败则进入if
    */
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        ConcurrentHashMap.CounterCell a; long v; int m;
        // 字面意思,是无竞争,这里先标记为 true,表示还没有产生线程竞争
        boolean uncontended = true;
        /* 
        	这里有三种情况会进入fullAddCount方法
        	1. 若数组为空,进入fullAddCount。
        	2. ThreadLocalRandom.getProbe()方法会给当前线程生成一个随机数(可以简单认为是hash值),然后用随机数与格子的长度取模,计算该线程所在的格子。若当前线程所分配到的格子为空,进入fullAddCount。
        	3. 若数组不为空,且当前线程所在的格子也不为空, 则尝试通过CAS修改此格子对应的value值加1,若修改失败,则说明有线程竞争,则设置uncontended为false,并进入fullAddCount。
        */
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount(); // 计算总共元素的个数
    }
    // 这里检查是否需要扩容
    if (check >= 0) {
        ConcurrentHashMap.Node<K,V>[] tab, nt; int n, sc;
        // 若元素个数达到扩容阈值,且tab不为空,且tab数组长度小于最大容量
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

//计算表中的元素总个数
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    //baseCount,以这个值作为累加基准
    long sum = baseCount;
    if (as != null) {
        //遍历 counterCells 数组,得到每个对象中的value值
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                //累加 value 值
                sum += a.value;
        }
    }
    //此时得到的就是元素总个数
    return sum;
}

//扩容时的校验标识
static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

/*  该方法为addCount方法中某些情况下的跳转,通过该方法全力增加元素个数,一定会成功。     参数分别为 1, false
	该方法中有三种情况,当counterCells数组不为空时,则选择当前线程的随机数所对应的格子。进行CAS加锁(类似于AQS中的state),获取到volatile变量后,然后在格子中实现自增(通过CAS),然后返回。如果数组为空,则通过CAS获取锁,如果获取到则创建一个长度为2的counterCells数组,并实现自增。否则,通过CAS直接对数组长度强行增加1(自旋直至成功)。
*/
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    // 如果当前线程的随机数为0,则强制初始化一个值
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        // 此时认为无竞争
        wasUncontended = true;
    }
    // 用来表示比 contend(竞争)更严重的碰撞,若为true,表示可能需要扩容,以减少碰撞冲突
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        ConcurrentHashMap.CounterCell[] as; ConcurrentHashMap.CounterCell a; int n; long v;
        if ((as = counterCells) != null && (n = as.length) > 0) {
        	// 若当前线程所在的格子(CounterCell对象)为空
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {          
                // 若无锁,则乐观的创建一个 CounterCell 对象。
                    ConcurrentHashMap.CounterCell r = new ConcurrentHashMap.CounterCell(x);
                    // 尝试加锁
                    if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        // 加锁成功后需要再检查前置条件是否成立
                        try {               // Recheck under lock
                            ConcurrentHashMap.CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                // 在新的格子上创建对象,表示已经实现自增
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            else if (counterCells != as || n >= NCPU)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 &&
                    U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        ConcurrentHashMap.CounterCell[] rs = new ConcurrentHashMap.CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
		//2.这里的 cellsBusy 参数非常有意思,是一个volatile的 int值,用来表示自旋锁的标志,
		//可以类比 AQS 中的 state 参数,用来控制锁之间的竞争,并且是独占模式。简化版的AQS。
		//cellsBusy 若为0,说明无锁,线程都可以抢锁,若为1,表示已经有线程拿到了锁,则其它线程不能抢锁。
        else if (cellsBusy == 0 && counterCells == as &&
                U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    ConcurrentHashMap.CounterCell[] rs = new ConcurrentHashMap.CounterCell[2];
                    rs[h & 1] = new ConcurrentHashMap.CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 直接通过CAS将baseCount值 + 1
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

  1. transfer方法
    • 需要注意的一点,前面虽然一直在说帮助扩容,其实更准确的说应该是帮助迁移元素。因为扩容的第一次初始化新表的动作只能由一个线程完成。其他线程仅仅能帮助迁移元素到新数组。 需要设定一个固定值stride作为步长,一个transferIndex(table.baseCount - 1)来表示所有线程协助迁移总共推进到的元素的下标位置。每个线程迁移时都会从后往前分配一定步长的数据作为迁移对象。当每个线程前已完成它的范围内的数据的迁移时,都会继续帮助transferIndex向前推进,直到transferIndex为0,表示数组迁移结束。
private final void transfer(ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    if (nextTab == null) {          
        try {
            @SuppressWarnings("unchecked")
            ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {     
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false;
    for (int i = 0, bound = 0;;) {
        ConcurrentHashMap.Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n;
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true;
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    ConcurrentHashMap.Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        ConcurrentHashMap.Node<K,V> lastRun = f;
                        for (ConcurrentHashMap.Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof ConcurrentHashMap.TreeBin) {
                        ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f;
                        ConcurrentHashMap.TreeNode<K,V> lo = null, loTail = null;
                        ConcurrentHashMap.TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (ConcurrentHashMap.Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}
时间并不会因为你的迷茫和迟疑而停留,就在你看这篇文章的同时,不知道有多少人在冥思苦想,在为算法废寝忘食,不知道有多少人在狂热地拍着代码,不知道又有多少提交一遍又一遍地刷新着OJ的status页面…… 没有谁生来就是神牛,而千里之行,始于足下!
原文地址:https://www.cnblogs.com/bianjunting/p/14590225.html