生产者消费者模式

  学习Java的多线程,生产者消费者模式是避免不了的。下面将以wait/notify,await/singal,blockingquene几种方式来实现生产者消费者模式。

  下面的仅仅是例子,实际应用中应该使用Executor来完成,并且保证线程及线程池可以正常关闭。实际使用中需要考虑的情况是生产者生产的较慢,消费者较快,消费者线程也不能一直等待,这就需要正确的标识出生产者线线程池什么时候结束。需要使用volatile boolean和AtomaticInteger来设置生产者线线程池的状态,消费者线程才可以正常结束而不是一直等待。

  使用wait/notify先了解以下知识:

  每一个同步锁lock下面都挂了几个线程队列,包括就绪(Ready)队列,等待(Waiting)队列等。当线程A因为得不到同步锁lock,从而进入的是lock.ReadyQueue(就绪队列),一旦同步锁不被占用,JVM将自动运行就绪队列中的线程而不需要任何notify()的操作。
  但是当线程A被wait()了,那么将进入lock.WaitingQuene(等待队列),同时如果占据的同步锁也会放弃。而此时如果同步锁不唤醒等待队列中的进程(lock.notify()),这些进程将永远不会得到运行的机会。
  wait不是改变锁的状态,是把当前线程放到锁的等待队列里面,notify就是从锁的等待队列里面选择第一个等待的线程进行调度。每个对象都有一个唯一的锁。


  wait, notify操作首先保证当前线程持有锁,否则会抛出异常。

  也就是一般需要如下方式来使用wait/notify。

Object o;

//A thread
synchronized(o){
    ...
    o.wait();
    ...
}

//B thread
synchronized(o){
    ...
    o.notify();
    ...
}

  下面是用wait, notify来完成生产者消费者的例子 

package multiThread;

import java.util.LinkedList;
class Storage
{
    // 仓库最大存储量
    private final int MAX_SIZE = 100;

    // 仓库存储的载体
    private LinkedList<Object> list = new LinkedList<Object>();

