Java线程间通信

前言

最近在看《Java多线程核心编程技术》这本书,没几天,但是看了很多章节。书中内容大都以代码为主,作者的想法大概是以代码的形式给读者增加多线程的印象,很多知识点也都以代码的形式展现,所以代码的篇幅占据了很大一部分,重点介绍的知识点也较少,整体偏简单了,我也很快的浏览了几个章节,做了大部分的笔记,其实还是有收获吧,就是感觉偏重基础,过多的代码量减少了总结,而且更深层次的原理性内容没有写出来,有点遗憾,所以后续我打算换一本书继续看。

本篇文章也是根据书中章节的介绍做的一次读书笔记,功能点都较简单,希望后续有机会能够再深入的了解一下这方面的知识点,本次就算抛砖引玉吧。

目标

本篇读书笔记大概介绍了以下几方面的内容:

  • 使用wait/notify实现线程间的通信
  • 生产者/消费者模式的实现
  • 方法join的使用

线程与线程之间不是相互独立的个体,它们彼此之间可以相互通信和协作。

等待 / 通知机制的实现

方法wait()的作用是使当前执行代码的线程进行等待,wait()方法是Object类的方法,该方法用来将当前线程置入“预执行队列”中,并且在wait()所在的代码行处停止执行,知道接到通知或被中断为止。在调用wait()方法之前,线程必须要获得该对象锁即只能在同步方法或者同步块中调用wait()方法。在执行wait()方法后,当前线程释放锁。在从wait()方法返回前,线程与其他线程竞争重新获得锁。如果调用wait()时没有持有适当的锁对象,则抛出IllegalMonitorStateException,它是RuntimeException的一个子类,因此不需要try-catch语句进行捕获异常。

方法notify()也要在同步方法或者同步块中进行调用,即在调用前,线程也必须获得该对象的对象级别锁。如果调用notify()方法时没有持有适当的锁,也会抛出IllegalMonitorStateException异常。该方法用来通知那些可能等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选出其中一个呈wait状态的线程,对其发出通知notify,并使它等待获取该对象的对象锁。需要说明的是,在执行notify()方法后,当前线程不会马上释放该对象锁,呈wait状态的线程并不能马上获取该对象锁,要等到执行notify()方法的线程将程序执行完,也就是退出synchronized代码块后,当前线程才会释放锁,而呈wait状态所在的线程才可以获取该对象锁。当第一个获得了该对象锁的wait线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,还会继续阻塞在wait状态,知道这个对象发出一个notify或notifyAll。

用一句话来总结wait和notify就是:wait使线程停止运行,而notify使停止的线程继续运行。

wait和notify用法的简单举例:

 1 public class MyThread10{
 2 
 3     static class ThreadA extends Thread {
 4         private Object key = null;
 5         public ThreadA(Object key) {
 6             this.key = key;
 7         }
 8 
 9         @Override
10         public void run() {
11             try{
12                 synchronized (key) {
13                     System.out.println("Thread A wait before.");
14                     key.wait();
15                     System.out.println("Thread A wait after.");
16                 }
17             }catch (Exception e){
18                e.printStackTrace();
19             }
20         }
21     }
22 
23     static class ThreadB extends Thread {
24         private Object key = null;
25         public ThreadB(Object key) {
26             this.key = key;
27         }
28 
29         @Override
30         public void run() {
31             try{
32                 synchronized (key) {
33                     System.out.println("Thread B notify before.");
34                     key.notify();
35                     System.out.println("Thread B notify after.");
36                 }
37             }catch (Exception e){
38                 e.printStackTrace();
39             }
40         }
41     }
42 
43     public static void main(String[] args) {
44         try{
45             // 必须是相同的锁对象
46             Object lock = new Object();
47             Thread threadA = new ThreadA(lock);
48             Thread threadB = new ThreadB(lock);
49             // 线程A先执行,然后执行了wait方法,线程不继续往下走,等待notify
50             threadA.start();
51             // 主线程休眠2秒后调用线程B执行
52             Thread.sleep(2000L);
53             // 线程B执行,执行后通知持有相同锁的等待线程可以执行了
54             threadB.start();
55         }catch (Exception e){
56            e.printStackTrace();
57         }
58     }
59 }
60 运行结果:
61 Thread A wait before.
62 Thread B notify before.
63 Thread B notify after.
64 Thread A wait after.

