Java生产者消费者模型

  在Java中线程同步的经典案例,不同线程对同一个对象同时进行多线程操作,为了保持线程安全,数据结果要是我们期望的结果。

  生产者-消费者模型可以很好的解释这个现象:对于公共数据data,初始值为0,多个线程对其进行增加或者减少,但是我们的目的是无论多少个线程同时操作他,结果都是:当data=0时,只能进行增加,data=1时只能进行减少。

  由于代码比较简单,就把所有的类都写在同一个类里面,以静态内部类的形式出现,这样比较节省篇幅。

  1.线程不安全:

 1 /**
 2  * 线程不安全
 3  */
 4 public class NonThreadSafeTest {
 5 
 6     @Test
 7     public void nonSafeTest() {
 8         Resouce resouce = new Resouce();
 9         Thread add = new Thread(new Run1(resouce), "add");
10         Thread minus = new Thread(new Run2(resouce), "minus");
11         add.start();
12         minus.start();
13         try {
14             add.join();
15             minus.join();
16             System.err.println("result: " + resouce.data);
17         } catch (InterruptedException e) {
18             e.printStackTrace();
19         }
20     }
21 
22     static class Run1 implements Runnable {
23         private Resouce resouce;
24 
25         public Run1(Resouce resouce) {
26             this.resouce = resouce;
27         }
28 
29         public void run() {
30             for (int i = 0; i < 10000; i++)
31                 resouce.add();
32         }
33     }
34 
35     static class Run2 implements Runnable {
36         private Resouce resouce;
37 
38         public Run2(Resouce resouce) {
39             this.resouce = resouce;
40         }
41 
42         public void run() {
43             for (int i = 0; i < 10000; i++)
44                 resouce.minus();
45         }
46     }
47 
48     static class Resouce {
49         public int data = 0;
50 
51         public synchronized void add() {
52             data++;
53         }
54 
55         public synchronized void minus() {
56             data--;
57         }
58     }
59 }

  2.线程安全:

 1     @Test
 2     public void safeTest() throws Exception {
 3         Resouce resouce = new Resouce();
 4         for (;;) {
 5             new Thread(new Run1(resouce)).start();
 6             new Thread(new Run2(resouce)).start();
 7         }
 8     }
 9 
10     static class Run1 implements Runnable {
11         private Resouce resouce;
12 
13         public Run1(Resouce resouce) {
14             this.resouce = resouce;
15         }
16 
17         public void run() {
18             resouce.add();
19         }
20     }
21 
22     static class Run2 implements Runnable {
23         private Resouce resouce;
24 
25         public Run2(Resouce resouce) {
26             this.resouce = resouce;
27         }
28 
29         public void run() {
30             resouce.minus();
31         }
32     }
33 
34     static class Resouce {
35         public int data = 0;
36 
37         public synchronized void add() {
38             while (data != 0) {
39                 try {
40                     wait();
41                 } catch (InterruptedException e) {
42                     e.printStackTrace();
43                 }
44             }
45             data++;
46             System.out.println(data);
47             notify();
48         }
49 
50         public synchronized void minus() {
51             while (data == 0) {
52                 try {
53                     wait();
54                 } catch (InterruptedException e) {
55                     e.printStackTrace();
56                 }
57             }
58             data--;
59             System.out.println(data);
60             notify();
61         }
62     }
63 }

  来说明一下其中的信息:

  1.通过JDK源代码我们可以发现:对于new Thread(new Runnable() {@override run() {}}){@override run(){}}.start();这样的形式来说,事实上是new了一个Thread的子类,然后调用start(),同时参数又是实现了Runnable接口的类的实例,二者均重写了run()方法,那么此时如果都重写了run(),就调用new Thread() {}这里面的run方法,而如果仅仅是new Thread(new Runnable() {@override run() {}}).start();这样的形式就会调用参数里面的run方法,见Thread的源码:

    public void run() {
    if (target != null) {
        target.run();
    }
    }

调用Thread的run方法,如果此时Thread的run方法没有被重写,并且target不为null,就调用target的run方法,target就是参数传进来的Runnable接口的实现类。

  2.既然上面第一点说到补充些Thread的run方法就调用参数的run方法,所以我们的测试main方法:

    public static void main(String[] args) {
        Resouce resouce = new Resouce();
        for(;;) {
            new Thread(new Run1(resouce)).start();
            new Thread(new Run2(resouce)).start();
        }
    }

