并发编程学习笔记(二十七、ConcurrentHashMap,Java7 ConcurrentHashMap)

目录:

  • 什么是ConcurrentHashMap
  • 为什么要有ConcurrentHashMap
  • ConcurrentHashMap源码解析

什么是ConcurrentHashMap

ConcurrentHashMap,从名字上就可以看出它是一个处理并发情况的哈希表,和HashMap非常像,你可以把它当成是HashMap的扩展类。

为什么要有ConcurrentHashMap

我们知道HashMap是一个非线程安全的哈希表,其内部的函数都是没有加锁的,而实际的开发过程中需要处理并发情况下的Map也不少。

即使JDK已经提供了Hashtable这种线程安全的哈希表,但它的实现方式过于暴力了,它把所有的函数都加上了synchronize,这样的方式虽然可以保证线程安全,但并发度非常的低(所有的读写线程使用一把锁)。

就是在这种情况下ConcurrentHashMap应运而生,它可以使得读不加锁也仅仅只是锁住将要写的那个槽

——————————————————————————————————————————————————————————————————————

小贴士:

还有一种同步map的方法,就是使用Collections工具类中synchronizedMap;它的原理和Hashtable差不多,也是使用synchronize对其内部的mutex对象加锁

我截取了其部分代码粘出来,你可以看看。

 1 private static class SynchronizedMap<K,V>
 2     implements Map<K,V>, Serializable {
 3     private static final long serialVersionUID = 1978198479659022715L;
 4 
 5     private final Map<K,V> m;     // Backing Map
 6     final Object      mutex;        // Object on which to synchronize
 7 
 8     SynchronizedMap(Map<K,V> m) {
 9         if (m==null)
10             throw new NullPointerException();
11         this.m = m;
12         mutex = this;
13     }
14 
15     SynchronizedMap(Map<K,V> m, Object mutex) {
16         this.m = m;
17         this.mutex = mutex;
18     }
19 
20     public int size() {
21         synchronized (mutex) {return m.size();}
22     }
23     public boolean isEmpty() {
24         synchronized (mutex) {return m.isEmpty();}
25     }
26     public boolean containsKey(Object key) {
27         synchronized (mutex) {return m.containsKey(key);}
28     }
29     public boolean containsValue(Object value) {
30         synchronized (mutex) {return m.containsValue(value);}
31     }
32     public V get(Object key) {
33         synchronized (mutex) {return m.get(key);}
34     }
35 
36     public V put(K key, V value) {
37         synchronized (mutex) {return m.put(key, value);}
38     }
39     public V remove(Object key) {
40         synchronized (mutex) {return m.remove(key);}
41     }
42     public void putAll(Map<? extends K, ? extends V> map) {
43         synchronized (mutex) {m.putAll(map);}
44     }
45     public void clear() {
46         synchronized (mutex) {m.clear();}
47     }
48 }

ConcurrentHashMap源码解析

我还是和往常一样从内部类、构造函数、核心方法这三个方面来介绍。

在介绍前,我们先看下ConcurrentHashMap的核心数据结构:

内部类:

1、Segment:

其最主要的结构就是Segment,它里面包含了HashMap中常用的属性及操作,其实啊总得来说它就是一个继承与ReentrantLock对HashMap进行包装的一个可重入锁对象

 1 static final class Segment<K,V> extends ReentrantLock implements Serializable {
 2     transient volatile HashEntry<K,V>[] table;
 3     transient int count;
 4     transient int modCount;
 5     transient int threshold;
 6     final float loadFactor;
 7     
 8     Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
 9         this.loadFactor = lf;
10         this.threshold = threshold;
11         this.table = tab;
12     }
13 
14     final V put(K key, int hash, V value, boolean onlyIfAbsent) {
15         // 省略实现
16     }
17 
18     private void rehash(HashEntry<K,V> node) {
19         // 省略实现
20     }
21 }

与HashMap不同的是HashMap只需要进行一次hash就可以计算出槽的位置,但ConcurrentHashMap需要先hash定位到对象的Segment再hash定位到实际存储数据的槽

——————————————————————————————————————————————————————————————————————

2、HashEntry:

1 static final class HashEntry<K,V> {
2     final int hash;
3     final K key;
4     volatile V value;
5     volatile HashEntry<K,V> next;
6 }

