生产者和消费者模型

生产者-消费者模型

网上有很多生产者-消费者模型的定义和实现。本文研究最常用的有界生产者-消费者模型,简单概括如下:

  • 生产者持续生产,直到缓冲区满,阻塞;缓冲区不满后,继续生产
  • 消费者持续消费,直到缓冲区空,阻塞;缓冲区不空后,继续消费
  • 生产者可以有多个,消费者也可以有多个

可通过如下条件验证模型实现的正确性:

  • 同一产品的消费行为一定发生在生产行为之后
  • 任意时刻,缓冲区大小不小于0,不大于限制容量

准备 - 接口定义

消费者:

public abstract class AbstractConsumer implements Runnable {

    protected abstract void consume() throws InterruptedException;

    @Override
    public void run() {
        while (true) {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }
}

生产者

public abstract class AbstractProducer implements Runnable {

    protected abstract void produce() throws InterruptedException;

    @Override
    public void run() {
        while (true) {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
                break;
            }
        }
    }
}

模型:

public interface Model {
    Runnable newRunnableConsumer();

    Runnable newRunnableProducer();
}

bean:

public class Task {
    private int no;
    public Task(int no) {
        this.no = no;
    }

    public int getNo() {
        return no;
    }
}

实现一:BlockingQueue

BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中。

public class BlockingQueueModel implements Model {

    private final BlockingQueue<Task> blockingQueue;

    BlockingQueueModel(int capacity) {
        this.blockingQueue = new LinkedBlockingQueue<>(capacity);
    }

    private final AtomicInteger taskNo = new AtomicInteger(0);

    @Override
    public Runnable newRunnableConsumer() {
        return new AbstractConsumer() {
            @Override
            public void consume() throws InterruptedException {
                Task task = blockingQueue.take();
                // 固定时间范围的消费,模拟相对稳定的服务器处理过程
                TimeUnit.MILLISECONDS.sleep(500 + (long) (Math.random() * 500));
                System.out.println("consume: " + task.getNo());
            }
        };
    }

    @Override
    public Runnable newRunnableProducer() {
        return new AbstractProducer() {
            @Override
            public void produce() throws InterruptedException {
                // 不定期生产,模拟随机的用户请求
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
                Task task = new Task(taskNo.getAndIncrement());
                blockingQueue.put(task);
                System.out.println("produce: " + task.getNo());
            }
        };
    }

    public static void main(String[] args) {
        Model model = new BlockingQueueModel(3);
        Arrays.asList(1, 2).forEach(x -> new Thread(model.newRunnableConsumer()).start());
        Arrays.asList(1, 2, 3, 4, 5).forEach(x -> new Thread(model.newRunnableProducer()).start());
    }
}
实现一:BlockingQueue

运行结果:

由于数据操作和日志输出是两个事务,所以上述日志的绝对顺序未必是真实的数据操作顺序,但对于同一个任务号task.getNo,其consume日志一定出现在其produce日志之后,即:同一任务的消费行为一定发生在生产行为之后。

实现二:wait && notify

Object类提供的wait()方法与notifyAll()方法

public class WaitNotifyModel implements Model {

    private final Object BUFFER_LOCK = new Object();

    private final Queue<Task> queue = new LinkedList<>();
    private final int capacity;

    public WaitNotifyModel(int capacity) {
        this.capacity = capacity;
    }

    private final AtomicInteger taskNo = new AtomicInteger(0);

    @Override
    public Runnable newRunnableConsumer() {
        return new AbstractConsumer() {
            @Override
            public void consume() throws InterruptedException {
                synchronized (BUFFER_LOCK) {
                    while (queue.isEmpty()) {
                        BUFFER_LOCK.wait();
                    }

                    Task task = queue.poll();
                    assert task != null;
                    TimeUnit.MILLISECONDS.sleep(500 + (long) (Math.random() * 500));
                    System.out.println("consume: " + task.getNo());
                    BUFFER_LOCK.notifyAll();
                }
            }
        };
    }

    @Override
    public Runnable newRunnableProducer() {
        return new AbstractProducer() {
            @Override
            public void produce() throws InterruptedException {
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
                synchronized (BUFFER_LOCK) {
                    while (queue.size() == capacity) {
                        BUFFER_LOCK.wait();
                    }
                    Task task = new Task(taskNo.getAndIncrement());
                    queue.offer(task);
                    System.out.println("produce: " + task.getNo());
                    BUFFER_LOCK.notifyAll();
                }
            }
        };
    }

