生产/消费者问题

生产/消费者问题是个非常典型的多线程问题,涉及到的对象包括“生产者”、“消费者”、“仓库”和“产品”。他们之间的关系如下:

① 生产者仅仅在仓储未满时候生产,仓满则停止生产。

② 消费者仅仅在仓储有产品时候才能消费,仓空则等待。

③ 当消费者发现仓库没产品可消费时候会通知生产者生产。

④ 生产者在生产出可消费产品时候,应该通知等待的消费者去消费。

用wait/notify/notifyAll实现和用Lock的Condition实现。

用wait/notify/notifyAll 实现生产者消费者模型:

方法一:用五个类来实现,分别为Produce(实现生产过程), Consumer(实现消费过程), ProduceThread(实现生产者线程),ConsumeThread(实现消费者线程),Main等。需要注意的是有两个地方。

① 用while判断当前list是否为空;

② 调用的是object的notifyAll()方法而不是notify()方法。

方法二:用四个类实现,分别为MyService(实现生产和消费过程用synchronized关键字实现同步),ProduceThread(实现生产者线程),ConsumeThread(实现消费者线程),Main。需要注意的也是方法一中的两个地方while和notifyAll()。

用Lock和Condition实现。共有四个类,分别是分别为MyService(实现生产和消费过程,用lock实现线程间同步),ProduceThread(实现生产者线程),ConsumeThread(实现消费者线程),Main。需要注意的也是方法一中的两个地方while和signalAll()。

方法一:


 
  1. package ProduceConsumer;  
  2.   
  3. import java.util.ArrayList;  
  4.   
  5. public class Produce {  
  6.       
  7.     public Object object;  
  8.     public ArrayList<Integer> list;//用list存放生产之后的数据,最大容量为1  
  9.               
  10.     public Produce(Object object,ArrayList<Integer> list ){  
  11.         this.object = object;  
  12.         this.list = list;  
  13.     }  
  14.       
  15.     public void produce() {  
  16.           
  17.         synchronized (object) {  
  18.             /*只有list为空时才会去进行生产操作*/  
  19.             try {  
  20.             while(!list.isEmpty()){  
  21.                     System.out.println("生产者"+Thread.currentThread().getName()+" waiting");  
  22.                     object.wait();  
  23.                 }   
  24.             int value = 9999;  
  25.             list.add(value);  
  26.             System.out.println("生产者"+Thread.currentThread().getName()+" Runnable");  
  27.             object.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程  
  28.         }catch (InterruptedException e) {  
  29.                 e.printStackTrace();  
  30.             }  
  31.         }  
  32.     }  
  33.   
  34. }  

 
  1. package ProduceConsumer;  
  2.   
  3. import java.util.ArrayList;  
  4.   
  5. public class Consumer {  
  6.       
  7.     public Object object;  
  8.     public ArrayList<Integer> list;//用list存放生产之后的数据,最大容量为1  
  9.               
  10.     public Consumer(Object object,ArrayList<Integer> list ){  
  11.         this.object = object;  
  12.         this.list = list;  
  13.     }  
  14.       
  15.     public void consmer() {  
  16.           
  17.         synchronized (object) {  
  18.             try {  
  19.                 /*只有list不为空时才会去进行消费操作*/  
  20.                 while(list.isEmpty()){  
  21.                     System.out.println("消费者"+Thread.currentThread().getName()+" waiting");  
  22.                     object.wait();  
  23.                 }   
  24.             list.clear();  
  25.             System.out.println("消费者"+Thread.currentThread().getName()+" Runnable");  
  26.             object.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程  
  27.               
  28.         }catch (InterruptedException e) {  
  29.             e.printStackTrace();  
  30.         }  
  31.     }  
  32.     }  
  33.       
  34. }  

 
  1. package ProduceConsumer;  
  2.   
  3. public class ProduceThread extends Thread {  
  4.     private Produce p;  
  5.     public ProduceThread(Produce p){  
  6.         this.p = p;  
  7.     }  
  8.     @Override  
  9.     public void run() {  
  10.         while (true) {  
  11.             p.produce();  
  12.         }  
  13.     }  
  14. }  

  1. package ProduceConsumer;  
  2.   
  3. public class ConsumeThread extends Thread {  
  4.     private Consumer c;  
  5.     public ConsumeThread(Consumer c){  
  6.         this.c = c;  
  7.     }  
  8.     @Override  
  9.     public void run() {  
  10.         while (true) {  
  11.             c.consmer();  
  12.         }  
  13.     }  
  14. }  

 
  1. package ProduceConsumer;  
  2.   
  3. import java.util.ArrayList;  
  4.   
  5. public class Main {  
  6.     public static void main(String[] args) {  
  7.         Object object = new Object();  
  8.         ArrayList<Integer> list = new ArrayList<Integer>();  
  9.       
  10.         Produce p = new Produce(object, list);  
  11.         Consumer c = new Consumer(object, list);  
  12.           
  13.         ProduceThread[] pt = new ProduceThread[2];  
  14.         ConsumeThread[] ct = new ConsumeThread[2];  
  15.           
  16.         for(int i=0;i<2;i++){  
  17.             pt[i] = new ProduceThread(p);  
  18.             pt[i].setName("生产者 "+(i+1));  
  19.             ct[i] = new ConsumeThread(c);  
  20.             ct[i].setName("消费者"+(i+1));  
  21.             pt[i].start();  
  22.             ct[i].start();  
  23.         }  
  24.     }  
  25. }  


