Java分布式应用学习笔记04JDK的并发包的集合总结

1.  前言

平时咱们使用的HashMapArrayList等等容器集合包都存在线程安全的问题,看过JDK源码的各位朋友们知道这些实现类底层,为了性能,都没有对这些集合的操作方法做加锁或者副本传递机制,只有VectorStack是线程安全的,大家可以看它们的源码,底层方法是以在方法上加上synchronized作为代价的,换句话说是用时间换取空间的方式。Sun JDK对多线程并发环境下做了很多并发的解决方案,其类大都在java.util.concurrent.*下面,此包下的类和java.util.*包下面的集合类,在使用上几乎没什么太大分别,想想也是啊!他们都是实现接口规范:ListSetMap的。只要接口规范不变,那么在使用上也不应该有何变化,实现机制是一个侧重低概率并发或者就是单线程环境下,并发包则侧重高并发情况的系统。大家可以看看Tomcat的源代码,其中org.apache.catalina.core.ApplicationContext里面就使用到了并发包,因为Tomcat作为Web容器一定要接受来自各个客户端的request,进而分配Web应用上下文信息,应用参数key-value值等等。又得满足并发的请求、又得满足性能所需,所以它使用JDK的并发包。在使用层面上,笔者并不作过多介绍,可以参考非并发包的使用。至于这些非并发包的底层实现方式可以参考笔者的blog关于Java基础数据结构的基础知识,而是介绍一下这些并发包的底层机制和性能对比,在多线程环境下,用并发包和不用并发包的时间效率对比,空间资源效率不用比了,肯定单线程那些包消耗的比多线程消耗的小得多,毕竟做任何事都是要付出代价的。

http://suhuanzheng7784877.iteye.com/blog/1004128

2.  Map的并发包

Map接口在并发包下的实现叫做java.util.concurrent.ConcurrentHashMap。它实现了ConcurrentMap接口,而ConcurrentMap接口又是继承自Map接口的扩展。

先看看它是如何实现put操作的。

Java代码  收藏代码
  1. <span style="font-size: small;">    public V put(K key, V value) {  
  2.         if (value == null)  
  3.             throw new NullPointerException();  
  4.         int hash = hash(key.hashCode());  
  5.         return segmentFor(hash).put(key, hash, value, false);  
  6.     }</span>  

 首先判断值是否为空,空值不必要存储,之后根据key的哈希值计算一个hash值。根据计算出的hash值去获取segment对象。找到了segment对象后调用该对象的put方法完成操作。SegmentConcurrentHashMap的内部类其底层原理使用一个transient volatile HashEntry<K,V>[] table;进行存取。现在再看segment内的put源码

Java代码  收藏代码
  1. <span style="font-size: small;">        V put(K key, int hash, V value, boolean onlyIfAbsent) {  
  2.             lock();  
  3.             try {  
  4.                 int c = count;  
  5.                 if (c++ > threshold) // ensure capacity  
  6.                     rehash();  
  7.                 HashEntry<K,V>[] tab = table;  
  8.                 int index = hash & (tab.length - 1);  
  9.                 HashEntry<K,V> first = tab[index];  
  10.                 HashEntry<K,V> e = first;  
  11.                 while (e != null && (e.hash != hash || !key.equals(e.key)))  
  12.                     e = e.next;  
  13.   
  14.                 V oldValue;  
  15.                 if (e != null) {  
  16.                     oldValue = e.value;  
  17.                     if (!onlyIfAbsent)  
  18.                         e.value = value;  
  19.                 }  
  20.                 else {  
  21.                     oldValue = null;  
  22.                     ++modCount;  
  23.                     tab[index] = new HashEntry<K,V>(key, hash, first, value);  
  24.                     count = c; // write-volatile  
  25.                 }  
  26.                 return oldValue;  
  27.             } finally {  
  28.                 unlock();  
  29.             }  
  30.         }</span>  

 首先是进行加锁操作,之后就是进行数组大小的判断,如果容量不够,则需要扩充。之后再通过对hash值的按位与的操作后,得到了这个key所要放置的位置。有了位置了,再看HashEntry数组组成的对象链,是否已经有key,如果有了,覆盖value操作,如果没有,创建一个新的HashEntry对象,重新组成HashEntry链表,最后进行解锁操作。

