深入ConcurrentHashMap一

ConcurrentHashMap可以做到比較高性能的并发訪问。原因是锁分段,及get不用加锁,就算堵塞时用的是ReentrantLock。


ConcurrentHashMap内部有一个Segment数组,每一个Segment有一个lock。


Segment相当于是一个子map,拥有一个HashEntity数组。
这样能够将并发压力分摊到多个Segment上。



ConcurrentHashMap组成图:


这里的Segment个数必须为2的n次方,为了之后高效计算要存放的key存放在哪个Segment上(用  << 实现)。

Segment内部拥有一个类型为volatile的HashEntry数组,这是为了可以让其他线程看到最新的值。

ConcurrentHashMap拥有put,get,remove等操作。而原有的扩容操作。为了性能则仅仅针对单个Segment来进行。


关键的Segment相当于ConcurrentHashMap中的子map。它包括了一个HashEntry数组。它的类图例如以下:

这里关键的Segment类图例如以下:



HashEntry内部存储了hash值。final的key值,及volatile的value及next值。

这里value及next值是volatile是为实现get操作的不须要加锁提供了基础。

HashEntry类图例如以下:



以下分别从ConcurrentHashMap创建,往ConcurrentHashMap放入元素。从ConcurrentHashMap取出元素来进行分析。

一.ConcurrentHashMap创建

在用ConcurrentHashMap时,能够在构造函数中传入你想要的map容量,loadFactor装载因子,并发数(大致决定了有几个Segment)。


假设不传入,使用空构造函数时,默认的map容量大小为16,loadFactor为0.75,并发数为16。
我们以默认的參数開始分析。
首先须要确定总容量大小(这里指全部Segment中存放数组元素的数组大小总和),Segment总数,以及每一个Segment中HashEntry数组的大小。


我们先附上代码:

 public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
 }


能够看到,对于Segment个数ssize取的是以concurrencyLevel为上界的2的n次方,这里因为concurrencyLevel为16。所以ssize也为16。


然后计算segmentShift,这个值用于兴许在存放元素到ConcurrentHashMap时。用于决定有多少位hash高位參与计算存放元素的Segment数组下标。
这里segmentShift为32-sshift。sshift取的是Segment个数ssize的2的n次数的n值。即在计算ssize时左移的次数。这里为4。
所以segmentShift为28。


接着计算出segmentMask,这个值用于在计算得到存放元素的Segment所在数组的下标,这里为了效率使用 & 操作来替代%模操作。
这里因为Segment个数为16,所以segmentMask为15。


之后会计算出每一个Segment的数组容量。这里先计算出每一个Segment的大致大小,即用int c = initialCapacity / ssize; 来得出c,这里c为1。
之后会初始化一个cap作为真正的segment中HashEntry数组大小,将它初始化为2。
然后为了保证得出的segMent中HashEntry的大小为2的n次方,所以兴许会对cap做下面操作:
while (cap < c)
  cap <<= 1;
即cap的值要不是2(c值小于等于2时),或者是大于2的2的n次方。


得到终于每一个Segment中HashEntry数组大小后,创建一个Segment数组。并在Segment数组0处初始化创建一个Segment。


最后将ConcurrentHashMap的segments设置为新创建的Segments数组。


这里能够看到最后将创建的Segment s0是调用UNSAFE.putOrderedObject(ss,SBASE,s0)来将其放入到ss数组中。

这里的UNSAFE.putOrderedObject是JAVA提供的用于直接操作内存的方法。当中參数ss是数组。

UNSAFE.putOrderedObject会延迟更新到内存中,可是因为兴许在获取segment数组中的segment时,採用的是UNSAFE.getObjectVolatile,所以可以保证对于segment放入数组的操作对于兴许的线程是可见的。

SBASE是ConcurrentHashMap的一个静态final long的值,相当于是Segment数组的首地址。

s0则是创建的须要放入ss数组中的segment实例。这里是将s0放入到ss数组位置0中。

假设提供类似:UNSAFE.putOrderedObject(ss,SBASE+offset,s)表明将元素s放入到数组ss的SBASE+offset位置。

当中这里的offset:

offset=步长*要放在数组第几位

步长通常是在编译期已经确定,这里測试过对于:Double或者Integer等。步长都是4。


原文地址:https://www.cnblogs.com/wgwyanfs/p/7060859.html