Java集合框架整理4--基于写时复制机制实现的CopyOnWriteArrayList和CopyOnWriteArraySet实现原理

前言:

上篇文章通过源码解析了ArrayList和LinkedList的实现逻辑,但是在多线程的情况下,这两个List实现类都是无法保证线程安全的,所以在JUC中就提供了一种线程安全的List,也就是本文将要解析的CopyOnWriteArrayList

CopyOnWriteArrayList从名字上可以看出底层的数据结构应该是和ArrayList一样采用数组来实现的,但是既然是线程安全的,所以应该是使用了同步操作,而JUC中同步的方式无非就是使用ReentrantLock来实现。另外JUC中有很多线程安全的集合类名字的前缀

都是ConConcurrent* 表示是可以并发操作,而CopyOnWriteArrayList却是以CopyOnWrite来作为前缀的,这里就涉及到了一个概念--CopyOnWrite机制(写时复制机制)

一、CopyOnWrite机制

CopyOnWrite从单词上直译意思是 通过复制来写,专业的叫法就是写时复制。

当我们使用集合容器来存储数据时,如果要保证线程安全,可以在操作的时候加锁处理,保证同一时间只有一个人操作即可。还有一种办法是每次操作的时候,都先将容器复制一份副本,然后各个线程直接在各自的容器副本中操作数据,操作完成之后再将当前的副本

直接替换掉原容器即可,这样只需要保证在替换的时候是线程安全的,就可以保证容器的所有操作都是线程安全的了。这样的好处是只有写操作才会进行复制,而不会影响到读操作,所以读操作还是会直接读原容器,而写是容器副本,从而间接的达到了读写分离的效果。当然既然是读写分离的,就需要注意脏读读问题,需要保证读的是最新的数据,否则会出现读的数据还是旧的,但是实际上已经被更新过了。

1.1、CopyOnWrite的优点

读写分离,只对写做同步处理,不对读作同步处理,既保证了写的线程安全又保证了读的高性能。

如果没有CopyOnWrite机制,如果想要实现线程安全的ArrayList,无非有几种方式

Synchronzied:如果使用Synchronzied,相当于ArrayList同一时间只可以有一个线程可以访问,很显然效率较低;

ReentrantLock:和Synchronzied基本上一致同一时间只有一个线程访问

ReentrantReadWriteLock:读写锁一定程度上实现了读写分开加锁的操作,但是如果有一个线程占有了写锁,此时有大量的读请求时还是会同样被阻塞中,所以读写锁还是会有写锁阻塞读锁的情况

而CopyOnWrite思想是复制副本来进行修改,而读线程读的还是原数据,就不会出现写线程将读线程给阻塞的情况了。

二、CopyOnWriteArrayList 

2.1、CopyOnWriteArrayList的初始化

CopyOnWriteArrayList一共有三个构造方法,一个是无参,一个参数为对象数组,一个参数为Collection集合,源码分别如下:

 1 /**volatile修饰的对象数组,内部存储数据的结构*/
 2     private transient volatile Object[] array;
 3 
 4     /**无参构造函数*/
 5     public CopyOnWriteArrayList() {
 6         /**新建Object数组,调用setArray方法初始化*/
 7         setArray(new Object[0]);
 8     }
 9 
10     /**参数为对象数组*/
11     public CopyOnWriteArrayList(E[] toCopyIn) {
12         /**直接将参数通过Arrays.copyOf方法复制一个数组*/
13         setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class));
14     }
15 
16     /**参数为Collection集合*/
17     public CopyOnWriteArrayList(Collection<? extends E> c) {
18         /**将集合转化成数组,在执行和上一个构造器方法相同的实现逻辑*/
19         Object[] elements;
20         if (c.getClass() == CopyOnWriteArrayList.class)
21             elements = ((CopyOnWriteArrayList<?>)c).getArray();
22         else {
23             elements = c.toArray();
24             // c.toArray might (incorrectly) not return Object[] (see 6260652)
25             if (elements.getClass() != Object[].class)
26                 elements = Arrays.copyOf(elements, elements.length, Object[].class);
27         }
28         setArray(elements);
29     }
30 
31     /** 将参数中的数组赋值给内部对象数组属性array,该方法为final类型,不允许被重写 */
32     final void setArray(Object[] a) {
33         array = a;
34     }

