ConcurrentHashMap:源码分析到面试题

在多线程情况下,我们的HashMap在JDK1.8之前最大的问题就是会造成环链,在JDK1.8开始之后虽然解决了环链,但是还是会因为并发的情况下,导致数据覆盖而丢失。虽然我们有HashTable和Collections下的同步器可以解决这个问题,但是这两种方案都不能算是一个优秀的解决方案,所以就有了我们要介绍的ConcurrentHashMap。本文主要是针对JDK1.8的源码进行分析,但是在介绍之前也会简单提一下,1.8之前是如何设计的!

在了解ConcurrentHashMap不妨先了解一下HashMap

JDK1.7

我们在JDK1.8之前采用的是SegmentHashEntry的方式实现的。结构如下:

我们是采用分段锁来实现并发的更新。Segment是继承自我们的ReentrantLock来充当锁的角色,每一个Segment都对应一个锁。从图中我们也可以看到,我们的每一个Segment对象都对应了哈希表的若干个哈希桶,相当于一小段哈希表!

这样我们在实现并发更新的时候,就不会锁住这个哈希表,而是锁住Segment对应的那一个对象那一部分,就会提高了我们的性能和效率。具体的源码这里就不分析了,因为我们主要是介绍1.8的ConcurrentHashMap。

JDK1.8

我们的ConcurrentHashMap在1.8之后就放弃了分段锁的解决方案,而是采用了CAS+Synchronized来保证并发更新的安全。底层和我们的HashMap一样,采用的是数组+链表+红黑树的存储结构!

我们在上面说到了1.8是采用CAS+Synchronized来保证并发安全,所以在如果对CAS还不了解的话,可以先看我的关于CAS的博客。(点击跳转

好了接下来我们就开始对源码进行分析了。

基本属性

ConcurrentHashMap很多基本属性都和我们的HashMap一样,所以这里我只介绍几个不一样的,而且后面我们分析源码会用到的。

//我们的哈希表,可是使用迭代器来进行迭代
transient volatile Node<K,V>[] table;
//默认为null,扩容的时候新生成的数组,其大小为原数组的两倍。
private transient volatile Node<K,V>[] nextTable;
//基础计数器,通过CAS来进行更新
private transient volatile long baseCount;
/*
*默认为0,用来控制table的初始化和扩容操作的
*当为负数时,它正在进行初始化或者在扩容:
*-1,表示正在进行初始化;-N表示N-1个线程在进行扩容
*当为正数的时候:
*如果table未初始化,表示需要初始化的大小;
*如果table初始化完成,表示table的容量,默认是table的0.75倍,
*/
private transient volatile int sizeCtl;

还有就是对比我们的HashMap,我们的Node也进行了重写,将我们的值和下一个结点都用了Volatile来修饰,线程修改后立刻刷回主存,增加了内存的可见性。

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
...部分代码省略...
}

构造方法

ConcurrentHashMap有五个构造方法,其中四个与HashMap类似,所以我们主要介绍这个多了一个参数的构造方法

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

我们第一个参数是容量大小,可以指定;第二个参数是我们的负载装载因子;第三个是指定我们的更新的并发线程数量;然后进行一些边界处理和赋值处理。最后就将我们的要扩容的大小赋值给了sizeCtl(上面介绍了,我们下次要扩容的大小),注意这里我们并没有进行初始化table,而是在第一次put的时候才会进行初始化,下面会讲到。

我们同样会在上面的构造方法里面看到一个方法tableSizeFor,我们点进去看,原来和我们的HashMap的那个设计容量为2的整数次幂方法一样,至于为什么要设置成2的整数次幂,我在HashMap方法里面也提到了。

    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

put方法

我们知道在进行第一次put的时候会进行扩容,那么如果有多个线程同时进来,我们是如何保证只有一个线程成功的进行了扩容呢?我们在第一次put的时候putVal方法里面有这么一行代码

 if (tab == null || (n = tab.length) == 0)
      tab = initTable();

我们调用了initTable方法,在下面注释上给出解析

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //如果sizectl(sc)小于0,说明已经有线程进行在初始化了,我们的其他进来的线程作罢
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //使用cas操作,将我们的sc更新为-1,代表在进行初始化了
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //右移两位再操作,相当于0.75*n,设置了一个扩容的阈值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

回来我们继续看一下我们的完整的putVal方法

 final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
     //key的散列,获取哈希值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果表没有进行初始化,则进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //如果可以直接将我们的哈希值插入数组,则直接存进去,不用加锁
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果插入的点是我们的table的连接点,说明在扩容,我们就帮助当前线程扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //然后在进行具体的增加操作的时候,加锁
                synchronized (f) {
                    //确定f在tab中是链表的头结点
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果哈希桶是树,按照树的方式插入新值
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                //如果节点大于等于8,进行变换红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        //调用生成树的方法
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
     //能执行到这一步,说明节点不是被替换的,是被插入的,所以要将map的元素数量加1
        addCount(1L, binCount);
        return null;
    }

当table容量不足的时候,即table的元素数量达到容量阈值sizeCtl,需要对table进行扩容。
整个扩容分为两部分:

  1. 构建一个nextTable,大小为table的两倍。
  2. 把table的数据复制到nextTable中。

这两个过程在单线程下实现比较简单,但是在多线程下比较复杂。我们的ConcurrentHashMap是支持并发插入的,这里用图文简单分析一下:

多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。

(这里具体的addCount方法和transfer方法暂时看的不是大懂,后面会补上!)

get方法

get方法比较简单,就是如果是在桶第一个就返回;如果是树的结构调用树的方法去遍历查找;如果是链表就遍历下去查找;如果都没找到就返回null;

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法这么简单贴上来只是为了说明,我们的get方法是没有加锁的,无阻塞的。之所以能够正确的读取值是因为我们在上面也说到了,重写了node,里面的变量都用了volatile关键字来进行修饰。而且通过代码可以得出ConcurrentHashMap的key和Value都不能为null。

面试题分析

同样的,再进行了稍微的源码分析,我们试着来解决一些面试题。

1、ConcurrentHashMap使用什么技术来保证线程安全?

我们在上面分析过了,1.7的时候采用的Segment分段锁来实现,1.8采用的是CAS+Synchronized来实现的。具体实现细节,balabala简单描述一下。

2、ConcurrentHashMap的get方法是否要加锁,为什么?

不用,我们说过了,get方法是无阻塞不加锁的。因为我们重写了node类,里面的变量都用了volatile关键字来进行修饰,可以保证最新值的获取!

3、ConcurrentHashMap1.7和1.8的区别?

数据结构

  • 1.7:SegmentHashEntry
  • 1.8:数组+链表+红黑树

并发安全实现

  • 1.7:分段式锁(锁的对象是一个Segment)
  • 1.8:CAS+Synchronized(降低了锁的粒度,对象是一个Node)

其他的面试题,无非与HashMap大径相似,可以看看我的HashMap分析,里面也有面试题详解。(点击跳转)

总结

关于源码其实还有很多都没有分析,因为这比HashMap要复杂也难。所以挑一些高频考点来进行分析。感谢下面的参考资料!

参考资料

https://www.jianshu.com/p/e694f1e868ec

公众号《Java3y》多线程系列文章

https://blog.csdn.net/u010723709/article/details/48007881

原文地址:https://www.cnblogs.com/CryFace/p/13436635.html