Java并发---生产者消费者实现

生产者消费者

生产者消费者模型是并发时线程之间同步和通信重要的实现,本文主要用一下四种方式来实现

  1. wait()/notify()方法
  2. 显式Lock和Condition
  3. BlockingQueue阻塞队列方法
  4. PipedWriter/PipedReader方法

wait()/notify()方法

wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,让出CPU和放弃锁,使自己处于等待状态,让其他线程执行。
notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
实现代码如下:

package concurrency.interview;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerAndConsumer_1 {
	static class Factory {

		private final int MAX_SIZE = 10;
		private int number = 0;

		private Object obj = new Object();

		// private LinkedList<Object> list = new LinkedList<>();

		public void produce() throws Exception {

			synchronized (obj) {
				while (number == MAX_SIZE) {
					System.out.println("仓库已满!请先消费");
					obj.wait();
				}

				number++;
				System.out.println("生产成功");
				TimeUnit.SECONDS.sleep(1);
				obj.notifyAll();
			}

		}

		public void consumer() throws Exception {
			synchronized (obj) {
				while (number <= 0) {
					System.out.println("仓库是空的,不能消费");
					obj.wait();
				}

				number--;
				System.out.println("消费成功");
				TimeUnit.SECONDS.sleep(1);
				obj.notifyAll();
			}

		}

	}

	static class Producer implements Runnable {
		Factory factory;

		Producer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {

			try {
				while (!Thread.interrupted()) {
					factory.produce();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	static class Consumer implements Runnable {
		Factory factory;

		Consumer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {
			try {
				while (!Thread.interrupted()) {
					factory.consumer();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	public static void main(String[] args) throws Exception {
		Factory factory = new Factory(); 
		ExecutorService executor = Executors.newCachedThreadPool();
		executor.execute(new Producer(factory));
		executor.execute(new Consumer(factory));

		TimeUnit.SECONDS.sleep(2);

		executor.shutdownNow();

		// Thread producer = new Thread(new Producer(factory));
		// Thread consumer = new Thread(new Consumer(factory));
		//
		// producer.start();
		// consumer.start();
		//
		// TimeUnit.SECONDS.sleep(5);
		//
		// producer.interrupt();
		// consumer.interrupt();
	}
}

显式Lock和Condition

JDK1.5引入了Lock和Condition,使用它们会更安全。可以通过在Condition上调用await()方法来挂起任务,类似wait(),调用signal()/signalAll(),类似notify()/notifyAll()

代码如下:

package concurrency.interview;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerAndConsumer_2 {
	static class Factory {

		private final int MAX_SIZE = 10;
		private int number = 0;

		
		private Lock lock = new ReentrantLock();
		private Condition condition = lock.newCondition();

		public void produce() throws Exception {
			lock.lock();
			try {
				while (number == MAX_SIZE) {
					System.out.println("仓库已满!请先消费");
					condition.await();
				}

				number++;
				System.out.println("生产成功");
				TimeUnit.SECONDS.sleep(1);
				condition.signalAll();
			} finally {
				lock.unlock();
			}

		}

		public void consumer() throws Exception {
			lock.lock();
			try {
				while (number <= 0) {
					System.out.println("仓库是空的,不能消费");
					condition.await();
				}

				number--;
				System.out.println("消费成功");
				TimeUnit.SECONDS.sleep(1);
				condition.signalAll();
			} finally {
				lock.unlock();
			}

		}

	}

	static class Producer implements Runnable {
		Factory factory;

		Producer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {

			try {
				while (!Thread.interrupted()) {
					factory.produce();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	static class Consumer implements Runnable {
		Factory factory;

		Consumer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {
			try {
				while (!Thread.interrupted()) {
					factory.consumer();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	public static void main(String[] args) throws Exception {
		Factory factory = new Factory();
		ExecutorService executor = Executors.newCachedThreadPool();
		executor.execute(new Producer(factory));
		executor.execute(new Consumer(factory));

		TimeUnit.SECONDS.sleep(2);

		executor.shutdownNow();

		// Thread producer = new Thread(new Producer(factory));
		// Thread consumer = new Thread(new Consumer(factory));
		//
		// producer.start();
		// consumer.start();
		//
		// TimeUnit.SECONDS.sleep(5);
		//
		// producer.interrupt();
		// consumer.interrupt();
	}
}

BlockingQueue阻塞队列方法

BlockingQueue也是JDK1.5的新增内容,它是已经在内部实现了同步的队列。主要有以下两个方法
put()方法:容量达到最大时,自动阻塞。
take()方法:容量为0时,自动阻塞。
我们可以看看ArrayBlockingQueue的上面两个方法实现

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

可以看到ArrayBlockingQueue内部的同步就是使用的Lock和Condition。
使用BlockingQueue生产者和消费者代码如下:

package concurrency.interview;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerAndConsumer_3 {
	static class Factory {

		private final int MAX_SIZE = 10;

		private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);

		public void produce() throws Exception {

			queue.put(new Object());
			TimeUnit.SECONDS.sleep(1);
			System.out.println("生产成功");

		}

		public void consumer() throws Exception {

			queue.take();
			TimeUnit.SECONDS.sleep(1);
			System.out.println("消费成功");

		}

	}

	static class Producer implements Runnable {
		Factory factory;

		Producer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {

			try {
				while (!Thread.interrupted()) {
					factory.produce();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	static class Consumer implements Runnable {
		Factory factory;

		Consumer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {
			try {
				while (!Thread.interrupted()) {
					factory.consumer();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	public static void main(String[] args) throws Exception {
		Factory factory = new Factory();
		ExecutorService executor = Executors.newCachedThreadPool();
		executor.execute(new Producer(factory));
		executor.execute(new Producer(factory));
		executor.execute(new Consumer(factory));

		TimeUnit.SECONDS.sleep(5);

		executor.shutdownNow();

		// Thread producer = new Thread(new Producer(factory));
		// Thread consumer = new Thread(new Consumer(factory));
		//
		// producer.start();
		// consumer.start();
		//
		// TimeUnit.SECONDS.sleep(5);
		//
		// producer.interrupt();
		// consumer.interrupt();
	}
}

PipedWriter/PipedReader方法

在jdk引入BlockingQueue之前大多是使用这种方式来实现同步和通信,它基本上可以看做是一个阻塞队列,代码如下:

package concurrency.interview;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerAndConsumer_4 {
	static class Factory {

		private final int MAX_SIZE = 10;

//		private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);
		private PipedWriter out = null;
		private PipedReader in = null;
		
		Factory() throws IOException {
			out = new PipedWriter();
			in = new PipedReader(out);
		}
		
		public void produce() throws Exception {

			out.write("hello world!");;
			TimeUnit.SECONDS.sleep(1);
			System.out.println("生产成功");

		}

		public void consumer() throws Exception {

			char[] buf = new char[12];
			in.read(buf);
			TimeUnit.SECONDS.sleep(1);
			System.out.println("消费成功");
			for (char c : buf) {
				System.out.print(c);
			}
			System.out.println();
		}

	}

	static class Producer implements Runnable {
		Factory factory;

		Producer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {

			try {
				while (!Thread.interrupted()) {
					factory.produce();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	static class Consumer implements Runnable {
		Factory factory;

		Consumer(Factory factory) {
			this.factory = factory;
		}

		@Override
		public void run() {
			try {
				while (!Thread.interrupted()) {
					factory.consumer();
				}
			} catch (Exception e) {
				System.out.println("结束");
			}
		}

	}

	public static void main(String[] args) throws Exception {
		Factory factory = new Factory();
		ExecutorService executor = Executors.newCachedThreadPool();
		executor.execute(new Producer(factory));
		executor.execute(new Producer(factory));
		executor.execute(new Consumer(factory));

		TimeUnit.SECONDS.sleep(5);

		executor.shutdownNow();

		// Thread producer = new Thread(new Producer(factory));
		// Thread consumer = new Thread(new Consumer(factory));
		//
		// producer.start();
		// consumer.start();
		//
		// TimeUnit.SECONDS.sleep(5);
		//
		// producer.interrupt();
		// consumer.interrupt();
	}
}

参考资料

http://blog.csdn.net/monkey_d_meng/article/details/6251879/
https://zhuanlan.zhihu.com/p/20300609
《Java编程思想》

原文地址:https://www.cnblogs.com/yangtong/p/7155078.html