ConcurrentHashMap 源码浅析 1.7

      1. 简介

        (1) 背景
        HashMap死循环:HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry.
        HashTable效率低下:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下.因为当一个线程访问HashTable的同步方法,其它线程也访问HashTable的同步方法时,会进入阻塞或轮询状态.如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法获取元素,所以竞争越激烈效率越低.
        (2) 简介
        HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么多线程访问容器里不同的数据段时,线程间不会存在竞争,从而可以有效提高并发访问效率,这就是ConcurrentHash所使用的锁分段技术.首先将数据分成一段一段地储存,然后给每一段配一把锁,当一个线程占用锁访问其中一段数据时,其它段的数据也能被其它线程访问.

      2. 结构

        ConcurrentHashMap是由Segments数组结构和HashEntry数组结构组成.Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的色;HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment组.Segment的结构和HashMap类似,是一种数组加链表的结构.一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护者一个HashEntry数组里面的元素,当对HashEntry数组的数据进行修改时,必须先获得与它对应的Segment锁,如下图所示.

        ConcurrentHashMap 源码浅析 1.7

      3. 基本成员

/**
 * 默认的初始容量 16   map默认容量,必须是2的冥
 */
static final int DEFAULT_INITIAL_CAPACITY = 16;

/**
 * 默认的负载因子
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;

/**
 * 默认的并发数量,会影响segments数组的长度(初始化后不能修改)
 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * 最大容量,构造ConcurrentHashMap时指定的值超过,就用该值替换
 * ConcurrentHashMap大小必须是2^n,且小于等于2^30
 */
static final int MAXIMUM_CAPACITY = 1 << 30;

/**
 * 每个segment中table数组的长度,必须是2^n,至少为2
 */
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

/**
 * 允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n
 */
static final int MAX_SEGMENTS = 1 << 16;

/**
 * 非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试
 */
static final int RETRIES_BEFORE_LOCK = 2;
/**
 * 用于segment的掩码值,用于与hash的高位进行取&
 */
final int segmentMask;

/**
 * 用于算segment位置时,hash参与运算的位数
 */
final int segmentShift;

/**
 * segments数组
 */
final Segment<K,V>[] segments;

HashEntry存储数据的链式结构
static final class HashEntry<K,V> {
    // hash值
    final int hash;
    // key
    final K key;
    // 保证内存可见性,每次从内存中获取
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

    /**
     * 使用volatile语义写入next,保证可见性
     */
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }

Segment继承ReentrantLock锁,用于存放HashEntry[]

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;

    /**
     * 对segment加锁时,在阻塞之前自旋的次数
     *
     */
    static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    /**
     * 每个segment的HashEntry table数组,访问数组元素可以通过entryAt/setEntryAt提供的volatile语义来完成
     * volatile保证可见性
     */
    transient volatile HashEntry<K,V>[] table;

    /**
     * 元素的数量,只能在锁中或者其他保证volatile可见性之间进行访问
     */
    transient int count;

    /**
     * 当前segment中可变操作发生的次数,put,remove等,可能会溢出32位
     * 它为chm isEmpty() 和size()方法中的稳定性检查提供了足够的准确性.
     * 只能在锁中或其他volatile读保证可见性之间进行访问
     */
    transient int modCount;

    /**
     * 当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor)
     */
    transient int threshold;

    /**
     * 负载因子
     */
    final float loadFactor;

    /**
     * 构造方法
     */
    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }

3、构造方法
有参构造

