CAS无锁算法与ConcurrentLinkedQueue

CAS:Compare and Swap 比较并交换

java.util.concurrent包完全建立在CAS之上的,没有CAS就没有并发包。并发包借助了CAS无锁算法实现了区别于synchronized同步锁的乐观锁。因为对于CAS算法来说,就是在不加锁的前提下而假设没有冲突去完成某个操作,如果因为冲突而导致操作失败,那么就进行重试,直到成功为止。

CAS有三个操作数:真实的内存值V、预期的内存值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为新值B,否则什么都不做。

我们通过原子操作类AtomicInteger来研究下在没有加锁的前提下是如何做到数据正确性的:

private volatile int value;

通过关键字volatile保证value值在线程间是可见的,这样在获取value值的时候可以直接获取:

public final int get() {
        return value;
    }

我们来看看++i是怎么做到的:

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

1、其中expect是预期的内存值A,而update是要修改的值B,this就是真实的内存值V

2、这里采用了CAS操作,每次从内存中读取数据然后将此数据+1后的结果进行CAS操作,如果成功就返回结果,否则重试,直到成功。

3、compareAndSet利用JNI来完成CPU指令的操作,该方法的过程类似如下:

if(this==expect)
    {
        this=update;
        return true;
    }
    else
    {
        return false;
    }

这里成功的过程也不是原子操作,有比较this==expect与this=update这两步操作,这两步的原子性的保证是由底层硬件支持的。

CAS的缺点

虽然CAS有效的解决了原子操作的问题,但是其仍然有三个劣势:

1、ABA问题:因为CAS需要在操作前检查下值有没有发生变化,如果没有则更新。但是如果一个值开始的时候是A,变成了B,又变成了A,那么使用CAS进行检查的时候会发现它的值没有发生变化,但是事实却不是如此。

ABA问题的解决思路是使用版本号,如A-B-A变成1A-2B-3A

2、循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

3、只能保证一个共享变量的原子操作:对一个共享变成可以使用CAS进行原子操作,但是多个共享变量的原子操作就无法使用CAS,这个时候只能使用锁。 

 ConcurrentLinkedQueue

  在JAVA多线程应用中,队列的使用率很高,多数生产者和消费者的首选数据结构就是队列(先进先出)。JAVA提供的线程安全队列分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子就是BlockingQueue,而非阻塞队列的典型例子就是ConcurrentLinkedQueue,在实际应用中要根据实际需要来选取。

  使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现;非阻塞的实现方式则可以使用循环CAS的方式来实现。

ConcurrentLinkedQueue是一个不限制大小的非阻塞队列,保存了当前链表的头指针head和尾指针tail。每个节点Node由节点元素item和指向下一个节点的引用next组成。节点之间通过next关联起来,从而组成一张链表结构的队列。链表中最后加入的节点称为尾节点。

private transient volatile Node<E> head = new Node<E>(null, null);

 /** Pointer to last node on list **/
 private transient volatile Node<E> tail = head;

1、头指针head不允许为空,数据内容永远是null。链表的第一个有效元素是最早入队的元素,即head.next。

2、尾指针tail并不一定指向尾指针,所以两者之间还是有区别的。

入队列

入队列就是将入队节点添加到队列的尾部

第一步:添加元素1,队列更新head的next节点为元素1节点,因为tail节点默认情况下等于head节点,所以tail的next节点也指向元素1节点。

第二步:添加元素2,队列更新元素1节点的next节点为元素2节点,然后tail指向元素2节点。

第三步:添加元素3,然后tail的next节点指向元素3节点。

第四步:添加元素4,队列更新元素3节点的next节点为元素4节点,然后tail节点指向元素4节点。

通过快照观察,入队其实只是做了两件事情:一是将入队节点设置成当前队尾节点的下一个节点。而是更新tail节点,如果tail节点的next节点为null,则将入队节点设置成tail的next节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点。

 入队源码:

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
     //入队前,创建入队节点 Node<E> n = new Node<E>(e, null);
//死循环,入队不成功则反复入队 for (;;) { Node<E> t = tail;
//tail的next节点 Node<E> s = t.getNext(); if (t == tail) {
         //tail的next节点为空 if (s == null) {
            //表示t是尾节点,将t的next节点指向入队节点 if (t.casNext(s, n)) {
              更新tail节点,允许失败 casTail(t, n); return true; } } else { casTail(t, s); } } } }

从源码的角度来看:入队过程主要就是定位出尾节点,然后使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。

设置tail节点所使用的CAS算法:

private boolean casTail(Node<E> cmp, Node<E> val) {
        return tailUpdater.compareAndSet(this, cmp, val);
    }

concurrent包的实现示意图如下:

原文地址:https://www.cnblogs.com/dongguacai/p/6003078.html