Java同步数据结构之ConcurrentHashMap

前言

这是Java并发包最后一个集合框架的数据结构,其复杂程度也较以往任何数据结构复杂的多,顾名思义ConcurrentHashMap是线程安全版本的HashMap,总所周知HashMap是非线程安全的,若直接用于多线程并发环境将会出现很多问题,比如数据丢失,甚至某些操作陷入死循环导致CPU利用率100%等情况。除了ConcurrentHashMap能够保证线程安全之外,还可以通过两种方法获得线程安全的Map结构的实例,要么使用Collections.synchronizedMap(map)返回使用synchronized包装过的线程安全的map,要么直接使用Hashtable代替。这两种方法对于线程安全的支持本质上并没有太大区别,前者仅仅是在每一个synchronized方法内部直接使用传入的非线程安全的map实例本身,而后者hashtable也是在每一个有线程安全问题的方法上加synchronized锁,并采用和hashMap相同的实现原理重新实现。

关于JDK8以前的HashMap与ConcurrentHashMap

在JDK8以前的HashMap是通过数组 + 链表的数据结构实现的,即它首先维护一个数组table,数组中的每一个元素都是一个包含键值对以及一个next指针的Entry对象,通过对Key对象的hashCode做一系列运算的结果再与数组的长度取模得出一个映射到数组table的下标(即索引,或者称之为“槽”),从而将键值对映射到数组的不同槽位,然而通常具有相同hashCode值的不同Key很大程度上将会映射到数组的同一个槽位,这可以称之为“碰撞”,这时候为了能够存储这种键值,HashMap采用了以链表的方式来组织具有相同hash值的不同Key(具体来说,当插入发生碰撞时,新节点会重新占据数组的对应槽位,该槽位原来的Entry节点被挤下去成为新节点的后继节点,这就是所谓的“头插法”,即在链表的头部插入),这样一来,具有相同hash值的不同Key虽然对应到了数组中的同一个槽位,但是却位于链表上的不同节点上,下面是一个JDK8以前的HashMap的内部结构示意图:

JDK8以前的HashMap的这种设计和实现都很简单,但是当具体相同Hash值的Key较多的时候,链表的长度将会很长,导致查询效率极其低下,毕竟链表的查询只能从头部一个一个的往后遍历比较。 

HashMap内部有一个值为0.75的默认加载因子loadFactor,该加载因子也可通过构造函数传入指定的值,其作用就是当数组table的使用率超过75%时,对数组长度进行扩容,还有一个变量threshold则表示下一次触发扩容的临界数组长度,即threshold = table.length * loadFactor,而。每一次扩容之后数组的长度是原来的2倍。数组是一个创建后就长度不可变更的数据结构,要对数组进行扩容,只有创建新的数组,然后将旧数组中的元素一个个拷贝过去,扩容是一个比较耗时的过程,也是当HashMap运用到多线程并发环境下线程不安全的主要诱因,扩容过程中依然采用头插法组织链表元素,所以扩容之后链表中的节点顺序将会发生反转,并且由于将Key影射到不同数组槽位的时候需要与数组长度取模运算,当数组长度发生变化之后具有相同hash值的Key也有可能会被映射到不同的槽位,当多个线程同时进行扩容操作时,CPU时间片的分配与HashMap的扩容机制一结合,就产生了数据丢失甚至构成环形链表的可能。环形链就会造成某些操作陷入死循环导致CPU利用率100%等情况,关于HashMap线程不安全的详细分析过程可用参考JDK1.7和JDK1.8中HashMap为什么是线程不安全的?

ConcurrentHashMap

JDK8之前的ConcurrentHashMap是采用分段锁的方式实现了HashMap的线程安全版本,顾名思义数组table中的某一部分使用同一个锁保证线程安全,它内部定义了一个Segment数组,Segment继承了ReentrantLock,所以一个Segment就是一把锁,每一个Segment内部又持有一个table数组,这样就相当于将HashMap种的table数组拆分成若干个分段数组,每一个Segment管理table数组的一个区间,每一个table数组还是按照HashMap的实现方式实现即数组+链表,新节点的插入还是按头插法进入。这种方式是一种粗粒度的并发控制方案,当两个操作位于不同的两个段时可以不受线程安全的影响,但是位于同一个段的不同数组槽位的更新操作依然会受到并发控制的互斥访问限制,所以吞吐量并没有提高太多,但是任何读操作不存在竞争,即是读写分离的。下面的一个ConcurrentHashMap的内部结构示意图:

Segment数组的每一个Segment元素都对应一个table数组,同时也共享同一把互斥锁,table数组中的每一个元素都是一个HashEntry对象,HashEntry持有键值对,hash值以及指向链表结构的下一个节点next指针。

JDK8的HashMap

从JDK8开始,HashMap和ConcurrentHashMap的实现都做了大的调整,针对HashMap主要围绕解决长链表下查询缓慢的情况进行了改进,其主要变化就是将长链表换成了红黑树(一种平衡二叉树),因此JDK8的HashMap采用了数组+短链表+红黑树的数据结构实现,在链表的长度超过8个节点的时候,将会将链表通过旋转的方式直接转换成红黑树(称之为树化),红黑树的引入在查询效率上至少提升了2倍以上。以下是其内部结构示意图:

Java8HashMap的table数组元素是由一个个Node或TreeNode节点组成,对于红黑树对应的数组槽位中始终存储其根节点,对于链表结构,每一次新元素都在链表尾部插入,即“尾插法”;对于红黑树,每一次新插入节点可能都会引起红黑树的旋转从而导致结构变化,但其根节点始终存储在table数组的槽位中。

