并发Queue

在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论是哪种,都继承自Queue

ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出原则。头是最先加入的,尾是最近加入的,该队列不允许null元素。

ConcurrentLinkedQueue重要方法:

  add()和offer()都是加入元素的重要的方法(ConcurrentLinkedQueue中,这两个方法没有任何区别)

  poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。


代码:

 1 package com.java.day03_queue;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ConcurrentLinkedQueue;
 6 import java.util.concurrent.LinkedBlockingQueue;
 7 
 8 public class UseQueue {
 9     public static void main(String[] args) {
10         //高性能无阻塞队列
11         ConcurrentLinkedQueue<String>q = new ConcurrentLinkedQueue<String>();
12         q.add("add");
13         q.offer("offer");
14         
15         System.out.println("q.poll():"+q.poll());
16         System.out.println("q.size():"+q.size());
17         System.out.println("q.peek():"+q.peek());
18         System.out.println("q.size():"+q.size());
19         
20         
21         //阻塞队列
22         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();
23         b.offer("offer");
24         b.add("add1");
25         b.add("add2");
26         b.add("add3");
27         b.add("add4");
28         
29         List <String>list = new ArrayList<String>();
30         //drainTo:将b的元素取出1个放入list中
31         System.out.println("b.drainTo(list,3):"+b.drainTo(list,1));
32         System.out.println("b.size():"+b.size());
33         
34         System.out.println("*****遍历list*****");
35         for (String string : list) {
36             System.out.println(string);
37         }
38         
39         
40     }
41 }

运行结果:

1 q.poll():add
2 q.size():1
3 q.peek():offer
4 q.size():1
5 b.drainTo(list,3):1
6 b.size():4
7 *****遍历list*****
8 offer

BlockingQueue接口

ArrayBlockingQueue:基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定常的数组,以便缓存队列中的数据对象,其内部没有实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用。

代码:

 1 package com.java.day03_queue;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ArrayBlockingQueue;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 import java.util.concurrent.LinkedBlockingQueue;
 8 import java.util.concurrent.TimeUnit;
 9 
10 public class UseQueue {
11     public static void main(String[] args) throws Exception {
12         System.out.println("********ConcurrentLinkedQueue********");
13         // 高性能无阻塞队列
14         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
15         q.add("add");
16         q.offer("offer");
17 
18         System.out.println("q.poll():" + q.poll());
19         System.out.println("q.size():" + q.size());
20         System.out.println("q.peek():" + q.peek());
21         System.out.println("q.size():" + q.size());
22 
23         System.out.println("******ArrayBlockingQueue********");
24         
25         // 阻塞队列
26         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
27         a.put("put1");
28         a.put("put2");
29         a.add("add1");
30         a.add("add2");
31         a.add("add3");
32         // offer方法是阻塞的
33         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
34         System.out.println("***遍历a****");
35         for (String string : a) {
36             System.out.println(string);
37         }
38         
39         
40         
41         System.out.println("******LinkedBlockingQueue*************");
42         
43         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();
44         b.offer("offer");
45         b.add("add1");
46         b.add("add2");
47         b.add("add3");
48         b.add("add4");
49 
50         List<String> list = new ArrayList<String>();
51         // drainTo:将b的元素取出1个放入list中
52         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
53         System.out.println("b.size():" + b.size());
54 
55         System.out.println("*****遍历list*****");
56         for (String string : list) {
57             System.out.println(string);
58         }
59 
60     }
61 }

运行结果:

 1 ********ConcurrentLinkedQueue********
 2 q.poll():add
 3 q.size():1
 4 q.peek():offer
 5 q.size():1
 6 ******ArrayBlockingQueue********
 7 a.offer():false
 8 ***遍历a****
 9 put1
10 put2
11 add1
12 add2
13 add3
14 ******LinkedBlockingQueue*************
15 b.drainTo(list,3):1
16 b.size():4
17 *****遍历list*****
18 offer

LinkedBlockingQueue:基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。它是一个无界队列。

代码:

 1 package com.java.day03_queue;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ArrayBlockingQueue;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 import java.util.concurrent.LinkedBlockingQueue;
 8 import java.util.concurrent.TimeUnit;
 9 
