Condition

Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式

condition对象是依赖于lock对象的,意思就是说condition对象需要通过lock对象进行创建出来(调用Lock对象的newCondition()方法)

三个线程依次打印abc

  1 public class Demo {
  2 
  3     private int signal;
  4     Lock lock = new ReentrantLock();
  5     Condition a = lock.newCondition();
  6     Condition b = lock.newCondition();
  7     Condition c = lock.newCondition();
  8 
  9     public  void a() {
 10         lock.lock();
 11         while (signal != 0){
 12             try {
 13                 a.await();
 14             } catch (InterruptedException e) {
 15                 e.printStackTrace();
 16             }
 17         }
 18 
 19         System.out.println("a");
 20         signal ++;
 21         b.signal();
 22         lock.unlock();
 23     }
 24 
 25     public  void b() {
 26         lock.lock();
 27         while (signal != 1){
 28             try {
 29                b.await();
 30             } catch (InterruptedException e) {
 31                 e.printStackTrace();
 32             }
 33         }
 34         System.out.println("b");
 35         signal++;
 36         c.signal();
 37         lock.unlock();
 38 
 39     }
 40 
 41     public  void c() {
 42         lock.lock();
 43         while (signal !=2){
 44             try {
 45                c.await();
 46             } catch (InterruptedException e) {
 47                 e.printStackTrace();
 48             }
 49         }
 50         System.out.println("c");
 51         signal=0;
 52         a.signal();
 53         lock.unlock();
 54     }
 55 
 56     public static void main(String[] args) {
 57 
 58         Demo d = new Demo();
 59         A a = new A(d);
 60         B b = new B(d);
 61         C c = new C(d);
 62 
 63         new Thread(a).start();
 64         new Thread(b).start();
 65         new Thread(c).start();
 66 
 67 
 68 
 69     }
 70 
 71 
 72 }
 73 
 74 class A implements Runnable {
 75 
 76     private Demo demo;
 77 
 78     public A(Demo demo) {
 79         this.demo = demo;
 80     }
 81 
 82     @Override
 83     public void run() {
 84         while (true) {
 85             demo.a();
 86             try {
 87                 Thread.sleep(1000);
 88             } catch (InterruptedException e) {
 89                 e.printStackTrace();
 90             }
 91         }
 92     }
 93 }
 94 
 95 class B implements Runnable {
 96 
 97     private Demo demo;
 98 
 99     public B(Demo demo) {
100         this.demo = demo;
101     }
102 
103     @Override
104     public void run() {
105         while (true) {
106             demo.b();
107             try {
108                 Thread.sleep(1000);
109             } catch (InterruptedException e) {
110                 e.printStackTrace();
111             }
112         }
113     }
114 }
115 
116 class C implements Runnable {
117 
118     private Demo demo;
119 
120     public C(Demo demo) {
121         this.demo = demo;
122     }
123 
124     @Override
125     public void run() {
126         while (true) {
127             demo.c();
128             try {
129                 Thread.sleep(1000);
130             } catch (InterruptedException e) {
131                 e.printStackTrace();
132             }
133         }
134     }
135 }

Condition构造有界缓存队列

 1 public class MyQueue<E>  {
 2 
 3     private Object [] obj ;
 4 
 5     private int addIndex;
 6     private int removeIndex;
 7     private int queueSize;
 8 
 9     private Lock lock = new ReentrantLock();
10 
11     Condition addCondition = lock.newCondition();
12     Condition removeCondition = lock.newCondition();
13 
14 
15 
16     public void add(E e){
17         lock.lock();
18         while (queueSize == obj.length){
19             try {
20                 addCondition.await();
21             } catch (InterruptedException e1) {
22                 e1.printStackTrace();
23             }
24         }
25 
26         obj[addIndex++] = e;
27         if (++ addIndex == obj.length){
28             addIndex = 0;
29         }
30 
31         queueSize++;
32         removeCondition.signal();
33         lock.unlock();
34 
35     }
36 
37     public void remove(){
38         lock.lock();
39         while (queueSize == 0){
40             try {
41                 removeCondition.await();
42             } catch (InterruptedException e) {
43                 e.printStackTrace();
44             }
45         }
46 
47         obj[removeIndex] = null;
48         if (++removeIndex == obj.length){
49             removeIndex = 0;
50 
51         }
52 
53         queueSize --;
54         addCondition.signal();
55         lock.unlock();
56     }
57 
58 }

Condition源码解读

在使用Condition时都是使用锁的new Condition接口实现的

lock.newCondition()

1     public Condition newCondition() {
2         return sync.newCondition();
3     }

找到同步器Sync

1         final ConditionObject newCondition() {
2             return new ConditionObject();
3         }
public class ConditionObject implements Condition, java.io.Serializable