除了内部数据结构的变化,HashMap其它特性例如加载因子,扩容等都与JDK8以前的版本差不多,但JDK8的HashMap对以前版本扩容可能造成环形链的问题进行了修复,因此当再次用于多线程并发环境,JDK8的HashMap将不会导致CPU%100的情况,但依然可能存在数据覆盖的问题出现,因此依然不是线程安全的。多线程环境下依然需要使用ConcurrentHashMap,它的JDK8版本下也做了大的调整。

JDK8的ConcurrentHashMap

分析它才是本文的最终目的,首先JDK8的ConcurrentHashMap内部数据结构基本与JDK8的HashMap一致,也是基于数组 + 短链表 + 红黑树的方式设计实现的。因为JDK8以前的分段锁思想是一种粗粒度的线程安全实现,而JDK8的ConcurrentHashMap则将分段锁的概念细划到单个的数组槽位上,即一个table数组槽位一个锁,因此只有更新操作具有相同hash值得线程之间才会存在竞争,任何读取操作依然不涉及竞争问题,仍然是读写分离的。JDK8抛弃分段锁不但节省了不必要的空间消耗,而且用回了传统的synchronized关键字的重量级锁,毕竟现在的JDK对其优化已经比较好了。

ConcurrentHashMap中table数组在存储红黑树的根节点时,使用了一个TreeBin的类封装TreeNode,因此不再像JDK8那样直接在数组的槽位中存放红黑树的根节点,而是一个携带根节点的TreeBin实例。另外,该类为了保证与以前的版本兼容,保留了拥有加载因子loadFactor和concurrencyLevel参数的构造函数,以及用于兼容序列化的Segment类,继承AbstractMap抽象类也是对兼容性的支持,除此之外并无其它目的。在构造函数传入的加载因子仅仅只是用于针对初始化给定容量的内部数组从而满足可以在放入给定数量的元素之前不触发扩容操作。其后,内部的加载因子还是默认的0.75,因为扩容操作是一个开销很大的过程,因此若能够在创建Map实例的时候确定大概需要的空间,将减少甚至消除扩容造成的开销。

ConcurrentHashMap在JDK8中对扩容操作进行了精妙的设计实现,任何读写线程在发现需要扩容或正在扩容时都会奉献雷锋精神,加入到辅助扩容的行列中,毕竟人多力量大,从而缩短扩容过程的时间开销,而且其内部的巧妙设计会在扩容过程中尽可能少的拷贝节点,根据Java Doc的描述,当table数组长度扩张一倍,只有大约六分之一的元素需要克隆。对于ConcurrentHashMap的理解我感觉比较有难度,特别是红黑树的转换脑袋感觉都不够用了,只能做粗略的分析。

 ConcurrentHashMap对size()方法也进行了精心的设计,它采用了类似高并发统计工具LongAdder的原理,使用baseCount + CounterCell数组的形式解决高并发更新同一个变量的线程争用问题,对产生竞争的线程将计数分散到哈希数组中的不同单元格中,而不需要在调用size方法时才遍历统计。关于LongAdder的原理可用查阅高并发原子累加器Striped64及其实现类LongAdder&LongAccumulator

ConcurrentHashMap中定义了一些特殊的节点,例如链表节点(hash值 > 0 ),红黑树节点(hash值为-1),转移节点ForwardingNodes(hash值为-1)标记该数组槽位已经被迁移到扩容后的table数组中,舜态节点ReservationNodes(hash值为-3)标记是一种用于lumdba表达式计算的临时节点,记住这些定义对于源码理解事半功倍。

通过构造函数,可见不仅没有持有加载引子,也没有持有threshold扩容阈值了,多了一个sizeCtl成员变量,该变量不同的值代表不同的含义: 默认值0表示数组table还未初始化,-1表示正在初始化table数组,-(1+n)表示有n个线程正在辅助一起扩容,> 0表示初始化数组完成,并且表示下一次触发扩容的数组占用阈值,例如现在数组长度是128,则sizeCtl 就是96,刚好是0.75的加载因子。sizeCtl还保存有扩容标记,确保调整大小不会重复执行。

ConcurrentHashMap扩容支持最多MAX_RESIZERS个线程并行进行以缩短时间,并且每一个扩容线程都按一个步长(默认是MIN_TRANSFER_STRIDE,即16)从数组末尾往头部分配一段还没被扩容的多个槽位,而不是一个线程一个槽位, transferIndex就用于指示现在扩容线程已经占据从数组末尾往头部的第几个槽位了,后来加入的扩容线程只能从该位置往前分配一段未完成扩容的槽位进行。

