ConcurrentHashMap

ConcurrentHashMap高并发实现机制:http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/

散列表应用场景:大多数是读取操作。
    读操作不需要加锁。完全并发的读取。
    通过减小请求同一个锁的频率和尽量减少持有锁的时间。

一、分离锁实现多个线程间的更深层次的共享访问。
    减小了请求同一个锁的频率。实现多个线程间的并发写操作。
    

二、HashEntry对象的不变性降低执行读操作的线程在遍历链表期间对加锁的需求。

三、通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性。




 
Segment 中执行具体的 put 操作
 V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();  // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap
            try {
                int c = count;
 
                if (c++ > threshold)     // 如果超过再散列的阈值
                    rehash();              // 执行再散列,table 数组的长度将扩充一倍
 
                HashEntry<K,V>[] tab = table;
                // 把散列码值与 table 数组的长度减 1 的值相“与”
                // 得到该散列码对应的 table 数组的下标值
                int index = hash & (tab.length - 1);
                // 找到散列码对应的具体的那个桶
                HashEntry<K,V> first = tab[index];
 
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;
 
                V oldValue;
                if (e != null) {            // 如果键 / 值对以经存在
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;    // 设置 value 值
                }
                else {                        // 键 / 值对不存在
                    oldValue = null;
                    ++modCount;         // 要添加新节点到链表中,所以 modCont 要加 1  
                    // 创建新节点,并添加到链表的头部
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c;               // 写 count 变量
                }
                return oldValue;
            } finally {
                unlock();                     // 解锁
            }
        }
        注意:这里的加锁操作是针对(键的 hash 值对应的)某个具体的 Segment,锁定的是该 Segment 而不是整个 ConcurrentHashMap。因为插入键 / 值对操作只是在这个 Segment 包含的某个桶中完成,不需要锁定整个ConcurrentHashMap。此时,其他写线程对另外 15 个Segment 的加锁并不会因为当前线程对这个 Segment 的加锁而阻塞。同时,所有读线程几乎不会因本线程的加锁而阻塞(除非读线程刚好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时需要加锁后重新读取该值)。
        相比较于 HashTable 和由同步包装器包装的 HashMap每次只能有一个线程执行读或写操作,ConcurrentHashMap 在并发访问性能上有了质的提高。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设置为 16),及任意数量线程的读操作。

二、用 HashEntery 对象的不变性来降低读操作对加锁的需求
 
        在代码清单“HashEntry 类的定义”中我们可以看到,HashEntry 中的 key,hash,next 都声明为 final 型。这意味着,不能把节点添加到链接的中间和尾部,也不能在链接的中间和尾部删除节点。这个特性可以保证:在访问某个节点时,这个节点之后的链接不会被改变。这个特性可以大大降低处理链表时的复杂性。
        同时,HashEntry 类的 value 域被声明为 Volatile 型,Java 的内存模型可以保证:某个写线程对 value 域的写入马上可以被后续的某个读线程“看”到。在 ConcurrentHashMap 中,不允许用 unll 作为键和值,当读线程读到某个 HashEntry 的 value 域的值为 null 时,便知道产生了冲突——发生了重排序现象,需要加锁后重新读入这个 value 值。这些特性互相配合,使得读线程即使在不加锁状态下,也能正确访问 ConcurrentHashMap。
 
        非结构性修改操作只是更改某个 HashEntry 的 value 域的值。由于对 Volatile 变量的写入操作将与随后对这个变量的读操作进行同步。当一个写线程修改了某个 HashEntry 的 value 域后,另一个读线程读这个值域,Java 内存模型能够保证读线程读取的一定是更新后的值。所以,写线程对链表的非结构性修改能够被后续不加锁的读线程“看到”。
对 ConcurrentHashMap 做结构性修改,实质上是对某个桶指向的链表做结构性修改。如果能够确保:在读线程遍历一个链表期间,写线程对这个链表所做的结构性修改不影响读线程继续正常遍历这个链表。那么读 / 写线程之间就可以安全并发访问这个 ConcurrentHashMap。
        结构性修改操作包括 put,remove,clear。下面我们分别分析这三个操作。
        clear 操作只是把 ConcurrentHashMap 中所有的桶“置空”,每个桶之前引用的链表依然存在,只是桶不再引用到这些链表(所有链表的结构并没有被修改)。正在遍历某个链表的读线程依然可以正常执行对该链表的遍历。
        从上面的代码清单“在 Segment 中执行具体的 put 操作”中,我们可以看出:put 操作如果需要插入一个新节点到链表中时 , 会在链表头部插入这个新节点。此时,链表中的原有节点的链接并没有被修改。也就是说:插入新健 / 值对到链表中的操作不会影响读线程正常遍历这个链表。
 

        删除节点 C 之后的所有节点原样保留到新链表中;删除节点 C 之前的每个节点被克隆到新链表中,注意:它们在新链表中的链接顺序被反转了。
 
 
 
