synchronized、lock、semaphore、blockingqueue实现生产消费场景。

性能:

数据量少时,Synchronized> Lock、Semaphore。

数据量大时,Lock > Synchronized > Semaphore。

Blockingqueue底层也是使用ReentrantLock + Condition。

一、Synchronized方式

synchronized作用于方法或者对象上,保证方法或者对象在某一时刻只能被一个线程占用。配合wait()、notify()进行线程间通讯。当库存不足时,消费者线程暂停等待。当生产者生产出新的商品时,通知消费者进行消费。

 1 package com.boot.demo;
 2 
 3 import java.util.concurrent.atomic.AtomicInteger;
 4 
 5 /**
 6  * @author braska
 7  * @date 2020/3/9
 8  **/
 9 public class SynchronizedTest {
10 
11     private AtomicInteger stock = new AtomicInteger(0);
12 
13     public synchronized void produce() {
14         if (stock.get() < 5) {
15             System.out.println(String.format("目前库存:%s", stock.addAndGet(1)));
16             notify();
17         } else {
18             try {
19                 System.out.println("仓库已满。");
20                 wait();
21             } catch (InterruptedException e) {
22                 e.printStackTrace();
23             }
24         }
25     }
26 
27     public synchronized void consume(String name) {
28         if (stock.get() > 0) {
29             System.out.println(String.format("%s在消费, 目前库存:%s", name,  stock.addAndGet(-1)));
30             notify();
31         } else {
32             try {
33                 System.out.println("库存已光。");
34                 wait();
35             } catch (InterruptedException e) {
36                 e.printStackTrace();
37             }
38         }
39     }
40 
41     static class Producer implements Runnable {
42         private SynchronizedTest test;
43 
44         public Producer(SynchronizedTest test) {
45             this.test = test;
46         }
47 
48         @Override
49         public void run() {
50             while(true) {
51                 try {
52                     Thread.sleep(1000);
53                 } catch (InterruptedException e) {
54                     e.printStackTrace();
55                 }
56                 test.produce();
57             }
58         }
59     }
60 
61     static class Consumer implements Runnable {
62         private SynchronizedTest test;
63         private String name;
64 
65         public Consumer(SynchronizedTest test, String name) {
66             this.test = test;
67             this.name = name;
68         }
69 
70         @Override
71         public void run() {
72             while(true) {
73                 try {
74                     Thread.sleep(3000);
75                 } catch (InterruptedException e) {
76                     e.printStackTrace();
77                 }
78                 test.consume(name);
79             }
80         }
81     }
82 
83 
84     public static void main(String[] args) {
85         SynchronizedTest test = new SynchronizedTest();
86 
87         Thread p1 = new Thread(new Producer(test));
88         Thread c1 = new Thread(new Consumer(test, "消费者1"));
89         Thread c2 = new Thread(new Consumer(test, "消费者2"));
90 
91         p1.start();
92         c1.start();
93         c2.start();
94     }
95 }
View Code

二、Semaphore方式

简单点说,Semaphore构造函数定义开放的购物窗口比如new Semaphore(6),无参默认开放一个窗口。tryAcquire()表示消费者抢占购物窗口,当然一个消费者可以抢占多个窗口比如tryAcquire(2)。当窗口被占满时,后面的消费者需要等待前面的消费者消费完,然后离开(释放)release(2)。

 1 package com.boot.demo;
 2 
 3 import java.util.concurrent.Semaphore;
 4 import java.util.concurrent.atomic.AtomicInteger;
 5 
 6 /**
 7  * @author braska
 8  * @date 2020/3/9
 9  **/
10 public class SemaphoreTest {
11 
12     Semaphore semaphore = new Semaphore(1);
13     AtomicInteger stock = new AtomicInteger(0);
14 
15     public void produce() {
16         if (stock.get() < 5) {
17             System.out.println(String.format("目前库存:%s", stock.addAndGet(1)));
18             semaphore.release();
19         } else {
20             System.out.println("仓库已满。");
21         }
22     }
23 
24     public void comsume(String name) {
25         if (semaphore.tryAcquire()) {
26             if (stock.get() > 0) {
27                 System.out.println(String.format("%s正在消费,目前库存: %s", name, stock.addAndGet(-1)));
28             } else {
29                 System.out.println("库存已光。");
30             }
31         }
32     }
33 
34     static class Producer implements Runnable{
35 
36         private SemaphoreTest deal;
37         public Producer(SemaphoreTest deal) {
38             this.deal = deal;
39         }
40 
41         @Override
42         public void run() {
43             while (true)
44             try {
45                 Thread.sleep(1000);
46                 deal.produce();
47             } catch (InterruptedException e) {
48                 e.printStackTrace();
49             }
50 
51         }
52     }
53 
54     static class Consumer implements Runnable {
55         private SemaphoreTest deal;
56         private String name;
57         public Consumer(SemaphoreTest deal, String name) {
58             this.deal = deal;
59             this.name = name;
60         }
61 
62         @Override
63         public void run() {
64             while (true)
65             try {
66                 Thread.sleep(3000);
67                 deal.comsume(name);
68             } catch (InterruptedException e) {
69                 e.printStackTrace();
70             }
71         }
72     }
73 
74 
75     public static void main(String[] args) {
76         SemaphoreTest deal = new SemaphoreTest();
77 
78         Thread p = new Thread(new Producer(deal));
79 
80         Thread c1 = new Thread(new Consumer(deal, "消费者1"));
81         Thread c2 = new Thread(new Consumer(deal, "消费者2"));
82 
83         p.start();
84         c1.start();
85         c2.start();
86     }
87 }
View Code