如上代码示,CopyOnWriteArrayList内部存储数据的也是一个Object数组,只不过是volatile修饰的保证了多线程修改时内存可见性,而三个构造方法实现逻辑都是创建一个Object数组,然后直接赋值给CopyOnWriteArrayList内部的Object数组

2.2、CopyOnWriteArrayList插入数据

在List尾部插入元素和在指定位置插入元素源码分别如下:

 1 /** final类型重入锁 */
 2     final transient ReentrantLock lock = new ReentrantLock();
 3 
 4     /** 返回存储数据的数组 */
 5     final Object[] getArray() {
 6         return array;
 7     }
 8 
 9     /** 尾部插入数据 */
10     public boolean add(E e) {
11         final ReentrantLock lock = this.lock;
12         lock.lock();
13         try {
14             Object[] elements = getArray();
15             int len = elements.length;
16             /** 复制一个新数组,容量+1 */
17             Object[] newElements = Arrays.copyOf(elements, len + 1);
18             newElements[len] = e;
19             /** 将新数组赋值给原数组,相当于替换 */
20             setArray(newElements);
21             return true;
22         } finally {
23             lock.unlock();
24         }
25     }
26 
27     /** 指定位置插入数据 */
28     public void add(int index, E element) {
29         final ReentrantLock lock = this.lock;
30         lock.lock();
31         try {
32             Object[] elements = getArray();
33             int len = elements.length;
34             if (index > len || index < 0)
35                 throw new IndexOutOfBoundsException("Index: "+index+
36                         ", Size: "+len);
37             Object[] newElements;
38             //需要移动的元素个数
39             int numMoved = len - index;
40             if (numMoved == 0)
41                 /**如果不需要移动表示尾部插入,逻辑相当于add(E e)的实现逻辑*/
42                 newElements = Arrays.copyOf(elements, len + 1);
43             else {
44                 /**如果需要移动,先创建新数组,将index前后两部分的数组分别通过复制来赋值给新数组*/
45                 newElements = new Object[len + 1];
46                 System.arraycopy(elements, 0, newElements, 0, index);
47                 System.arraycopy(elements, index, newElements, index + 1,
48                         numMoved);
49             }
50             /** index位置插入数据并替换原数组 */
51             newElements[index] = element;
52             setArray(newElements);
53         } finally {
54             lock.unlock();
55         }
56     }

从源码可以看出,保证add方法线程安全的方式是通过可重入锁ReentrantLock来实现的,执行插入之前进行加锁,插入完成之后解锁,所以插入的过程肯定是线程安全的。另外和ArrayList实现逻辑不同的是,ArrayList每次插入之前都会判断是否需要扩容,如果需要扩容的话会扩容1.5倍,而CopyOnWriteArrayList每次只扩容1位,扩容之后插入数据直接将复制的副本替换掉原数组,而不是直接在原数组上进行修改的。

正如CopyOnWrite机制所说的那样,写的时候先通过复制原数组,然后写入数据,最后再直接替换掉原数组。

2.3、CopyOnWriteArrayList修改数据

 1 /** 设置指定位置数据 */
 2     public E set(int index, E element) {
 3         final ReentrantLock lock = this.lock;
 4         lock.lock();
 5         try {
 6             Object[] elements = getArray();
 7             E oldValue = get(elements, index);
 8 
 9             /** 当旧数据不等于新数据时,直接替换 */
10             if (oldValue != element) {
11                 int len = elements.length;
12                 Object[] newElements = Arrays.copyOf(elements, len);
13                 newElements[index] = element;
14                 setArray(newElements);
15             } else {
16                 /** 当旧数据和新数据相同时,也进行替换*/
17                 // Not quite a no-op; ensures volatile write semantics
18                 setArray(elements);
19             }
20             return oldValue;
21         } finally {
22             lock.unlock();
23         }
24     }