三、用 Volatile 变量协调读写线程间的内存可见性
 
 
        上面的代码中,一开始就对volatile变量count进行了读取比较,这个还是java5对volatile语义增强的作用,这样就可以获取变量的可见性。所以count != 0之后,我们可以认为对应的hashtable是最新的,当然由于读取的时候没有加锁,在get的过程中,可能会有更新。当发现根据key去找元素的时候,但发现找得的key对应的value为null,这个时候可能会有其他线程正在对这个元素进行写操作,所以需要在使用锁的情况下在读取一下value,以确保最终的值。
 

【两个静态内部类 HashEntry 和 Segment】
        HashEntry 用来封装映射表的键 / 值对;
        Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。

 

 

【HashEntry 类】

key,hash 和 next 域都被声明为 final 型。

value 域被声明为 volatile 型。

 

static final class HashEntry<K,V>{

 final K key;

 final int hash;

 volatile V value;

 final HashEntry<K,V> next;

 HashEntry(K key, int hash, HashEntry<K,V> next, V value) {

            this.key = key;

            this.hash = hash;

            this.next = next;

            this.value = value;

        }

}

 

 

【Segment 类】

        Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。

        每个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。

        table 是一个由 HashEntry 对象组成的数组。table 数组的每一个数组成员就是散列映射表的一个桶。

        count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。注意,之所以在每个 Segment 对象中包含一个计数器,而不是在 ConcurrentHashMap 中使用全局的计数器,是为了避免出现“热点域”而影响 ConcurrentHashMap 的并发性。

        这样当需要更新计数器时,不用锁定整个 ConcurrentHashMap。

 

static final class Segment<K,V> extends ReentrantLock implements Serializable{

 //本 segment 范围内,包含的 HashEntry 元素的个数

 transient volatile int count;

 

 //table 被更新的次数

 transient int modCount;

 

 //HashEntry个数,触发 table 的再散列

 transient int threshold;

 

 //如果并发级别16, ConcurrentHashMap 包含的桶总数的 1/16

 transient volatile HashEntry<K,V>[] table;

 

 final float loadFactor;

 

 Segment(int initialCapacity, float lf){

  loadFactor = lf;

  setTable(HashEntry.<K,V>newArray(initialCapacity));

 }

 

 void setTable(HashEntry<K,V>[] newTable){

  threshold = (int)(newTable.length * loadFactor);

  table = newTable;

 }

 

 //根据 key 的散列值,找到 table 中对应的那个桶

 HashEntry<K,V> getFirst(int hash){

  HashEntry<K,V>[] tab = table;

  return tab[hash & (tab.length-1)];

 }

}

 

【ConcurrentHashMap 类】

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>  

        implements ConcurrentMap<K, V>, Serializable {

    static final int DEFAULT_INITIAL_CAPACITY= 16;

    static final float DEFAULT_LOAD_FACTOR= 0.75f;

    static final int DEFAULT_CONCURRENCY_LEVEL= 16;

    final int segmentMask;

    final int segmentShift;

    final Segment<K,V>[] segments;

    public ConcurrentHashMap(int initialCapacity,

                             float loadFactor, int concurrencyLevel) {

        if(!(loadFactor > 0) || initialCapacity < 0 ||

 concurrencyLevel <= 0)

            throw new IllegalArgumentException();

 

        if(concurrencyLevel > MAX_SEGMENTS)

            concurrencyLevel = MAX_SEGMENTS;

 

        // 寻找最佳匹配参数(不小于给定参数的最接近的 2 次幂)

        int sshift = 0;

        int ssize = 1;

        while(ssize < concurrencyLevel) {

            ++sshift;

            ssize <<= 1;

        }

        segmentShift = 32 - sshift;       // 偏移量值

        segmentMask = ssize - 1;           // 掩码值

        this.segments = Segment.newArray(ssize);   // 创建数组

 

        if (initialCapacity > MAXIMUM_CAPACITY)

            initialCapacity = MAXIMUM_CAPACITY;

        int c = initialCapacity / ssize;

        if(c * ssize < initialCapacity)

            ++c;

        int cap = 1;

        while(cap < c)

            cap <<= 1;

 

        // 依次遍历每个数组元素

        for(int i = 0; i < this.segments.length; ++i)

            // 初始化每个数组元素引用的 Segment 对象

 this.segments[i] = new Segment<K,V>(cap, loadFactor);

    }

    public ConcurrentHashMap() {

        // 使用三个默认参数,调用上面重载的构造函数来创建空散列映射表

 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);

 }

}

 
《Concurrent包中的锁机制》http://www.iteye.com/topic/333669