    public static void main(String[] args) {
        Model model = new BlockingQueueModel(3);
        Arrays.asList(1, 2).forEach(x -> new Thread(model.newRunnableConsumer()).start());
        Arrays.asList(1, 2, 3, 4, 5).forEach(x -> new Thread(model.newRunnableProducer()).start());
    }
}
实现二: wait && notify

朴素的wait && notify机制不那么灵活,但足够简单

实现三:简单的Lock && Condition

java.util.concurrent包提供的Lock && Condition,对于实现二的简单变形

public class LockConditionModel implements Model {

    private final Lock BUFFER_LOCK = new ReentrantLock();
    private final Condition CONDITION = BUFFER_LOCK.newCondition();
    private final Queue<Task> queue = new LinkedList<>();

    private final int capacity;

    public LockConditionModel(int capacity) {
        this.capacity = capacity;
    }

    private final AtomicInteger taskNo = new AtomicInteger(0);

    @Override
    public Runnable newRunnableConsumer() {
        return new AbstractConsumer() {
            @Override
            public void consume() throws InterruptedException {
                BUFFER_LOCK.lockInterruptibly();
                try {
                    while (queue.isEmpty()) {
                        CONDITION.await();
                    }

                    Task task = queue.poll();
                    assert task != null;
                    TimeUnit.MILLISECONDS.sleep(500 + (long) (Math.random() * 500));
                    System.out.println("consume: " + task.getNo());
                    CONDITION.signalAll();
                } finally {
                    BUFFER_LOCK.unlock();
                }
            }
        };
    }

    @Override
    public Runnable newRunnableProducer() {
        return new AbstractProducer() {
            @Override
            public void produce() throws InterruptedException {
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));

                BUFFER_LOCK.lockInterruptibly();

                try {
                    while (queue.size() == capacity) {
                        CONDITION.await();
                    }
                    Task task = new Task(taskNo.getAndIncrement());
                    queue.offer(task);
                    System.out.println("produce: " + task.getNo());
                    CONDITION.signalAll();
                } finally {
                    BUFFER_LOCK.unlock();
                }
            }
        };
    }

    public static void main(String[] args) {
        Model model = new BlockingQueueModel(3);
        Arrays.asList(1, 2).forEach(x -> new Thread(model.newRunnableConsumer()).start());
        Arrays.asList(1, 2, 3, 4, 5).forEach(x -> new Thread(model.newRunnableProducer()).start());
    }
}
实现三: lock && condition

实现四:更高并发性能的Lock && Condition

实现三有一个问题,通过实践可以发现,实现二,三的效率明显低于实现一,并发瓶颈很明显,因为在锁 BUFFER_LOCK 看来,任何消费者线程与生产者线程都是一样的。换句话说,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)操作缓冲区 buffer。

而实际上,如果缓冲区是一个队列的话,“生产者将产品入队”与“消费者将产品出队”两个操作之间没有同步关系,可以在队首出队的同时,在队尾入队。理想性能可提升至两倍。

去掉这个瓶颈

那么思路就简单了:需要两个锁 CONSUME_LOCKPRODUCE_LOCKCONSUME_LOCK控制消费者线程并发出队,PRODUCE_LOCK控制生产者线程并发入队;相应需要两个条件变量NOT_EMPTYNOT_FULLNOT_EMPTY负责控制消费者线程的状态(阻塞、运行),NOT_FULL负责控制生产者线程的状态(阻塞、运行)。以此让优化消费者与消费者(或生产者与生产者)之间是串行的;消费者与生产者之间是并行的。

public class LockConditionPreferModel implements Model {

    private final Lock CONSUMER_LOCK = new ReentrantLock();
    private final Condition NOT_EMPTY_CONDITION = CONSUMER_LOCK.newCondition();

    private final Lock PRODUCER_LOCK = new ReentrantLock();
    private final Condition NOT_FULL_CONDITION = PRODUCER_LOCK.newCondition();
    private AtomicInteger bufLen = new AtomicInteger(0);
    private final Buffer<Task> buffer = new Buffer<>();

    private final int capacity;

    public LockConditionPreferModel(int capacity) {
        this.capacity = capacity;
    }

    private final AtomicInteger taskNo = new AtomicInteger(0);