10 public class UseQueue {
11     public static void main(String[] args) throws Exception {
12         System.out.println("********ConcurrentLinkedQueue********");
13         // 高性能无阻塞队列
14         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
15         q.add("add");
16         q.offer("offer");
17 
18         System.out.println("q.poll():" + q.poll());
19         System.out.println("q.size():" + q.size());
20         System.out.println("q.peek():" + q.peek());
21         System.out.println("q.size():" + q.size());
22 
23         System.out.println("******ArrayBlockingQueue********");
24         
25         // 阻塞队列
26         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
27         a.put("put1");
28         a.put("put2");
29         a.add("add1");
30         a.add("add2");
31         a.add("add3");
32         // offer方法是阻塞的
33         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
34         System.out.println("***遍历a****");
35         for (String string : a) {
36             System.out.println(string);
37         }
38         
39         
40         
41         System.out.println("******LinkedBlockingQueue*************");
42         
43         //LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();//无界
44         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>(5);//有界为5
45         b.offer("offer");
46         b.add("add1");
47         b.add("add2");
48         b.add("add3");
49         b.add("add4");
50         //b.add("add5"); 如果初始化 为5个,则第六个如果是以add加入,抛异常,如果是以offer加入,不抛异常
51         b.offer("add5");
52         
53         System.out.println("b.size():"+b.size());
54         List<String> list = new ArrayList<String>();
55         // drainTo:将b的元素取出1个放入list中
56         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
57         System.out.println("b.size():" + b.size());
58 
59         System.out.println("*****遍历list*****");
60         for (String string : list) {
61             System.out.println(string);
62         }
63 
64     }
65 }

运行结果:

 1 ********ConcurrentLinkedQueue********
 2 q.poll():add
 3 q.size():1
 4 q.peek():offer
 5 q.size():1
 6 ******ArrayBlockingQueue********
 7 a.offer():false
 8 ***遍历a****
 9 put1
10 put2
11 add1
12 add2
13 add3
14 ******LinkedBlockingQueue*************
15 b.size():5
16 b.drainTo(list,3):1
17 b.size():4
18 *****遍历list*****
19 offer

SynchronizedQueue:一种没有缓冲的队列,生产者生产的数据直接会被消费者获取并消费

不能直接添加元素,直接添加会报异常

代码:

 1 package com.java.day03_queue;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ArrayBlockingQueue;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 import java.util.concurrent.LinkedBlockingQueue;
 8 import java.util.concurrent.SynchronousQueue;
 9 import java.util.concurrent.TimeUnit;
10 
11 public class UseQueue {
12     public static void main(String[] args) throws Exception {
13         System.out.println("********ConcurrentLinkedQueue********");
14         // 高性能无阻塞队列
15         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
16         q.add("add");
17         q.offer("offer");
18 
19         System.out.println("q.poll():" + q.poll());
20         System.out.println("q.size():" + q.size());
21         System.out.println("q.peek():" + q.peek());
22         System.out.println("q.size():" + q.size());
23 
24         System.out.println("******ArrayBlockingQueue********");
25         
26         // 阻塞队列
27         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
28         a.put("put1");
29         a.put("put2");
30         a.add("add1");
31         a.add("add2");
32         a.add("add3");
33         // offer方法是阻塞的
34         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
35         System.out.println("***遍历a****");
36         for (String string : a) {
37             System.out.println(string);
38         }
39         
40         
41         
42         System.out.println("******LinkedBlockingQueue*************");
43         
44         //LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();//无界
45         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>(5);//有界为5
46         b.offer("offer");
47         b.add("add1");
48         b.add("add2");
49         b.add("add3");
50         b.add("add4");
51         //b.add("add5"); 如果初始化 为5个,则第六个如果是以add加入,抛异常,如果是以offer加入,不抛异常
52         b.offer("add5");
53         
54         System.out.println("b.size():"+b.size());
55         List<String> list = new ArrayList<String>();
56         // drainTo:将b的元素取出1个放入list中
57         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
58         System.out.println("b.size():" + b.size());
59 
60         System.out.println("*****遍历list*****");
61         for (String string : list) {
62             System.out.println(string);
63         }
64 
65         //SynchronizedQueue
66         SynchronousQueue<String> s = new SynchronousQueue<String>();
67         s.add("test");
68         
69         
70     }
71 }

运行结果:

 1 ********ConcurrentLinkedQueue********
 2 q.poll():add
 3 q.size():1
 4 q.peek():offer
 5 q.size():1
 6 ******ArrayBlockingQueue********
 7 a.offer():false
 8 ***遍历a****
 9 put1
10 put2
11 add1
12 add2
13 add3
14 ******LinkedBlockingQueue*************
15 b.size():5
16 b.drainTo(list,3):1
17 b.size():4
18 *****遍历list*****
19 offer
20 Exception in thread "main" java.lang.IllegalStateException: Queue full
21     at java.util.AbstractQueue.add(Unknown Source)
22     at com.java.day03_queue.UseQueue.main(UseQueue.java:67)

SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

 代码:

 1 package com.java.day03_queue;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 import java.util.concurrent.ArrayBlockingQueue;
 6 import java.util.concurrent.ConcurrentLinkedQueue;
 7 import java.util.concurrent.LinkedBlockingQueue;
 8 import java.util.concurrent.SynchronousQueue;
 9 import java.util.concurrent.TimeUnit;
10 
11 public class UseQueue {
12     public static void main(String[] args) throws Exception {
13         System.out.println("********ConcurrentLinkedQueue********");
14         // 高性能无阻塞队列
15         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
16         q.add("add");
17         q.offer("offer");
18 
19         System.out.println("q.poll():" + q.poll());
20         System.out.println("q.size():" + q.size());
21         System.out.println("q.peek():" + q.peek());
22         System.out.println("q.size():" + q.size());
23 
24         System.out.println("******ArrayBlockingQueue********");
25         
26         // 阻塞队列
27         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
28         a.put("put1");
29         a.put("put2");
30         a.add("add1");
31         a.add("add2");
32         a.add("add3");
33         // offer方法是阻塞的
34         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
35         System.out.println("***遍历a****");
36         for (String string : a) {
37             System.out.println(string);
38         }
39         
40         
41         
42         System.out.println("******LinkedBlockingQueue*************");
43         
44         //LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();//无界
45         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>(5);//有界为5
46         b.offer("offer");
47         b.add("add1");
48         b.add("add2");
49         b.add("add3");
50         b.add("add4");
51         //b.add("add5"); 如果初始化 为5个,则第六个如果是以add加入,抛异常,如果是以offer加入,不抛异常
52         b.offer("add5");
53         
54         System.out.println("b.size():"+b.size());
55         List<String> list = new ArrayList<String>();
56         // drainTo:将b的元素取出1个放入list中
57         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
58         System.out.println("b.size():" + b.size());
59 
60         System.out.println("*****遍历list*****");
61         for (String string : list) {
62             System.out.println(string);
63         }
64 
65         //SynchronizedQueue
66         System.out.println("**********SynchronizedQueue***************");
67         final SynchronousQueue<String> s = new SynchronousQueue<String>();
68         Thread t1 = new Thread(new Runnable() {
69             public void run() {
70                 try {
71                     System.out.println(s.take());
72                 } catch (InterruptedException e) {
73                     e.printStackTrace();
74                 }
75             }
76         });
77         
78         t1.start();
79         
80         Thread t2 = new Thread(new Runnable() {
81             public void run() {
82         
83                 s.add("nanami");
84             }
85         });
86         
87         t2.start();
88     }
89 }

运行结果:

 1 ********ConcurrentLinkedQueue********
 2 q.poll():add
 3 q.size():1
 4 q.peek():offer
 5 q.size():1
 6 ******ArrayBlockingQueue********
 7 a.offer():false
 8 ***遍历a****
 9 put1
10 put2
11 add1
12 add2
13 add3
14 ******LinkedBlockingQueue*************
15 b.size():5
16 b.drainTo(list,3):1
17 b.size():4
18 *****遍历list*****
19 offer
20 **********SynchronizedQueue***************
21 nanami

PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。

(优先级的排列:并不是在加入的时候排序,而是在第一次take的时候按照优先级进行选择排序)

代码:

 1 package com.java.day03_queue;
 2 
 3 import java.util.Iterator;
 4 import java.util.concurrent.PriorityBlockingQueue;
 5 
 6 //比较器有问题
 7 public class UsePriorityBlockingQueue {
 8     public static void main(String[] args) throws Exception {
 9         PriorityBlockingQueue<Task> p = new PriorityBlockingQueue<>();
10         
11         Task t1= new Task();
12         t1.setId(3);
13         t1.setName("nanami-----3");
14         
15         Task t2= new Task();
16         t2.setId(8);
17         t2.setName("tomoe---8");
18         
19         Task t3= new Task();
20         t3.setId(2);
21         t3.setName("t3---2");
22         
23         Task t4= new Task();
24         t4.setId(6);
25         t4.setName("t4---6");
26         
27         Task t5= new Task();
28         t5.setId(7);
29         t5.setName("t5---7");
30         
31         
32         p.add(t1);
33         p.add(t2);
34         p.add(t3);
35         p.add(t4);
36         p.add(t5);
37         
38         //排序:在第一次take之后,按照优先级进行排序
39         System.out.println(p.take());
40         for (Iterator iterator = p.iterator(); iterator.hasNext();) {
41             Task task = (Task) iterator.next();
42             System.out.println(task);
43         }
44         
45         
46     }
47 }
 1 package com.java.day03_queue;
 2 
 3 public class Task implements Comparable<Task>{
 4 
 5     private int id;
 6     private String name;
 7     public int getId() {
 8         return id;
 9     }
10     public void setId(int id) {
11         this.id = id;
12     }
13     public String getName() {
14         return name;
15     }
16     public void setName(String name) {
17         this.name = name;
18     }
19     @Override
20     public int compareTo(Task task) {
21         return this.getId()>task.getId()?1: (this.getId()<task.getId()?-1:0);
22     }
23     @Override
24     public String toString() {
25         return "Task [id=" + id + ", name=" + name + "]";
26     }
27     
28     
29     
30 }

