condition实现原理

  condition是对线程进行控制管理的接口,具体实现是AQS的一个内部类ConditionObject,主要功能是控制线程的启/停(这么说并不严格,还要有锁的竞争排队)。
condition主要方法:
 
void await() throws InterruptedException
进入等待,直到被通知或中断
void awaitUninterruptibly()
进入等待,直到被通知,不响应中断
long awaitNanos(long nanosTimeout) throws InterruptedException
等待xxx纳秒
boolean awaitUntil(Date deadline) throws InterruptedException
等待,直到某个时间点
void signal()
唤醒
void signalAll()
唤醒所有等待在condition上的线程,会从等待队列挨个signal()
使用示例:
  通过实现一个有界队列来深入理解Condition的使用方式。有界队列:一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,
,直到队列出现空位。
public class BoundedQueue<T> {
    private Object[] items;
    //添加的下标,删除的下标和数组当前数量
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition empty = lock.newCondition();
    private Condition full = lock.newCondition();
    //构造方法
    public BoundedQueue(int size){
        items = new Object[size];
    }
    //添加元素,如果数组满,则添加线程进入等待,直到有空位
    public void add(T t) throws InterruptedException{
        lock.lock();
        try {
            while (count == items.length)  //改成if会如何
                full.await();
            items[addIndex] = t;
            if(++addIndex == items.length)
                addIndex = 0;
            ++count;
            empty.signal();
        }finally {
            lock.unlock();
        }
    }

    //从头部删除一个元素,如果数组空,则删除线程进入等待状态,直到添加新元素
    public T remove() throws InterruptedException{
        lock.lock();
        try{
            while (count == 0)
                empty.await();
            Object x = items[removeIndex];
            if(++removeIndex == items.length)
                removeIndex = 0;
            --count;
            full.signal();
            return (T)x;
        }finally {
            lock.unlock();
        }
    }
}
  在这里,阻塞队列的数据存放是在items数组中,注意几个下标的赋值操作,当addIndex到头的时候,因为之前可能有remove操作,故items数组的头部或者中间位置可能是空的,如果继续添加数据,数据应添加在头部,相当于形成了一个“环”,这就是19-20行的含义。至于16行代码中的while替换成if,书中给出的解释是:使用while目的是为了防止过早或意外的通知,只有条件符合才能退出循环。这个地方没有想出相应的场景,仅从目前的代码逻辑来说,换成if也是可以的,但如果考虑异常导致等待线程被唤醒,那阻塞队列就无法正常工作了。
原理分析:
  ConditionObject是同步器AQS的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(等待队列),该队列是condition对象实现等待/通知的功能的关键。
  等待队列:
  等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node

  如图所示,Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
  在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock实现类拥有一个同步队列和多个等待队列:

  如上图所示,Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。
等待:
  调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
  如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。
  Condition的await()方法:
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //当前线程加入等待队列
    int savedState = fullyRelease(node); //释放同步状态,也就是释放锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,( 因为已经获取了同步状态,所以无需通过cas,在队列尾部添加等待节点 )然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
  当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
  如果从队列的角度去看,当前线程加入Condition的等待队列,如图所示,同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中:
通知:
  调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列末尾。
  signal方法:
public final void signal() {
    if (!isHeldExclusively()) //是否获取了锁(重入锁中是直接 return  独占锁线程==当前线程)
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; //等待队列头节点
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null) //等待队列首节点的后继节点为空,说明只有一个节点,那么尾节点也置空
            lastWaiter = null; 
        first.nextWaiter = null;
    } while (!transferForSignal(first)  && (first = firstWaiter) != null); // 等待队列首节点唤醒失败,则唤醒下个节点
}
final boolean transferForSignal(Node node) {// 将节点加入同步队列
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 设置节点状态为0----(等待队列状态只能为-2,-3,用-2进行cas设置失败,说明是-3)
        return false;
    Node p = enq(node); // 放入同步队列尾部
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
  调用signal方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
  节点从等待队列移动到同步队列的过程如下图所示:
  被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。成功获取同步状态之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。
  Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
-----------------------------------------------------------------------------------
想到的问题:
  1、wait,sleep,park都有啥区别,这个让出cpu资源的操作底层实现是一样的么?
  wait是Object的方法,会释放锁,原理是monitor机制(抢占对象头信息的锁标志位),是个native方法,也就是是java的一套机制,底层是用c编写的,具体如何挂起线程的逻辑未知。
  sleep是Thread的方法,不会释放锁,也是个native方法,具体底层逻辑未知。
  park是unsafe类的方法,这个unsafe类提供了很多直接操作内存的方法,这个park也是个native方法,openjdk源码显示底层调用了操作系统的函数,停掉了线程。wait跟sleep应该也是类似原理。
  2、线程的唤醒是怎么实现的,在线程非常多的情况下,唤醒了就能马上执行么?
  唤醒肯定也是最终调用os的函数来实现的,线程非常多的情况下,硬件线程数固定,操作系统或者jvm肯定也要有相应机制来调用线程,这个唤醒只是让线程“醒”了而已,醒了!=执行,所以,唤醒了可能并不能马上执行,而是要等待别的线程执行完后进行抢占,成功后才能执行。
原文地址:https://www.cnblogs.com/nevermorewang/p/9905939.html