部分源码分析

  1  ------------------------常量---------------------------
  2  
  3  //最大的表容量。
  4  private static final int MAXIMUM_CAPACITY = 1 << 30;
  5  
  6  //默认初始表容量。
  7  private static final int DEFAULT_CAPACITY = 16;
  8  
  9  //最大的数组大小(非2次幂)。被toArray和toArray(T[] a)方法需要。
 10  static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 11  
 12  //默认并发级别。未使用,仅仅用于与该类的以前版本兼容而定义。
 13  private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 14  
 15  //负载因子,貌似也没使用,在构造函数中指定此值只影响初始表容量。
 16  private static final float LOAD_FACTOR = 0.75f;
 17  
 18  //链表转换成红黑树(树化)的size阈值
 19  static final int TREEIFY_THRESHOLD = 8;
 20  
 21  //红黑色转换成链表(反树化)的size阈值,用在调整大小的时候。
 22  static final int UNTREEIFY_THRESHOLD = 6;
 23  
 24  //最小的表容量,该值至少是 4倍 TREEIFY_THRESHOLD以避免调整大小和树化阈值的冲突
 25  static final int MIN_TREEIFY_CAPACITY = 64;
 26  
 27  //扩容线程的最小跨度,即每一个线程至少分配16个槽位进行扩容
 28  private static final int MIN_TRANSFER_STRIDE = 16;
 29  
 30  //用于在sizeCtl中生成扩容标记的比特位数
 31  private static int RESIZE_STAMP_BITS = 16;
 32 
 33  //用于辅助调整大小的最大线程数
 34  private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
 35  
 36  //在sizeCtl中标记位的偏移
 37  private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
 38  
 39  /*
 40  * Encodings for Node hash fields.节点哈希字段的编码
 41  */
 42  static final int MOVED     = -1; // 转发节点的哈希值
 43  static final int TREEBIN   = -2; // 红黑树的根节点的哈希值
 44  static final int RESERVED  = -3; // 临时保留节点的哈希
 45  static final int HASH_BITS = 0x7fffffff; // 普通链表节点哈希的可用位
 46  
 47  /* ---------------- 节点 -------------- */
 48 
 49  //键值对的条目,只用于遍历时自读操作,存在hash值为负值和键值为null的特殊节点。
 50  static class Node<K,V> implements Map.Entry<K,V> {
 51         final int hash;  //哈希值
 52         final K key;     //
 53         volatile V val;    //
 54         volatile Node<K,V> next; //下一个节点指针
 55  
 56  .....
 57  }
 58  
 59  //红黑树节点
 60  static final class TreeNode<K,V> extends Node<K,V> {
 61         TreeNode<K,V> parent;  // red-black tree links
 62         TreeNode<K,V> left;
 63         TreeNode<K,V> right;
 64         TreeNode<K,V> prev;    // needed to unlink next upon deletion
 65         boolean red;
 66  
 67   .....
 68  }
 69  
 70  //存放在tale数组槽位种的红黑树根节点包装类
 71  static final class TreeBin<K,V> extends Node<K,V> {
 72         TreeNode<K,V> root;
 73         volatile TreeNode<K,V> first;
 74         volatile Thread waiter;
 75         volatile int lockState;
 76         // values for lockState
 77         static final int WRITER = 1; // set while holding write lock
 78         static final int WAITER = 2; // set when waiting for write lock
 79         static final int READER = 4; // increment value for setting read lock
 80  
 81    .....
 82  }
 83  
 84  //标记已经被扩容转移的槽位,持有新数组的引用,可通过它的find方法让get操作可用从该节点过渡到新数组中去搜索
 85  static final class ForwardingNode<K,V> extends Node<K,V> {
 86         final Node<K,V>[] nextTable;
 87         ForwardingNode(Node<K,V>[] tab) {
 88             super(MOVED, null, null, null);
 89             this.nextTable = tab;
 90         }
 91         
 92       .....
 93  }    
 94  /* ---------------- 静态工具 -------------- */
 95  
 96  //通过哈希值h计算对应的数组索引
 97  static final int spread(int h) {
 98     return (h ^ (h >>> 16)) & HASH_BITS;
 99  }
100  //若x实现了Comparable接口则返回其类对象,否则返回null
101  static Class<?> comparableClassFor(Object x) 
102  
103  //若x与kc类型匹配,返回k.compareTo(x) ,否则返回0
104  static int compareComparables(Class<?> kc, Object k, Object x) {
105  
106  /* ---------------- table元素访问 -------------- */
107 
108  //volatile读取指定数组索引位置的元素
109  static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
110     return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
111  }
112  
113  //CAS更新指定数组索引位置的元素
114  static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
115                                     Node<K,V> c, Node<K,V> v) {
116     return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
117  }
118  
119  //volatile设置指定数组索引位置的元素
120  static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
121     U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
122  }
123  
124  
125  /* ---------------- 字段 -------------- */
126  
127  //table数组,在第一次插入元素时才初始化,大小总是2的幂。由迭代器直接访问。
128  transient volatile Node<K,V>[] table;
129  
130  //调整大小时才不为空的临时新数组。
131  private transient volatile Node<K,V>[] nextTable;
132  
133  //基础计数器,主要用于没有竞争时,也可用用作初始化表产生竞争的回退,通过CAS更新
134  private transient volatile long baseCount;
135  
136  //用于表初始化和调整大小的控制,当为负值表正在初始化或者调整大小:-1 表示正在初始化,-(1 + 辅助调整大小的线程数)表示正在调整大小。
137  //当table等于null时该值等于用于初始创建表时的初始大小,默认情况为0.初始化完成之后,保留下一个元素的count值,用于调整大小。
138  private transient volatile int sizeCtl;
139  
140  //指示扩容时已经处理到的槽位位置(从数组末尾往前处理,所以最开始transferIndex等于数组的长度)
141  private transient volatile int transferIndex;
142  
143  //自旋锁,用于计数元素个数对CounterCells槽位的锁定或扩容
144  private transient volatile int cellsBusy;
145  
146  //用于计数元素个数的单元格计数器,非空时大小为2的幂
147  private transient volatile CounterCell[] counterCells;
View Code

首先是一些常量,Node、TreeNode、TreeBin、ForwardingNode类定义,一些静态工具和字段的定义,这部分只是为实现真正的逻辑做准备,看看即可。

部分构造方法

 1 //创建一个新的初始容量为16的空表
 2  public ConcurrentHashMap() {
 3  }
 4  
 5  //创建一个新的初始大小可容纳指定数量元素而不需要动态调整大小的空表。
 6  public ConcurrentHashMap(int initialCapacity) {
 7     this.sizeCtl = ....
 8  }
 9  
10  //创建给定初始容量和负载因子的空表
11  public ConcurrentHashMap(int initialCapacity,float loadFactor) {
12     this.sizeCtl = ....
13  }
View Code