/**
 * ConcurrentHashMap 构造方法
 * @param initialCapacity 初始化容量
 * @param loadFactor 负载因子
 * @param concurrencyLevel 并发segment,segments数组的长度
 */
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 大于最大segments容量,取最大容量
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    // 2^sshift = ssize 例如:sshift = 4,ssize = 16
    // 根据concurrencyLevel计算出ssize为segments数组的长度
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) { // 第一次 满足
        ++sshift;  // 第一次 1
        ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1)
    }
    // segmentShift和segmentMask的定义
    this.segmentShift = 32 - sshift; // 用于计算hash参与运算位数
    this.segmentMask = ssize - 1; // segments位置范围
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 计算每个segment中table的容量
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // HashEntry[]默认 容量
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 确保cap是2^n
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    // 创建segments并初始化第一个segment数组,其余的segment延迟初始化
    Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                    (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

无参构造使用默认参数

public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

5、基本方法

一些UNSAFE方法
setNext

/**
     * 使用volatile语义写入next,保证可见性
     */
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }

entryAt

/**
 * 获取给定table的第i个元素,使用volatile读语义
 */
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
    return (tab == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
                    (tab, ((long)i << TSHIFT) + TBASE);
}

setEntryAt

/**
 * 设置给定的table的第i个元素,使用volatile写语义
 */
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                   HashEntry<K,V> e) {
    UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}

put 插入元素
执行流程分析
(1) map的put方法就做了三件事情,找出segments的位置;判断当前位置有没有初始化,没有就调用ensureSegment()方法初始化;然后调用segment的put方法.
(2) segment的put方法.,获取当前segment的锁,成功接着执行,失败调用scanAndLockForPut方法自旋获取锁,成功后也是接着往下执行.
(3) 通过hash计算出位置,获取节点,找出相同的key和hash替换value,返回.没有找到相同的,设置找出的节点为当前创建节点的next节点,设置创建节点前,判断是否需要扩容,需要调用扩容方法rehash();不需要,设置节点,返回,释放锁.

/**
 * map的put方法,定位segment
 */