    // 生产num个产品
    public void produce(int num)
    {
        // 同步代码段
        synchronized (list)
        {
            // 如果仓库剩余容量不足
            // 这里用的是while,不能是if
            while (list.size() + num > MAX_SIZE)
            {
                System.out.println("【要生产的产品数量】:" + num + "	【库存量】:"
                        + list.size() + "	暂时不能执行生产任务!");
                try
                {
                    // 由于条件不满足,生产阻塞
                    list.wait();
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }

            // 生产条件满足情况下,生产num个产品
            for (int i = 1; i <= num; ++i)
            {
                list.add(new Object());
            }

            System.out.println("【已经生产产品数】:" + num + "	【现仓储量为】:" + list.size());

            list.notifyAll();
        }
    }

    // 消费num个产品
    public void consume(int num)
    {
        // 同步代码段
        synchronized (list)
        {
            // 如果仓库存储量不足
            while (list.size() < num)
            {
                System.out.println("【要消费的产品数量】:" + num + "	【库存量】:"
                        + list.size() + "	暂时不能执行消费任务!");
                try
                {
                    // 由于条件不满足,消费阻塞
                    list.wait();
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }

            // 消费条件满足情况下,消费num个产品
            for (int i = 1; i <= num; ++i)
            {
                list.remove();
            }

            System.out.println("【已经消费产品数】:" + num + "	【现仓储量为】:" + list.size());

            list.notifyAll();
        }
    }

    // get/set方法
    public LinkedList<Object> getList()
    {
        return list;
    }

    public void setList(LinkedList<Object> list)
    {
        this.list = list;
    }

    public int getMAX_SIZE()
    {
        return MAX_SIZE;
    }
}

class producer extends Thread
{
    // 每次生产的产品数量
    private int num;

    // 所在放置的仓库
    private Storage storage;

    // 构造函数,设置仓库
    public producer(Storage storage)
    {
        this.storage = storage;
    }

    // 线程run函数
    public void run()
    {
        produce(num);
    }

    // 调用仓库Storage的生产函数
    public void produce(int num)
    {
        storage.produce(num);
    }

    // get/set方法
    public int getNum()
    {
        return num;
    }

    public void setNum(int num)
    {
        this.num = num;
    }

    public Storage getStorage()
    {
        return storage;
    }

    public void setStorage(Storage storage)
    {
        this.storage = storage;
    }
}

class consumer extends Thread
{
    // 每次消费的产品数量
    private int num;

    // 所在放置的仓库
    private Storage storage;

    // 构造函数,设置仓库
    public consumer(Storage storage)
    {
        this.storage = storage;
    }

    // 线程run函数
    public void run()
    {
        consume(num);
    }

    // 调用仓库Storage的生产函数
    public void consume(int num)
    {
        storage.consume(num);
    }

    // get/set方法
    public int getNum()
    {
        return num;
    }

    public void setNum(int num)
    {
        this.num = num;
    }

    public Storage getStorage()
    {
        return storage;
    }

    public void setStorage(Storage storage)
    {
        this.storage = storage;
    }
}

public class WaitNotifyTest
{
    public static void main(String[] args)
    {
        // 仓库对象
        Storage storage = new Storage();

        // 生产者对象
        producer p1 = new producer(storage);
        producer p2 = new producer(storage);
        producer p3 = new producer(storage);
        producer p4 = new producer(storage);
        producer p5 = new producer(storage);
        producer p6 = new producer(storage);
        producer p7 = new producer(storage);

        // 消费者对象
        consumer c1 = new consumer(storage);
        consumer c2 = new consumer(storage);
        consumer c3 = new consumer(storage);

        // 设置生产者产品生产数量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);

        // 设置消费者产品消费数量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);

        // 线程开始执行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
    }
}

  await/singal是Lock所对应的操作,上面的一个Objec对应的wait队列只能有一个,但是一个Lock对应的await却可以有多个,下面将使用getLock和putLock 2个队列来完成生产者消费者模式。使用await/singal也需要当前线程持有Lock,和上面的是一样的。ArrayBlockingQueue的底层就是用到了同一个Lock的2个队列。

package multiThread;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Storage2 extends Storage
{
    // 仓库最大存储量
    private final int MAX_SIZE = 100;

    // 仓库存储的载体
    private LinkedList<Object> list = new LinkedList<Object>();
    
    private Lock lock = new ReentrantLock();
    private Condition getLock = lock.newCondition();
    private Condition putLock = lock.newCondition();

    // 生产num个产品
    public void produce(int num)
    {
            lock.lock();
            try {                
                // 如果仓库剩余容量不足
                // 这里用的是while,不能是if
                while (list.size() + num > MAX_SIZE) {
                    System.out.println("【要生产的产品数量】:" + num + "	【库存量】:" + list.size() + "	暂时不能执行生产任务!");
                    try {
                        // 由于条件不满足,生产阻塞
                        putLock.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 生产条件满足情况下,生产num个产品
                for (int i = 1; i <= num; ++i) {
                    list.add(new Object());
                }
                System.out.println("【已经生产产品数】:" + num + "	【现仓储量为】:" + list.size());
                getLock.signalAll();
            } finally{
                lock.unlock();
            }
    }

    // 消费num个产品
    public void consume(int num)
    {
            lock.lock();
            try {
                // 如果仓库存储量不足
                while (list.size() < num) {
                    System.out.println("【要消费的产品数量】:" + num + "	【库存量】:" + list.size() + "	暂时不能执行消费任务!");
                    try {
                        // 由于条件不满足,消费阻塞
                        getLock.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 消费条件满足情况下,消费num个产品
                for (int i = 1; i <= num; ++i) {
                    list.remove();
                }
                System.out.println("【已经消费产品数】:" + num + "	【现仓储量为】:" + list.size());
                putLock.signalAll();
            } finally{
                lock.unlock();
            }
    }

    // get/set方法
    public LinkedList<Object> getList()
    {
        return list;
    }

    public void setList(LinkedList<Object> list)
    {
        this.list = list;
    }

    public int getMAX_SIZE()
    {
        return MAX_SIZE;
    }
}


public class AwaitSingalTest
{
    public static void main(String[] args)
    {
        // 仓库对象
        Storage2 storage = new Storage2();

        // 生产者对象
        producer p1 = new producer(storage);
        producer p2 = new producer(storage);
        producer p3 = new producer(storage);
        producer p4 = new producer(storage);
        producer p5 = new producer(storage);
        producer p6 = new producer(storage);
        producer p7 = new producer(storage);

        // 消费者对象
        consumer c1 = new consumer(storage);
        consumer c2 = new consumer(storage);
        consumer c3 = new consumer(storage);

        // 设置生产者产品生产数量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);

        // 设置消费者产品消费数量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);

        // 线程开始执行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
                
    }
}

  用blockingquene来实现生产者消费者模式和上面2中的实现略有不同。上面2中由于都基于了锁,所以增加10个对象时,会一次将这个10个对象增加到仓库中。

  blockingquene的底层也是基于await/singal的,保证了多个线程中对blockingquene的插入、读取操作只能有一个线程进行。blockingquene中对自己进行了加锁,所以生产者和消费者就不能再次对blockingquene加锁。

  但是多次运行该程序,仓库中最后的结果肯定是正确的,这个是由blockingquene来保证的。

package multiThread;

import java.util.concurrent.LinkedBlockingQueue;

class Storage3 extends Storage {
    // 仓库最大存储量
    private final int MAX_SIZE = 100;

    // 仓库存储的载体,使用LinkedBlockingQueue的put和take方法
    // 使用LinkedBlockingQueue时就不要在显示的使用lock或者synchronized
    public static LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<Object>(100);

    // 生产num个产品
    public void produce(int num) {

            // 如果仓库剩余容量不足
            if (lbq.size() + num > MAX_SIZE) {
                System.out.println("【要生产的产品数量】:" + num + "	【库存量】:" + lbq.size() + "	暂时不能执行生产任务!");
            }
            // 生产条件满足情况下,生产num个产品
            for (int i = 1; i <= num; ++i) {
                try {
                    if (!Thread.currentThread().isInterrupted()){
                        lbq.put(new Object());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("【已经生产产品数】:" + num + "	【现仓储量为】:" + lbq.size());

    }

    // 消费num个产品
    public void consume(int num) {
            // 如果仓库存储量不足
            if (lbq.size() < num) {
                System.out.println("【要消费的产品数量】:" + num + "	【库存量】:" + lbq.size() + "	暂时不能执行消费任务!");
            }
            // 消费条件满足情况下,消费num个产品
            for (int i = 1; i <= num; ++i) {
                try {
                    if (!Thread.currentThread().isInterrupted()){
                        lbq.take();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("【已经消费产品数】:" + num + "	【现仓储量为】:" + lbq.size());
    }

    public void setList(LinkedBlockingQueue<Object> lbq) {
        this.lbq = lbq;
    }

    public int getMAX_SIZE() {
        return MAX_SIZE;
    }
}

public class BlockingQueneTest {
    public static void main(String[] args) {
        // 仓库对象
        Storage3 storage = new Storage3();

        // 生产者对象
        producer p1 = new producer(storage);
        producer p2 = new producer(storage);
        producer p3 = new producer(storage);
        producer p4 = new producer(storage);
        producer p5 = new producer(storage);
        producer p6 = new producer(storage);
        producer p7 = new producer(storage);

        // 消费者对象
        consumer c1 = new consumer(storage);
        consumer c2 = new consumer(storage);
        consumer c3 = new consumer(storage);

        // 设置生产者产品生产数量
        p1.setNum(10);
        p2.setNum(10);
        p3.setNum(10);
        p4.setNum(10);
        p5.setNum(10);
        p6.setNum(10);
        p7.setNum(80);

        // 设置消费者产品消费数量
        c1.setNum(50);
        c2.setNum(20);
        c3.setNum(30);

        // 线程开始执行
        c1.start();
        c2.start();
        c3.start();
        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
        
        try {
            c1.join();
            c2.join();
            c3.join();
            p1.join();
            p2.join();
            p3.join();
            p4.join();
            p5.join();
            p6.join();
            p7.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("最后的库存为:" + Storage3.lbq.size());
    }
}
原文地址:https://www.cnblogs.com/lnlvinso/p/4657287.html