就会调用参数(Run1和Run2)的run方法,与此同时,我们分别在Run1和Run2的run方法里面调用Resouce类的add和minus方法,我们让Run1和Run2都持有Reasource这个对象的引用,并且是同一个引用,就实现了多个线程同时操作Resource的同一个实例。然后我们将逻辑(这里就是add和minus这两个同步方法)写在Resource里面。

  3.我们发现在add和minus这两个同步方法里面,add或者minus wait了一段时间被唤醒,他就会执行while块下面的代码,也就是

            data++;
            System.out.println(data);
            notifyAll();

或者

            data--;
            System.out.println(data);
            notifyAll();

这两个其中的一个,此时我们使用while来判断被唤醒,使用if也可以,不过这里最好是用while,因为,当线程被唤醒之后,while会再检查while的条件,如果不满足就继续睡眠,而if就直接执行下面的代码,原因是,我隐约的记得以前听张孝祥老师说有正在等待的线程有可能被伪唤醒,如果是被伪唤醒的话,不检查while条件,那么就会出现很严重的问题,所以这里要用while。而JDK的原话解释是这样的:“对于某一个参数的版本,实现中断和虚假唤醒是可能的”,说白了就是有可能不是被notify或者notifyAll唤醒,如果不是被这二者唤醒的,那么是不能让他继续执行的。

前面使用了synchronized+wait/notifyAll的组合,这二者在线程同步上面是一组的,而JDK还提供了另外一组更为灵活强大的线程同步工具, ReentrantLock+Condition,ReentrantLock就相当于synchronized,而Condition就类似与wait/notify,下面给出例子。

public class Resource {

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    
    private int num = 0;
    
    public void increase() {
        try {
            lock.lock();
            while(num == 1) {
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            num++;
            System.err.println(num + "-Thread ID: " + Thread.currentThread().getId());
            condition.signalAll();
        } finally {
            if(lock.isHeldByCurrentThread()) lock.unlock();
        }
    }
    
    public void decrease() {
        try {
            lock.lock();
            while(num == 0) {
                condition.await();
            }
            num--;
            System.err.println(num + "-Thread ID: " + Thread.currentThread().getId());
            condition.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(lock.isHeldByCurrentThread()) lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        final Resource r = new Resource();
        new Thread() {
            public void run() {
                while(true) r.increase();
            };
        }.start();
        
        new Thread() {
            public void run() {
                while(true) r.decrease();
            };
        }.start();
    }
}

 其实ReentrantLock的Condition可以做到创建多个条件,每次唤醒通知可以定向唤醒,比如data == 1时候,addCondition等待,唤醒在minusCondition上的等待,而data == 0时,minusCondition等待,唤醒在addCondition上的等待。如下:

 1 public class LockTest {
 2 
 3     public static void main(String[] args) {
 4         Resource src = new Resource();
 5         List<Thread> ts = new ArrayList<>(20);
 6         for (int i = 0; i < 10; i++) {
 7             Thread add = new Thread(new AddThread(src), "add" + i);
 8             Thread minus = new Thread(new MinusThread(src), "minus" + i);
 9             ts.add(add);
10             ts.add(minus);
11         }
12         for (int i = 0; i < 20; i++) {
13             Thread t = ts.get(i);
14             t.start();
15         }
16         try {
17             Thread.sleep(Long.MAX_VALUE);
18         } catch (InterruptedException e) {
19             e.printStackTrace();
20         }
21     }
22 
23 }
24 
25 class AddThread implements Runnable {
26     private Resource resource;
27 
28     public AddThread(Resource resource) {
29         this.resource = resource;
30     }
31 
32     @Override
33     public void run() {
34         for (;;)
35             resource.add();
36     }
37 
38 }
39 
40 class MinusThread implements Runnable {
41     private Resource resource;
42 
43     public MinusThread(Resource resource) {
44         this.resource = resource;
45     }
46 
47     @Override
48     public void run() {
49         for (;;)
50             resource.minus();
51     }
52 
53 }
54 
55 class Resource {
56     int data = 0;
57     private ReentrantLock lock = new ReentrantLock();
58     private Condition addCon = lock.newCondition();
59     private Condition minusCon = lock.newCondition();
60 
61     void add() {
62         try {
63             lock.lock();
64             while (data == 1) {
65                 addCon.await();
66             }
67             data++;
68             System.err.println(data + "-----" + Thread.currentThread().getName());
69             minusCon.signal();
70         } catch (InterruptedException e) {
71             e.printStackTrace();
72         } finally {
73             lock.unlock();
74         }
75     }
76 
77     void minus() {
78         try {
79             lock.lock();
80             while (data == 0) {
81                 minusCon.await();
82             }
83             data--;
84             System.err.println(data + "-----" + Thread.currentThread().getName());
85             addCon.signal();
86         } catch (Exception e) {} finally {
87             lock.unlock();
88         }
89     }
90 }
原文地址:https://www.cnblogs.com/dreamroute/p/3870514.html