public V put(K key, V value) {
    Segment<K,V> s;
    // value不能为空
    if (value == null)
        throw new NullPointerException();
    // 获取hash
    int hash = hash(key);
    // 定位segments 数组的位置
    int j = (hash >>> segmentShift) & segmentMask;
    // 获取这个segment
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 为null 初始化当前位置的segment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}
    /**
     * put到table方法
     */
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        // 是否获取锁,失败自旋获取锁(直到成功)
        HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            // 定义位置
            int index = (tab.length - 1) & hash;
            // 获取第一个桶的第一个元素
            // entryAt 底层调用getObjectVolatile 具有volatile读语义
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) { // 证明链式结构有数据 遍历节点数据替换,直到e=null
                    K k;
                    if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) { //  找到了相同的key
                        oldValue = e.value;
                        if (!onlyIfAbsent) { // 默认值false
                            e.value = value; // 替换value
                            ++modCount;
                        }
                        break; // 结束循环
                    }
                    e = e.next;
                }
                else { // e=null (1) 之前没有数据 (2) 没有找到替换的元素
                    // node是否为空,这个获取锁的是有关系的
                    // (1) node不为null,设置node的next为first
                    // (2) node为null,创建头节点,指定next为first
                    if (node != null)
                        // 底层使用 putOrderedObject 方法 具有volatile写语义
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    // 扩容条件 (1)entry数量大于阈值 (2) 当前table的数量小于最大容量  满足以上条件就扩容
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        // 扩容方法,方法里面具体讲
                        rehash(node);
                    else
                        // 给table的index位置设置为node,
                        // node为头结点,原来的头结点first为node的next节点
                        // 底层也是调用的 putOrderedObject 方法 具有volatile写语义
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

解释下(hash >>> segmentShift) & segmentMask定位segment位置(个人理解)
ConcurrentHashMap 源码浅析 1.7
ensureSegment初始化segment方法
执行流程
(1) 计算位置,使用UNSAFE的方法判断当前位置有没有初始化,然后使用segmets[0]的模板创建一个新的HashEntry[],再次判断当前位置有没有初始化,可能存在多线程同时初始化,然后创建一个新的segment,最后使用自旋cas设置新的segment的位置,保证只有一个线程初始化成功.

/**
 *
 * @param k 位置
 * @return segments
 */
private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;  // 当前的segments数组
    long u = (k << SSHIFT) + SBASE; // raw offset // 计算原始偏移量,在segments数组的位置
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 判断没有被初始化
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype // 获取第一个segment ss[0]
        // 这就是为什么要在初始化化map时要初始化一个segment,需要用cap和loadFactoe 为模板
        int cap = proto.table.length; // 容量
        float lf = proto.loadFactor; // 负载因子
        int threshold = (int)(cap * lf); // 阈值
        // 初始化ss[k] 内部的tab数组 // recheck
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再次检查这个ss[k]  有没有被初始化
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
            // 创建一个Segment
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 这里用自旋CAS来保证把segments数组的u位置设置为s
            // 万一有多线程执行到这一步,只有一个成功,break
            // getObjectVolatile 保证了读的可见性,所以一旦有一个线程初始化了,那么就结束自旋
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

scanAndLockForPut自旋获取锁方法
具体流程看代码注释,解释下这个方法的优化(看的某位大佬的博客)
(1) 我们在put方法获取锁失败,才会进入这个方法,这个方法采用自旋获取锁,直到成功才返回,但是使用了自旋次数的限制,这么做的好处是什么了,就是竞争太激烈的话,这个线程可能一直获取不到锁,自旋也是消耗cpu性能的,所以当达到自旋次数时,就阻塞当前线程,直到有线程释放了锁,通知这些线程.在等待过程中是不消耗cpu的.
(2) 当我们进入这个方法时,说明获取锁失败,那么可别是别的线程在对这个segment进行修改操作,所以说如果别的线程在操作之后,我们自己的工作内存中的数据可能已经不是最新的了,这个时候我们使用具有volatile语义的方法重新读了数据,在自旋过程中遍历这些数据,把最新的数据缓存在工作内存中,当前线程再次获取锁时,我们的数据是最新的,就不用重新去住内存中获取,这样在自旋获取的锁的过程中就预热了这些数据,在获取锁之后的执行中就提升了效率.

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        HashEntry<K,V> first = entryForHash(this, hash); // 根据hash获取头结点
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1; // 是为了找到对应hash桶,遍历链表时找到就停止
        while (!tryLock()) { // 尝试获取锁,成功就返回,失败就开始自旋
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
                if (e == null) {  // 结束遍历节点
                    if (node == null) // 创造新的节点
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0; // 结束遍历
                }
                else if (key.equals(e.key)) // 找到节点 停止遍历
                    retries = 0;
                else
                    e = e.next; // 下一个节点 直到为null
            }
            else if (++retries > MAX_SCAN_RETRIES) { // 达到自旋的最大次数
                lock(); // 进入加锁方法,失败进入队列,阻塞当前线程
                break;
            }
            else if ((retries & 1) == 0 &&
                    (f = entryForHash(this, hash)) != first) {
                e = first = f; // 头结点变化,需要重新遍历,说明有新的节点加入或者移除
                retries = -1;
            }
        }
        return node;
    }

rehash扩容方法
解释下节点位置变化这一块的处理,如下图所示.

