经典并发问题:生产者-消费者

前置知识

此处以Java描述该问题,需要Java并发的知识,这里附上一些不错的教程:

Youtube上的一个教程,非常实战,能快速找到感觉。

Oracle的并发官方教程,很全面,但不深入。

国人的Java并发系列教程,很全面,但有地方深入到源码,有的浅停在使用方法。

免费的翻译书,没有看,据说不错。

问题描述

生产者-消费者(Producer-Consumer Problem)以下简称为PC问题。其描述以下问题:

  • 多个生产者生产产品,多个消费者消费产品,两者间有一个大小固定的缓冲区;
  • 生产者、消费者不可同时访问缓冲区;
  • 生产者不可向满缓冲区放产品;
  • 消费者不可从空缓冲区取产品。

信号量解法

public class PCSemaphore {
	private final static int BUFFER_SIZE = 10;

	public static void main(String[] args) {
		Semaphore mutex = new Semaphore(1);
		Semaphore full = new Semaphore(0);
		Semaphore empty = new Semaphore(BUFFER_SIZE);

		Queue<Integer> buffer = new LinkedList<>();

		Producer producer = new Producer(mutex, empty, full, buffer);
		Consumer consumer = new Consumer(mutex, empty, full, buffer);

		// 可以初始化多个生产者、消费者
		new Thread(producer, "p1").start();
		new Thread(producer, "p2").start();
		new Thread(consumer, "c1").start();
		new Thread(consumer, "c2").start();
		new Thread(consumer, "c3").start();
	}
}

class Producer implements Runnable {
	private Semaphore mutex, empty, full;
	private Queue<Integer> buffer;
	private Integer counter = 0;

	public Producer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {
		this.mutex = mutex;
		this.empty = empty;
		this.full = full;
		this.buffer = buffer;
	}

	@Override
	public void run() {
		while (true) {
			try {
				empty.acquire();
				mutex.acquire();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			buffer.offer(counter++);
			
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			full.release();
			mutex.release();
		}
	}
}

class Consumer implements Runnable {
	private Semaphore mutex, empty, full;
	private Queue<Integer> buffer;

	public Consumer(Semaphore mutex, Semaphore empty, Semaphore full, Queue<Integer> buffer) {
		this.mutex = mutex;
		this.empty = empty;
		this.full = full;
		this.buffer = buffer;
	}

	@Override
	public void run() {
		String threadName = Thread.currentThread().getName();
		
		while (true) {
			try {
				full.acquire();
				mutex.acquire();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			Integer product = buffer.poll();
			int left = buffer.size();
			System.out.printf("%s consumed %d left %d%n", threadName, product, left);
			
			try {
				Thread.sleep(200);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			empty.release();
			mutex.release();
		}
	}
}

教科书的解法,问题有三:

  1. 不可交换消费者中的full和mutex信号量的请求顺序,若我们交换他们则会有:
    image_1camh79sc162efh2a42ir2eq19.png-20.5kB
    同样地不可交换生产者中的empty和mutex信号量的请求顺序。
    即必须遵守先资源信号量,再互斥信号量的请求顺序。
    教科书中使用AND型信号量解决该问题,但这对程序员来说算是一个Dirty Solution,为什么呢,见问题2。

  2. 将进程控制部分和业务逻辑放在一起,这种代码看着混乱不堪。

  3. 效率低,Java中是不会用信号量来实现这东西,信号量在Java中常用来限制对一个共享资源的最大并发访问数。

注意,因为这里写的写的是一个小示例,所以我们没有考虑产品的生产、使用和入队、出队(放入、拿出缓存区的过程)。但实际中我们只需要将产品的入队和出队进行互斥即可,产品的生产和使用可并发执行,甚至在每个生产者、消费者内部可建立单独的缓冲区,暂存生产出来但还不能放到公共缓冲区的产品,直到可以放入公共缓冲区。

wait() & notify()

public class PCWaitNotify {
	public final static int BUFFER_SIZE = 10;

	public static void main(String[] args) {
		Object mutex = new Object();
		AtomicInteger counter = new AtomicInteger(0);
		Queue<Integer> buffer = new LinkedList<>();

		Producer producer = new Producer(buffer, counter, mutex);
		Consumer consumer = new Consumer(buffer, mutex);

		new Thread(producer).start();
		new Thread(producer).start();
		new Thread(consumer).start();
	}
}

class Producer implements Runnable {
	private Random rand = new Random();
	private Queue<Integer> buffer;
	private AtomicInteger counter; // 支持原子操作的基本类型包装类
	private Object mutex;

	public Producer(Queue<Integer> buffer, AtomicInteger counter, Object mutex) {
		this.buffer = buffer;
		this.counter = counter;
		this.mutex = mutex;
	}

	@Override
	public void run() {
		while (true) {
			synchronized (mutex) {
				while (buffer.size() == PCWaitNotify.BUFFER_SIZE) {
					try {
						mutex.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}

				buffer.offer(counter.incrementAndGet());
				mutex.notify();
			}
			
			try {
				Thread.sleep(rand.nextInt(800));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

class Consumer implements Runnable {
	private Random rand = new Random();
	private Queue<Integer> buffer;
	private Object mutex;

	public Consumer(Queue<Integer> buffer, Object mutex) {
		this.buffer = buffer;
		this.mutex = mutex;
	}

	@Override
	public void run() {
		while (true) {
			synchronized (mutex) {
				while (buffer.size() == 0) {
					try {
						mutex.wait();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				
				System.out.println("consumed " + buffer.poll() + " left " + buffer.size());
				mutex.notify();
			}
			try {
				Thread.sleep(rand.nextInt(500));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

以Java的底层并发API,wait()和notify()实现,效率虽高,但进程控制部分和业务逻辑同样混在一起,没有完全解决问题。

BlockingQueue

public class PCBlockingQueue {
	private final static int BUFFER_SIZE = 10;

	public static void main(String[] args) {
		BlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(BUFFER_SIZE);
		AtomicInteger counter = new AtomicInteger(0);
		
		Producer producer = new Producer(buffer, counter);
		Consumer consumer = new Consumer(buffer);
		
		new Thread(producer).start();
		new Thread(producer).start();
		new Thread(consumer).start();
	}
}

class Producer implements Runnable {
	private Random rand = new Random();
	private AtomicInteger counter;
	private BlockingQueue<Integer> buffer;

	public Producer(BlockingQueue<Integer> buffer, AtomicInteger counter) {
		this.buffer = buffer;
		this.counter = counter;
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep(rand.nextInt(800));
				Integer product = counter.incrementAndGet();
				buffer.put(product);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

class Consumer implements Runnable {
	private Random rand = new Random();
	private BlockingQueue<Integer> buffer;
	
	public Consumer(BlockingQueue<Integer> buffer) {
		this.buffer = buffer;
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep(rand.nextInt(600));
				Integer product = buffer.take(); // 队列空时,会阻塞,直到有新元素,并将新元素返回。
				System.out.println("consumed " + product + " left " + buffer.size());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

你觉得像Java这样的企业级语言会不考虑到代码的功能分离问题吗(SRP原则)?Java中早就提供了一堆线程安全的数据结构,这里用了BlockingQueue,其他线程安全类参考java.util.concurrent包。

原文地址:https://www.cnblogs.com/sequix/p/8776716.html