生产者/消费者模式实现

  wait/notify最经典的案例就是"生产者/消费者"模式。但是此模式有一些需要注意的地方。

  生产者-消费者也有多种实现方式。

    (1)常见的就是synchronized结合wait+notify实现

    (2)用Lock类实现

    (3)使用BlockingQueue阻塞队列实现

 一、 synchronized结合wait()、notify()实现生产者消费者模型

1. 一个简单的生产者消费者(相当于一个生产者,两个消费者)

  一个线程向集合中添加元素,两个线程从集合中删除元素,与之前等待/通知博客的最后一个案例类似。

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 利用之前的等待/通知实现一个简单的生产者消费者
 * 
 * @author Administrator
 *
 */
public class Demo1 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo1.class);

    public static void main(String[] args) throws InterruptedException {
        final List<String> list = new ArrayList<String>();

        // 删除元素线程1
        Thread sub1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                list.wait();
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub1");
        sub1.start();

        // 删除元素线程2
        Thread sub2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                list.wait();
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub2");
        sub2.start();

        // 增加元素线程
        Thread.sleep(1 * 1000);
        Thread addThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 5; i++) {
                        synchronized (list) {
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                            list.notifyAll();
                            list.wait();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "B");
        addThread.start();
    }
}

结果:

18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->0,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->0, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->1,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->1, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->2,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->2, threadName->sub2
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->3,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->3, threadName->sub1
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] 添加元素->4,threadName->B
18:16:23 [cn.qlq.thread.seven.Demo1]-[INFO] list.remove ->4, threadName->sub2

2. 多生产与多消费:操作值-假死 

  假死的现象其实就是进入waiting状态。如果全部线程都进入waiting状态,则程序就不再执行任何业务功能了,整个项目呈停止状态。

  例如两个生产者两个消费者最后处于假死状态的例子:

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/**
 * 多生产与多消费:操作值-假死( 多生产与多消费保证只有一个元素生产与消费)
 * 
 * @author Administrator
 *
 */
public class Demo2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);

    public static void main(String[] args) throws InterruptedException {
        final List<String> list = new ArrayList<String>();

        // 删除元素线程1
        Thread sub1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub1");
        sub1.start();

        // 删除元素线程2
        Thread sub2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub2");
        sub2.start();

        // 增加元素线程
        Thread.sleep(1 * 1000);
        Thread addThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add1");
        addThread.start();

        // 增加元素线程
        Thread.sleep(1 * 1000);
        Thread addThread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", Thread.currentThread().getName());
                            list.notify();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add2");
        addThread2.start();

        Thread.sleep(10 * 1000);
        LOGGER.info("sub1 state->{}", sub1.getState());
        LOGGER.info("sub2 state->{}", sub2.getState());
        LOGGER.info("add1 state->{}", addThread.getState());
        LOGGER.info("add2 state->{}", addThread2.getState());
    }
}

结果:

18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub1
18:49:23 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 添加元素->0,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->add1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] list.remove ->0, threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub1
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->sub2
18:49:24 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->sub2
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 添加元素->add2,threadName->{}
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 退出等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->add1
18:49:25 [cn.qlq.thread.seven.Demo2]-[INFO] 进入等待***,threadName->add2
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] sub2 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add1 state->WAITING
18:49:35 [cn.qlq.thread.seven.Demo2]-[INFO] add2 state->WAITING

  

解释一下上面的线程假死的原因:

  由于唤醒线程调用的是notify()唤醒单个线程,所以有可能唤醒的是同类的线程,也就是生产者唤醒的是生产者,消费者唤醒的是消费者。导致最后四个线程都处于waiting状态。

解决办法:

  唤醒的时候采用notifyAll()唤醒所有的线程唤醒所有的线程,避免只唤醒同类线程。

3. 多生产与多消费:操作值

  为了避免上面的假死线下,唤醒的时候采用notifyAll()唤醒所有的线程唤醒所有的线程,避免只唤醒同类线程。

  唤醒的时候也唤醒异类,这样就不会出现假死的状态了,程序会一直运行下去。

package cn.qlq.thread.seven;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 多生产与多消费:操作值
 * 
 * @author Administrator
 *
 */
