Lock、ReentrantLock实现生产者,消费者

转载:https://blog.csdn.net/xiaojin21cen/article/details/87196937

写在前面

  灵活使用了锁机制,值得借鉴。

一、synchronized 实现

使用了 wait()、notify()和 notifyAll() 方法

package com.aop8.proAndcum;

import java.util.Random;
import java.util.Vector;
import java.util.concurrent.TimeUnit;

public class MyContainer1<T> {
	
	private final Vector<T> sum = new Vector<>();
	private final int MAX = 10; // 最多10个元素

	public synchronized void put(T t) {
		while (sum.size() == MAX) { // while正常情况下和wait()一起使用,
			try {
				System.err.println("产品已满");
				this.wait(); 
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("产品 add");
		sum.add(t);
		this.notifyAll(); // 通知消费者线程进行消费
	}

	public synchronized T get() {
		T t = null;
		while (sum.size() == 0) {
			try {
				System.err.println("缺货");
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		t = sum.remove(0);
		this.notifyAll(); // 通知生产者进行生产
		return t;
	}

	public static void main(String[] args) {
		MyContainer1<String> myContainer1 = new MyContainer1<>();
		// 启动消费者线程
		for (int i = 0; i < 4; i++) {
			new Thread(() -> {
				for (int j = 0; j < 5; j++) {
					try {
						TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("消费者--"+myContainer1.get());
				}
			}, "消费者-" + i).start();
		}

		
		// 启动生产者线程
		for (int i = 0; i < 2; i++) {
			new Thread(() -> {
				for (int j = 0; j < 10; j++) {
					try {
						TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					myContainer1.put(Thread.currentThread().getName() + "" + j);
				}
			}, "产品-" + i).start();
		}
	}
}

二、 Lock、ReentrantLock 实现

使用 await()、signal() 和 signalAll()

package com.aop8.proAndcum;

import java.util.Random;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyContainer2<T> {

	private final Vector<T> countNum = new Vector<>();
	private final int MAX = 10;
	private Lock lock = new ReentrantLock();
	private Condition producer = lock.newCondition();
	private Condition consumer = lock.newCondition();

	// 生产者
	public void put(T t) {
		try {
			lock.lock();
			while (countNum.size() == MAX) {
				System.err.println("产品已满");
				producer.await();
			}
			System.out.println("产品 add");
			countNum.add(t);
			consumer.signalAll(); // 通知消费者线程进行消费
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	// 消费者
	public T get() {
		T t = null;
		try {
			lock.lock();
			while (countNum.size() == 0) {
				System.err.println("缺货");
				consumer.await();
			}

			t = countNum.remove(0);
			producer.signalAll(); // 通知生产者进行生产
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
		return t;
	}

	public static void main(String[] args) {
		MyContainer2<String> myContainer2 = new MyContainer2<String>();

		// 启消费者线程
		for (int i = 0; i < 10; i++) {
			new Thread(() -> {
				for (int j = 0; j < 2; j++) {
					try {
						TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("消费者--"+myContainer2.get());
				}
			}, "consumer_" + i).start();
		}

		// 启动生产者线程
		for (int i = 0; i < 2; i++) {
			new Thread(() -> {
				for (int j = 0; j < 10; j++) {
					try {
						TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					myContainer2.put(Thread.currentThread().getName() + "" + j);
				}
			}, "产品-" + i).start();
		}
	}
}
原文地址:https://www.cnblogs.com/ysq0908/p/13509469.html