手写迷你版BlockingQueue阻塞队列

手写MiniBlockingQueue阻塞队列

BlockingQueue接口

package com.xiaozhou;

public interface BlockingQueue<T> {

    void put(T element) throws InterruptedException;

    T take() throws InterruptedException;
}

MiniBlockingQueue

package com.xiaozhou;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MiniBlockingQueue<T> implements BlockingQueue<T>{

    //底层是一个数组
    private Object[] elementData;
    private int size;

    public MiniBlockingQueue(int size) {
        this.elementData = new Object[size];
        this.size = size;
    }

    //当前队列中的元素个数, put时的指针索引, take时的指针索引
    private int count , putIndex, takeIndex;

    private Lock lock = new ReentrantLock();

    private Condition fullCondition = lock.newCondition();

    private Condition emptyCondition = lock.newCondition();


    @Override
    public void put(T element) throws InterruptedException {

        lock.lock();

        try {
            //当阻塞队列满时 该put的线程需要进入条件队列中等待
            if (count == size){
                fullCondition.await();
            }

            elementData[putIndex] = element;

            putIndex++;

            count++;

            if (putIndex == size) putIndex = 0;

            //通知take阻塞的线程来消费
            emptyCondition.signal();
        } finally {
            lock.unlock();
        }


    }

    @Override
    public T take() throws InterruptedException {

        lock.lock();
        try {
            //当队列为空时,需要进入条件队列中等待
            if (count == 0){
                emptyCondition.await();
            }

            Object element = elementData[takeIndex];

            takeIndex++;
            count--;

            if (takeIndex == size) takeIndex = 0;

            fullCondition.signal();

            return (T) element;
        } finally {
            lock.unlock();
        }
    }

    //测试
    public static void main(String[] args) throws InterruptedException {

        MiniBlockingQueue<String> queue = new MiniBlockingQueue<String>(3);


        // 当阻塞队列中为空时take 会阻塞,直到有生产者放进去值才会take到
//        new Thread(()->{
//            try {
//                String element = queue.take();
//                System.out.println(Thread.currentThread().getName() + " 从队列中获取值为: " + element);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        },"A").start();
//
//
//        TimeUnit.SECONDS.sleep(2);
//
//        queue.put("zzp");


        // 当阻塞队列中值满时,再去put则会阻塞,直到有消费者消费后才会put进去
        new Thread(()->{
            for (int i = 0; i < 4; i++) {
                try {
                    queue.put(String.valueOf(i));
                    System.out.println(Thread.currentThread().getName() + "放入:" + String.valueOf(i));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"A").start();


        TimeUnit.SECONDS.sleep(2);


        for (int i = 0; i < 4; i++) {
            String res = queue.take();
            System.out.println(Thread.currentThread().getName() + "获取到:" + res);
        }
    }
}

万般皆下品,唯有读书高!
原文地址:https://www.cnblogs.com/s686zhou/p/15082977.html