并发之lock的condition接口

13.死磕Java并发-----J.U.C之Condition

12.Condition使用总结

11.Java并发编程系列之十七:Condition接口

===

13.死磕Java并发-----J.U.C之Condition

此篇博客所有源码均来自JDK 1.8

在没有Lock之前,我们使用synchronized来控制同步,配合Object的wait()、notify()系列方法可以实现等待/通知模式。在Java SE5后,Java提供了Lock接口,相对于Synchronized而言,Lock提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活。下图是Condition与Object的监视器方法的对比(摘自《Java并发编程的艺术》):

这里写图片描述

Condition提供了一系列的方法来对阻塞和唤醒线程:

  1. await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
  2. await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
  3. awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
  4. awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
  5. awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
  6. signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  7. signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

Condition是一种广义上的条件队列。他为线程提供了一种更为灵活的等待/通知模式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。

Condtion的实现

获取一个Condition必须要通过Lock的newCondition()方法。该方法定义在接口Lock下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。Condition为一个接口,其下仅有一个实现类ConditionObject,由于Condition的操作需要获取相关的锁,而AQS则是同步锁的实现基础,所以ConditionObject则定义为AQS的内部类。定义如下:

public class ConditionObject implements Condition, java.io.Serializable {
}

等待队列

每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。我们看Condition的定义就明白了:

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;

    //头节点
    private transient Node firstWaiter;
    //尾节点
    private transient Node lastWaiter;

    public ConditionObject() {
    }

    /** 省略方法 **/
}

从上面代码可以看出Condition拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用await()方法,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。结构如下:

这里写图片描述

Node里面包含了当前线程的引用。Node定义与AQS的CLH同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node静态内部类)。

Condition的队列结构比CLH同步队列的结构简单些,新增过程较为简单只需要将原尾节点的nextWaiter指向新增节点,然后更新lastWaiter即可。

等待

