Condition的使用与源码进阶

 
一、Condition作用是什么
  主要功能和Object中的wait、notify功能相对应,使某个线程 在某种情况下等待和唤醒的功能。
 
 
二、使用实例
1)实例1,单一生产者和消费者使用,并且只用一个condition对象控制生产者和消费者
注意:单一condition对象,生产者和消费者都在一个队列中排队,那么这个时候下一个被唤醒的可能是消费者也可能是生产者,那么如果生产者需要唤醒消费者但是被唤醒的确实生产者,则需要生产者继续阻塞排队(使用循环判断模式)
 
 
package com.test.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest4 {

    static Logger logger = LoggerFactory.getLogger(ConditionTest4.class ) ;

    public static int getRandom(){
        Random random = new Random() ;
        int rid = random.nextInt(6);
        int val =  rid+2;
        logger.info(String.format("生成随机数:%d", val ));
        return  val;
    }


    //睡眠打印日志更易于观察
    public static void waitN(Integer n ){
        try {
            TimeUnit.SECONDS.sleep(n);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return   ;
    }

    public static void main(String[] args) throws InterruptedException {

        Lock lock = new ReentrantLock(true) ;
        Condition condition = lock.newCondition();

        //队列允许的最大值
        Integer count=10;
       //数据等待的队列
        LinkedList<Integer> list = new LinkedList<>() ;


        Thread producter = new Thread(()->{
            while (true){//保持生产者一直可以生产
                lock.lock();
                try {
                    while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的
                        try {
                            logger.info("1111111111生产者进入等待队列");
                            waitN(2);//睡眠,使测试更容易被观看结果
                            condition.await();
                            logger.info("111111111生产者被唤醒");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    logger.info("111111111111生产者可以生产了");
                    waitN(2);
                    list.add( getRandom());
                    condition.signal();
                }finally {
                    lock.unlock();
                }
            }
        },"producter") ;
        producter.start();

        Thread consumer = new Thread(()->{
            while (true){
                lock.lock();

                while(list.size()==0){//每次唤醒判断如果是空的则继续排队
                    try {
                        logger.info("22222222消费者进入等待队列");
                        waitN(5);
                        condition.await();
                        logger.info("222222222消费者被唤醒");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                logger.info("2222222222消费者可以消费了");
                waitN(2);
                int val =list.pop();
                logger.info(String.format("2222222消费者可以消费了 %d ", val ));
                condition.signal();

                //解锁一般放到finnaly中
                lock.unlock();
            }
        },"consumer") ;
        consumer.start();
    }
}
View Code
2)实例2,多个生产者和多个消费者使用,生产者和消费者分别使用自己的condition对象
 当生产者的线程大于消费者的线程,则最终队列会在 满的地方徘徊,
package com.test.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest5 {

    static Logger logger = LoggerFactory.getLogger(ConditionTest5.class ) ;

    public static int getRandom(){
        Random random = new Random() ;
        int rid = random.nextInt(6);
        int val =  rid+2;
        logger.info(String.format("生成随机数:%d", val ));
        return  val;
    }

    //睡眠打印日志更易于观察
    public static void waitN(Integer n ){
        try {
            TimeUnit.SECONDS.sleep(n);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return   ;
    }

    public static void main(String[] args) throws InterruptedException {
        //true 公平锁,这样生产者和消费者才能交替执行
        Lock lock = new ReentrantLock(true) ;
        //2个condition对象,一个在生产者中等待,一个在消费者中等待
        Condition produCondition = lock.newCondition();
        Condition consuCondition = lock.newCondition();

        Integer count=10;
        LinkedList<Integer> list = new LinkedList<>() ;

        for (int i = 0; i < 4 ; i++) {
            Thread producter = new Thread(()->{
                while (true){//保持生产者一直可以生产
                    lock.lock();

                    while(list.size()==count){//保证 condition被叫醒以后,集合中不是满的
                        try {
                            logger.info("1111111111生产者进入等待队列");
                            waitN(2);
                            produCondition.await();
                            logger.info("111111111生产者被唤醒");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    logger.info("111111111111生产者可以生产了, {}" , list.toString());
                    waitN(2);
                    list.add( getRandom());
                    consuCondition.signal();

                    lock.unlock();
                }
            },"producter"+i) ;
            producter.start();
        }

        for (int i = 0; i < 2; i++) {
            Thread consumer = new Thread(()->{
                while (true){
                    lock.lock();
                    while(list.size()==0){
                        try {
                            logger.info("22222222消费者进入等待队列");
                            waitN(5);
                            consuCondition.await();
                            logger.info("222222222消费者被唤醒");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    logger.info("2222222222消费者可以消费了");
                    waitN(2);
                    int val =list.pop();
                    logger.info(String.format("2222222消费者可以消费了 %d ,  %s ", val ,  list.toString()  ));
                    produCondition.signal();
                    lock.unlock();
                }
            },"consumer"+i ) ;
            consumer.start();
        }
    }
}
View Code
三、源码主要方法解读
 
Condition这个类是AQS的内部类,通过lock进行实例化的,当我们使用condition对象内部方法的时候,是可以使用lock对象的属性的。
 
1)await方法
 
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //当前线程进入等待队列
    Node node = addConditionWaiter();
    long savedState = fullyRelease(node);
    int interruptMode = 0;
   //判断节点是否在lock的等待队列中,如果已经不在了则这个线程等待
    while (!isOnSyncQueue(node)) {
        //如果没在lock的等待队列中,则当前线程睡眠
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

 2)当前线程入队方法

  

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    //如果队列中的状态不是CONDITION,则清除掉这个节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 形成一个节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
   
    if (t == null)
        //如果第一个线程进入等待队列,则当前节点为第一个节点 
        firstWaiter = node;
    else
        //如果不是第一个进入等待,则最后一个节点的next指向当前节点
        t.nextWaiter = node;
    //当前线程的节点变成最后的节点
    lastWaiter = node;
   //返回当前线程的节点
    return node;
}

3)线程等待了,则会释放当前线程持有的锁

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        //调用外部类对象lock的方法进行锁释放
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        //如果释放锁失败则当前线程的节点状态为取消 
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

 4) 当前节点是否在lock的等待队列中

final boolean isOnSyncQueue(Node node) {
    // 当前节点状态为CONDITION ,并且是head节点(节点为head节点: node.prev == null)
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    return findNodeFromTail(node);
}

5)从lock的队列的尾部 向前查找是否可以找到这个节点,找到则返回true,找到head节点还没找到

  

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
         //找到head节点还是没有,则返回false,head节点的prev是null
        if (t == null)
            return false;
        t = t.prev;
    }
}

 6)唤醒睡眠的线程

  

public final void signal() {
    //必须是当前获取锁的线程来执行
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //唤醒队列中第一个线程
        doSignal(first);
}

 7)唤醒第一个线程,清除第一个node对应的next信息

    

private void doSignal(Node first) {
    do {
        //如果只有一个节点,则清除lastWaiter ,如果first唤醒失败,则把first的next 赋值给firstWaiter ,在此执行唤醒
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //清除nextWaiter信息
        first.nextWaiter = null;
    } while (!transferForSignal(first) && //去唤醒第一个节点 
             (first = firstWaiter) != null);  // 判断first!= null 
}

 8  唤醒第一个节点去获取锁

  

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //设置节点状态 从 CONDITION 变为 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        //如果失败,则直接返回
        return false;


    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //节点放到lock队列中去抢锁
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0  //节点状态为1 ,被取消了 
          || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //设置线程节点状态为SIGNAL,并判断成功与失败
        //线程在上一个步骤已经放到了lock队列中,这里只是设置一下node状态,实际上在lock.unlock步骤会从队列中唤醒的
        LockSupport.unpark(node.thread);
    return true;
}
 
 
 
原文地址:https://www.cnblogs.com/lean-blog/p/13736049.html