public class Demo3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);

    public static void main(String[] args) throws InterruptedException {
        final List<String> list = new ArrayList<String>();

        // 删除元素线程1
        Thread sub1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub1");
        sub1.start();

        // 删除元素线程2
        Thread sub2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (list) {
                        while (true) {
                            while (list.size() == 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }

                            LOGGER.info("list.remove ->{}, threadName->{}", list.get(0),
                                    Thread.currentThread().getName());
                            list.remove(0);
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "sub2");
        sub2.start();

        // 增加元素线程
        Thread.sleep(1 * 1000);
        Thread addThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", i, Thread.currentThread().getName());
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add1");
        addThread.start();

        // 增加元素线程
        Thread.sleep(1 * 1000);
        Thread addThread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 50000; i++) {
                        synchronized (list) {
                            while (list.size() != 0) {
                                LOGGER.info("进入等待***,threadName->{}", Thread.currentThread().getName());
                                list.wait();
                                LOGGER.info("退出等待***,threadName->{}", Thread.currentThread().getName());
                            }
                            list.add(i + "");
                            LOGGER.info("添加元素->{},threadName->{}", Thread.currentThread().getName());
                            list.notifyAll();
                        }
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("InterruptedException error", e);
                }
            }
        }, "add2");
        addThread2.start();

        Thread.sleep(10 * 1000);
        LOGGER.info("sub1 state->{}", sub1.getState());
        LOGGER.info("sub2 state->{}", sub2.getState());
        LOGGER.info("add1 state->{}", addThread.getState());
        LOGGER.info("add2 state->{}", addThread2.getState());
    }
}

4.  多个生产者一个消费者

  附上一个多生产与一个消费者的例子。大体的思路是对资源加锁,在线程中通过while(true)不停的调用资源的add()方法和remove()方法,两个方法都是同步方法,需要同步是对当前对象加锁。每个对象都有一个阻塞队列,一个就绪队列。

  在学习完wait/notify之后再次重写下面代码就清晰多了。

package cn.qlq.thread.seven;

public class Demo4 {
    public static void main(String[] args) {
        MyResource myResource = new MyResource();
        // 多个生产者一个消费者
        MyConsumerThread myConsumerThread = new MyConsumerThread(myResource);
        // MyConsumerThread myConsumerThread1 = new
        // MyConsumerThread(myResource);
        // MyConsumerThread myConsumerThread2 = new
        // MyConsumerThread(myResource);
        MyProducerThread myProducerThread = new MyProducerThread(myResource);
        MyProducerThread myProducerThread1 = new MyProducerThread(myResource);
        MyProducerThread myProducerThread2 = new MyProducerThread(myResource);
        myProducerThread.start();
        myProducerThread1.start();
        myProducerThread2.start();
        myConsumerThread.start();
        // myConsumerThread1.start();
        // myConsumerThread2.start();
    }
}

/**
 * 资源类 一个加一个减(都有同步锁)
 * 
 * @author: qlq
 * @date : 2018年6月15日上午11:38:37
 */
class MyResource {
    private int num;// 资源数量
    private int capacity = 10;// 资源容量