    @Override
    public Runnable newRunnableConsumer() {
        return new AbstractConsumer() {
            @Override
            public void consume() throws InterruptedException {
                int newBufSize;
                CONSUMER_LOCK.lockInterruptibly();
                try {
                    while (bufLen.get() == 0) {
                        System.out.println("buffer is empty...");
                        NOT_EMPTY_CONDITION.await();
                    }

                    Task task = buffer.poll();
                    newBufSize = bufLen.decrementAndGet();
                    assert task != null;
                    TimeUnit.MILLISECONDS.sleep(500 + (long) (Math.random() * 500));
                    System.out.println("consume: " + task.getNo());
                    if (newBufSize > 0) {
                        NOT_EMPTY_CONDITION.signalAll();
                    }
                } finally {
                    CONSUMER_LOCK.unlock();
                }

                if (newBufSize < capacity) {
                    PRODUCER_LOCK.lockInterruptibly();
                    try {
                        NOT_FULL_CONDITION.signalAll();
                    } finally {
                        PRODUCER_LOCK.unlock();
                    }
                }
            }
        };
    }

    @Override
    public Runnable newRunnableProducer() {
        return new AbstractProducer() {
            @Override
            public void produce() throws InterruptedException {
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 1000));
                int newBufSize;
                PRODUCER_LOCK.lockInterruptibly();

                try {
                    while (bufLen.get() == capacity) {
                        System.out.println("buffer is full...");
                        NOT_FULL_CONDITION.await();
                    }
                    Task task = new Task(taskNo.getAndIncrement());
                    buffer.offer(task);
                    newBufSize = bufLen.incrementAndGet();
                    System.out.println("produce: " + task.getNo());
                    NOT_FULL_CONDITION.signalAll();
                } finally {
                    PRODUCER_LOCK.unlock();
                }

                if (newBufSize > 0) {
                    CONSUMER_LOCK.unlock();
                    try {
                        NOT_EMPTY_CONDITION.signalAll();
                    } finally {
                        CONSUMER_LOCK.unlock();
                    }
                }
            }
        };
    }

    private static class Buffer<E> {
        private Node head;
        private Node tail;

        Buffer() {
            head = tail = new Node(null);
        }

        private void offer(E e) {
            tail.next = new Node(e);
            tail = tail.next;
        }

        private E poll() {
            head = head.next;
            E e = head.item;
            head.item = null;
            return e;
        }

        private class Node {
            E item;
            Node next;

            Node(E item) {
                this.item = item;
            }
        }
    }

    public static void main(String[] args) {
        Model model = new BlockingQueueModel(3);
        Arrays.asList(1, 2).forEach(x -> new Thread(model.newRunnableConsumer()).start());
        Arrays.asList(1, 2, 3, 4, 5).forEach(x -> new Thread(model.newRunnableProducer()).start());
    }
}
实现四 双lock && 双condition

需要注意的是,由于需要同时在UnThreadSafe的缓冲区 buffer 上进行消费与生产,我们不能使用实现二、三中使用的队列了,需要自己实现一个简单的缓冲区 Buffer。Buffer要满足以下条件:

  • 在头部出队,尾部入队
  • 在poll()方法中只操作head
  • 在offer()方法中只操作tail

实现要点:

1. 持有两种锁

2. 每次生产(/消费)结束会检验数据状态更新另一种锁,当然更新的过程要用相应的锁同步。

还能进一步优化吗

我们已经优化掉了消费者与生产者之间的瓶颈,还能进一步优化吗?

如果可以,必然是继续优化消费者与消费者(或生产者与生产者)之间的并发性能。然而,消费者与消费者之间必须是串行的,因此,并发模型上已经没有地方可以继续优化了。

不过在具体的业务场景中,一般还能够继续优化。如:

  • 并发规模中等,可考虑使用CAS代替重入锁
  • 模型上不能优化,但一个消费行为或许可以进一步拆解、优化,从而降低消费的延迟
  • 一个队列的并发性能达到了极限,可采用“多个队列”(如分布式消息队列等)

补充一下LinkedBlockingQueue在新增和删除时候的各个方法的区别:

一般情况建议用offer和poll,是即时操作,如果带时间的offer和poll相当于限时的同步等待

永久的同步等待使用put和take(@see 上面的实现一)

原文地址:https://www.cnblogs.com/balfish/p/7794494.html