所以直线我们关心的在put中会出现的线程安全问题,看了源码后是不是就解决了。想想除了put操作会出现线程不安全的隐患外,我们来看看remove操作。

删除操作代码原理与put操作类似,也是通过hash值找到那个segment对象,之后调用segmentremove方法去完成真正的操作。真正的操作也是先加锁,之后迭代HashEntry,直到找到了传入的hash值相同的。找到了删之,重新建立链表!找不到,over,然后释放对象锁。

ConcurrentHashMapgetcontainsKey等等这种不破坏原子性的读取(read)操作可以说大部分情况下没有进行加锁操作,即便像get加了锁操作,也是极其轻量的,仅仅是加锁了一行很简单的读取操作代码,如下

Java代码  收藏代码
  1. <span style="font-size: small;">        V readValueUnderLock(HashEntry<K,V> e) {  
  2.             lock();  
  3.             try {  
  4.                 return e.value;  
  5.             } finally {  
  6.                 unlock();  
  7.             }  
  8.         }</span>  

 下面我们来看看性能,在此所说的性能仅仅指时间执行效率。

使用一般HashMap包的程序如下

Java代码  收藏代码
  1. <span style="font-size: small;">package threadConcurrent.hashMap;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.Map;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7.   
  8. /** 
  9. * @author liuyan 
  10.  * 
  11.  */  
  12. public class PubHashMap implements Runnable {  
  13.   
  14.     final static int ThreadSIZE = 2500;  
  15.       
  16.     final static int elSize = 500;  
  17.   
  18.     int threadNum;  
  19.   
  20.     public PubHashMap(int threadNum) {  
  21.         this.threadNum = threadNum;  
  22.     }  
  23.   
  24.     @Override  
  25.     public void run() {  
  26.         Map<String, String> hashMap = new HashMap<String, String>();  
  27.         for (int i = 0; i < elSize; i++) {  
  28.             hashMap.put(i + "" + threadNum, i + "" + threadNum);  
  29.         }  
  30.     }  
  31.   
  32.     /** 
  33.      * @param args 
  34.      */  
  35.     public static void main(String[] args) {  
  36.   
  37.         // 启用线程池  
  38.         ExecutorService exec = Executors.newFixedThreadPool(ThreadSIZE);  
  39.   
  40.         long startTime = System.currentTimeMillis();  
  41.         for (int index = 0; index <= ThreadSIZE; index++) {  
  42.             exec.execute(new PubHashMap(index));  
  43.         }  
  44.         long endTime = System.currentTimeMillis();  
  45.         exec.shutdown();  
  46.         System.out.println("消耗时间:" + (endTime - startTime) + "ms");  
  47.   
  48.     }  
  49.   
  50. }</span>  

 启动2500个线程,每个线程往HashMap中添加500个字符串元素。执行多次后给出一个平均时间吧

Java代码  收藏代码
  1. <span style="font-size: small;">消耗时间:1753ms</span>  

 使用并发包程序如下

