LinkedBlockingQueue、ArrayBlockingQueue、DelayQueue、TransferQueue、SynchronousQueue

1.LinkedBlockingQueue

/**
 * 使用阻塞同步队列 LinkedBlockingQueue 完成生产者消费者模式
 * 使用场景较多。
 */
public class T05_LinkedBlockingQueue {
    public static void main(String[] args) {

        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        // 启动生产者线程生产
        new Thread(() -> {
            for (int j = 0; j < 100; j++) {
                try {
            //若果容器满了,它会在这里等待,容器有空余了在插入数据 queue.put(
"aaa" + j); // put 方法,给容器添加元素,如果容器已经满了,则会阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "p").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } // 启用消费者线程消费 for (int i = 0; i < 5; i++) { new Thread(() -> { while (true) { try { System.out.println(Thread.currentThread().getName() + ":" + queue.take()); // 从队列中拿数据,如果空了,则会阻塞等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } }

2.ArrayBlockingQueue

/**
 * 使用阻塞有界同步队列 ArrayBlockingQueue 完成生产者消费者模式
 */
public class T06_ArrayBlockingQueue {
    public static void main(String[] args) throws InterruptedException {

        BlockingQueue queue = new ArrayBlockingQueue<>(10);
        for (int i = 0; i < 10; i++) {
            queue.put("a" + i);
        }
      //put的话在容器满了的情况下进行等待
//queue.put("a11"); // 会阻塞 //queue.add("a11"); // 会抛出异常 //System.out.println(queue.offer("a11")); // 会返回false System.out.println(queue.offer("a11", 1, TimeUnit.SECONDS)); // 会等待1s,返回false, 如果1s内有空闲,则添加成功后返回true } }

3.DelayQueue可以当做定时任务

/**
 * DelayQueue,
 * 出队有个时间限制, 每个元素有一个等待时间, 可以按照等待时间排序元素
 * DelayQueue元素必须为 Delayed类型的,即必须设置元素的等待时间
 * 
 * 用途,定时执行任务
 */
public class T07_DelayQueue {

    public static void main(String[] args) throws InterruptedException {
        long timestamp = System.currentTimeMillis();
        MyTask myTask1 = new MyTask(timestamp + 1000); // 1s后执行
        MyTask myTask2 = new MyTask(timestamp + 2000);
        MyTask myTask3 = new MyTask(timestamp + 1500);
        MyTask myTask4 = new MyTask(timestamp + 2500);
        MyTask myTask5 = new MyTask(timestamp + 500);

        DelayQueue<MyTask> tasks = new DelayQueue<>();
        tasks.put(myTask1);
        tasks.put(myTask2);
        tasks.put(myTask3);
        tasks.put(myTask4);
        tasks.put(myTask5);

        System.out.println(tasks);  // 确实按照我们拍的顺序执行的

        for (int i = 0; i < tasks.size(); i++) {
            System.out.println(tasks.take());
        }
    }
    static class MyTask implements Delayed {
        private long runningTime;

        public MyTask(long runTime) {
            this.runningTime = runTime;
        }
        
        // 这是每个元素的等待时间, 越是后加入的元素,时间等待的越长
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        // 这是排序规律, 执行等待时间最短的排在上面
        @Override
        public int compareTo(Delayed o) {
            return (int) (o.getDelay(TimeUnit.MILLISECONDS) - this.getDelay(TimeUnit.MILLISECONDS));
        }      
        @Override
        public String toString() {
            return runningTime + "";
        }
    }
}

4.TransferQueue

/**
 * TransferQueue,
 * 拥有transfer方法,传输,当transfer一个元素时,如果有take方法阻塞等待获取元素,则不向队列中保存,直接给等待的take方法
 * 如果没有消费者线程,transfer线程将会阻塞
 * 
 * 情景:如果将元素放入队列,再拿给消费者线程,太慢了,如果需要的效率更高,可以使用TransferQueue来解决更高的并发
 * 
 */
public class T08_TransferQueue {

    public static void main(String[] args) {
        
        TransferQueue mq = new LinkedTransferQueue();
        
        // 先让消费者线程等待
        new Thread(() -> {
            try {
                System.out.println(mq.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 再让生产者线程生产
        try {
       //一般应用于消息的实时处理,若是消费者都在忙着,可以先放到队列里面,但消费者必须必生产者先启动 mq.transfer(
"aaa"); // put add 都不会阻塞,会添加到容器中,只有transfer才有此种功能(等待消费者直接获取),所以transfer是有容量的 } catch (InterruptedException e) { e.printStackTrace(); } //若是先起生产者再起消费者,这里就不会再执行了,他找不到消费者了 /*new Thread(() -> { try { System.out.println(mq.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ } }

 5.SynchronousQueue

这个是容量为0的队列,生产者直接把数据交给消费者

/**
 * SynchronousQueue,
 * 一种特殊的TransferQueue,容量为0
 * 
 * TransferQueue是有容量的,可以通过add/put等方法向队列中加入元素
 * 但是SynchronousQueue是灭有
 */
public class T09_SynchronousQueue {

    public static void main(String[] args) throws InterruptedException {
        
        BlockingQueue queue = new SynchronousQueue();
        
        new Thread(() -> {
            try {
                System.out.println(queue.take()); // 取不到就阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        //queue.add("aaa"); // IllegalStateException: Queue full  抛出异常,因为没有容量
        queue.put("aaa");  // 会阻塞等待消费者线程获取,内部是transfer
        
        System.out.println(queue.size()); // 长度为0 
    }

}

总结

Map/Set:
    无并发:
        HashMap
        TreeMap
        LinkedHashMap
    低并发:
        HashTable
        Collections.synchronizedMap()
    高并发:
        ConcurrentHashMap - 并发高
        ConcurrentSkipListMap - 并发高 且 需要排序

队列:
    无并发:
        ArrayList
        LinkedList
    低并发:
        Vector
        Collections.synchronizedList()
    写少读多:
        CopyOnWriteList
    高并发
        Queue:
            ConcurrentLinkedQueue 非阻塞同步队列
            BlockQueue
                LinkedBlockingQueue
                ArrayBlockingQueue 
                TransferQueue
                SynchronousQueue
            DelayQueue

 

原文地址:https://www.cnblogs.com/gxlaqj/p/11699740.html