多线程与并发6 并发容器

  • 容器架构

  • Hashtable  HashMap  SynchronizedHashMap  CurrentHashMap  CurrentSkipListMap(弥补同步的TreeMap)

  Hashtable所以方法默认加sychronized,

  HashMap默认没有加锁,

  而SynchronizedHashMap默认是Collections.synchronizedMap(Map<k,v> map)返回一个同步map。

  CurrentHashMap是多线程真正用的,本来是拉链表,JDK1.8后变成红黑树.

ConcurrentHashMap相比HashMap而言,是多线程安全的,其底层数据与HashMap的数据结构相同,数据结构如下:

  

ConcurrentHashMap的数据结构(数组+链表+红黑树),桶中的结构可能是链表,也可能是红黑树,红黑树是为了提高查找效率。

ConcurrentHashMap为什么高效?

JDK1.5中的实现

ConcurrentHashMap使用的是分段锁技术,将ConcurrentHashMap将锁一段一段的存储,然后给每一段数据配一把锁(segment),当一个线程占用一把锁(segment)访问其中一段数据的时候,其他段的数据也能被其它的线程访问,默认分配16个segment。默认比Hashtable效率提高16倍。

ConcurrentHashMap的结构图如下(网友贡献的图,哈):


 
Paste_Image.png

JDK1.8中的实现

ConcurrentHashMap取消了segment分段锁,而采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构一样,数组+链表/红黑二叉树
synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。

JDK1.8的ConcurrentHashMap的结构图如下:

 
Paste_Image.png

TreeBin: 红黑二叉树节点
Node: 链表节点

ConcurrentHashMap 源码分析

ConcurrentHashMap 类结构参照HashMap,这里列出HashMap没有的几个属性。

/**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     hash表初始化或扩容时的一个控制位标识量。
     负数代表正在进行初始化或扩容操作
     -1代表正在初始化
     -N 表示有N-1个线程正在进行扩容操作
     正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
     */
    private transient volatile int sizeCtl; 
    // 以下两个是用来控制扩容的时候 单线程进入的变量
    /**
     * The number of bits used for generation stamp in sizeCtl.
     * Must be at least 6 for 32bit arrays.
     */
    private static int RESIZE_STAMP_BITS = 16;
    /**
     * The bit shift for recording size stamp in sizeCtl.
     */
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    
    
    /*
     * Encodings for Node hash fields. See above for explanation.
     */
    static final int MOVED     = -1; // hash值是-1,表示这是一个forwardNode节点
    static final int TREEBIN   = -2; // hash值是-2  表示这时一个TreeBin节点

分析代码主要目的:分析是如果利用CAS和Synchronized进行高效的同步更新数据。
下面插入数据源码:

public V put(K key, V value) {
    return putVal(key, value, false);
}

    /** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //ConcurrentHashMap 不允许插入null键,HashMap允许插入一个null键
    if (key == null || value == null) throw new NullPointerException();
    //计算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    //for循环的作用:因为更新元素是使用CAS机制更新,需要不断的失败重试,直到成功为止。
    for (Node<K,V>[] tab = table;;) {
        // f:链表或红黑二叉树头结点,向链表中添加元素时,需要synchronized获取f的锁。
        Node<K,V> f; int n, i, fh;
        //判断Node[]数组是否初始化,没有则进行初始化操作
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //通过hash定位Node[]数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头结点),添加失败则进入下次循环。
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //检查到内部正在移动元素(Node[] 数组扩容)
        else if ((fh = f.hash) == MOVED)
            //帮助它扩容
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //锁住链表或红黑二叉树的头结点
            synchronized (f) {
                //判断f是否是链表的头结点
                if (tabAt(tab, i) == f) {
                    //如果fh>=0 是链表节点
                    if (fh >= 0) {
                        binCount = 1;
                        //遍历链表所有节点
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //如果节点存在,则更新value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            //不存在则在链表尾部添加新节点。
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //TreeBin是红黑二叉树节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //添加树节点
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                      value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            
            if (binCount != 0) {
                //如果链表长度已经达到临界值8 就需要把链表转换为树结构
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //将当前ConcurrentHashMap的size数量+1
    addCount(1L, binCount);
    return null;
}
  1. 判断Node[]数组是否初始化,没有则进行初始化操作
  2. 通过hash定位Node[]数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头结点),添加失败则进入下次循环。
  3. 检查到内部正在扩容,如果正在扩容,就帮助它一块扩容。
  4. 如果f!=null,则使用synchronized锁住f元素(链表/红黑二叉树的头元素)
    4.1 如果是Node(链表结构)则执行链表的添加操作。
    4.2 如果是TreeNode(树型结果)则执行树添加操作。
  5. 判断链表长度已经达到临界值8 就需要把链表转换为树结构。

总结:
    JDK8中的实现也是锁分离的思想,它把锁分的比segment(JDK1.5)更细一些,只要hash不冲突,就不会出现并发获得锁的情况。它首先使用无锁操作CAS插入头结点,如果插入失败,说明已经有别的线程插入头结点了,再次循环进行操作。如果头结点已经存在,则通过synchronized获得头结点锁,进行后续的操作。性能比segment分段锁又再次提升。

 

其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树。

 

  •  CopyOnWrite

在juc(java.util.concurrent)包下有着这么两个类,CopyOnWriteArrayList 和 CopyOnWriteArraySet。直译过来就是在写操作的时候复制。这体现了读写分离的思想。

在写操作的线程,会将数组复制出来一份进行操作。而原本的数组不会做改变。
读线程则不会受到影响,但是可能读到的是一个过期的数据。
只能保证最终的一致性,不能保证实时的一致性。 

CopyOnWriteArrayList  --写时加锁 读时不加锁
public boolean add(E e) {
    // 添加的时候,上锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 原本的数组
        Object[] elements = getArray();
        // 原本数组的长度
        int len = elements.length;
        // 调用native方法进行复制
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        // 新的元素
        newElements[len] = e;
        // 替换数组
        setArray(newElements);
        // 成功
        return true;
    } finally {
        // 解锁
        lock.unlock();
    }
}
  • BlockingQueue

package day06;

import org.junit.Test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

/**
 * @author: zdc
 * @date: 2020-04-03
 */