从源码上流程上没什么大的问题,加锁->复制->修改->替换

但是有一段代码比较特殊,也就是第18行的setArray(elements), 当set的数据和旧的数据一致时,那么数组是并没有被修改的,理论上执行或不执行setArray这行代码效果是一样的,为什么还需要保留这一行代码呢?

从第17行的注释可以得知一二,注释的意思是为了确保volatile写的语义。

因为CopyOnWriteArrayList内部的数组对象array是通过volatile修饰的,而这里的setArray并不是为了保证array对于其他线程的可见性,而是为了保证外部非volatile变量的happen-before原则。

比如以下案例:

 1 static volatile Object[] list = new Object[1];
 2     boolean flag = false;
 3 
 4     public void func1(){
 5         flag = true;//步骤1
 6         list[0] = new Object();//步骤2
 7     }
 8 
 9     public void func2(){
10         if(list[0] != null){//步骤3
11             boolean result = flag;//步骤4
12         }
13     }

定义volatile类型的Object数组和非volatile类型的变量flag,这里存在的happen-before关系为:

1 before 2 (程序先后关系)、 2 before 3 (volatile规则)、 3 before 4 (程序先后关系)、 1 before 4(根据happen-before传递规则)

但是如果没有这一行setArray操作,相当于就没有了写操作,也就会丢失 2 before 3的关系了,也就是无法保证volatile变量的写语义了。

2.4、CopyOnWriteArrayList读取数据

1 /** 获取数据 */
2     public E get(int index) {
3         return get(getArray(), index);
4     }
5 
6     private E get(Object[] a, int index) {
7         return (E) a[index];
8     }

可以看出CopyOnWriteArrayList读取数据的逻辑比较简单,就是从数组中获取指定index的数据,并且没有做同步处理,所以get操作和ArrayList的逻辑和效果是一模一样的。

但是由于get操作没有任何同步操作,所以理论上是会存在脏读的情况的,比如线程A写数据,在执行setArray之前,线程B读了数据,此时线程B读取的还是原数组中的数据,而实际上线程A已经对副本进行了修改操作了,只是还没有覆盖。同理的还有CopyOnWriteArrayList的size()、isEmpty()等读操作相关的方法都会存在类似的脏读问题。

注意:虽然CopyOnWriteArrayList底层的数组是volatile修饰的,但是这只能让array对象对其他线程可见,但是array内部存储的数据对于其他线程而言还是不可见的。

分析完CopyOnWriteArrayList的读写源码之后,实际上其他方法就可以不用再看了,删除数据的remove方法实际也是和add一样,先加锁在复制写数据然后再覆盖原数组。

2.5、CopyOnWriteArrayList总结

1、CopyOnWriteArrayList是线程安全的List,底层数据结构也是数组结构,不过通过volatile修饰,使得写操作之后立即刷新内存,使得其他线程读最新的数据。是基于CopyOnWrite机制实现的线程安全的List

2、CopyOnWriteArrayList每次插入数据都会进行一次扩容,容量加1,并且在写之前都需要通过ReentrantLock加锁处理,然后复制原数组,写完数据之后直接覆盖原数组

3、CopyOnWriteArrayList的读操作没有加锁处理,所以会存在脏读问题,可能会读到其他线程以及修改,但是还没有替换原数组的数据

4、CopyOnWriteArrayList每次插入数据都会涉及到数组的复制,所以不适合频繁写而导致频繁复制数组的场景,而读没有加锁,所以适合写少读多的场景。

5、CopyOnWriteArrayList通过迭代器循环时,只可以循环读,而不可以执行写操作,因为迭代的数据是副本数据。

6、CopyOnWriteArrayList的set方法当设置数据一直时也同样会复制数组,不是为了保证数组的可见性,而是为了保证外部非volatile变量的happen-before关系,从而实现volatile的语义。

三、CopyOnWriteArraySet