方法二:


 
  1. package ProduceConsumer2;  
  2.   
  3. import java.util.ArrayList;  
  4.   
  5. public class MyService {  
  6.       
  7.     public ArrayList<Integer> list = new ArrayList<Integer>();//用list存放生产之后的数据,最大容量为1  
  8.     synchronized public void produce() {  
  9.               
  10.         try {  
  11.             /*只有list为空时才会去进行生产操作*/  
  12.             while(!list.isEmpty()){  
  13.                     System.out.println("生产者"+Thread.currentThread().getName()+" waiting");  
  14.                     this.wait();  
  15.                 }   
  16.             int value = 9999;  
  17.             list.add(value);  
  18.             System.out.println("生产者"+Thread.currentThread().getName()+" Runnable");  
  19.             this.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程  
  20.         }catch (InterruptedException e) {  
  21.                 e.printStackTrace();  
  22.             }  
  23.     }  
  24.       
  25.     synchronized public void consmer() {  
  26.         try {     
  27.             /*只有list不为空时才会去进行消费操作*/  
  28.             while(list.isEmpty()){  
  29.                     System.out.println("消费者"+Thread.currentThread().getName()+" waiting");  
  30.                     this.wait();  
  31.             }  
  32.             list.clear();  
  33.             System.out.println("消费者"+Thread.currentThread().getName()+" Runnable");  
  34.             this.notifyAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程  
  35.         } catch (InterruptedException e) {  
  36.             e.printStackTrace();  
  37.         }  
  38.     }  
  39.       
  40. }  

 
  1. package ProduceConsumer2;  
  2.   
  3. public class ProduceThread extends Thread {  
  4.     private MyService p;  
  5.     public ProduceThread(MyService p){  
  6.         this.p = p;  
  7.     }  
  8.     @Override  
  9.     public void run() {  
  10.         while (true) {  
  11.             p.produce();  
  12.         }  
  13.     }  
  14. }  

 
  1. package ProduceConsumer2;  
  2.   
  3. public class ConsumeThread extends Thread {  
  4.     private MyService c;  
  5.     public ConsumeThread(MyService c){  
  6.         this.c = c;  
  7.     }  
  8.     @Override  
  9.     public void run() {  
  10.         while (true) {  
  11.             c.consmer();  
  12.         }  
  13.     }  
  14. }  

用Lock和Condition实现


 
  1. package ConditionProduceConsumer;  
  2.   
  3. import java.util.concurrent.locks.Condition;  
  4. import java.util.concurrent.locks.ReentrantLock;  
  5.   
  6. public class MyService {  
  7.       
  8.     private ReentrantLock lock = new ReentrantLock();  
  9.     private Condition condition = lock.newCondition();  
  10.     private boolean hasValue = false;  
  11.       
  12.       
  13.      public void produce() {  
  14.         lock.lock();  
  15.         try {  
  16.             /*只有list为空时才会去进行生产操作*/  
  17.             while(hasValue == true){  
  18.                 System.out.println("生产者"+Thread.currentThread().getName()+" waiting");  
  19.                 condition.await();  
  20.             }  
  21.             hasValue = true;  
  22.             System.out.println("生产者"+Thread.currentThread().getName()+" Runnable");  
  23.             condition.signalAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程  
  24.         } catch (InterruptedException e) {  
  25.             e.printStackTrace();  
  26.         }finally{  
  27.             lock.unlock();  
  28.         }  
  29.               
  30.     }  
  31.       
  32.      public void consmer() {  
  33.         lock.lock();  
  34.         try {  
  35.             /*只有list为空时才会去进行生产操作*/  
  36.             while(hasValue == false){  
  37.                 System.out.println("消费者"+Thread.currentThread().getName()+" waiting");  
  38.                 condition.await();  
  39.             }  
  40.             hasValue = false;  
  41.             System.out.println("消费者"+Thread.currentThread().getName()+" Runnable");  
  42.             condition.signalAll();//然后去唤醒因object调用wait方法处于阻塞状态的线程  
  43.         } catch (InterruptedException e) {  
  44.             e.printStackTrace();  
  45.         }finally{  
  46.             lock.unlock();  
  47.         }  
  48.       
  49.     }  
  50. }  

 
  1. package ConditionProduceConsumer;  
  2.   
  3. public class ProduceThread extends Thread {  
  4.     private MyService p;  
  5.     public ProduceThread(MyService p){  
  6.         this.p = p;  
  7.     }  
  8.     @Override  
  9.     public void run() {  
  10.         while (true) {  
  11.             p.produce();  
  12.         }  
  13.     }  
  14. }  

 
  1. package ConditionProduceConsumer;  
  2.   
  3. public class ConsumeThread extends Thread {  
  4.     private MyService c;  
  5.     public ConsumeThread(MyService c){  
  6.         this.c = c;  
  7.     }  
  8.     @Override  
  9.     public void run() {  
  10.         while (true) {  
  11.             c.consmer();  
  12.         }  
  13.     }  
  14. }  

 
    1. package ConditionProduceConsumer;  
    2.   
    3.   
    4. public class Main {  
    5.     public static void main(String[] args) {  
    6.   
    7.         MyService service = new MyService();  
    8.           
    9.         ProduceThread[] pt = new ProduceThread[2];  
    10.         ConsumeThread[] ct = new ConsumeThread[2];  
    11.           
    12.         for(int i=0;i<1;i++){  
    13.             pt[i] = new ProduceThread(service);  
    14.             pt[i].setName("Condition 生产者 "+(i+1));  
    15.             ct[i] = new ConsumeThread(service);  
    16.             ct[i].setName("Condition 消费者"+(i+1));  
    17.             pt[i].start();  
    18.             ct[i].start();  
    19.         }  
    20.     }  
    21. }  
原文地址:https://www.cnblogs.com/BlingSun/p/7488514.html