public class BlockingQueueDemo {
    BlockingQueue<String> queue = new LinkedBlockingDeque<String>();
    private static int count;
    public void testLinkdeBlockingQueue() {
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    queue.put("" + count);
                    System.out.println(Thread.currentThread().getName() + " produce " + count++);
                    TimeUnit.SECONDS.sleep(1);  //等待一下 这样生产者和消费者可以同时进行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "").start();
        }

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (; ; ) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " consumer " + queue.take());//消费完就会进行阻塞
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, i + "").start();
        }
    }

    public static void main(String[] args) {
        new BlockingQueueDemo().testLinkdeBlockingQueue();
    }
}
  1. LinkedBlockingQueue

  2. ArrayBlockingQueue

  3. DelayQueue  必须实现Delayed接口,需要做一个CompareTo比较 时间等待的越短,会有优先执行权。并不按照先进先出执行。

    delayQueue.put(task1)...... dekayQueue.take() 

    task1  task2  task3 ......必须实现Delayed接口 里面有compare方法 进行比较

   SychronousQUeue  容量为0  先take 再put

  

package day06;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author: zdc
 * @date: 2020-04-03
 */
public class SychronousQueueDemo {
    private static BlockingQueue<String> queue = new SynchronousQueue<>();

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            while (true) {
                try {
                    System.out.println(queue.take());//queue里不存入 take不到则阻塞
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        queue.put("zzzz");

        TimeUnit.SECONDS.sleep(5);

        queue.put("ddddd");
    }
}

 TransferQueue

TransferQueue继承了BlockingQueueBlockingQueue又继承了Queue)并扩展了一些新方法。BlockingQueue(和Queue)是Java 5中加入的接口,它是指这样的一个队列:当生产者向队列添加元素但队列已满时,生产者会被阻塞;当消费者从队列移除元素但队列为空时,消费者会被阻塞。

TransferQueue则更进一步,生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费(不仅仅是添加到队列里就完事)。新添加的transfer方法用来实现这种约束。顾名思义,阻塞就是发生在元素从一个线程transfer到另一个线程的过程中,它有效地实现了元素在线程之间的传递(以建立Java内存模型中的happens-before关系的方式)。

TransferQueue还包括了其他的一些方法:两个tryTransfer方法,一个是非阻塞的,另一个带有timeout参数设置超时时间的。还有两个辅助方法hasWaitingConsumer()和getWaitingConsumerCount()。

当我第一次看到TransferQueue时,首先想到了已有的实现类SynchronousQueue。SynchronousQueue的队列长度为0,最初我认为这好像没多大用处,但后来我发现它是整个Java Collection Framework中最有用的队列实现类之一,特别是对于两个线程之间传递元素这种用例。

TransferQueue相比SynchronousQueue用处更广、更好用,因为你可以决定是使用BlockingQueue的方法(译者注:例如put方法)还是确保一次传递完成(译者注:即transfer方法)。在队列中已有元素的情况下,调用transfer方法,可以确保队列中被传递元素之前的所有元素都能被处理。Doug Lea说从功能角度来讲,LinkedTransferQueue实际上是ConcurrentLinkedQueue、SynchronousQueue(公平模式)和LinkedBlockingQueue的超集。而且LinkedTransferQueue更好用,因为它不仅仅综合了这几个类的功能,同时也提供了更高效的实现。

Joe Bowbeer提供了一篇William Scherer, Doug Lea, and Michael Scott的论文,在这篇论文中展示了LinkedTransferQueue的算法,性能测试的结果表明它优于Java 5的那些类(译者注:ConcurrentLinkedQueue、SynchronousQueue和LinkedBlockingQueue)。LinkedTransferQueue的性能分别是SynchronousQueue的3倍(非公平模式)和14倍(公平模式)。因为像ThreadPoolExecutor这样的类在任务传递时都是使用SynchronousQueue,所以使用LinkedTransferQueue来代替SynchronousQueue也会使得ThreadPoolExecutor得到相应的性能提升。考虑到executor在并发编程中的重要性,你就会理解添加这个实现类的重要性了。

Java 5中的SynchronousQueue使用两个队列(一个用于正在等待的生产者、另一个用于正在等待的消费者)和一个用来保护两个队列的锁。而LinkedTransferQueue使用CAS操作(译者注:参考wiki)实现一个非阻塞的方法,这是避免序列化处理任务的关键。这篇论文还罗列了很多的细节和数据,如果你感兴趣,非常值得一读。

 

原文地址:https://www.cnblogs.com/zdcsmart/p/12607897.html