CopyOnWriteArraySet是一个线程安全的无序集合,相当于线程安全的HashSet,但是实现和HashSet完全不同,HashSet底层是通过HashMap来实现的,而CopyOnWriteArraySet底层则是通过CopyOnWriteArrayList来实现的,

CopyOnWriteArraySet 内部基本上所有的方法实现都是通过CopyOnWriteArrayList来实现的。

3.1、CopyOnWriteArraySet初始化

 1 /** 内部存储数据的结果为CopyOnWriteArrayList */
 2     private final CopyOnWriteArrayList<E> al;
 3 
 4     /**
 5      * 无参构造函数实际就是初始化一个CopyOnWriteArrayList
 6      */
 7     public CopyOnWriteArraySet() {
 8         al = new CopyOnWriteArrayList<E>();
 9     }
10 
11     /**
12      * 初始化CopyOnWriteArrayList,并将集合中的数据初始化到List中
13      */
14     public CopyOnWriteArraySet(Collection<? extends E> c) {
15         if (c.getClass() == CopyOnWriteArraySet.class) {
16             @SuppressWarnings("unchecked") CopyOnWriteArraySet<E> cc =
17                     (CopyOnWriteArraySet<E>)c;
18             al = new CopyOnWriteArrayList<E>(cc.al);
19         }
20         else {
21             al = new CopyOnWriteArrayList<E>();
22             al.addAllAbsent(c);
23         }
24     }

CopyOnWriteArraySet的初始化过程实际就是初始化了内部的CopyOnWriteArrayList

3.2、CopyOnWriteArraySet增删数据

1 public boolean add(E e) {
2         return al.addIfAbsent(e);
3     }
1 public boolean remove(Object o) {
2         return al.remove(o);
3     }

插入数据是调用CopyOnWriteArrayList的addIfAbsent方法,该方法的作用是如果插入的数据不存在就插入,否则就插入失败,这样的操作就实现了set中不会出现重复数据的要求。源码如下:

 1 /** 如果不存在就插入数据 */
 2     public boolean addIfAbsent(E e) {
 3         /** 1.获取数组的快照 */
 4         Object[] snapshot = getArray();
 5         /** 2.判断元素在数组中的位置是否大于0,如果大于0表示已经存在,则直接返回false;
 6          *    如果位置小于0则表示元素不存在,则调用addIfAbsent()方法插入数据
 7          *    */
 8         return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
 9                 addIfAbsent(e, snapshot);
10     }
11 
12     /** 如果不存在就插入数据 */
13     private boolean addIfAbsent(E e, Object[] snapshot) {
14         final ReentrantLock lock = this.lock;
15         lock.lock();
16         try {
17             Object[] current = getArray();
18             int len = current.length;
19             //如果传入数组和当前数组不相等,表示数组已经被修改过
20             if (snapshot != current) {
21                 /**
22                  * 取两个数组长度的最小值common
23                  * 从0开始到common判断两个数组每一位数据是否相等,并且是否和插入的元素相等,如果数组某一位不一致或者和插入数据一直就返回false */
24                 int common = Math.min(snapshot.length, len);
25                 for (int i = 0; i < common; i++)
26                     if (current[i] != snapshot[i] && eq(e, current[i]))
27                         return false;
28                 /**判断当前数组中common到最后直接是否存在插入的数据
29                  * (因为此时快照snapshot已经判断过了不存在了,且0-common之间也已经不存在了,所以只需要判断current中
30                  * 的common-len之间是否包含插入数据)  */
31                 if (indexOf(e, current, common, len) >= 0)
32                     return false;
33             }
34             /** 如果数组没有被修改过,或者修改之后的数组中也不包含待插入的数据,则之间在尾部插入 */
35             Object[] newElements = Arrays.copyOf(current, len + 1);
36             newElements[len] = e;
37             setArray(newElements);
38             return true;
39         } finally {
40             lock.unlock();
41         }
42     }
43 
44     /** 查找指定数据在数组中的位置index值*/
45     private static int indexOf(Object o, Object[] elements,
46                                int index, int fence) {
47         if (o == null) {
48             for (int i = index; i < fence; i++)
49                 if (elements[i] == null)
50                     return i;
51         } else {
52             for (int i = index; i < fence; i++)
53                 if (o.equals(elements[i]))
54                     return i;
55         }
56         return -1;
57     }