Java代码  收藏代码
  1. <span style="font-size: small;">package threadConcurrent.hashMap;  
  2.   
  3. import java.util.Map;  
  4. import java.util.concurrent.ConcurrentHashMap;  
  5. import java.util.concurrent.ExecutorService;  
  6. import java.util.concurrent.Executors;  
  7.   
  8. /** 
  9. * @author liuyan 
  10.  * 
  11.  */  
  12. public class PutConcurrentHashMap implements Runnable {  
  13.   
  14.     final static int ThreadSIZE = 2500;  
  15.   
  16.     final static int elSize = 500;  
  17.   
  18.     int threadNum;  
  19.   
  20.     public PutConcurrentHashMap(int threadNum) {  
  21.         this.threadNum = threadNum;  
  22.     }  
  23.   
  24.     @Override  
  25.     public void run() {  
  26.         Map<String, String> concurrentHashMap = new ConcurrentHashMap<String, String>();  
  27.         for (int i = 0; i < elSize; i++) {  
  28.             concurrentHashMap.put(i + "" + threadNum, i + "" + threadNum);  
  29.         }  
  30.     }  
  31.   
  32.     /** 
  33.      * @param args 
  34.      */  
  35.     public static void main(String[] args) {  
  36.   
  37.         // 启用线程池  
  38.         ExecutorService exec = Executors.newFixedThreadPool(ThreadSIZE);  
  39.   
  40.         long startTime = System.currentTimeMillis();  
  41.         for (int index = 0; index <= ThreadSIZE; index++) {  
  42.             exec.execute(new PutConcurrentHashMap(index));  
  43.         }  
  44.   
  45.         long endTime = System.currentTimeMillis();  
  46.         exec.shutdown();  
  47.         System.out.println("消耗时间:" + (endTime - startTime) + "ms");  
  48.     }  
  49.   
  50. }</span>  

 也是多次执行后,得出一个平均时间吧

Java代码  收藏代码
  1. <span style="font-size: small;">消耗时间:1869ms</span>  

 时间消耗上差不多哈。在集合元素越来越多的情况下,在解决线程安全的同时保证了时间执行熬费上几乎和非线程安全的Map持平。所以在并发条件下不必自己解决Map的线程安全问题,直接放心使用JDK自己的并发Map包即可,时间性能上还能保证。

3.  List的并发包

可以在高并发环境下使用java.util.concurrent.CopyOnWriteArrayList代替java.util.ArrayList。对于添加元素的操作,底层并不像Map那么复杂,就是利用了数组的copy功能和加锁机制

Java代码  收藏代码
  1. <span style="font-size: small;">    public boolean add(E e) {  
  2.         final ReentrantLock lock = this.lock;  
  3.         lock.lock();  
  4.         try {  
  5.             Object[] elements = getArray();  
  6.             int len = elements.length;  
  7.             Object[] newElements = Arrays.copyOf(elements, len + 1);  
  8.             newElements[len] = e;  
  9.             setArray(newElements);  
  10.             return true;  
  11.         } finally {  
  12.             lock.unlock();  
  13.         }  
  14.     }</span>  

 它是使用ReentrantLock进行的加锁,之后获得数组进行copy操作,之后数组个数加一。将新元素填充,之后再对局部变量进行一下set操作,最后就是解锁操作。

至于remove操作,和add的原理一样

Java代码  收藏代码
  1. <span style="font-size: small;">    public E remove(int index) {  
  2.         final ReentrantLock lock = this.lock;  
  3.         lock.lock();  
  4.         try {  
  5.             Object[] elements = getArray();  
  6.             int len = elements.length;  
  7.             Object oldValue = elements[index];  
  8.             int numMoved = len - index - 1;  
  9.             if (numMoved == 0)  
  10.                 setArray(Arrays.copyOf(elements, len - 1));  
  11.             else {  
  12.                 Object[] newElements = new Object[len - 1];  
  13.                 System.arraycopy(elements, 0, newElements, 0, index);  
  14.                 System.arraycopy(elements, index + 1, newElements, index,  
  15.                         numMoved);  
  16.                 setArray(newElements);  
  17.             }  
  18.             return (E) oldValue;  
  19.         } finally {  
  20.             lock.unlock();  
  21.         }  
  22.     }</span>  

 在枷锁对儿中间,先找到标记下的数组元素,之后创建一个新的临时数组,进行copy,将要删除的对象元素剔除出去!返回被删除元素对象。

add操作性能与ArrayList进行对比,线程运行400个,添加元素数是2000个。对比平均之后发现运行的时间也相差不是很多。并发情况下,CopyOnWriteArrayListArrayList略快了那么一点点。get几乎和ArrayList没差别,直接从数组中找第index个元素。

 

4.  Set的并发

CopyOnWriteArraySetCopyOnWriteArrayList底层实现差不多,就是在添加元素的时候需要对对象进行唯一性判断,如果对象数组已经含有重复的元素,不进行增加处理。在此不再赘述。