从上面的线程可以看到,在执行了notify方法之后,wait()所在线程并没有立马执行,必须要等到notify()方法所在的线程执行到退出synchronized语句块之后,方可获得锁继续执行。

关键字synchronized可以将任何一个Object对象作为同步对象来看待,而Java为每个Object都实现了wait()和notify()方法,他们必须用在被synchronized同步的Object的临界区内。

通过调用wait()方法可以使处于临界区内的线程进入等待状态,同时释放被同步对象的锁。而notify操作可以唤醒一个因调用了wait操作而处于阻塞状态中的线程,使其进入就绪状态。被重新唤醒的线程会试图重新获得临界区的控制权,也就是获得锁,并继续执行临界区内wait之后的代码。如果发出notify操作时没有处于阻塞状态中的线程,那么该命令则会被忽略。

wait()方法可以使调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,知道再次被唤醒。

notify()方法可以随机唤醒等待队列中等待同一共享资源(同一个锁)的一个线程,并使该线程退出等待队列,进入可运行状态,也即notify()方法仅通知一个线程

notifyAll()方法可以使所有正在等待队列中等待同一共享资源的全部线程从等待队列中退出,进入可运行状态。此时,优先级最高的那个线程最先执行,但也有可能是随机执行,因为这要取决于JVM虚拟机的实现。

对此,我们画一张线程状态切换的示意图:

  1. 新创建一个新的线程对象后,再调用它的start()方法,系统会为此线程分配CPU资源,使其处于Runnable(可运行)状态,这是一个准备运行的阶段。如果线程抢占到CPU资源,此线程就处于Running(运行)状态。
  2. Runnable状态和Running状态可相互切换,因为有可能线程运行一段时间后,有其他高优先级的线程抢占了CPU资源,这时此线程就从Running状态切换到了Runnable状态。
  3. Blocked是阻塞的意思,例如遇到了一次IO操作,此时CPU处于空闲状态,可能会转而把CPU时间片分配给其他线程,这时也可以称之为“暂停”状态。Blocked状态结束后,进入Runnable状态,等待系统重新分配资源。
  4. run()方法运行结束后进入销毁阶段,整个线程执行完毕。

其中,线程进入Runnable状态大体分为以下5种情况:

  • 调用sleep()方法后经过的时间超过了指定的休眠时间。
  • 线程调用的阻塞IO已经返回,阻塞方法执行完毕。
  • 线程成功地获得了试图同步的监视器。
  • 线程正在等待某个通知,其他线程发出了通知。
  • 处于挂起状态的线程调用了resume恢复方法。

出现阻塞的情况也大体分为以下5种:

  • 线程调用sleep()方法,主动放弃占用的处理器资源。
  • 线程调用了阻塞式IO方法,在该方法返回前,该线程被阻塞。
  • 线程试图获得一个同步监视器,当该同步监视器正在被其他线程所持有。
  • 线程等待某个通知。
  • 程序调用了suspend方法将该线程挂起。此方法容易导致死锁,尽量避免使用。

每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待CPU的调度;反之,一个线程被wait后,就会进入阻塞队列,等待下一次被唤醒。

方法wait()锁释放与notify()锁不释放

当方法wait()被执行后,锁被自动释放,但执行完notify()方法后,锁是不会自动释放的,必须要等到出了当前监视器(锁)的控制范围,即出了synchronized同步块。