可以看出实际就是遍历数组,是否包含待插入的数据,如果不包含则可以插入,否则直接返回false不允许插入。

类似的CopyOnWriteArraySet的所有方法基本上全是通过CopyOnWriteArrayList来实现的。相当于CopyOnWriteArraySet就是去重版本的CopyOnWriteArrayList。

Extra

1、CopyOnWrite思想是一种写时加锁,而读是不加锁,且写操作不会阻塞读操作,而读操作可以读取到最新的数据的一种机制。

2、除了JUC中实现的CopyOnWriteArrayList和CopyOnWriteArraySet,Kafka的源码中也基于CopyOnWrite思想实现了CopyOnWriteMap,源码如下,有兴趣的可自行分析:

  1 public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
  2 
  3 private volatile Map<K, V> map;
  4 
  5 public CopyOnWriteMap() {
  6     this.map = Collections.emptyMap();
  7 }
  8 
  9 public CopyOnWriteMap(Map<K, V> map) {
 10     this.map = Collections.unmodifiableMap(map);
 11 }
 12 
 13 @Override
 14 public boolean containsKey(Object k) {
 15     return map.containsKey(k);
 16 }
 17 
 18 @Override
 19 public boolean containsValue(Object v) {
 20     return map.containsValue(v);
 21 }
 22 
 23 @Override
 24 public Set<java.util.Map.Entry<K, V>> entrySet() {
 25     return map.entrySet();
 26 }
 27 
 28 @Override
 29 public V get(Object k) {
 30     return map.get(k);
 31 }
 32 
 33 @Override
 34 public boolean isEmpty() {
 35     return map.isEmpty();
 36 }
 37 
 38 @Override
 39 public Set<K> keySet() {
 40     return map.keySet();
 41 }
 42 
 43 @Override
 44 public int size() {
 45     return map.size();
 46 }
 47 
 48 @Override
 49 public Collection<V> values() {
 50     return map.values();
 51 }
 52 
 53 @Override
 54 public synchronized void clear() {
 55     this.map = Collections.emptyMap();
 56 }
 57 
 58 @Override
 59 public synchronized V put(K k, V v) {
 60     Map<K, V> copy = new HashMap<K, V>(this.map);
 61     V prev = copy.put(k, v);
 62     this.map = Collections.unmodifiableMap(copy);
 63     return prev;
 64 }
 65 
 66 @Override
 67 public synchronized void putAll(Map<? extends K, ? extends V> entries) {
 68     Map<K, V> copy = new HashMap<K, V>(this.map);
 69     copy.putAll(entries);
 70     this.map = Collections.unmodifiableMap(copy);
 71 }
 72 
 73 @Override
 74 public synchronized V remove(Object key) {
 75     Map<K, V> copy = new HashMap<K, V>(this.map);
 76     V prev = copy.remove(key);
 77     this.map = Collections.unmodifiableMap(copy);
 78     return prev;
 79 }
 80 
 81 @Override
 82 public synchronized V putIfAbsent(K k, V v) {
 83     if (!containsKey(k))
 84         return put(k, v);
 85     else
 86         return get(k);
 87 }
 88 
 89 @Override
 90 public synchronized boolean remove(Object k, Object v) {
 91     if (containsKey(k) && get(k).equals(v)) {
 92         remove(k);
 93         return true;
 94     } else {
 95         return false;
 96     }
 97 }
 98 
 99 @Override
100 public synchronized boolean replace(K k, V original, V replacement) {
101     if (containsKey(k) && get(k).equals(original)) {
102         put(k, replacement);
103         return true;
104     } else {
105         return false;
106     }
107 }
108 
109 @Override
110 public synchronized V replace(K k, V v) {
111     if (containsKey(k)) {
112         return put(k, v);
113     } else {
114         return null;
115     }
116 }
117 }
原文地址:https://www.cnblogs.com/jackion5/p/13033766.html