生产者和消费者

  • 如果生产者的队列满了(while循环判断是否满),则等待。如果生产者的队列没满,则生产数据并唤醒消费者进行消费。

  • 如果消费者的队列空了(while循环判断是否空),则等待。如果消费者的队列没空,则消费数据并唤醒生产者进行生产。

package test;

import java.util.Random;

import java.util.Vector;

import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {

    // true--->生产者一直执行,false--->停掉生产者

    private volatile boolean isRunning = true;

    // 公共资源

    private final Vector sharedQueue;

    // 公共资源的最大数量

    private final int SIZE;

    // 生产数据

    private static AtomicInteger count = new AtomicInteger();

    public Producer(Vector sharedQueue, int SIZE) {

        this.sharedQueue = sharedQueue;

        this.SIZE = SIZE;

    }

    @Override

    public void run() {

        int data;

        Random r = new Random();

        System.out.println("start producer id = " + Thread.currentThread().getId());

        try {

            while (isRunning) {

                // 模拟延迟

                Thread.sleep(r.nextInt(1000));

                // 当队列满时阻塞等待

                while (sharedQueue.size() == SIZE) {

                    synchronized (sharedQueue) {

                        System.out.println("Queue is full, producer " + Thread.currentThread().getId()

                                + " is waiting, size:" + sharedQueue.size());

                        sharedQueue.wait();

                    }

                }

                // 队列不满时持续创造新元素

                synchronized (sharedQueue) {

                    // 生产数据

                    data = count.incrementAndGet();

                    sharedQueue.add(data);

                    System.out.println("producer create data:" + data + ", size:" + sharedQueue.size());

                    sharedQueue.notifyAll();

                }

            }

        } catch (InterruptedException e) {

            e.printStackTrace();

            Thread.currentThread().interrupted();

        }

    }

    public void stop() {

        isRunning = false;

    }

}

package test;

import java.util.Random;

import java.util.Vector;

public class Consumer implements Runnable {

    // 公共资源

    private final Vector sharedQueue;

    public Consumer(Vector sharedQueue) {

        this.sharedQueue = sharedQueue;

    }

    @Override

    public void run() {

        Random r = new Random();

        System.out.println("start consumer id = " + Thread.currentThread().getId());

        try {

            while (true) {

                // 模拟延迟

                Thread.sleep(r.nextInt(1000));

                // 当队列空时阻塞等待

                while (sharedQueue.isEmpty()) {

                    synchronized (sharedQueue) {

                        System.out.println("Queue is empty, consumer " + Thread.currentThread().getId()

                                + " is waiting, size:" + sharedQueue.size());

                        sharedQueue.wait();

                    }

                }

                // 队列不空时持续消费元素

                synchronized (sharedQueue) {

                    System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size());

                    sharedQueue.notifyAll();

                }

            }

        } catch (InterruptedException e) {

            e.printStackTrace();

            Thread.currentThread().interrupt();

        }

    }

}

package test;

import java.util.Vector;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class Test2 {

    public static void main(String[] args) throws InterruptedException {

        // 1.构建内存缓冲区

        Vector sharedQueue = new Vector();

        int size = 4;

        // 2.建立线程池和线程

        ExecutorService service = Executors.newCachedThreadPool();

        Producer prodThread1 = new Producer(sharedQueue, size);

        Producer prodThread2 = new Producer(sharedQueue, size);

        Producer prodThread3 = new Producer(sharedQueue, size);

        Consumer consThread1 = new Consumer(sharedQueue);

        Consumer consThread2 = new Consumer(sharedQueue);

        Consumer consThread3 = new Consumer(sharedQueue);

        service.execute(prodThread1);

        service.execute(prodThread2);

        service.execute(prodThread3);

        service.execute(consThread1);

        service.execute(consThread2);

        service.execute(consThread3);

        // 3.睡一会儿然后尝试停止生产者(结束循环)

        Thread.sleep(10 * 1000);

        prodThread1.stop();

        prodThread2.stop();

        prodThread3.stop();

        // 4.再睡一会儿关闭线程池

        Thread.sleep(3000);

        // 5.shutdown()等待任务执行完才中断线程(因为消费者一直在运行的,所以会发现程序无法结束)

        service.shutdown();

    }

}

原文地址:https://www.cnblogs.com/chinaifae/p/10443305.html