多线程-线程通信

一、线程的通信机制

  1、wait/notify/notifyAll机制

  3、Lock+Condition


 二、wait/notify机制

  该机制涉及三个方法,wait()/notify()/notifyAll()。三个方法均非Thread类中的方法,而是Object类中声明的方法。均为final方法,无法被重写。

  wait():让当前线程(Thread.concurrentThread()方法所返回的线程)释放对象锁并进入等待(阻塞)状态,直到线程被唤醒。并且当前线程必须拥有此调用对象的锁。

  notify():唤醒一个正在等待相应对象锁的线程,使其进入就绪队列,等待当前线程释放锁后竞争锁,得到CPU的执行。如果有多个线程在等待这个对象的锁,只能唤醒其中一个线程。

  notify():唤醒所有正在等待这个对象的锁的线程。

为何这三个方法不是Thread类声明中的方法,而是Object类中声明的方法?
    
   用这几个方法必须拿到当前对象的monitor对象。monitor存在于引用指针中,而synchronized关键字可以获取monitor。

  意:一个线程被唤醒不代表立即获取了对象的锁,只有等调用完了notify()/notifyAll()并退出synchronized块,释放对象锁后,其他线程才可获得执行


 Demo:利用synchronized、wait、notifyAll实现生产者消,费者模型

Resource:共享资源池

package com.imooc.demo.thread;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Title: Resource
 * @Description: 公共资源类
 * @date 2019/1/189:56
 */
public class Resource {
    private static final Logger logger = LoggerFactory.getLogger(Resource.class);
    //当前资源池数
    private int num = 0;
    //池中最大数
    private int maxSize = 10;

    /**
     * 取资源
     */
    public synchronized void remove(){
        //当有资源,消费者可以调用,无资源消费者等待
        if (num > 0){
            num--;
            logger.info("消费者:"+Thread.currentThread().getName()+"消耗了一个资源,池中剩余资源:"+num);
            //唤醒生产者
            notifyAll();
        }else {
            try {
                //消费者等待
                wait();
                logger.info("资源池已无资源,消费者开始等待");
            } catch (InterruptedException e) {
                logger.error("remove wait error {}",e);
            }
        }
    }

    /**
     * 添加资源
     */
    public synchronized void add(){
        //资源小于可以添加,满了生产者等待
        if (num < maxSize){
            num++;
            logger.info(Thread.currentThread().getName()+"生产了一个资源,池中还有资源:"+num);
            //唤醒消费者
            notifyAll();
        }else {
            try {
                //生产者等待
                wait();
                logger.info(Thread.currentThread().getName()+"进入等待");
            } catch (InterruptedException e) {
                logger.error("add wait error {}",e);
            }
        }
    }
}
ConsumerThread:消费者+ProducerThread:生产者 
//消费者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Title: ConsumerThread
 * @Description: 消费者
 * @date 2019/1/1810:07
 */
public class ConsumerThread extends Thread{
    private static final Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
    private Resource resource;
    public ConsumerThread(Resource resource){
        this.resource = resource;
    }
    public void run(){
        while (true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("ConsumerThread error {}",e);
            }
            resource.remove();
        }
    }
}
//生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @Title: ProducerThread
 * @Description: 生产者
 * @date 2019/1/1810:08
 */
public class ProducerThread extends Thread{
    private static final Logger logger = LoggerFactory.getLogger(ProducerThread.class);
    private Resource resource;
    public ProducerThread(Resource resource){
        this.resource = resource;

    }
    public void run(){
        while (true){
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                logger.error("ProducerThread error {}",e);
            }
            resource.add();
        }
    }
}

Client:客户端

package com.imooc.demo.thread;

/**
 * @Title: Client
 * @Description: 客户端
 * @date 2019/1/1810:10
 */
public class Client {
    public static void main(String[] args) {
        Resource resource =new Resource();
        ProducerThread pt1 = new ProducerThread(resource);
        ProducerThread pt2 = new ProducerThread(resource);
        ProducerThread pt3 = new ProducerThread(resource);

        ConsumerThread ct1 = new ConsumerThread(resource);
        ConsumerThread ct2 = new ConsumerThread(resource);
        pt1.start();
        pt2.start();
        pt3.start();
        ct1.start();
    }
}

