架构师养成记--8.Queue

一、ConcurrentLinkedQueue

是一个适合在高并发场景下,无锁,无界的,先进先出原则。不允许为null值,add()、offer()加入元素,这两个方法没区别;poll()、peek()取头元素节点,pull会删除,peek不会。

有一点要注意,轮询条件不能用queue.size();而是用 queue.isEmpty(); 看下面的代码:

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentLinkedQueueTest {
    private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
    private static int count = 100000;
    private static int count2 = 2; // 线程个数
    private static CountDownLatch cd = new CountDownLatch(count2);
    public static void dothis() {
        for (int i = 0; i < count; i++) {
            queue.offer(i);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        long timeStart = System.currentTimeMillis();
        ExecutorService es = Executors.newFixedThreadPool(4);
        ConcurrentLinkedQueueTest.dothis();
        for (int i = 0; i < count2; i++) {
            es.submit(new Poll());
        }
        cd.await();
        System.out.println("cost time "
                + (System.currentTimeMillis() - timeStart) + "ms");
        es.shutdown();
    }
    static class Poll implements Runnable {
        @Override
        public void run() {
            while (queue.size()>0) {
//            while (!queue.isEmpty()) {
                System.out.println(queue.poll());
            }
            cd.countDown();
        }
    }
}

运行结果是:

改用queue.isEmpty()后运行结果是:

结果居然相差那么大,看了下ConcurrentLinkedQueue的API 原来.size() 是要遍历一遍集合的,难怪那么慢,所以尽量要避免用size而改用isEmpty()。

二、ArrayBlockingQueue

基于数组的阻塞队列、有缓冲、定长、没有实现读写分离。有界队列。

下面是ArrayBlockingQueue实现的生产者消费者模式:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(100);
        for (int i = 0; i < 10; i++)
            new Thread(new ThreadProducer(queue)).start();
        for (int i = 0; i < 10; i++)
            new Thread(new ThreadConsumer(queue)).start();
    }
}

class ThreadProducer implements Runnable {
    ThreadProducer(BlockingQueue queue) {
        this.queue = queue;
    }

    BlockingQueue queue;
    static int cnt = 0;