    /**
     * 同步方法增加资源 如果数量大于容量,线程进入阻塞状态 否则通知消费者进行消费
     */
    public synchronized void add() {
        if (num >= capacity) {// 大于等于的话进入阻塞状态
            try {
                wait();
                System.out.println(Thread.currentThread().getName() + "进入线程等待。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            num++;// 生产一件资源
            System.out.println(Thread.currentThread().getName() + "生产一件资源,目前剩余资源" + num + "件");
            notifyAll();// 通知消费者进行消费
        }
    }

    /**
     * 同步方法移除资源 如果num>0,消费资源,通知生产者进行生产 否则的话进入阻塞队列
     */
    public synchronized void remove() {
        if (num > 0) {
            num--;
            System.out.println(Thread.currentThread().getName() + "消费一件资源,目前剩余" + num + "件");
            notifyAll();// 唤醒生产者进行生产
        } else {
            try {
                wait();
                System.out.println(Thread.currentThread().getName() + "进入线程等待。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

/**
 * 生产者类
 * 
 * @author: qlq
 * @date : 2018年6月16日上午11:08:05
 */
class MyProducerThread extends Thread {
    private MyResource resource;

    protected MyProducerThread(MyResource resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.add();
        }
    }
}

/**
 * 消费者线程
 * 
 * @author: qlq
 * @date : 2018年6月16日上午11:09:06
 */
class MyConsumerThread extends Thread {
    private MyResource resource;

    protected MyConsumerThread(MyResource resource) {
        super();
        this.resource = resource;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(2 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            resource.remove();
        }
    }
}

  

总结:   

  每个对象都有一个阻塞队列,一个就绪队列。同步方法中才可以使用wait()/notify(),而且获得哪个锁才能调用对应锁的对应方法,否则会报非法监视器异常。

 

二、 ReentrantLock结合Condition的await()、sianalAll()实现生产者消费者模式

1.单个生产线程,多个消费线程的例子

  生产者和消费者分别阻塞在两个条件下面的例子

package cn.qlq.thread.eleven;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ReentrantLock結合Condition实现生产者-消费者模型(单生产-多消费)
 * 
 * @author Administrator
 *
 */
public class Demo4 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class);
    private Lock lock = new ReentrantLock();
    private Condition producerCon = lock.newCondition();
    private Condition consumerCon = lock.newCondition();
    private volatile List<String> list = new ArrayList<String>();// 模拟是一个容器
    private volatile int capacity = 3;// 最多3个

    public void removeEle() {
        try {
            lock.lock();
            for (int i = 0; i < 5; i++) {
                while (list.size() == 0) {
                    consumerCon.await();// 阻塞消费者
                }
                LOGGER.info("threadName - > {} 消费元素 {}", Thread.currentThread().getName(), list.get(0));
                list.remove(0);
                producerCon.signal();// 唤醒生产者,由于是单个生产者,所以可以用signal()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void addEle() {
        try {
            lock.lock();
            for (int i = 0; i < 10; i++) {
                // 超过容量阻塞生产者
                while (list.size() >= capacity) {
                    producerCon.await();
                }
                LOGGER.info("threadName - > {} 生产元素 {}", Thread.currentThread().getName(), i);
                list.add(i + "");
                consumerCon.signalAll();// 唤醒消费者,由于是多个消费者,所以可以用signalAll()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final Demo4 demo4 = new Demo4();

        // 两个消费者
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con2").start();

        // 单个生产者
        Thread.sleep(1 * 1000);
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.addEle();
            }
        }, "pro1").start();
    }
}

结果:

13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 0
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 1
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 2
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 0
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 1
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 2
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 3
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 4
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 5
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 3
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con1 消费元素 4
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 5
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 6
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 7
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 8
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 6
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 7
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 8
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > pro1 生产元素 9
13:31:28 [cn.qlq.thread.eleven.Demo4]-[INFO] threadName - > con2 消费元素 9

2.多个生产线程,多个消费线程的例子

package cn.qlq.thread.eleven;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * ReentrantLock結合Condition实现生产者-消费者模型(单生产-多消费)
 * 
 * @author Administrator
 *
 */
public class Demo5 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo5.class);
    private Lock lock = new ReentrantLock();
    private Condition producerCon = lock.newCondition();
    private Condition consumerCon = lock.newCondition();
    private volatile List<String> list = new ArrayList<String>();// 模拟是一个容器
    private volatile int capacity = 3;// 最多十个

    public void removeEle() {
        try {
            lock.lock();
            for (int i = 0; i < 5; i++) {
                while (list.size() == 0) {
                    consumerCon.await();// 阻塞消费者
                }
                LOGGER.info("threadName - > {} 消费元素 {}", Thread.currentThread().getName(), list.get(0));
                list.remove(0);
                producerCon.signalAll();// 唤醒生产者,由于是单个生产者,所以可以用signal()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void addEle() {
        try {
            lock.lock();
            for (int i = 0; i < 5; i++) {
                // 超过容量阻塞生产者
                while (list.size() >= capacity) {
                    producerCon.await();
                }
                LOGGER.info("threadName - > {} 生产元素 {}", Thread.currentThread().getName(), i);
                list.add(i + "");
                consumerCon.signalAll();// 唤醒消费者,由于是多个消费者,所以可以用signalAll()
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final Demo5 demo4 = new Demo5();

        // 两个消费者
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.removeEle();
            }
        }, "con2").start();

        // 单个生产者
        Thread.sleep(1 * 1000);
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.addEle();
            }
        }, "pro1").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                demo4.addEle();
            }
        }, "pro2").start();
    }
}

结果:

13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro1 生产元素 4
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con1 消费元素 4
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 0
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 1
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 2
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 3
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > pro2 生产元素 4
13:36:49 [cn.qlq.thread.eleven.Demo5]-[INFO] threadName - > con2 消费元素 4

三、 使用BlockingQueue阻塞队列实现生产者消费者

  参考我的另一篇博客:https://www.cnblogs.com/qlqwjy/p/10175201.html 

原文地址:https://www.cnblogs.com/qlqwjy/p/10115756.html