/**
     *扩容方法
     */
    private void rehash(HashEntry<K,V> node) {

        // 旧的table
        HashEntry<K,V>[] oldTable = table;
        // 旧的table的长度
        int oldCapacity = oldTable.length;
        // 扩容原来capacity的一倍
        int newCapacity = oldCapacity << 1;
        // 新的阈值
        threshold = (int)(newCapacity * loadFactor);
        // 新的table
        HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
        // 新的掩码
        int sizeMask = newCapacity - 1;
        // 遍历旧的table
        for (int i = 0; i < oldCapacity ; i++) {
            // table中的每一个链表元素
            HashEntry<K,V> e = oldTable[i];
            if (e != null) { // e不等于null
                HashEntry<K,V> next = e.next; // 下一个元素
                int idx = e.hash & sizeMask;  // 重新计算位置,计算在新的table的位置
                if (next == null)   //  Single node on list 证明只有一个元素
                    newTable[idx] = e; // 把当前的e设置给新的table
                else { // Reuse consecutive sequence at same slot
                    HashEntry<K,V> lastRun = e; // 当前e
                    int lastIdx = idx;          // 在新table的位置
                    for (HashEntry<K,V> last = next;
                         last != null;
                         last = last.next) { // 遍历链表
                        int k = last.hash & sizeMask; // 确定在新table的位置
                        if (k != lastIdx) { // 头结点和头结点的next元素的节点发生了变化
                            lastIdx = k;    // 记录变化位置
                            lastRun = last; // 记录变化节点
                        }
                    }
                    // 以下把链表设置到新table分为两种情况
                    // (1) lastRun 和 lastIdx 没有发生变化,也就是整个链表的每个元素位置和一样,都没有发生变化
                    // (2) lastRun 和 lastIdx 发生了变化,记录变化位置和变化节点,然后把变化的这个节点设置到新table
                    //     ,但是整个链表的位置只有变化节点和它后面关联的节点是对的
                    //      下面的这个遍历就是处理这个问题,遍历当前头节点e,找出不等于变化节点(lastRun)的节点重新处理
                    newTable[lastIdx] = lastRun;
                    // Clone remaining nodes
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        // 处理扩容时那个添加的节点

        // 计算位置
        int nodeIndex = node.hash & sizeMask; // add the new node
        // 设置next节点,此时已经扩容完成,要从新table里面去当前位置的头结点为next节点
        node.setNext(newTable[nodeIndex]);
        // 设置位置
        newTable[nodeIndex] = node;
        // 新table替换旧的table
        table = newTable;
    }

get 没有加锁,效率高
注意:get方法使用了getObjectVolatile方法读取segment和hashentry,保证是最新的,具有锁的语义,可见性
分析:为什么get不加锁可以保证线程安全
(1) 首先获取value,我们要先定位到segment,使用了UNSAFE的getObjectVolatile具有读的volatile语义,也就表示在多线程情况下,我们依旧能获取最新的segment.
(2) 获取hashentry[],由于table是每个segment内部的成员变量,使用volatile修饰的,所以我们也能获取最新的table.
(3) 然后我们获取具体的hashentry,也时使用了UNSAFE的getObjectVolatile具有读的volatile语义,然后遍历查找返回.
(4) 总结我们发现怎个get过程中使用了大量的volatile关键字,其实就是保证了可见性(加锁也可以,但是降低了性能),get只是读取操作,所以我们只需要保证读取的是最新的数据即可.

/**
 * get 方法
 */
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 获取segment的位置
    // getObjectVolatile getObjectVolatile语义读取最新的segment,获取table
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
        // getObjectVolatile getObjectVolatile语义读取最新的hashEntry,并遍历
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // 找到相同的key 返回
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

size 尝试3次不加锁获取sum,如果发生变化就全部加锁,size和containsValue方法的思想也是基本类似.
执行流程
(1) 第一次,retries++=0,不满足全部加锁条件,遍历所有的segment,sum就是所有segment的容量,last等于0,第一次不相等,last=sum.
(2) 第二次,retries++=1,不满足加锁条件,计算所有的segment,sum就是所有的segment的容量,last是上一次的sum,相等结束循环,不相等下次循环.
(3) 第三次,retries++=2,先运算后赋值,所以此时还是不满足加锁条件和上面一样统计sum,判断这一次的sum和last(上一次的sum)是否相等,相等结束,不相等,下一次循环.
(4) 第四次,retries++=2,满足加锁条件,给segment全部加锁,这样所有线程就没有办法进行修改操作,统计每个segment的数量求和,然后返回size.(ps:全部加锁提高了size的准确率,但是降低了吞吐量,统计size的过程中如果其它线程进行修改操作这些线程全部自旋或者阻塞).

/**
 * size
 * @return
 */
public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // 为true表示size溢出32位
    long sum;         // modCounts的总和
    long last = 0L;   // previous sum
    int retries = -1; // 第一次不计算次数,所以会重试三次
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) { // 重试次数达到3次 对所有segment加锁
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) { // seg不等于空
                    sum += seg.modCount; // 不变化和size一样
                    int c = seg.count; // seg 的size
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last) // 没有变化
                break;
            last = sum; // 变化,记录这一次的变化值,下次循环时对比.
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

remove replace和remove都是用了scanAndLock这个方法
解释下scanAndLock这个方法,和put方法的scanAndLockForPut方法思想类似,都采用了同样的优化手段.

/**
     * 删除方法
     */
    final V remove(Object key, int hash, Object value) {
        if (!tryLock()) // 获取锁
            scanAndLock(key, hash); // 自旋获取锁
        V oldValue = null;
        try {
            HashEntry<K,V>[] tab = table; // 当前table
            int index = (tab.length - 1) & hash; // 获取位置
            HashEntry<K,V> e = entryAt(tab, index);// 找到元素
            HashEntry<K,V> pred = null;
            while (e != null) {
                K k;
                HashEntry<K,V> next = e.next; // 下一个元素
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) { // e的key=传入的key
                    V v = e.value; // 获取value
                    if (value == null || value == v || value.equals(v)) { // 如果value相等
                        if (pred == null) // 说明是头结点,让next节点为头结点即可
                            setEntryAt(tab, index, next);
                        else //  说明不是头结点,就把当前节点e的上一个节点pred的next节点设置为当前e的next节点
                            pred.setNext(next);
                        ++modCount;
                        --count;
                        oldValue = v;
                    }
                    break;
                }
                pred = e;
                e = next;
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

isEmpty 其实也和size的思想类似,不过这个始终没有加锁,提高了性能
执行流程
(1) 第一次遍历就干了一件事,确定map的每个segment是否为0,其中任何一个segment的count不为0,就返回,都为0,就累加modCount为sum.
(2) 判断sum是否为0,不为0,就代表了map之前是有数据的,被remove和clean了,modCount指的是操作次数,再次确定map的每个segment是否为0,其中任何一个segment的count不为0,就返回,并且累减sum,最后判断sum是否为0,为0就代表没有任何变化,不为0,就代表在第一次统计过程中有线程又添加了元素,所以返回false.但是如果在第二次统计时又发生了变化了,所以这个不是那么的准确,但是不加锁提高了性能,也是一个可以接受的方案.

`public boolean isEmpty() {

long sum = 0L;
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
        if (seg.count != 0)
            return false; // 某一个不为null,立即返回
        sum += seg.modCount;
    }
}
// 上面执行完 说明不为空,并且过程可能发生了变化
// 发生变化
if (sum != 0L) { // recheck unless no modifications
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            if (seg.count != 0)
                return false;
            sum -= seg.modCount;
        }
    }
    if (sum != 0L) // 变化值没有为0,说明不为空
        return false;
}

// 没有发生变化
return true;

}

总结

1.7 ConcurrentHashMap 使用了分段锁的思想提高了并发的的访问量,就是使用很多把锁,每一个segment代表了一把锁,每一段只能有一个线程获取锁;但是segment的数量初始化了,就不能修改,所以这也代表了并发的不能修改,这也是1.7的一个局限性.从get方法可以看出使用了UNSAFE的一些方法和volatile关键字来代替锁,提高了并发性.在size和containsValue这些方法提供一种尝试思想,先不加锁尝试统计,如果其中没有变化就返回,有变化接着尝试,达到尝试次数再加锁,这样也避免了立即加锁对并发的影响.


原文地址:https://www.cnblogs.com/zouhong/p/13554112.html