三、Condition

  在JDK5中出现,替代传统的Object的wait、notify实现线程间的协作。Condition是个接口,基本方法是await()、signal()/signalAll()

  依赖于Lock接口,生成一个Condition的方式是lock.newCondition()。await()和signal()/signalAll()方法都必须在lock的保护之内

Demo:使用lock、condition、await、signalAll实现生产者、消费者模型

Resource

package com.imooc.demo.treadForCondition;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @Title: Resource
 * @Description: 资源池
 * @date 2019/1/1810:31
 */
public class Resource {
    private static final Logger logger = LoggerFactory.getLogger(com.imooc.demo.thread.Resource.class);
    private volatile int num = 0;//当前资源数量
    private int maxSize = 10;//资源池中允许存放的资源数目
    private Lock lock;
    //生产者和消费者的Condition 实例
    private Condition producerCondition;
    private Condition consumerCondition;
    public Resource(Lock lock,Condition producerCondition,Condition consumerCondition){
        this.lock = lock;
        this.producerCondition = producerCondition;
        this.consumerCondition = consumerCondition;
    }
    /**
     * 取资源
     */
    public synchronized void remove(){
        lock.lock();
        //当有资源,消费者可以调用,无资源消费者等待
        try {
            if (num > 0){
                num--;
                logger.info("消费者:"+Thread.currentThread().getName()+"消耗了一个资源,池中剩余资源:"+num);
                //唤醒生产者
                consumerCondition.signalAll();
            }else {
                try {
                    //消费者等待
                    producerCondition.await();
                    logger.info("资源池已无资源,消费者开始等待");
                } catch (InterruptedException e) {
                    logger.error("remove wait error {}",e);
                }
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 添加资源
     */
    public synchronized void add(){
        //资源小于可以添加,满了生产者等待
        lock.lock();
        try {
            if (num < maxSize){
                num++;
                logger.info(Thread.currentThread().getName()+"生产了一个资源,池中还有资源:"+num);
                //唤醒消费者
                producerCondition.signalAll();
            }else {
                try {
                    //生产者等待
                    consumerCondition.await();
                    logger.info(Thread.currentThread().getName()+"进入等待");
                } catch (InterruptedException e) {
                    logger.error("add wait error {}",e);
                }
            }
        } finally {
            lock.unlock();
        }
    }
}
ConsumerThread+ProducerThread+Client
//ConsumerThread

import com.imooc.demo.thread.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @Title: ConsumerThread
 * @Description: 消费者
 * @date 2019/1/1810:07
 */
public class ConsumerThread extends Thread{
    private static final Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
    private Resource resource;
    public ConsumerThread(Resource resource){
        this.resource = resource;
    }
    public void run(){
        while (true){
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                logger.error("ConsumerThread error {}",e);
            }
            resource.remove();
        }
    }
}

//ProducerThread

import com.imooc.demo.thread.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @Title: ProducerThread
 * @Description: 生产者
 * @date 2019/1/1810:08
 */
public class ProducerThread extends Thread{
    private static final Logger logger = LoggerFactory.getLogger(ProducerThread.class);
    private Resource resource;
    public ProducerThread(Resource resource){
        this.resource = resource;

    }
    public void run(){
        while (true){
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("ProducerThread error {}",e);
            }
            resource.add();
        }
    }
}
//Client

import com.imooc.demo.thread.Resource;

/**
 * @Title: Client
 * @Description: 客户端
 * @date 2019/1/1810:10
 */
public class Client {
    public static void main(String[] args) {
        Resource resource =new Resource();
        ProducerThread pt1 = new ProducerThread(resource);

        ConsumerThread ct1 = new ConsumerThread(resource);
        ConsumerThread ct2 = new ConsumerThread(resource);
        pt1.start();
        ct1.start();
        ct2.start();
    }
}
原文地址:https://www.cnblogs.com/zhangbLearn/p/9837809.html