另外,sleep()方法执行后,在没有到达指定休眠期限之前,都是不会释放锁的。

当interrupt方法遇到wait方法

当线程处于wait状态时,即调用了wait操作之后,如果再调用该线程对象的interrupt()方法则会出现InterruptException异常。

那么出现了异常而导致线程终止,线程终止锁也就释放了

唤醒所有线程

上面讲到了,notify操作只能随机唤醒等待队列中的一个线程,如果想要唤醒全部线程,可以使用notifyAll()方法。

方法wait(long)的使用

带一个参数的wait(long)方法的功能是,设定当前线程的最长等待期限,如果在这个等待期限中没有其他线程对当前线程进行唤醒,过了等待的最大期限之后该线程将自动唤醒

 1 public class MyThread11 extends Thread {
 2 
 3     private Object key;
 4     public MyThread11(Object key) {
 5         this.key = key;
 6     }
 7 
 8     @Override
 9     public void run() {
10         try{
11             synchronized (key) {
12                 System.out.println("Thread wait a moment before." + System.currentTimeMillis());
13                 // 3秒之内如果没有被唤醒,时间一到则主动唤醒
14                 key.wait(3000L);
15                 System.out.println("Thread wait a moment wait after." + System.currentTimeMillis());
16             }
17         }catch (Exception e){
18             e.printStackTrace();
19         }
20     }
21 
22     public static void main(String[] args) {
23         Object lock = new Object();
24         Thread thread = new MyThread11(lock);
25         // 线程执行后调用了wait方法,但是没有执行notify
26         thread.start();
27     }
28 }
29 运行结果:
30 Thread wait a moment before.1565252610726
31 Thread wait a moment wait after.1565252613726
32 (语句打印前后中间相隔3秒)

生产者 / 消费者模式实现

等待 / 通知最经典的案例就是“生产者 / 消费者”模式,这里基于wait / notify来实现。

  1 public class MyThread12 {
  2 
  3     /**
  4      * 赋值对象
  5      */
  6     static class ValObj {
  7         public static String value = "";
  8     }
  9 
 10     /**
 11      * 生产者
 12      */
 13     static class P {
 14         private String lock;
 15         public P(String lock) {
 16             this.lock = lock;
 17         }
 18         public void setValue() {
 19             try{
 20                 synchronized (lock) {
 21                     if(!ValObj.value.equals("")) {
 22                         lock.wait();
 23                     }
 24                     String val = System.currentTimeMillis() + "";
 25                     System.out.println("Set: " + val);
 26                     ValObj.value = val;
 27                     lock.notify();
 28                 }
 29             }catch (Exception e){
 30                e.printStackTrace();
 31             }
 32         }
 33     }
 34 
 35     /**
 36      * 消费者
 37      */
 38     static class C {
 39         private String lock;
 40         public C(String lock) {
 41             this.lock = lock;
 42         }
 43         public void getValue() {
 44             try{
 45                 synchronized (lock) {
 46                     if(ValObj.value.equals("")) {
 47                         lock.wait();
 48                     }
 49                     System.out.println("Get: " + ValObj.value);
 50                     ValObj.value = "";
 51                     lock.notify();
 52                 }
 53             }catch (Exception e){
 54                 e.printStackTrace();
 55             }
 56         }
 57     }
 58 
 59     /**
 60      * 生产者线程
 61      */
 62     static class ThreadA extends Thread{
 63         private P p;
 64         public ThreadA(P p) {
 65             this.p = p;
 66         }
 67         @Override
 68         public void run() {
 69             while (true) {
 70                 p.setValue();
 71             }
 72         }
 73     }
 74 
 75     /**
 76      * 消费者线程
 77      */
 78     static class ThreadB extends Thread{
 79         private C c;
 80         public ThreadB(C c) {
 81             this.c = c;
 82         }
 83         @Override
 84         public void run() {
 85             while (true) {
 86                 c.getValue();
 87             }
 88         }
 89     }
 90 
 91     public static void main(String[] args) {
 92        String lock = new String("");
 93        P p = new P(lock);
 94        C c = new C(lock);
 95        Thread pThread = new ThreadA(p);
 96        Thread cThread = new ThreadB(c);
 97         pThread.start();
 98         cThread.start();
 99     }
100 
101 }

