七、条件队列构建生产者、消费者示例

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionQueueTest {

    private static ReentrantLock lock     = new ReentrantLock();
    private static Condition     empty    = lock.newCondition();
    private static Condition     notEmpty = lock.newCondition();

    private static String data;

    public static void main(String[] args) throws InterruptedException {
        // 启动生产者线程
        startThread0("生产者");
        // 并发启动消费者线程
        CountDownLatch latch = new CountDownLatch(1);
        for (int i = 0; i < 3; i++) {
            startThread1(String.format("消费者%s", i), latch);
        }
        // 休眠1秒,保证所有的Thread都调用start方法
        Thread.sleep(1000);
        latch.countDown();
        Thread.sleep(Integer.MAX_VALUE);
    }

    public static void startThread0(String threadName) {
        new Thread(() -> {
            // 生产者生产数据:1)校验数据;2)等待空数据信号;3)校验数据
            for (; ; ) {
                lock.lock();
                if (data == null) {
                    // 如果没有数据,那么生产数据
                    data = String.valueOf(new Random().nextInt(1000));
                    System.out.println(threadName + "生产数据:" + data);
                    // 通知所有消费者生产了一份数据
                    notEmpty.signalAll();
                }
                try {
                    // 进入等待
                    empty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.unlock();
            }
        }).start();
    }

    public static void startThread1(String threadName, CountDownLatch latch) {
        new Thread(() -> {
            try {
                // 等待其它消费者
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 消费者消费数据,1)校验数据是否为空;2)等待非空数据信号;3)再校验数据
            for (; ; ) {
                lock.lock();
                if (data != null) {
                    // 消费到数据跳出循环
                    System.out.println(threadName + "消费数据:" + data);
                    data = null;
                    lock.unlock();
                    break;
                }
                // 无数据信号发送给生产者
                empty.signal();
                try {
                    // 等待数据非空
                    notEmpty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.unlock();
            }
        }).start();
    }
}

以上代码构建了1个生产者线程,多个消费者线程:

1)生产者等待empty的信号去生产数据,并发送notEmpty信号通知所有消费者去消费数据;

2)消费者发送empty信号,并等待notEmpty信号去消费数据;

注意:条件对列很容易造成信号丢失,必须保证先校验、再等待、等待结束再校验一次这样的逻辑才能防止丢失。

原文地址:https://www.cnblogs.com/lay2017/p/10997811.html