java blockingQueue 原理

blockingQueue 阻塞队列,一般用于消费者生产者模式。 就是通过ReentrantLock和condition实现的阻塞,

1     /** Main lock guarding all access */
2     final ReentrantLock lock;
3     /** Condition for waiting takes */
4     private final Condition notEmpty;
5     /** Condition for waiting puts */
6     private final Condition notFull;

在我上一篇文章中简单用condition实现了消费者生产者模式。

https://www.cnblogs.com/liumy/p/11564375.html

本文的jdk版本是 jdk1.7

下面是阻塞对列的几个重要方法。

首先, 队列是 FIFO, 先进先出。 队列的头部是在队列中存活时间长的元素,而在队列尾部是存货时间短的元素。

下面是根据ArrayBlockingQueue来说明这些方法的实现。BlockingQueue的实现类有很多种,这里只是把共通的实现逻辑讲一下。

其中

抛出异常是指 当队列已满的时候,添加元素(add(e)),抛出IllegalStateException。当添加的对象为null时,会有空指针异常。在队列为空时,删除元素(remove()),会抛出NoSuchElementException。当执行element()方法时,如果队列为空,则抛出NoSuchElementException,反之,返回队列的头元素。

返回特殊值 是指,当队列已满时,执行offer方法在队列尾部添加元素时,会返回false,如果成功添加,则返回true。 当队列为空时,执行poll方法,返回为null,反之,则返回头部元素。

阻塞是指, 当队列已满,执行put方法,添加元素时,线程会阻塞,直到队列变得不满时,线程才会被唤醒,继续添加元素。当队列为空时,执行take方法,拉去元素时,线程会阻塞,直到队列有数据时,线程才继续执行。

超时是指 当队列已满,执行offer(e,time,unit)方法,添加元素时,线程会等待指定的时间,如果这个时间内队列还是满的,那么j就返回false,反之,则返回true。当队列为空,执行poll(time,unit)方法,拉取元素时,线程会等待指定的时间,如果这个时间内队列还是空的,则返回false,反之,返回true。

ArrayBlockingQueue.class

有下面这些成员变量。

/**
     * Serialization ID. This class relies on default serialization
     * even for the items array, which is default-serialized, even if
     * it is empty. Otherwise it could not be declared final, which is
     * necessary here.
     */
    private static final long serialVersionUID = -817911632652898426L;

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

 blockingQueue 简单例子: 生产者消费者模式

package com.citi.test.mutiplethread.demo0503;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        BlockingQueue<String> queue=new ArrayBlockingQueue<String>(4);
        new Thread(new Producer(queue)).start();
        new Thread(new Producer(queue)).start();
        new Thread(new Producer(queue)).start();
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        
        new Thread(new Consumer(queue)).start();
    }
}
package com.citi.test.mutiplethread.demo0503;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable{
    private BlockingQueue<String> queue;
    public Producer(BlockingQueue<String> queue) {
        this.queue=queue;
    }
    @Override
    public void run() {
        while(true){
            try {
                String str=UUID.randomUUID().toString();
                System.out.println(Thread.currentThread().getName()+"生产者生产"+str);
                queue.put(str);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
}
package com.citi.test.mutiplethread.demo0503;

import java.util.Arrays;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{
    
    private BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {
        this.queue=queue;
    }

    @Override
    public void run() {
        while(true){
            try {
                String take = queue.take();
                System.out.println(Thread.currentThread().getName()+"消费了"+take);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
}

参考下面资料:

https://www.cnblogs.com/zaizhoumo/p/7786793.html

https://www.jianshu.com/p/69ad22627b2b

原文地址:https://www.cnblogs.com/liumy/p/11565425.html