5.  Queue的并发

队列的并发类是java.util.concurrent.ArrayBlockingQueue,从类名字上大家估计就能猜出来了,底层使用的依然是数组。这个ArrayBlockingQueue是继承自原始的java.util.AbstractQueue,所以很多方法在父类里面已经有了,只是对于关键方法入队列、出队列操作加入了锁对儿机制。

入队列元素操作源码如下

Java代码  收藏代码
  1. public boolean offer(E e, long timeout, TimeUnit unit)  
  2.     throws InterruptedException {  
  3.   
  4.     if (e == nullthrow new NullPointerException();  
  5. g nanos = unit.toNanos(timeout);  
  6.     final ReentrantLock lock = this.lock;  
  7.     lock.lockInterruptibly();  
  8.     try {  
  9.         for (;;) {  
  10.             if (count != items.length) {  
  11.                 insert(e);  
  12.                 return true;  
  13.             }  
  14.             if (nanos <= 0)  
  15.                 return false;  
  16.             try {  
  17.                 nanos = notFull.awaitNanos(nanos);  
  18.             } catch (InterruptedException ie) {  
  19.                 notFull.signal(); // propagate to non-interrupted thread  
  20.                 throw ie;  
  21.             }  
  22.         }  
  23.     } finally {  
  24.         lock.unlock();  
  25.     }  
  26. }  

 

数组未满情况下,执行insert操作的时候,如果数组满了,则进行等待,单位是纳秒。如果超时或者被唤醒了,那么再次判断是否数组已满,如果线程被打断直接抛出异常。出队列方法和入队列差不多,不再赘述。

6.  AtomicXXXX的原子类

并发包还提供了支持原子操作的Atomic系列类,我们举一个具有代表性的类——AtomicInteger类,通常我们使用计数器操作的时候,一般为了避免线程安全的问题,在方法上加锁操作。有了并发包下的原子系列类,我们直接使用即可。关键使用代码片段如下

 

 

Java代码  收藏代码
  1. public static int getSum() {  
  2.     return sum.incrementAndGet();  
  3. }  

 

其自增方法底层片段最关键是这么一句

Java代码  收藏代码
  1. if (compareAndSet(current, next))  
  2.     return next;  

 

此方法具体如下

Java代码  收藏代码
  1. public final boolean compareAndSet(int expect, int update) {  
  2. urn unsafe.compareAndSwapInt(this, valueOffset, expect, update);  
  3. }  
 

unsafe.compareAndSwapInt调用了本地native方法直接与底层硬件也就是CPU打交道。大家有兴趣的话可以将sun的这段代码反编译看看。底层会比较内存上的地址上的值是否为当前值,是就next,不是则反复循环,直到找到当前值。这个和Hibernate的那个乐观锁有异曲同工的意思。其他的一些原子类AtomicBooleanAtomicLong等等在此不再赘述,使用以及底层原理都差不多。

7.  总结与反思

关于并发包的集合类就先总结到这里,这次没有将集合的读取元素进行性能对比,实际应用中高并发的读取比集合元素改变(addremovereplace)更为常见,不过代码很简单,所以就不给出了,有兴趣的朋友认识了这些类后可以自己做实验。至于反思,应该就是这些并发包的资源性能,是否很占用内存空间,加入在一个高并发环境下而且硬件环境又不允许分配给应用系统十分宽容的硬件资源,那么高并发情况下是否玩不转(比如云计算的虚拟化,一台实机可能启动多个虚拟机,作为实机的扩充)。这个问题我们可以使用工具测试jconsole进行监控,也有可能用户的应用自身代码也存在着一系列的问题,还是得具体问题具体分析,总的来说并发包要想实现线程安全,而且时间效率在一般环境下又和非并发包的差不多,需要消耗的内存资源比以前多是一定的,这个是避免不了的,世界是物质的,做任何事情都需要付出代价,只是看这个代价和收益相比值不值得。



原文地址:https://www.cnblogs.com/daichangya/p/12960013.html