java并发编程基础——线程通信

线程通信

当线程在系统内运行时,程序通常无法准确的控制线程的轮换执行,但我们可以通过一些机制来保障线程的协调运行

一、传统的线程通信

传统的线程通信主要是通过Object类提供的wait(),notify(),notifyAll() 3个方法实现,这三个方法必须由同步监视器对象来调用

thread.png

wait():导致当前线程等待,直到其他线程调用同步监视器的notify()方法或者notifyAll()方法来唤醒该线程。wait会释放当前同步监视器锁定

notify():唤醒此同步监视器上的等待的单个线程。如果所有线程都在此同步监视器上等待,则会选择唤醒其中一个线程

notifyAll():唤醒此同步监视器上所有等待的线程,只有当前线程放弃对该同步监视器的锁定后,才可以执行被唤醒的线程。

下面程序利用wait(),notify实现两个线程轮流对i自增

package threadtest;
  
public class ThreadTest  implements Runnable{
  
    static int i = 0;
    private int flag = 0 ;
    public  void incre() throws InterruptedException {
        synchronized (this) {
            if(flag == 0) {
                flag = 1;
                System.out.println(Thread.currentThread().getName());
                i++;
                 this.notify();//唤醒其他线程
                 this.wait();//等待,并释放同步监视器锁定
            }else if(flag == 1){
                flag = 0;
                System.out.println(Thread.currentThread().getName());
                i++;
                 this.notify();//唤醒其他线程
                 this.wait();//等待,并释放同步监视器锁定
            }else {
                this.notifyAll();
            }
        }
    }
      
    @Override
    public  void run() {
        for(int j=0;j<100;j++) {
            try {
                incre();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        flag = 3;
        try {
            incre();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  
    public static void main(String[] args) throws InterruptedException {
        ThreadTest tt = new ThreadTest();
        Thread t1 = new Thread(tt);
        Thread t2 = new Thread(tt);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
          
}

结果:

Thread-0
Thread-1
Thread-0
...
Thread-1
Thread-0
Thread-1
200

二、使用Condition控制线程通信

如果不使用synchronized,而是使用Lock对象来保证同步,则系统中不存在隐士的同步监视器,也就能使用wait,notify,notifyAll来进行线程通信了。

当使用Lock对象来保证同步时,java提供了一个Condition类来保持协调。

Condition对象被绑定在一个Lock对象上,要获得,只要Lock对象的newCondition方法即可获得Condition对象。

Condition类包含如下3个方法

await():类似wait(),导致当前线程等待,直到其他线程调用该Condition的signal()方法或signalAll()方法来唤醒该线程。wait会释放当前同步监视器锁定

signal():唤醒在此Lock对象上等待的单个线程,如果该Lock上所有线程都在等待,则会选择唤醒其中一个线程

signalAll():唤醒在此Lock对象上等待的所有线程

下面程序利用lock,condition实现两个线程轮流对i自增

package threadtest;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
 
public class ThreadTest  implements Runnable{
  
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition cond = lock.newCondition();
    static int i = 0;
    private int flag = 0 ;
    public  void incre() throws InterruptedException {
        try {
            lock.lock();
            if(flag == 0) {
                flag = 1;
                System.out.println(Thread.currentThread().getName());
                i++;
                cond.signal();
                cond.await();
                 
            }else if(flag == 1){
                flag = 0;
                System.out.println(Thread.currentThread().getName());
                i++;
                 cond.signal();
               cond.await();
                 
            }else {
                cond.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
      
    @Override
    public  void run() {
        for(int j=0;j<100;j++) {
            try {
                incre();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        flag = 3;
        try {
            incre();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  
    public static void main(String[] args) throws InterruptedException {
        ThreadTest tt = new ThreadTest();
        Thread t1 = new Thread(tt);
        Thread t2 = new Thread(tt);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
          
}

结果:

Thread-0
Thread-1
Thread-0
...
Thread-1
Thread-0
Thread-1
200

三、使用阻塞队列(BlockingQueue)控制线程通信

java5提供了BlockingQueue接口主要用于线程同步工具,它也是Queue的子接口。

BlockingQueue特点:当生产者试图向BlockingQueue中放入元素,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取元素时,如果该队列已空,则该线程被阻塞。

BlockingQueue接口中有两个方法支持阻塞:

put(E e):尝试把E元素放入BlockingQueue中,如果该队列已满,则阻塞该线程

take():尝试从BlockingQueue的头部取出元素,如果该队列已空,则阻塞该线程

BlockingQueue接口继承Queue接口,所以也可以用Queue接口中的方法:

在队尾部插入元素:add(E e)、offer(E e)、put(E e),当队列已满时,这三个方法会抛出异常、返回false、阻塞队列

在队列头部删除并返回删除的元素:remove()、poll、take(),当队列已空时,这个三个方法会抛出异常、返回false、阻塞队列

在队列头部取出但不删除元素:element()、peek(),当队列已空时,这两个方法分别抛出异常,返回false.

BlockingQueue接口常用实现类:

ArrayBlockingQueue:基于数组实现的BlockingQueue队列

LinkedBlockingQueue:基于链表实现的BlockingQueue队列

SynchronousQueue:同步队列。对该队列的存、取操作必须交替进行

BlockingQueue的小例子如下:

package threadtest;
 
import java.util.concurrent.BlockingQueue;
 
/**
 * 生产着
 * @author rdb
 *
 */
public class Producer implements Runnable{
 
    private BlockingQueue<Integer> bq ;
    public Producer(BlockingQueue<Integer> bq) {
        this.bq = bq;
    }
     
    @Override
    public void run() {
        for(int i=0;i<30;i++) {
            System.out.println(Thread.currentThread().getName() + "开始生产" + bq);
            try {
                bq.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "生产结束" + bq);
        }
    }
 
}
 
package threadtest;
 
import java.util.concurrent.BlockingQueue;
 
/**
 * 消费者
 * @author rdb
 *
 */
public class Consumer implements Runnable{
 
    private BlockingQueue<Integer> bq;
    public Consumer(BlockingQueue<Integer> bq) {
        this.bq = bq ;
    }
    @Override
    public void run() {
        while(true) {
            System.out.println(Thread.currentThread().getName() + "开始消费" + bq);
            try {
                bq.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "消费完成" + bq);
        }
    }
 
}
 
package threadtest;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
 
public class ThreadTest  {
 
        //启动三个生产者线程,一个消费者线程
    public static void main(String[] args) {
        BlockingQueue< Integer> bq = new  ArrayBlockingQueue<>(1);
        Producer p = new Producer(bq);
        Consumer c = new Consumer(bq);
        new Thread(p,"produce1").start();
        new Thread(p,"produce2").start();
        new Thread(p,"produce3").start();
        new Thread(c,"consumer").start();
                 
    }
  
     
}

结果:

produce2开始生产[]
consumer开始消费[]
produce3开始生产[]
produce1开始生产[]
produce2生产结束[1]
produce3生产结束[1]
produce2开始生产[1]
consumer消费完成[]
produce3开始生产[1]
consumer开始消费[1]
consumer消费完成[]
produce1生产结束[1]
consumer开始消费[1]
produce1开始生产[1]
produce2生产结束[1]
consumer消费完成[]
........
consumer消费完成[]
consumer开始消费[]
原文地址:https://www.cnblogs.com/jnba/p/10634435.html