调用Condition的await()方法会使当前线程进入等待状态,同时会加入到Condition等待队列同时释放锁。当从await()方法返回时,当前线程一定是获取了Condition相关连的锁。

    public final void await() throws InterruptedException {
        // 当前线程中断
        if (Thread.interrupted())
            throw new InterruptedException();
        //当前线程加入等待队列
        Node node = addConditionWaiter();
        //释放锁
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        /**
         * 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
         * 直到检测到此节点在同步队列上
         */
        while (!isOnSyncQueue(node)) {
            //线程挂起
            LockSupport.park(this);
            //如果已经中断了,则退出
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        //竞争同步状态
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //清理下条件队列中的不是在等待条件的节点
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

此段代码的逻辑是:首先将当前线程新建一个节点同时加入到条件队列中,然后释放当前线程持有的同步状态。然后则是不断检测该节点代表的线程释放出现在CLH同步队列中(收到signal信号之后就会在AQS队列中检测到),如果不存在则一直挂起,否则参与竞争同步状态。

加入条件队列(addConditionWaiter())源码如下:

    private Node addConditionWaiter() {
        Node t = lastWaiter;    //尾节点
        //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
        if (t != null && t.waitStatus != Node.CONDITION) {
            //清除条件队列中所有状态不为Condition的节点
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //当前线程新建节点,状态CONDITION
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        /**
         * 将该节点加入到条件队列中最后一个位置
         */
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

该方法主要是将当前线程加入到Condition条件队列中。当然在加入到尾节点之前会清楚所有状态不为Condition的节点。

fullyRelease(Node node),负责释放该线程持有的锁。

    final long fullyRelease(Node node) {
        boolean failed = true;
        try {
            //节点状态--其实就是持有锁的数量
            long savedState = getState();
            //释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

isOnSyncQueue(Node node):如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true

    final boolean isOnSyncQueue(Node node) {
        //状态为Condition,获取前驱节点为null,返回false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //后继节点不为null,肯定在CLH同步队列中
        if (node.next != null)
            return true;

        return findNodeFromTail(node);
    }

unlinkCancelledWaiters():负责将条件队列中状态不为Condition的节点删除

        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

通知

调用Condition的signal()方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。

    public final void signal() {
        //检测当前线程是否为拥有锁的独
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //头节点,唤醒条件队列中的第一个节点
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);    //唤醒
    }

该方法首先会判断当前线程是否已经获得了锁,这是前置条件。然后唤醒条件队列中的头节点。

doSignal(Node first):唤醒头节点

    private void doSignal(Node first) {
        do {
            //修改头结点,完成旧头结点的移出工作
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

doSignal(Node first)主要是做两件事:1.修改头节点,2.调用transferForSignal(Node first) 方法将节点移动到CLH同步队列中。transferForSignal(Node first)源码如下:

     final boolean transferForSignal(Node node) {
        //将该节点从状态CONDITION改变为初始状态0,
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

整个通知的流程如下:

  1. 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
  2. 如果线程已经获取了锁,则将唤醒条件队列的首节点
  3. 唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中
  4. 最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。

总结

一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,如果是则尝试获取锁,否则一直挂起。当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

Condition的应用

只知道原理,如果不知道使用那就坑爹了,下面是用Condition实现的生产者消费者问题:

public class ConditionTest {
    private LinkedList<String> buffer;    //容器
    private int maxSize ;           //容器最大
    private Lock lock;
    private Condition fullCondition;
    private Condition notFullCondition;

    ConditionTest(int maxSize){
        this.maxSize = maxSize;
        buffer = new LinkedList<String>();
        lock = new ReentrantLock();
        fullCondition = lock.newCondition();
        notFullCondition = lock.newCondition();
    }

    public void set(String string) throws InterruptedException {
        lock.lock();    //获取锁
        try {
            while (maxSize == buffer.size()){
                notFullCondition.await();       //满了,添加的线程进入等待状态
            }

            buffer.add(string);
            fullCondition.signal();
        } finally {
            lock.unlock();      //记得释放锁
        }
    }

    public String get() throws InterruptedException {
        String string;
        lock.lock();
        try {
            while (buffer.size() == 0){
                fullCondition.await();
            }
            string = buffer.poll();
            notFullCondition.signal();
        } finally {
            lock.unlock();
        }
        return string;
    }
}

12.Condition使用总结

一、介绍

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition1的await()、signal()这种方式实现线程间协作更加安全和高效。简单说,他的作用是使得某些线程一起等待某个条件(Condition),只有当该条件具备(signal 或者 signalAll方法被调用)时,这些等待线程才会被唤醒,从而重新争夺锁。

二、使用

Condition是个接口,基本的方法就是await()和signal()方法;

Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 
调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
Conditon中的await()对应Object的wait();
Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。

三、示例代码 

 
  1. package com.meituan.hyt.test4;  
  2.   
  3. import java.util.concurrent.locks.Condition;  
  4. import java.util.concurrent.locks.ReentrantLock;  
  5.   
  6.   
  7. public class Main {  
  8.     public static void main(String[] args) {  
  9.         final ReentrantLock reentrantLock = new ReentrantLock();  
  10.         final Condition condition = reentrantLock.newCondition();  
  11.   
  12.         new Thread(new Runnable() {  
  13.             @Override  
  14.             public void run() {  
  15.                 reentrantLock.lock();  
  16.                 System.out.println(Thread.currentThread().getName() + "拿到锁了");  
  17.                 System.out.println(Thread.currentThread().getName() + "等待信号");  
  18.                 try {  
  19.                     condition.await();  
  20.                 } catch (InterruptedException e) {  
  21.                     e.printStackTrace();  
  22.                 }  
  23.   
  24.                 System.out.println(Thread.currentThread().getName() + "拿到信号");  
  25.   
  26.                 reentrantLock.unlock();  
  27.             }  
  28.         }, "线程1").start();  
  29.   
  30.         new Thread(new Runnable() {  
  31.             @Override  
  32.             public void run() {  
  33.                 reentrantLock.lock();  
  34.                 System.out.println(Thread.currentThread().getName() + "拿到锁了");  
  35.   
  36.                 try {  
  37.                     Thread.sleep(3000);  
  38.                 } catch (InterruptedException e) {  
  39.                     e.printStackTrace();  
  40.                 }  
  41.   
  42.                 System.out.println(Thread.currentThread().getName() + "发出信号");  
  43.                 condition.signalAll();  
  44.   
  45.                 reentrantLock.unlock();  
  46.             }  
  47.         }, "线程2").start();  
  48.     }  
  49. }  

运行结果:

线程1拿到锁了
线程1等待信号
线程2拿到锁了
线程2发出信号
线程1拿到信号

四、原理

我们知道Lock的本质是AQS,AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空。而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列。但是,两个队列的作用不同的,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

1. 线程1调用reentrantLock.lock时,尝试获取锁。如果成功,则返回,从AQS的队列中移除线程;否则阻塞,保持在AQS的等待队列中。
2. 线程1调用await方法被调用时,对应操作是被加入到Condition的等待队列中,等待signal信号;同时释放锁。
3. 锁被释放后,会唤醒AQS队列中的头结点,所以线程2会获取到锁。
4. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒,只是被加入AQS等待队列。
5. signal方法执行完毕,线程2调用unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,线程1被唤醒,线程1恢复执行。
所以:
发送signal信号只是将Condition队列中的线程加到AQS的等待队列中。只有到发送signal信号的线程调用reentrantLock.unlock()释放锁后,这些线程才会被唤醒。

可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

await源码: 

 
  1. public final void await() throws InterruptedException {  
  2.     // 1.如果当前线程被中断,则抛出中断异常  
  3.     if (Thread.interrupted())  
  4.         throw new InterruptedException();  
  5.     // 2.将节点加入到Condition队列中去,这里如果lastWaiter是cancel状态,那么会把它踢出Condition队列。  
  6.     Node node = addConditionWaiter();  
  7.     // 3.调用tryRelease,释放当前线程的锁  
  8.     long savedState = fullyRelease(node);  
  9.     int interruptMode = 0;  
  10.     // 4.为什么会有在AQS的等待队列的判断?  
  11.     // 解答:signal操作会将Node从Condition队列中拿出并且放入到等待队列中去,在不在AQS等待队列就看signal是否执行了  
  12.     // 如果不在AQS等待队列中,就park当前线程,如果在,就退出循环,这个时候如果被中断,那么就退出循环  
  13.     while (!isOnSyncQueue(node)) {  
  14.         LockSupport.park(this);  
  15.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  
  16.             break;  
  17.     }  
  18.     // 5.这个时候线程已经被signal()或者signalAll()操作给唤醒了,退出了4中的while循环  
  19.     // 自旋等待尝试再次获取锁,调用acquireQueued方法  
  20.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  
  21.         interruptMode = REINTERRUPT;  
  22.     if (node.nextWaiter != null)  
  23.         unlinkCancelledWaiters();  
  24.     if (interruptMode != 0)  
  25.         reportInterruptAfterWait(interruptMode);  
  26. }  
整个await的过程如下:  

1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。 

2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。 

3.自旋(while)挂起,直到被唤醒(signal把他重新放回到AQS的等待队列)或者超时或者CACELLED等。 

4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。

signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程,本质是将节点从Condition队列中取出来一个还是所有节点放到AQS的等待队列。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。 

11.Java并发编程系列之十七:Condition接口 

通过前面的文章,我们知道任何一个Java对象,都拥有一组监视器方法,主要包括wait()、notify()、notifyAll()方法,这些方法与synchronized关键字配合使用可以实现等待/通知机制。而且前面我们已经使用这种方式实现了生产者-消费者模式。类似地,Condition接口也提供类似的Object的监视器的方法,主要包括await()、signal()、signalAll()方法,这些方法与Lock锁配合使用也可以实现等待/通知机制。

相比Object实现的监视器方法,Condition接口的监视器方法具有一些Object所没有的特性:

  1. Condition接口可以支持多个等待队列,在前面已经提到一个Lock实例可以绑定多个Condition,所以自然可以支持多个等待队列了
  2. Condition接口支持响应中断,前面已经提到过
  3. Condition接口支持当前线程释放锁并进入等待状态到将来的某个时间,也就是支持定时功能

使用Condition接口配合Lock锁的使用实例如下:

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            //....
            condition.await();
        }finally {
            lock.unlock();
        }
    }

    public void conditionSignal(){
        lock.lock();
        try {
            //...
            condition.signal();
        }finally {
            lock.unlock();
        }
    }

一般而言,都会将Condition变量作为成员变量。当调用await方法后,当前线程会释放锁并进入Condition变量的等待队列,而其他线程调用signal方法后,通知正在Condition变量等待队列的线程从await方法返回,并且在返回前已经获得了锁。

与使用Object的监视器方法达到了同样的效果,也许看不出Condition配合Lock锁的优势何在。但是在复杂多线程的编程中,这种方式可以体现出其优势。所以一般使用的时候仍然是Object的监视器方法居多。

现在我们已经知道了如何配合Condition和Lock锁实现等待/通知机制,那么我们使用这种方式实现生产者-消费者模式:

package com.rhwayfun.concurrency.r0405;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by rhwayfun on 16-4-5.
 */
public class ConditionProducerConsumerDemo {

    //日期格式器
    private static DateFormat format = new SimpleDateFormat("HH:mm:ss");

    static class Info{
        //作者
        private String author;
        //标题
        private String title;
        //是否开始生产的标志
        private boolean produce = true;
        //Lock锁
        private Lock lock = new ReentrantLock();
        //Condition变量
        private Condition condition = lock.newCondition();

        public Info(){}

        public Info(String author, String title) {
            this.author = author;
            this.title = title;
        }

        public String getAuthor() {
            return author;
        }

        public void setAuthor(String author) {
            this.author = author;
        }

        public String getTitle() {
            return title;
        }

        public void setTitle(String title) {
            this.title = title;
        }

        /**
         * 生产者执行的生产方法
         * @param author
         * @param title
         * @throws InterruptedException
         */
        public void set(String author,String title) throws InterruptedException {
            lock.lock();
            try {
                //没有开始生产就等待
                while (!produce){
                    condition.await();
                }
                //如果已经开始生产
                this.setAuthor(author);
                TimeUnit.SECONDS.sleep(1);
                this.setTitle(title);
                //表示已经停止了生产可以取数据了
                produce = false;
                //通知消费者
                condition.signal();
            }finally {
                lock.unlock();
            }
        }

        /**
         * 消费者执行的消费方法
         * @throws InterruptedException
         */
        public void get() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                //如果已经开始生产就等待
                while (produce){
                    condition.await();
                }
                //如果没有在生产就就可以取数据
                System.out.println(Thread.currentThread().getName() + ":" + this.getAuthor()
                        + "=" + this.getTitle() + " at "
                        + format.format(new Date()));
                //表示我已经取了数据,生产者可以继续生产
                produce = true;
                //通知生产者
                condition.signal();
            }finally {
                lock.unlock();
            }
        }
    }

    static class Producer implements Runnable{

        private Info info;

        public Producer(Info info) {
            this.info = info;
        }

        public void run() {
            boolean flag = true;
            for (int i = 0; i < 5; i++){
                if (flag){
                    try {
                        info.set("authorA","titleA");
                        System.out.println(Thread.currentThread().getName() + ":" + info.getAuthor() + "="
                                + info.getTitle() + " at " + format.format(new Date()));
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    flag = false;
                }else {
                    try {
                        info.set("authorB","titleB");
                        System.out.println(Thread.currentThread().getName() + ":" + info.getAuthor() + "="
                                + info.getTitle() + " at " + format.format(new Date()));
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    flag = true;
                }
            }
        }
    }

    static class Consumer implements Runnable{

        private Info info;

        public Consumer(Info info) {
            this.info = info;
        }

        public void run() {
            for (int i = 0; i < 5; i++){
                try {
                    info.get();
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Info info = new Info();

        Thread producer = new Thread(new Producer(info),"Producer");
        Thread consumer = new Thread(new Consumer(info),"Consumer");

        producer.start();
        TimeUnit.SECONDS.sleep(1);
        consumer.start();
    }
}

运行结果如下:

这里写图片描述

原文地址:https://www.cnblogs.com/awkflf11/p/9225389.html