    public void run() {
        String cmd;
        while (true) {
            cmd = "" + (cnt);
            cnt = (cnt + 1) & 0xFFFFFFFF;
            try {
                queue.put(cmd);
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class ThreadConsumer implements Runnable {
    ThreadConsumer(BlockingQueue queue) {
        this.queue = queue;
    }

    BlockingQueue queue;

    public void run() {
        String cmd;
        while (true) {
            try {
                System.out.println(queue.take());
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

三、LinkedBlockingQueue

基于链表的阻塞队列、有缓冲、读写分离锁(从而实现生产者和消费者操作的完全并行运行)、无界队列

LinkedBlockingQueue实现是线程安全的,实现了FIFO(先进先出)等特性. 是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

工厂生产制造  生产高大上洒, 还有美女.

消费者有X二代,也有导演.

让消费者抢资源吧.

生产者:

import java.util.UUID;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private BlockingQueue<String> queue;
    private String produce;
    public Producer(BlockingQueue<String> queue, String produce) {
        this.queue = queue;
        if (null != produce)
         this.produce = produce;
        else this.produce = "null ";
    }

    @Override
    public void run() {
        String uuid = UUID.randomUUID().toString();
        try {
         Thread.sleep(200);//生产需要时间
            queue.put(produce + " : " + uuid);
            System.out.println("Produce "" + produce + "" : " + uuid + " " + Thread.currentThread());
           
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}      

消费者:

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
 private BlockingQueue<String> queue;
 private String consumer;

 public Consumer(BlockingQueue<String> queue, String consumer) {
  this.queue = queue;
  if (null != consumer)
   this.consumer = consumer;
  else
   this.consumer = "null ";
 }

 @Override
 public void run() {
  try {
   String uuid = queue.take();
   System.out.println(consumer + " decayed " + uuid
     + " " + Thread.currentThread());
  } catch (InterruptedException e) {
   System.out.println(e.getMessage());
  }
 }
}

调用:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Tester {

    public Tester(){
        // 队列
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);

       
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 6; i++) {
         service.submit(new Consumer(queue, "X二代" + i));
         service.submit(new Consumer(queue, "导演" + i));
        }
        for (int i = 0; i < 6; i++) {
         service.submit(new Producer(queue, "黄金酒," + i));
         service.submit(new Producer(queue, "美女演员" + i));
        }
        service.shutdown();
    }
}   

四、SynchronousQueue

没有缓冲的队列,生产者查收的数据或直接被消费者获取并消费。在add前必须take,否则报错。

 五、PriorityBlockingQueue


元素必须实现Comparable接口,在第一次调用take方法的时候才会被排序

public class Task implements Comparable<Task>{
    
    private int id ;
    private String name;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    
    @Override
    public int compareTo(Task task) {
        return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);  
    }
    
    public String toString(){
        return this.id + "," + this.name;
    }
    
}


import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;

public class UsePriorityBlockingQueue {

    
    public static void main(String[] args) throws Exception{
        
        
        PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
        
        Task t1 = new Task();
        t1.setId(3);
        t1.setName("id为3");
        Task t2 = new Task();
        t2.setId(4);
        t2.setName("id为4");
        Task t3 = new Task();
        t3.setId(1);
        t3.setName("id为1");
        
        //return this.id > task.id ? 1 : 0;
        q.add(t1);    //3
        q.add(t2);    //4
        q.add(t3);  //1
        
        // 1 3 4
        System.out.println("容器:" + q);
        System.out.println(q.take().getId());
        System.out.println("容器:" + q);
//        System.out.println(q.take().getId());
//        System.out.println(q.take().getId());
        

        
    }
}

 六、DelayQueue 延迟队列

元素需要实现Delayed接口,DelayQueue是没有大小限制的队列。调用take方法不会立即拿到元素,得等到设定的延迟时间到了才能拿到元素。下面是网吧上网流程使用DelayQueue的例子。

  1 package com.bjsxt.base.coll013;
  2 
  3 import java.util.concurrent.DelayQueue;
  4 
  5 public class WangBa implements Runnable {  
  6     
  7     private DelayQueue<Wangmin> queue = new DelayQueue<Wangmin>();  
  8     
  9     public boolean yinye =true;  
 10       
 11     public void shangji(String name,String id,int money){  
 12         Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());  
 13         System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");  
 14         this.queue.add(man);  
 15     }  
 16       
 17     public void xiaji(Wangmin man){  
 18         System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");  
 19     }  
 20   
 21     @Override  
 22     public void run() {  
 23         while(yinye){  
 24             try {  
 25                 Wangmin man = queue.take();  
 26                 xiaji(man);  
 27             } catch (InterruptedException e) {  
 28                 e.printStackTrace();  
 29             }  
 30         }  
 31     }  
 32       
 33     public static void main(String args[]){  
 34         try{  
 35             System.out.println("网吧开始营业");  
 36             WangBa siyu = new WangBa();  
 37             Thread shangwang = new Thread(siyu);  
 38             shangwang.start();  
 39               
 40             siyu.shangji("路人甲", "123", 1);  
 41             siyu.shangji("路人乙", "234", 10);  
 42             siyu.shangji("路人丙", "345", 5);  
 43         }  
 44         catch(Exception e){  
 45             e.printStackTrace();
 46         }  
 47   
 48     }  
 49 }  
 50 
 51 --------------------------------------------------------
 52 
 53 
 54 package com.bjsxt.base.coll013;
 55 
 56 import java.util.concurrent.Delayed;
 57 import java.util.concurrent.TimeUnit;
 58 
 59 public class Wangmin implements Delayed {  
 60     
 61     private String name;  
 62     //身份证  
 63     private String id;  
 64     //截止时间  
 65     private long endTime;  
 66     //定义时间工具类
 67     private TimeUnit timeUnit = TimeUnit.SECONDS;
 68       
 69     public Wangmin(String name,String id,long endTime){  
 70         this.name=name;  
 71         this.id=id;  
 72         this.endTime = endTime;  
 73     }  
 74       
 75     public String getName(){  
 76         return this.name;  
 77     }  
 78       
 79     public String getId(){  
 80         return this.id;  
 81     }  
 82       
 83     /** 
 84      * 用来判断是否到了截止时间 
 85      */  
 86     @Override  
 87     public long getDelay(TimeUnit unit) { 
 88         //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
 89         return endTime - System.currentTimeMillis();
 90     }  
 91   
 92     /** 
 93      * 相互批较排序用 
 94      */  
 95     @Override  
 96     public int compareTo(Delayed delayed) {  
 97         Wangmin w = (Wangmin)delayed;  
 98         return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;  
 99     }      
100     
101 }  
原文地址:https://www.cnblogs.com/sigm/p/6186401.html