以上程序体现的是单个生产者单个消费进行的数据交互,在控制台会set和get信息交互打印。

多个生产者消费者出现程序停止运行

如果在此基础上有多个生产者和多个消费者同时对这个值进行操作,即在main函数中有多个pThread和多个cThread进行多线程操作,那么可能会出现程序持续等待的局面。这个时候所有的线程都进入到了WAITING状态,所有程序都不再工作了,没有程序负责notify其他线程了,为什么这样呢?

因为在多个生产者多个消费者的情况下,大家监控的都是同一个对象资源,各个线程在执行wait / notify通信时,不能保证notify唤醒的是对方线程,也有可能是己方线程,即生产者唤醒的是生产者,消费者唤醒了消费者,这种情况持续下去的话,就会导致大家都处于等待状态,程序无法继续运行。

如果要解决上面这种程序停止运行的问题,该如何处理呢?

上面的问题主要是,每次执行notify操作时,不能保证唤醒的是对方线程,而其目的主要是唤醒对方线程继续执行,所以,如果要做到这一点,直接将notify()方法改成notifyAll()方法即可,这样的做法是能够通知到全部,就能保证对方一定能够被通知到了,这样程序就不会出现停止运行的情况了。

通过管道进行线程间通信

JDK中提供了各种各样的输入 / 输出流Stream来帮助我们方便的操作数据,其中管道流(pipeStream)是一种特殊的流,用于在不同线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道读取数据。

通过使用管道,实现不同线程间的通信,而无需借助于类似临时文件之类的东西。

JDK中提供了4个类来使线程间进行数据通信:

  • PipedInputStream 和 PipedOutputStream
  • PipedReader 和 PipedWriter

方法join的使用

在很多情况下,主线程创建并启动子线程,如果子线程中要进行大量的耗时运算,主线程往往将早于子线程结束,那如果主线程需要得到子线程的执行结果,就必须要等到子线程运行完才能获得这个值,这个时候就要用到join()方法了,这个方法的作用就是等待线程对象的销毁。

 1 public class MyThread13 extends Thread {
 2 
 3     @Override
 4     public void run() {
 5         try{
 6             Thread.sleep(3000L);
 7             System.out.println("Print thread:" + Thread.currentThread().getId()
 8                     + " : " + System.currentTimeMillis());
 9         }catch (Exception e){
10             e.printStackTrace();
11         }
12     }
13 
14     public static void main(String[] args) {
15         try{
16             Thread thread = new MyThread13();
17             thread.start();
18             thread.join();
19             System.out.println("The child thread has completed and the main thread continues to run.");
20         }catch (Exception e){
21            e.printStackTrace();
22         }
23     }
24 }
25 运行结果:
26 Print thread:11 : 1565319923537
27 The child thread has completed and the main thread continues to run.

方法join的作用就是使执行join操作的线程正常执行完run()方法中的任务,而使调用它线程的线程进行无限期的阻塞,等该线程执行完销毁之后再继续执行本线程后面的代码。

join()方法与异常

join()方法执行后如果还没有完成,遇到interrupt()方法的话就会出现异常,报InterruptedException.

join(long)的使用

join方法中带一个long类型的参数,表示设定等待的最长时间。

其实join(long)的功能在内部是使用wait(long)方法来实现的,所以join(long)方法具有释放锁的特点。

参考资料

1、Java多线程编程核心技术 / 高洪岩著.—北京:机械工业出版社,2015.6

原文地址:https://www.cnblogs.com/captainad/p/11327201.html