1. 先来了解ConcurrentHashMap中的几个成员,当然大多数与HashMap中的相似,我们只看独有的成员
/** * The default concurrency level for this table, used when not * otherwise specified in a constructor. */ static final int DEFAULT_CONCURRENCY_LEVEL = 16; //默认的并发级别
/**
* The maximum capacity, used if a higher value is implicitly
* specified by either of the constructors with arguments. MUST
* be a power of two <= 1<<30 to ensure that entries are indexable
* using ints.
*/
static final int MAXIMUM_CAPACITY = 1 << 30; //最大容量
/**
* The minimum capacity for per-segment tables. Must be a power
* of two, at least two to avoid immediate resizing on next use
* after lazy construction.
*/
static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //每个Segement中的桶的数量
/**
* The maximum number of segments to allow; used to bound
* constructor arguments. Must be power of two less than 1 << 24.
*/
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //允许的最大的Segement的数量
/**
* Mask value for indexing into segments. The upper bits of a
* key's hash code are used to choose the segment.
*/
final int segmentMask; //掩码,用来定位segements数组的位置
/**
* Shift value for indexing within segments.
*/
final int segmentShift; //偏移量,用来确认hash值的有效位
/**
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments; //相当于多个HashMap组成的数组
static final class Segment<K,V> extends ReentrantLock implements Serializable { //内部类Segment,继承了ReentrantLock,有锁的功能 /** * The maximum number of times to tryLock in a prescan before * possibly blocking on acquire in preparation for a locked * segment operation. On multiprocessors, using a bounded * number of retries maintains cache acquired while locating * nodes. */ static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; /** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */ transient volatile HashEntry<K,V>[] table; //每个Segement内部都有一个table数组,相当于每个Segement都是一个HashMap transient int count; //这些参数与HashMap中的参数功能相同 transient int modCount; transient int threshold; final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { //向Segement中添加一个元素 }
2. 构造函数
@SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) //参数校验 throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; //计算segement数组的大小,并且为2的倍数,默认情况下concurrentyLevel为16,那么ssize也为16 while (ssize < concurrencyLevel) { ++sshift; //ssize每次进行左移运算,因此sshift可以看做是ssize参数左移的位数 ssize <<= 1; }
this.segmentShift = 32 - sshift; //segement偏移量 this.segmentMask = ssize - 1; //由于ssize为2的倍数,所以sengemnt为全1的,用来定位segement数组的下标 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize; //计算每个Segement中桶的数量 if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = //初始化第一个segement new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //新建segements数组,并将s0赋值 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
3 . 我们来看put()方法
@SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K,V> s; if (value == null) //ConcurrentHashMap中value不能为空 throw new NullPointerException(); int hash = hash(key); //获取到key的hash值 int j = (hash >>> segmentShift) & segmentMask; //定位到某个segement位置 if ((s = (Segment<K,V>)UNSAFE.getObject //从上面的构造方法中我们知道,segments数组只有0位置的segment被初始化了,因此这里需要去检测计算出的位置的segment是否被初始化
由于是并发容器,所以使用UNSAFE中的方法
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); //将元素插入到定位的Segement中 }
final V put(K key, int hash, V value, boolean onlyIfAbsent) { //segement中的put()方法 HashEntry<K,V> node = tryLock() ? null : //获取锁,若获取不到锁,则县创建节点并返回 scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; //之后的算法就与HashMap中相似了 int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); //释放锁 } return oldValue; }
4. 来具体看一下SegementMask与SegmentShift这两个变量时怎么使用的?
int sshift = 0; int ssize = 1; //计算segement数组的大小,并且为2的倍数,默认情况下concurrentyLevel为16,那么ssize也为16 while (ssize < concurrencyLevel) { ++sshift; //ssize每次进行左移运算,因此sshift可以看做是ssize参数左移的位数 ssize <<= 1; } this.segmentShift = 32 - sshift; //segement偏移量 this.segmentMask = ssize - 1; //由于ssize为2的倍数,所以sengemnt为全1的,用来定位segement数组的下标
上面是构造函数中计算这两个变量的代码。
我们假设concurrencyLevel为默认值16,那么经过计算得到,ssize = 16,sshift = 4,segmentShift = 28, segementMask = 15
由于ssize为segements数组的大小,我们可以发现,当 n 与 segmentMask按位与时候正好可以得到<=15的数组,正是segements数组的下标。
int j = (hash >>> segmentShift) & segmentMask; //定位到某个segement位置
而segementShift的作用在于缩小hash值的范围,我们并不需要使用hash值所有的位,通过上面的数据,当hash值右移28位后正好可以得到有效计算的位数(4位),因此上面构造函数中的sshift也
可以表示计算segements数组时的有效位数。