ASQ的内部类,实现了Condition接口

public ConditionObject() { } 空的构造方法

Condition 常用方法,,signal(),await()
await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
signal() :唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
 1         public final void await() throws InterruptedException {
 2             if (Thread.interrupted())
 3                 throw new InterruptedException();
 4             Node node = addConditionWaiter();
 5             int savedState = fullyRelease(node);
 6             int interruptMode = 0;
 7             while (!isOnSyncQueue(node)) {
 8                 LockSupport.park(this);
 9                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
10                     break;
11             }
12             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
13                 interruptMode = REINTERRUPT;
14             if (node.nextWaiter != null) // clean up if cancelled
15                 unlinkCancelledWaiters();
16             if (interruptMode != 0)
17                 reportInterruptAfterWait(interruptMode);
18         }

线程中断,扔出异常

addConditionWaiter() 加入等待队列,

 1         private Node addConditionWaiter() {
 2             Node t = lastWaiter;
 3             // If lastWaiter is cancelled, clean out.
 4             if (t != null && t.waitStatus != Node.CONDITION) {
 5                 unlinkCancelledWaiters();
 6                 t = lastWaiter;
 7             }
 8             Node node = new Node(Thread.currentThread(), Node.CONDITION);
 9             if (t == null)
10                 firstWaiter = node;
11             else
12                 t.nextWaiter = node;
13             lastWaiter = node;
14             return node;
15         }

头指针 firstWaiter 尾指针 lastWaiter

unlinkCancelledWaiters() 进行过滤,删除有些节点

t 指向最后一个节点

t为空,firstWaiter = node;

t不为空, t.nextWaiter = node;

t的下一个节点指向新增节点

lastWaiter = node; 下一个节点指向新增节点,

return node; 添加成功,返回,

int savedState = fullyRelease(node);

 1     final int fullyRelease(Node node) {
 2         boolean failed = true;
 3         try {
 4             int savedState = getState();
 5             if (release(savedState)) {
 6                 failed = false;
 7                 return savedState;
 8             } else {
 9                 throw new IllegalMonitorStateException();
10             }
11         } finally {
12             if (failed)
13                 node.waitStatus = Node.CANCELLED;
14         }
15     }

getState()拿到状态,

释放  fullyRelease(node)

isOnlySyncQueue(node) 判断是否在同步队列中,不在同步队列中等待

唤醒后把节点放入同步队列中,进入同步队列后

12             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
13                 interruptMode = REINTERRUPT;
14             if (node.nextWaiter != null) // clean up if cancelled
15                 unlinkCancelledWaiters();
16             if (interruptMode != 0)
17                 reportInterruptAfterWait(interruptMode);

signal()

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

判断是否是独占节点,不是 ,抛出异常 

如果第一个节点不为空,释放

1         private void doSignal(Node first) {
2             do {
3                 if ( (firstWaiter = first.nextWaiter) == null)
4                     lastWaiter = null;
5                 first.nextWaiter = null;
6             } while (!transferForSignal(first) &&
7                      (first = firstWaiter) != null);
8         }
  
first.nextWaiter = null,首节点的下一个节点为空,去掉首节点
transferForSignal(first)
 1     final boolean transferForSignal(Node node) {
 2         /*
 3          * If cannot change waitStatus, the node has been cancelled.
 4          */
 5         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 6             return false;
 7 
 8         /*
 9          * Splice onto queue and try to set waitStatus of predecessor to
10          * indicate that thread is (probably) waiting. If cancelled or
11          * attempt to set waitStatus fails, wake up to resync (in which
12          * case the waitStatus can be transiently and harmlessly wrong).
13          */
14         Node p = enq(node);
15         int ws = p.waitStatus;
16         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
17             LockSupport.unpark(node.thread);
18         return true;
19     }

while (!transferForSignal(first) && (first = firstWaiter) != null)

如果成功,返回false,第一个节点唤醒,do while 方法结束
同步节点失败,返回true
firstWaiter指向下一个,
保证一直有叫醒的节点放到同步队列中。

Condition 其实用到AQS中的Node,通过node构建一个单向链表,await()方法向队列尾部插入一个节点

signal 就从头部移除一个节点,移除的节点放到同步队列中,

调用await方法后,将当前线程加入Condition等待队列中。当前线程释放锁。否则别的线程就无法拿到锁而发生死锁。自旋(while)挂起,不断检测节点是否在同步队列中了,如果是则尝试获取锁,否则挂起。当线程被signal方法唤醒,被唤醒的线程将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

wait 和 notify 只能有一个等待队列。

Condition 能够实现多个等待队列。


















原文地址:https://www.cnblogs.com/quyangyang/p/11187876.html