ConcurrentHashMap jdk1.7

java.util.concurrent.ConcurrentHashMap<K, V>

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable

锁分段+数组+链表

Segment[] + HashEntry[] + HashEntry链

锁分段:

  每一把锁只锁住哈希表中的一部分数据,这样,在多线程环境中,如果多个线程要锁住的数据属于不同的段,多个线程就可以分别锁不同的段,线程之间不存在锁竞争,从而达到多线程同时操作

  默认,16个段Segment,给每一个Segment分配一把锁,当某一个线程占用锁访问其中一个Segment时,其它线程可以访问其它的Segment。

  但是,对于某些需要锁定整张哈希表(遍历整张哈希表)的方法,就需要按顺序锁定所有的段,操作完成之后,按顺序释放锁。按顺序,否则可能产生死锁。

静态final内部类Segment

static final class Segment<K,V> extends ReentrantLock implements Serializable

Segment的实现和HashMap有点像

代表键值对的HashEntry存放在一个HashEntry[]中,数组的每一个元素都是一个HashEntry链,HashEntry[]存放在Segment中

Segment相当于当前段的锁,对一个Segment中HashEntry[]的写操作,必须获得对应的Segment锁

对于同一个Segment的操作,才需要考虑线程同步;对于不同Segment的操作,互不影响。

transient volatile HashEntry<K,V>[] table;

静态内部类HashEntry

static final class HashEntry<K,V> {

final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;

……

}

对于ConcurrentHashMap,多个读操作不需要加锁,多个线程可以同时读

对于HashMap,如果在一个线程读取一个键值对的过程中,其它线程对当前键值对进行了修改,或者在Entry链上增加或删除了Entry,在不加锁的情况下,可能得不到一致的结果

但是,ConcurrentHashMap在多个线程读的时候,不加锁也可以读到一致的结果,原因:HashEntry中存放着真正的键值对,其value是volatile的,next也是volatile的,保证了内存可见性(任意线程对一个键值对的value值和HashEntry链的修改,对其它线程来说都是立即可见的)

成员变量

static final int DEFAULT_INITIAL_CAPACITY = 16;  //ConcurrentHashMap的初始容量

static final float DEFAULT_LOAD_FACTOR = 0.75f;  //加载因子,加载因子是针对每一个Segment而言的

static final int DEFAULT_CONCURRENCY_LEVEL = 16;   //默认并发级别,segment的个数

static final int MAXIMUM_CAPACITY = 1 << 30;  //ConcurrentHashMap的最大容量,最多存储这么多个键值对(HashEntry对象)

static final int MIN_SEGMENT_TABLE_CAPACITY = 2;  //每个Segment中都有一个HashEntry[],指定HashEntry数组的最小长度

static final int MAX_SEGMENTS = 1 << 16;   //Segment的最大数量,最大并发数

static final int RETRIES_BEFORE_LOCK = 2;  

final Segment<K,V>[] segments;

final int segmentMask;   //段掩码,值为Segment[].length-1    this.segmentMask = ssize - 1; 

final int segmentShift;  //2的sshift等于ssize,segmentShift=32-sshift       this.segmentShift = 32 - sshift;

    int j = (hash >>> segmentShift) & segmentMask; //定位Segment

构造器

 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)

在构造器中,判断传入的参数(初始容量、加载因子、并发级别)是否合法

根据参数concurrencyLevel确定Segment[]的长度(并发级别),调整为2的n次方

根据初始容量和Segment的个数确定每个Segment中HashEntry[]的长度,也是2的n次方

创建一个Segment对象作为Segment[]数组中下标为0的元素

创建一个Segment[],赋值给ConcurrentHashMap的segments成员变量

~从构造函数中可以看出,Segment[]的长度和Segment中HashEntry[]的长度都是2的n次方,都是为了能够快速定位(位运算)

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
    //以上是判断传入的参数 初始容量 加载因子 并发级别是否合法
// Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift;   ssize <<= 1;  //2的sshift次方 等于 ssize }  //concurrencyLevel有可能不是2的幂次方,通过while循环之后,ssize中保存的是大于concurrencyLevel的最小的那个2的幂次方的值,ssize即为Segment的个数 this.segmentShift = 32 - sshift;  //Segment的个数是2的sshift个 this.segmentMask = ssize - 1;  //Segment的个数 - 1 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize;      if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;  // cap最终存储的是Segment中HashEntry[]的大小,cap也一定是2的幂次方 // create segments and segments[0]
    
    //创建一个Segment对象,作为Segment[]的第一个元素 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }

元素的个数、判断是否为空

size()

遍历所有的Segment,先按顺序将一个个Segment加锁,然后将一个个Segment中HashEntry的个数加起来,最后按顺序释放加在一个个Segment上的锁。

必须按顺序,否则可能产生死锁

put操作

 

put操作需要在定位Segment之后,对Segment加锁

如果value==null,空指针异常

如果value不是null

  首先,计算hash(key),最终的hash值是hashCode(key) ^ hashSeed之后,再经过各种位操作之后的值

  然后,通过hash值的各种位移和与运算,得到Segment[]的下标

    构造器中只初始化了Segment[]下标0处的Segment对象,在put时,确定了Segment的位置,先判断该位置上的Segment有没有初始化,没有,先初始化

  最后,调用Segment对象的put方法存入键值对。

    加锁 tryLock()

    确定HashEntry[]的下标。int index = (tab.length - 1) & hash;

    如果当前HashEntry没有初始化,先初始化,并将键值对插入

    已经初始化,遍历HashEntry链,有重复的,新值覆盖旧值,否则,插入

    插入之前,判断是否需要扩容,扩容:Segment[]是不能扩容的,只能扩容一个个Segment中的HashEntry[]的大小为原来的两倍

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();  //ConcurrentHashMap不允许null键和null值
        int hash = hash(key);  //计算hash值
        int j = (hash >>> segmentShift) & segmentMask;  //定位segment    Segment[]
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);  //segments[j]没有初始化,初始化该Segment
        return s.put(key, hash, value, false);  //在当前Segment中存入键值对
    }

get操作

get操作不需要加锁,因为涉及的共享变量都是volatile的,可以保证内存可见性

先得到key的hash值

根据hash值定位Segment,再定位HashEntry,遍历HashEntry链,如果存在那个key,返回key对应的value,否则返回null

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

扩容

  Segment的个数一旦确定不能改变,当一个Segment中元素的个数超过阈值并且即将发生冲突,扩容,只是扩容当前Segment中的HashEntry[]

        private void rehash(HashEntry<K,V> node) {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }
原文地址:https://www.cnblogs.com/duanjiapingjy/p/9539310.html