ConcurrentHashMap提供了5种构造方法,除了无参构造方法什么也没做,其它方法都涉及到初始化数组容量和根据加载因子确定下一次的扩容阈值sizeCtl,loadFactor和concurrencyLevel仅仅用于初始化table数组的长度以及对老版本的兼容,ConcurrentHashMap内部并不会持有它们,其内部加载因子还是默认值0.75.

添加元素---put

 1 //外部接口方法 
 2 public V put(K key, V value) {
 3     return putVal(key, value, false);
 4 }
 5 //内部实现方法
 6 final V putVal(K key, V value, boolean onlyIfAbsent) {
 7 // key和value都不能为null
 8 if (key == null || value == null) throw new NullPointerException();
 9 // 计算hash值
10 int hash = spread(key.hashCode());
11 // 要插入的元素所在桶的元素个数
12 int binCount = 0;
13 
14 for (Node<K,V>[] tab = table;;) {// 自旋
15     Node<K,V> f; int n, i, fh;
16     if (tab == null || (n = tab.length) == 0)
17         // 如果table数组未初始化或者长度为0,则初始化
18         tab = initTable();
19     else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
20         // 如果要插入的元素所在的槽位还没有被占据,则把这个元素直接放入到这个槽位中
21         if (casTabAt(tab, i, null,
22                 new Node<K,V>(hash, key, value, null)))
23             // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作
24             // 如果使用CAS插入元素成功,则break跳出循环,流程结束
25             break;                   // no lock when adding to empty bin
26     }
27     else if ((fh = f.hash) == MOVED)
28         // 如果要插入的元素所在的槽位的第一个元素的hash是MOVED,表示当前正在进行扩容迁移元素,则当前线程帮忙一起迁移元素
29         tab = helpTransfer(tab, f);
30     else {
31         // 如果这个槽位不为空且没有进行扩容,则锁住这个槽位(分段锁概念)
32         // 并查找要插入的元素是否在这个槽位中
33         // 存在,则替换值(onlyIfAbsent=false)
34         // 不存在,则插入到链表结尾或红黑树中
35         V oldVal = null;
36         synchronized (f) {
37             // 再次确认第一个元素是否有变化,如果有变化则进入下一次循环,重试
38             if (tabAt(tab, i) == f) {
39                 
40                 if (fh >= 0) { //表示链表结构
41                     binCount = 1;
42                     // 遍历链表
43                     for (Node<K,V> e = f;; ++binCount) {
44                         K ek;
45                         if (e.hash == hash &&
46                                 ((ek = e.key) == key ||
47                                         (ek != null && key.equals(ek)))) {
48                             // 如果找到了这个元素,则更新value(onlyIfAbsent=false)
49                             oldVal = e.val;
50                             if (!onlyIfAbsent)
51                                 e.val = value;
52                             break; //退出
53                         }
54                         Node<K,V> pred = e;
55                         if ((e = e.next) == null) {
56                             // 如果到链表尾部还没有找到该Key对应的元素
57                             // 就把它插入到链表结尾并退出循环
58                             pred.next = new Node<K,V>(hash, key,
59                                     value, null);
60                             break;
61                         }
62                     }
63                 }
64                 else if (f instanceof TreeBin) {//表示是红黑树结构
65                     Node<K,V> p;
66                     binCount = 2;
67                     // 调用红黑树的插入方法插入元素
68                     // 如果成功插入则返回null
69                     // 否则返回寻找到的节点
70                     if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
71                             value)) != null) {
72                         // 如果找到了这个元素,则更新value(onlyIfAbsent=false)
73                         oldVal = p.val;
74                         if (!onlyIfAbsent)
75                             p.val = value;
76                     }
77                 }
78             }
79         }
80         // 如果binCount不为0,说明成功插入了元素或者寻找到了元素
81         if (binCount != 0) {
82             // 如果链表元素个数达到了8,则尝试树化
83             // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数
84             // 所以不会重复树化
85             if (binCount >= TREEIFY_THRESHOLD)
86                 treeifyBin(tab, i);
87             // 如果要插入的元素已经存在,则返回旧值
88             if (oldVal != null)
89                 return oldVal;
90             // 退出外层大循环,流程结束
91             break;
92         }
93     }
94     }
95     // 成功插入元素,元素个数加1(是否要扩容在这个里面)
96     addCount(1L, binCount);
97     // 成功插入元素返回null
98     return null;
99 } 
View Code

单纯就这一段逻辑如果不考虑红黑树的插入方法putTreeVal,以及树化的方法treeifyBin的话,则该段逻辑其实很简单,大致分四个部分:1,计算出数组槽位索引,确定该位置的情况,2,该位置为空直接占据即可,3,该位置是链表结构,使用尾插法链接到链表结尾,若该位置是红黑树结构,则调用红黑树的插入方法插入该元素,4,若是链表结构并且链表长度达到了阈值8,则将链表树化,即转换成红黑树结构。

考虑到红黑树的插入逻辑比较复杂,我们这里暂时不去探究其过程,仅做简单的逻辑梳理。但要明白其中大致的关键,若是链表采用的尾插法,若是红黑树,插入节点之后红黑树为了维持平衡将采用旋转和重新作色的方式调整红黑树的结构(根节点可能会变化),但无论怎么调整,最终该hash值对应的数组槽位中的TreeBin实例都将持有该红黑树的根节点。

从上面的逻辑我们还可用发现,ConcurrentHashMap不允许Key和Value为null。并且在实现的过程中,采用了synchronized锁定了该hash值对应的table数组的单个槽位,这同样也可以看作是一种分段锁,只是比起JDK8以前的分段锁一次锁定多个数组槽位,JDK8这种实现分段锁更精细化,仅仅只需要锁定一个真正操作的槽位。