与HashMap的结构基本一致,不同的是其内部的value、next使用volatile修饰,以保证内存的可见性,即读写操作对其它线程可见。

构造函数:

 1 public ConcurrentHashMap(int initialCapacity,
 2                          float loadFactor, int concurrencyLevel) {
 3     if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 4         throw new IllegalArgumentException();
 5     if (concurrencyLevel > MAX_SEGMENTS)
 6         concurrencyLevel = MAX_SEGMENTS;
 7     // 求解concurrencyLevel与2的几次方最近,如concurrencyLevel=5,则与2^3=8最近,则sshift=3,sszie=8
 8     int sshift = 0;
 9     int ssize = 1;
10     while (ssize < concurrencyLevel) {
11         ++sshift;
12         ssize <<= 1;
13     }
14     // segmentShift和segmentMask主要用于元素的hash
15     this.segmentShift = 32 - sshift;
16     this.segmentMask = ssize - 1;
17     if (initialCapacity > MAXIMUM_CAPACITY)
18         initialCapacity = MAXIMUM_CAPACITY;
19     // 得到分段锁segment中HashEntry[]大小
20     int c = initialCapacity / ssize;
21     if (c * ssize < initialCapacity)
22         ++c;
23     int cap = MIN_SEGMENT_TABLE_CAPACITY;
24     while (cap < c)
25         cap <<= 1;
26     // 创建segment
27     Segment<K,V> s0 =
28         new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
29                          (HashEntry<K,V>[])new HashEntry[cap]);
30     Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
31     UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
32     this.segments = ss;
33 }

核心方法:

1、java.util.concurrent.ConcurrentHashMap#put:

 1 public V put(K key, V value) {
 2     Segment<K,V> s;
 3     if (value == null)
 4         throw new NullPointerException();
 5     // 计算key的hash值
 6     int hash = hash(key);
 7     // 无符号右移segmentShift位(默认16),然后&segmentMask(默认15),最后得到segment在内存中的地址
 8     int j = (hash >>> segmentShift) & segmentMask;
 9     // 如果拿到的segment为null
10     if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
11          (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
12         // 则初始化segment
13         s = ensureSegment(j);
14     return s.put(key, hash, value, false);
15 }

其实这段逻辑主要就是找到segment的下标然后再存到HashEntry中。

2、java.util.concurrent.ConcurrentHashMap.Segment#put:

 1 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 2     // 加锁,没有获取到会重试(scanAndLockForPut)
 3     HashEntry<K,V> node = tryLock() ? null :
 4         scanAndLockForPut(key, hash, value);
 5     V oldValue;
 6     try {
 7         HashEntry<K,V>[] tab = table;
 8         // 计算传入hash应在哪个槽
 9         int index = (tab.length - 1) & hash;
10         // 拿到对应的槽的表头
11         HashEntry<K,V> first = entryAt(tab, index);
12         // 遍历整个链表
13         for (HashEntry<K,V> e = first;;) {
14             // 若果这个key要存入的槽已经有了节点(链表指针非空)
15             if (e != null) {
16                 K k;
17                 // 如果key相同则覆盖value值
18                 if ((k = e.key) == key ||
19                     (e.hash == hash && key.equals(k))) {
20                     oldValue = e.value;
21                     if (!onlyIfAbsent) {
22                         e.value = value;
23                         ++modCount;
24                     }
25                     break;
26                 }
27                 // 指向下一个槽,能一直循环的条件
28                 e = e.next;
29             }
30             // 若果这个key要存入的槽无节点(链表为空)
31             else {
32                 if (node != null)
33                     node.setNext(first);
34                 else
35                     // 创建一个新的槽
36                     node = new HashEntry<K,V>(hash, key, value, first);
37                 int c = count + 1;
38                 // 超过阀值进行扩容
39                 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
40                     rehash(node);
41                 else
42                     // 未超过阀值将值加入到表头
43                     setEntryAt(tab, index, node);
44                 ++modCount;
45                 count = c;
46                 oldValue = null;
47                 break;
48             }
49         }
50     } finally {
51         // 释放锁
52         unlock();
53     }
54     return oldValue;
55 }

——————————————————————————————————————————————————————————————————————

原文地址:https://www.cnblogs.com/bzfsdr/p/13287686.html