三、Lock方式

说起来同synchronized方式是一样的。锁住某一个对象或者执行体保证在某一时刻不被多个线程同时占用。区别:synchronized不需要手动释放;lock不是java内置特性。

 1 package com.boot.demo;
 2 
 3 import java.util.concurrent.atomic.AtomicInteger;
 4 import java.util.concurrent.locks.Lock;
 5 import java.util.concurrent.locks.ReentrantLock;
 6 
 7 /**
 8  * @author braska
 9  * @date 2020/3/9
10  **/
11 public class LockTest {
12 
13     private Lock lock = new ReentrantLock();
14     private AtomicInteger stock = new AtomicInteger(0);
15 
16     public void produce() {
17         lock.lock();
18         if (stock.get() < 5) {
19             System.out.println(String.format("目前库存:%s", stock.addAndGet(1)));
20         } else {
21             System.out.println("仓库已满。");
22         }
23         lock.unlock();
24     }
25 
26     public void consume(String name) {
27         lock.lock();
28         if (stock.get() > 0) {
29             System.out.println(String.format("%s正在消费,目前库存: %s", name, stock.addAndGet(-1)));
30         } else {
31             System.out.println("库存已光。");
32         }
33         lock.unlock();
34     }
35 
36     static class Producer implements Runnable{
37 
38         private LockTest deal;
39         public Producer(LockTest deal) {
40             this.deal = deal;
41         }
42 
43         @Override
44         public void run() {
45             while (true)
46                 try {
47                     Thread.sleep(1000);
48                     deal.produce();
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 }
52 
53         }
54     }
55 
56     static class Consumer implements Runnable {
57         private LockTest deal;
58         private String name;
59         public Consumer(LockTest deal, String name) {
60             this.deal = deal;
61             this.name = name;
62         }
63 
64         @Override
65         public void run() {
66             while (true)
67                 try {
68                     Thread.sleep(3000);
69                     deal.consume(name);
70                 } catch (InterruptedException e) {
71                     e.printStackTrace();
72                 }
73         }
74     }
75 
76 
77     public static void main(String[] args) {
78         LockTest deal = new LockTest();
79 
80         Thread p = new Thread(new Producer(deal));
81 
82         Thread c1 = new Thread(new Consumer(deal, "消费者1"));
83         Thread c2 = new Thread(new Consumer(deal, "消费者2"));
84 
85         p.start();
86         c1.start();
87         c2.start();
88     }
89 }
View Code

四、BlockingQueue方式

底层采用的也是lock机制。

 1 package com.boot.demo;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.BlockingQueue;
 5 import java.util.concurrent.atomic.AtomicInteger;
 6 
 7 /**
 8  * @author braska
 9  * @date 2020/3/9
10  **/
11 public class BlockingQueueTest {
12 
13     private BlockingQueue<Integer> queue = new ArrayBlockingQueue(5);
14     private AtomicInteger stock = new AtomicInteger(0);
15 
16     public void produce() throws InterruptedException {
17         try {
18             queue.add(stock.addAndGet(1));
19             System.out.println(String.format("目前库存:%s", stock.get()));
20         } catch (Exception e) {
21             stock.addAndGet(-1);
22             System.out.println("仓库已满。");
23         }
24     }
25 
26     public void consume(String name) {
27         Integer good = queue.poll();
28         if (good != null) {
29             System.out.println(String.format("%s正在消费,目前库存: %s", name, good));
30             stock.addAndGet(-1);
31         } else {
32             System.out.println("库存已光。");
33         }
34     }
35 
36     static class Producer implements Runnable{
37 
38         private BlockingQueueTest deal;
39         public Producer(BlockingQueueTest deal) {
40             this.deal = deal;
41         }
42 
43         @Override
44         public void run() {
45             while (true)
46                 try {
47                     Thread.sleep(1000);
48                     deal.produce();
49                 } catch (InterruptedException e) {
50                     e.printStackTrace();
51                 }
52 
53         }
54     }
55 
56     static class Consumer implements Runnable {
57         private BlockingQueueTest deal;
58         private String name;
59         public Consumer(BlockingQueueTest deal, String name) {
60             this.deal = deal;
61             this.name = name;
62         }
63 
64         @Override
65         public void run() {
66             while (true)
67                 try {
68                     Thread.sleep(3000);
69                     deal.consume(name);
70                 } catch (InterruptedException e) {
71                     e.printStackTrace();
72                 }
73         }
74     }
75 
76 
77     public static void main(String[] args) {
78         BlockingQueueTest deal = new BlockingQueueTest();
79 
80         Thread p = new Thread(new Producer(deal));
81 
82         Thread c1 = new Thread(new Consumer(deal, "消费者1"));
83         Thread c2 = new Thread(new Consumer(deal, "消费者2"));
84 
85         p.start();
86         c1.start();
87         c2.start();
88     }
89 }
View Code
原文地址:https://www.cnblogs.com/braska/p/12449864.html