获取元素---get

 1 //外部接口方法 
 2 public V get(Object key) {
 3     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 4     // 计算hash
 5     int h = spread(key.hashCode());
 6     // 如果元素所在的槽位存在且里面有元素
 7     if ((tab = table) != null && (n = tab.length) > 0 &&
 8             (e = tabAt(tab, (n - 1) & h)) != null) {
 9         // 如果第一个元素就是要找的元素,直接返回
10         if ((eh = e.hash) == h) {
11             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
12                 return e.val;
13         }
14         else if (eh < 0)
15             // hash小于0,说明是红黑树树或者正在扩容迁移
16             // 调用红黑树的查找逻辑
17             return (p = e.find(h, key)) != null ? p.val : null;
18 
19         //否则是链表结构,则遍历整个链表寻找元素
20         while ((e = e.next) != null) {
21             if (e.hash == h &&
22                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
23                 return e.val; //找到返回其value
24         }
25     }
26     return null;
27 }
View Code

同样,若不考虑红黑树的查找逻辑,该段代码的逻辑非常简单,就是根据hash值计算数组的槽位索引,根据该索引对应的槽位的数据结构,按不同的查找实现,是链表则遍历链表查找,是红黑树则调用红黑树的查找逻辑。唯一需要注意的是,它并没有区分现在是否处于扩容迁移的状态,这是因为ConcurrentHashMap的扩容在迁移元素的过程中依然会确保各个数据结构的查找逻辑能够通过ForwardingNodes转到新数组中去继续进行查找。

删除元素---remove

  1 //外部接口方法 
  2 public V remove(Object key) {
  3     // 调用替换节点方法
  4     return replaceNode(key, null, null);
  5 }
  6 
  7 //实现方法
  8 final V replaceNode(Object key, V value, Object cv) {
  9     // 计算hash
 10     int hash = spread(key.hashCode());
 11     // 自旋
 12     for (Node<K,V>[] tab = table;;) {
 13         Node<K,V> f; int n, i, fh;
 14         if (tab == null || (n = tab.length) == 0 ||
 15                 (f = tabAt(tab, i = (n - 1) & hash)) == null)
 16             // 如果目标key所在的槽位不存在,直接跳出循环返回null
 17             break;
 18         else if ((fh = f.hash) == MOVED)
 19             // 如果正在扩容中,协助扩容
 20             tab = helpTransfer(tab, f);
 21         else {
 22             V oldVal = null;
 23             // 标记是否处理过
 24             boolean validated = false;
 25             synchronized (f) {
 26                 // 再次验证当前锁定的节点是槽位中第一个元素
 27                 if (tabAt(tab, i) == f) {
 28                     if (fh >= 0) {
 29                         // 是链表结构
 30                         validated = true;
 31                         // 遍历链表寻找目标节点
 32                         for (Node<K,V> e = f, pred = null;;) {
 33                             K ek;
 34                             if (e.hash == hash &&
 35                                     ((ek = e.key) == key ||
 36                                             (ek != null && key.equals(ek)))) {
 37                                 // 找到了目标节点
 38                                 V ev = e.val;
 39                                 // 检查目标节点旧value是否等于cv
 40                                 if (cv == null || cv == ev ||
 41                                         (ev != null && cv.equals(ev))) {
 42                                     oldVal = ev;
 43                                     if (value != null)
 44                                         // 如果value不为空则替换旧值
 45                                         e.val = value;
 46                                     else if (pred != null)
 47                                         // 如果前置节点不为空
 48                                         // 修改后继指针以删除当前节点
 49                                         pred.next = e.next;
 50                                     else
 51                                         // 如果前置节点为空
 52                                         // 说明是槽位中第一个元素,直接将后继节点提升上来占据该槽位
 53                                         setTabAt(tab, i, e.next);
 54                                 }
 55                                 break;
 56                             }
 57                             pred = e;
 58                             // 遍历到链表尾部还没找到元素,跳出循环
 59                             if ((e = e.next) == null)
 60                                 break;
 61                         }
 62                     }
 63                     else if (f instanceof TreeBin) {
 64                         // 是红黑树结构
 65                         validated = true;
 66                         TreeBin<K,V> t = (TreeBin<K,V>)f;
 67                         TreeNode<K,V> r, p;
 68                         // 遍历树找到了目标节点
 69                         if ((r = t.root) != null &&
 70                                 (p = r.findTreeNode(hash, key, null)) != null) {
 71                             V pv = p.val;
 72                             // 检查目标节点旧value是否等于cv
 73                             if (cv == null || cv == pv ||
 74                                     (pv != null && cv.equals(pv))) {
 75                                 oldVal = pv;
 76                                 if (value != null)
 77                                     // 如果value不为空则替换旧值
 78                                     p.val = value;
 79                                 else if (t.removeTreeNode(p))
 80                                     // 如果value为空则删除元素
 81                                     // 如果删除后树的元素个数较少则退化成链表
 82                                     // t.removeTreeNode(p)这个方法返回true表示删除节点后树的元素个数较少
 83                                     setTabAt(tab, i, untreeify(t.first));
 84                             }
 85                         }
 86                     }
 87                 }
 88             }
 89             // 如果处理过,不管有没有找到元素都返回
 90             if (validated) {
 91                 // 如果找到了元素,返回其旧值
 92                 if (oldVal != null) {
 93                     // 如果要替换的值为空,元素个数减1
 94                     if (value == null)
 95                         addCount(-1L, -1);
 96                     return oldVal;
 97                 }
 98                 break;
 99             }
100         }
101     }
102     // 没找到元素返回空
103     return null;
104 }
View Code

同样,若不考虑红黑树的查找、删除、反树化逻辑,该段代码的逻辑非常简单,根据索引找到数组槽位的节点,若是链表就通过修改next指针删除节点,如果删除的是占据槽位的链表头节点则将其后继节点提升上来占据数组该槽位;若是红黑树就使用红黑树的方法删除节点,删除节点之后若树的节点足够少则需要反树化,即重新转换成链表。

从实现可用看出,删除元素的实现是先将节点的value置为null,然后在进行节点移除。其实现方法同样的替换节点值的实现。

扩容

首先,是否需要扩容一般是在累计元素个数的时候进行确定的,即addCount方法,我们这里就不具体分析该方法了,总之当table数组的使用率达到sizeCtl指示的扩容阈值时就会触发扩容,扩容时sizeCtl高16位存储扩容邮戳(resizeStamp),低位存储扩容线程数加1(1+nThreads),其它线程在累计元素之后通过检查sizeCtl的扩容标记发现正在进行扩容的话也会加入的扩容行列中来,当然扩容的线程个数也是有控制的。

在上面添加元素的实现中,若hash指示的数组槽位中的节点标志正在进行扩容,也会调用辅助扩容方法帮助一起扩容,即helpTransfer方法,在该线程加入到扩容行列中之前,会通过标记的ForwardingNode节点拿到新数组的引用,然后对sizeCtl中记录的扩容线程数加+,最后再调用真正的扩容迁移元素的实现方法transfer(Node<K,V>[] tab, Node<K,V>[] nextTab):

  1  
  2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  3     int n = tab.length, stride;
  4     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  5         stride = MIN_TRANSFER_STRIDE; // subdivide range
  6     if (nextTab == null) {            // initiating
  7         // 如果nextTab为空,说明是第一个开始扩容迁移元素的线程
  8         // 就创建一个新数组
  9         try {
 10             // 新数组是原来的两倍
 11             @SuppressWarnings("unchecked")
 12             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
 13             nextTab = nt;
 14         } catch (Throwable ex) {      
 15             // 数组扩容的时候有可能出现OOME,这时需要将sizeCtl设置为Integer.MAX_VALUE,用以表示这个异常
 16             sizeCtl = Integer.MAX_VALUE;
 17             return;
 18         }
 19         nextTable = nextTab;
 20         //因为扩容时候的元素迁移是从数组最末端的元素开始的,所以迁移的时候下标是递减的,从下面的`--i`就能看出来了
 21         transferIndex = n;
 22     }
 23     // 新数组大小
 24     int nextn = nextTab.length;
 25     // 新建一个ForwardingNode类型的节点,并把新数组的引用存储在里面,用于标记该被迁移的槽位
 26     ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 27     boolean advance = true; //指示该线程负责的槽位段中每一个槽位是否完成迁移,初始为true仅仅是为了能进入while
 28     boolean finishing = false; // 表示整个table里面的所有元素是否迁移完毕
 29     for (int i = 0, bound = 0;;) {//自旋
 30         Node<K,V> f; int fh;
 31         //确定当前线程负责的槽位段,并更新transferIndex指示已经迁移到了数组的哪个位置
 32         while (advance) {
 33             int nextIndex, nextBound;
 34             // 倒序迁移旧table数组元素的下标已达到槽的边界,或者整个table已经迁移完毕,说明迁移完成了
 35             if (--i >= bound || finishing)
 36                 advance = false;
 37             //扩容的脚本已经从最末端走到起始位置了,说明迁移完成了
 38             else if ((nextIndex = transferIndex) <= 0) {
 39                 i = -1;
 40                 advance = false;
 41             }
 42             //根据步长设置本次线程迁移的槽位段的边界transferIndex
 43             else if (U.compareAndSwapInt
 44                     (this, TRANSFERINDEX, nextIndex,
 45                             nextBound = (nextIndex > stride ?
 46                                     nextIndex - stride : 0))) {
 47                 bound = nextBound;
 48                 i = nextIndex - 1;
 49                 advance = false;
 50             }
 51         }
 52         
 53         //下面开始一个槽位一个槽位的迁移
 54         if (i < 0 || i >= n || i + n >= nextn) {
 55             int sc;
 56             if (finishing) { // 整个map所有槽位的元素都迁移完成了,由最后一个完成的线程执行
 57                
 58                 // 更新table数组指向新数组,设置下一次扩容的阈值
 59                 nextTable = null;
 60                 table = nextTab;
 61                 sizeCtl = (n << 1) - (n >>> 1);
 62                 return;
 63             }
 64             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 65                 // 当前线程负责的槽位段全部完成,把扩容线程数-1
 66                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 67                     return;
 68 
 69                 // finishing为true才会走到上面的if条件
 70                 finishing = advance = true;
 71                 // i重新赋值为n
 72                 // 这样会再重新遍历一次table数组,看看是不是都迁移完成了
 73                 // 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件
 74                 i = n; // recheck before commit
 75             }
 76         }
 77         else if ((f = tabAt(tab, i)) == null)
 78             // 如果对应的槽位为空,直接放入ForwardingNode标记该槽位已迁移
 79             advance = casTabAt(tab, i, null, fwd);
 80         else if ((fh = f.hash) == MOVED)
 81             // 说明它是ForwardingNode节点,也就是该槽位已迁移
 82             advance = true; // already processed
 83         else {
 84             // 锁定该槽位并迁移元素
 85             synchronized (f) {
 86                 // 再次判断当前槽位第一个元素是否有修改
 87                 // 也就是可能其它线程先一步迁移了元素
 88                 if (tabAt(tab, i) == f) {
 89                     // 把一个链表分化成两个链表
 90                     // 规则是槽位中各元素的hash与数组长度n进行与操作
 91                     // 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中
 92                     // 其中低位链表迁移到新数组中的位置相对旧数组不变
 93                     // 高位链表迁移到新数组中的位置正好是其在旧数组的位置加旧数组的长度n
 94                     // 这也正是为什么扩容时容量要变成两倍的原因
 95                     Node<K,V> ln, hn;
 96                     if (fh >= 0) {
 97                         // 第一个元素的hash值大于等于0
 98                         // 说明该槽位中元素是以链表形式存储的
 99                         // 这里与HashMap迁移算法基本类似
100                         // 唯一不同的是多了一步寻找lastRun
101                         // 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表
102                         // 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0
103                         // 则最后后面三个0对应的元素肯定还是在同一个槽位中
104                         // 这时lastRun对应的就是倒数第三个节点
105                         // 至于为啥要这样处理,我也没太搞明白
106                         int runBit = fh & n;
107                         Node<K,V> lastRun = f;
108                         for (Node<K,V> p = f.next; p != null; p = p.next) {
109                             int b = p.hash & n;
110                             if (b != runBit) {
111                                 runBit = b;
112                                 lastRun = p;
113                             }
114                         }
115                         // 看看最后这几个元素归属于低位链表还是高位链表
116                         if (runBit == 0) {
117                             ln = lastRun;
118                             hn = null;
119                         }
120                         else {
121                             hn = lastRun;
122                             ln = null;
123                         }
124                         // 遍历链表,把hash&n为0的放在低位链表中
125                         // 不为0的放在高位链表中
126                         for (Node<K,V> p = f; p != lastRun; p = p.next) {
127                             int ph = p.hash; K pk = p.key; V pv = p.val;
128                             if ((ph & n) == 0)
129                                 ln = new Node<K,V>(ph, pk, pv, ln);
130                             else
131                                 hn = new Node<K,V>(ph, pk, pv, hn);
132                         }
133                         // 低位链表的位置不变
134                         setTabAt(nextTab, i, ln);
135                         // 高位链表的位置是原位置加n
136                         setTabAt(nextTab, i + n, hn);
137                         // 标记当前槽位已迁移
138                         setTabAt(tab, i, fwd);
139                         // advance为true,返回上面进行--i操作
140                         advance = true;
141                     }
142                     else if (f instanceof TreeBin) {
143                         // 如果第一个元素是树节点
144                         // 也是一样,分化成两颗树
145                         // 也是根据hash&n为0放在低位树中
146                         // 不为0放在高位树中
147                         TreeBin<K,V> t = (TreeBin<K,V>)f;
148                         TreeNode<K,V> lo = null, loTail = null;
149                         TreeNode<K,V> hi = null, hiTail = null;
150                         int lc = 0, hc = 0;
151                         // 遍历整颗树,根据hash&n是否为0分化成两颗树
152                         for (Node<K,V> e = t.first; e != null; e = e.next) {
153                             int h = e.hash;
154                             TreeNode<K,V> p = new TreeNode<K,V>
155                                     (h, e.key, e.val, null, null);
156                             if ((h & n) == 0) {
157                                 if ((p.prev = loTail) == null)
158                                     lo = p;
159                                 else
160                                     loTail.next = p;
161                                 loTail = p;
162                                 ++lc;
163                             }
164                             else {
165                                 if ((p.prev = hiTail) == null)
166                                     hi = p;
167                                 else
168                                     hiTail.next = p;
169                                 hiTail = p;
170                                 ++hc;
171                             }
172                         }
173                         // 如果分化的树中元素个数小于等于6,则退化成链表
174                         ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
175                                 (hc != 0) ? new TreeBin<K,V>(lo) : t;
176                         hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
177                                 (lc != 0) ? new TreeBin<K,V>(hi) : t;
178                         // 低位树的位置不变
179                         setTabAt(nextTab, i, ln);
180                         // 高位树的位置是原位置加n
181                         setTabAt(nextTab, i + n, hn);
182                         // 标记该槽位已迁移
183                         setTabAt(tab, i, fwd);
184                         // advance为true,返回上面进行--i操作
185                         advance = true;
186                     }
187                 }
188             }
189         }
190     }
191 }
View Code