运行结果:

1 Task [id=2, name=t3---2]
2 Task [id=3, name=nanami-----3]
3 Task [id=6, name=t4---6]
4 Task [id=7, name=t5---7]
5 Task [id=8, name=tomoe---8]

DelayQueue:带有延迟时间的队列,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景有很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等。(延迟时间一到,从队列中自动出来,如:网吧上网)

代码:

 1 package com.java.day03_queue;
 2 
 3 import java.util.concurrent.DelayQueue;
 4 
 5 public class WangBa implements Runnable{
 6 
 7     private DelayQueue<WangMin>queue = new DelayQueue<WangMin>();
 8     private boolean yinye = true;
 9     
10     
11     //上机
12     public void shangji(String name,int id,int money){
13         WangMin man = new WangMin(name,id,1000*money+System.currentTimeMillis());
14         System.out.println("网名:"+name +"   身份证:"+id+"交钱:"+money+"元,开始上机。。。。");
15         queue.add(man);
16     }
17     
18     //下机
19     public void xiaji(WangMin man){
20         System.out.println("网民:"+man.getName()+"身份证:"+man.getId()+"   时间到,下机。。。。");
21     }
22     
23     
24     @Override
25     public void run() {
26         while(yinye){
27             try {
28                 WangMin man  = queue.take();
29                 xiaji(man);
30             } catch (InterruptedException e) {
31                 e.printStackTrace();
32             }
33         }
34     }
35     
36     public static void main(String[] args) {
37         System.out.println("网吧开始营业");
38         WangBa siyu = new WangBa();
39         //创建了一个线程
40         Thread shangwang  = new Thread(siyu);
41         shangwang.start();
42         
43         
44         siyu.shangji("nanami", 1, 3);
45         siyu.shangji("tomoe", 2, 5);
46     }
47     
48     
49 
50 }
 1 package com.java.day03_queue;
 2 
 3 import java.util.concurrent.Delayed;
 4 import java.util.concurrent.TimeUnit;
 5 
 6 public class WangMin implements Delayed{
 7 
 8     private int id;
 9     private String name;
10     private long endTime;
11     private TimeUnit timeUtil = TimeUnit.SECONDS ;
12     
13     public WangMin(String name ,int id,long endTime){
14         this.name=name;
15         this.id=id;
16         this.endTime=endTime;
17     }
18     
19     public int getId() {
20         return id;
21     }
22 
23     public void setId(int id) {
24         this.id = id;
25     }
26 
27     public String getName() {
28         return name;
29     }
30 
31     public void setName(String name) {
32         this.name = name;
33     }
34 
35     
36     
37     public long getEndTime() {
38         return endTime;
39     }
40 
41     public void setEndTime(long endTime) {
42         this.endTime = endTime;
43     }
44 
45     @Override
46     public int compareTo(Delayed delayed) {
47         WangMin w = (WangMin)delayed;
48         return  (this.getDelay(this.timeUtil)-this.getDelay(this.timeUtil))>0?1:0;
49     }
50 
51     @Override
52     public long getDelay(TimeUnit unit) {
53         return endTime-System.currentTimeMillis();
54     }
55 
56 }

运行结果:因为上面为while死循环,所以,程序并没有停止

1 网吧开始营业
2 网名:nanami   身份证:1交钱:3元,开始上机。。。。
3 网名:tomoe   身份证:2交钱:5元,开始上机。。。。
4 网民:nanami身份证:1   时间到,下机。。。。
5 网民:tomoe身份证:2   时间到,下机。。。。
原文地址:https://www.cnblogs.com/syousetu/p/6732709.html