《java.util.concurrent 之ConcurrentHashMap 源码分析》http://www.iteye.com/topic/977348

《ConcurrentHashMap之实现细节》http://www.iteye.com/topic/344876
【锁机制:锁分离技术】
多个锁控制Hash表不同部分的修改。
段Segment表示不同部分。每一段:小Hash Table。
size(),containsValue()跨段,按顺序锁定所有段 。避免死锁。获得锁的顺序是固定的。不变性。
final Segment<K,V>[] segments;
 
【读不加锁:get()】
实现技术: 保证hashEntry几乎是不可变 的,只有value不是final。
static final class HashEntry<K,V>{
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}
 
【 final next值 】 不能修改next,不能从hash链的中间或者尾部添加或者删除节点。所有的节点修改只能从头部开始。
【删除】 持有段锁。需要将删除节点前的所有节点复制一遍。
【volatile】value保证读操作总能看到最新值。
 
 

 

通过位运算就可以定位段和段中hash槽的位置。

当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。但是我们也不要忘记《算法导论》给我们的教训:hash槽的的个数不应该是2^n,这可能导致hash槽分配不均,这需要对hash值重新再hash一次。(这段似乎有点多余了 )

 

这是重新hash的算法,还比较复杂,我也懒得去理解了。

 

privatestatic int hash(int h) {  

   // Spread bits to regularize both segment and index locations,  

   // using variant of single-word Wang/Jenkins hash.  

   h += (h <<  15) ^ 0xffffcd7d;  

   h ^= (h >>> 10);  

   h += (h <<   3);  

   h ^= (h >>>  6);  

   h += (h <<   2) + (h << 14);  

   return h ^ (h >>> 16);  

}

 

这是定位段的方法:

finalSegment<K,V> segmentFor(int hash) {  return segments[(hash >>> segmentShift) & segmentMask];

 

数据结构

Hash表,解决hash冲突,ConcurrentHashMap和HashMap使用相同的方式,都是将hash值相同的节点放在一个hash链中。与HashMap不同的是,ConcurrentHashMap使用多个子Hash表,也就是段(Segment)。下面是ConcurrentHashMap的数据成员:

 

publicclass ConcurrentHashMap<K, V> extends AbstractMap<K, V>  implements ConcurrentMap<K, V>, Serializable {

   /**

    * Mask value for indexing into segments. The upper bits of a

    * key's hash code are used to choose the segment.

    */  

   final int segmentMask;  

 

   /**

    * Shift value for indexing within segments.

    */  

   final int segmentShift;  

 

   /**

    * The segments, each of which is a specialized hash table

    */  

   final Segment<K,V>[] segments;  

}

 

所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。

 

 

每个Segment相当于一个子Hash表,它的数据成员如下:

   static final class Segment<K,V> extends ReentrantLock implements Serializable {  

private static final long serialVersionUID = 2249069246763182397L;  

       /**

        * The number of elements in this segment's region.

        */  

       transient volatile int count;  

//如果用transient声明一个实例变量,当对象存储时,它的值不需要维持。

//java的transient关键字为我们提供了便利,你只需要实现Serilizable接口,将不需要序列化的属性前添加关键字transient,序列化对象的时候,这个属性就不会被序列化。

 

       /**

        * Number of updates that alter the size of thetable. This is

        * used during bulk-read methods to make sure theysee a

        * consistent snapshot: If modCounts change during atraversal

        * of segments computing size or checkingcontainsValue, then

        * we might have an inconsistent view of state so(usually)

        * must retry.

        */  

       transient int modCount;  

 

       /**

        * The table is rehashed when its size exceeds thisthreshold.

        * (The value of this field is always<tt>(int)(capacity *

        * loadFactor)</tt>.)

        */  

       transient int threshold;  

 

       /**

        * The per-segment table.

        */  

       transient volatile HashEntry<K,V>[] table;  

 

       /**

        * The load factor for the hash table.  Eventhough this value

        * is same for all segments, it is replicated toavoid needing

        * links to outer object.

        * @serial

        */  

       final float loadFactor;  

}

count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。

 

协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。这利用了Java 5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系。

 

modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时会还会详述。

threashold用来表示需要进行rehash的界限值。

table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的table值而不需要同步。loadFactor表示负载因子。

 

 

实现细节-修改操作

删除操作remove(key)

1.    public V remove(Object key) {  

2.     hash = hash(key.hashCode());  

3.        return segmentFor(hash).remove(key, hash, null);  

4.    }

 

整个操作是先定位到段,然后委托给段的remove操作。

当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。

 

下面是Segment的remove方法实现:

1.    V remove(Object key, int hash, Object value) {  

2.        lock();  //持有段锁

3.        try { //定位到要删除的节点e

4.            int c = count - 1;  

5.            HashEntry<K,V>[] tab = table;  

6.            int index = hash & (tab.length - 1);  

7.            HashEntry<K,V> first = tab[index];  

8.            HashEntry<K,V> e = first;  

9.            while (e != null && (e.hash != hash || !key.equals(e.key)))  

10.              e = e.next;  

11.    

12.  //【关键原理】将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用

13.          V oldValue = null;  

14.          if (e != null) {  

15.              V v = e.value;  

16.              if (value == null || value.equals(v)) {  

17.                  oldValue = v;  

18.                  // All entries following removed node can stay  

19.                  // in list, but all preceding ones need to be  

20.                  // cloned.  

21.                  ++modCount;  

22.                  HashEntry<K,V> newFirst = e.next;  

23.                  for (HashEntry<K,V> p = first; p != e; p = p.next)  

24.                      newFirst = new HashEntry<K,V>(p.key, p.hash,  

25.                                                    newFirst, p.value);  

26.                  tab[index] = newFirst;  

27.                  count = c; // write-volatile  

28.              }  

29.          }  

30.          return oldValue;  

31.      } finally {  

32.          unlock();  

33.      }  

34.  }  

整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。

接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。

 

整个remove实现并不复杂,但是需要注意如下几点。第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。

第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。

 

put操作

put操作也是委托给段的put方法。下面是段的put方法:

1.    V put(K key, int hash, V value, boolean onlyIfAbsent) {  

2.        lock();  

3.        try {  

4.            int c = count;  

5.            if (c++ > threshold) // ensure capacity  

6.                rehash();  

7.            HashEntry<K,V>[] tab = table;  

8.            int index = hash & (tab.length - 1);  

9.            HashEntry<K,V> first = tab[index];  

10.          HashEntry<K,V> e = first;  

11.          while (e != null && (e.hash != hash || !key.equals(e.key)))  

12.              e = e.next;  

13.    

14.          V oldValue;  

15.          if (e != null) {  

16.              oldValue = e.value;  

17.              if (!onlyIfAbsent)  

18.                  e.value = value;  

19.          }  

20.          else {  

21.              oldValue = null;  

22.              ++modCount;  

23.              tab[index] = new HashEntry<K,V>(key, hash, first, value);  

24.              count = c; // write-volatile  

25.          }  

26.          return oldValue;  

27.      } finally {  

28.          unlock();  

29.      }  

30.  }  

 

该方法也是在持有段锁的情况下执行的,首先判断是否需要rehash,需要就先rehash。接着是找是否存在同样一个key的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n,这里就不介绍了。

 

修改操作还有putAll和replace。putAll就是多次调用put方法,没什么好说的。replace甚至不用做结构上的更改,实现要比put和delete要简单得多,理解了put和delete,理解replace就不在话下了,这里也不介绍了。

 

 

获取操作

get操作,同样ConcurrentHashMap的get操作是直接委托给Segment的get方法,直接看Segment的get方法:

1.    V get(Object key, int hash) {  

2.        if (count != 0) { // read-volatile  

3.            HashEntry<K,V> e = getFirst(hash);  

4.            while (e != null) {  

5.                if (e.hash == hash && key.equals(e.key)) {  

6.                    V v = e.value;  

7.                    if (v != null)  

8.                        return v;  

9.                    return readValueUnderLock(e); // recheck  

10.              }  

11.              e = e.next;  

12.          }  

13.      }  

14.      return null;  

15.  }  

 

get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。

对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值。

接下来就是对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。

对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的, 这是通过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执 行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。

 

最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为put的时 候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为HashEntry中 的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空。这种情况应当很罕见,一旦发生这种情况,ConcurrentHashMap采取的方式是在持有锁的情况下再读一遍,这能够保证读到最新的值,并且一定不会为空值。

 

1.    V readValueUnderLock(HashEntry<K,V> e) {  

2.        lock();  

3.        try {  

4.            return e.value;  

5.        } finally {  

6.            unlock();  

7.        }  

8.    }  

 

containsKey

1.    boolean containsKey(Object key, int hash) {  

2.        if (count != 0) { // read-volatile  

3.            HashEntry<K,V> e = getFirst(hash);  

4.            while (e != null) {  

5.                if (e.hash == hash && key.equals(e.key))  

6.                    return true;  

7.                e = e.next;  

8.            }  

9.        }  

10.      return false;  

11.  }

 

原文地址:https://www.cnblogs.com/lsx1993/p/4619956.html