说实话,扩容的逻辑应该是ConcurrentHashMap中除了红黑树相关的逻辑之外最难懂的了,我也只是理解了一部分,大致逻辑就是每一个参与扩容的线程都会分得一段槽位完成迁移,分派槽位区间是从table数组得尾部往头部进行的,完成迁移得槽位会放置一个ForwardingNode节点标记该槽位已经被迁移过了,链表节点与红黑树节点都各自实现了不同的迁移逻辑,但都会将原链表/红黑树拆分成两个链表/红黑树,然后分别把这两部分放置于新数组的原位置和原位置+n的位置,n为旧数组的长度,这里为什么要将原来的链表或红黑树拆成两个我就不是很明白了,大概是为了充分利用扩容出来的空间,并且将长链表和红黑树拆分的小一点可用加快查询搜索速度吧。

完成整个数组的所有槽位的迁移之后,再将新数组的引用指向table数组,整个扩容扩充即结束。

获取元素个数---size()/mappingCount

 1 public int size() {
 2     // 调用sumCount()计算元素个数
 3     long n = sumCount();
 4     return ((n < 0L) ? 0 :
 5             (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
 6                     (int)n);
 7 }
 8 
 9 final long sumCount() {
10     // 计算CounterCell所有段及baseCount的数量之和
11     CounterCell[] as = counterCells; CounterCell a;
12     long sum = baseCount;
13     if (as != null) {
14         for (int i = 0; i < as.length; ++i) {
15             if ((a = as[i]) != null)
16                 sum += a.value;
17         }
18     }
19     return sum;
20 }
View Code

size的实现也很讲究啊,使用了LongAdder的实现原理,由一个基础计数器 和一个散列数组构成,当计数器在基础计数器或散列数组的同一个索引槽位产生竞争时都将会把数据记录到其它CounterCell数组的不同槽位中,以消除竞争。但是其作者推荐使用mappingCount(),抛弃使用size方法,因为size方法返回值是一个int类型的,在元素个数超出int的最大范围时将无法正常工作,而mappingCount方法返回的是long类型的,而且它们的实现都是一样的。

其它一些部分方法

putIfAbsent(Key, Value) 只在不存在相应的键使才插入该键值对。

remove(Key, Value) 仅在存在键Key,并且值等于给定的value时才删除该键值对。

replace(Key, oldValue, newValue) 仅在存在键Key,并且值等于给定的value时才替换其value为新值。

replace(Key, Value) 只在不存在相应的键才替换该键值为新的值。

computeIfAbsent(key, Function)仅在不存在相应的键Key时才通过给定的函数用该指定的key计算出一个值,若非null则将其与key作为键值对插入Map

computeIfPresent(K key, BiFunction)若存在相应的键Key时通过给定的函数用该指定的key及其value计算出一个值,若非null则将其与key作为键值对插入Map,若为nul则删除原键值对。

static <K> KeySetView<K,Boolean> newKeySet()  返回一个用ConcurrentHashMap实现的值为布尔型的KetSet视图。

forEach(parallelismThreshold,BiFunction<? super K, ? super V, ? extends U> transformer,Consumer<? super U> action) 对每个(键、值)按指定的transformer转换的非空结果执行给定的操作action。

search(parallelismThreshold,BiFunction)通过对每个(键、值)或空(如果没有)应用给定的搜索函数返回非空结果。

reduce(long parallelismThreshold,BiFunction<? super K, ? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? extends U> reducer)返回使用给定的转换器transformer转换每一个键值对并使用给定的reducer进行累计的结果。

reduceToXXX(long parallelismThreshold, ToLongBiFunction<? super K, ? super V> transformer,X basis,LongBinaryOperator reducer)返回使用给定的转换器transformer转换每一个键值对并使用给定的reducer进行累计的到指定的基准值basis上的结果。

forEach/reduce/searchKey(...)则是上面forEach, search, reduce方法仅仅作用于Key时的重载。

forEach/reduce/searchValue(...)则是上面forEach, search, reduce方法仅仅作用于Value时的重载。

forEach/reduce/searchEntries(...)则是上面forEach, search, reduce方法作用于Entry条目时的重载。

迭代器/可拆分迭代器

ConcurrentHashMap作为Map无法直接迭代,只能对各自视图进行迭代,例如KeySet,ValueSet,entrySet()等,它们相应的内部类迭代器KeyIterator,ValueIterator,EntryIterator都继承了内部类BaseIterator,BaseIterator又继承自内部类Traverser,它们的迭代器都是弱一致性的,不会抛出ConcurrentModificationException。但是,迭代器被设计成一次只能被一个线程使用。迭代器反映了ConcurrentHashMap在迭代器创建时或创建后的状态的元素,即扩容之后的元素状态也会反映到迭代器。

同样的,对于可拆分迭代器spliterator,也仅仅只能对各自视图进行迭代,例如KeySet,ValueSet,entrySet()等,它们相应的内部类可拆分迭代器KeySpliterator,EntrySpliterator,ValueSpliterator都继承了内部类Traverser,它们的拆分按每一次拆分一半的方式进行。

总结

ConcurrentHashMap是线程安全的HashMap实现,它的内部数据结构与对应JDK的HashMap的数据一致,在JDK8以前HashMap是采用数组+链表的方式实现,而ConcurrentHashMap在其基础上采用分段锁实现了线程安全,而JDK8的HashMap采用了数组+短链表+红黑树的数据结构实现,这改善了之前版本的长链表的查询低效问题,而对应的ConcurrentHashMap在此基础上也摒弃了粗粒度的分段锁实现,采用了每一个数组槽位一个锁这种更细粒度的分段锁,并且抛弃了ReentrantLock改用synchronized + CAS锁,这也改善了上一个版本ReentrantLock的空间浪费。

ConcurrentHashMap的内部实现非常精妙,但红黑树的部门确实有点难度,我并没有对红黑树的转换过程做深入的探究,本文只对ConcurrentHashMap作了粗略的了解,若想了解红黑树转换过程,可参考【死磕Java并发】—–J.U.C之ConcurrentHashMap红黑树转换分析 一文。除了初始化的时候,会按照指定的加载因子创建可容纳指定数量的元素而不触发数组扩容之外,ConcurrentHashMap内部维护了一个0.75的加载因子,也就是每当内部的数组占用率达到75%的时候就会将原来的数组扩容至原来的2倍大小,并将原来的所有元素拷贝到新数组中,拷贝的时候为了充分里面多出来的空间,和提高查询搜索速度,会将一些长链表或红黑树拆分成两个体积更小的链表或红黑树分别存放与新数组的原位置和原位置+原数组长度的位置,由于扩容操作是一个非常耗时的过程,ConcurrentHashMap对这一块做了精妙的设计使扩容可以由多个参与线程一起辅助完成,从而减小时间消耗,但扩容本身还是是开销比较大操作,能够在使用ConcurrentHashMap之前就确定其大概需要的容量将有效减少扩容的消耗。

原文地址:https://www.cnblogs.com/txmfz/p/11168884.html