一段阻塞队列代码的纠错与优化

下面的代码在某处发现后,立马发现存在问题。

[java] view plaincopyprint?
  1. public class BlockingQ {  
  2.   
  3.     private Object notEmpty = new Object();  
  4.     private Object notFull = new Object();  
  5.     private Queue<Object> linkedList = new LinkedList<Object>();  
  6.     private int maxLength = 10;  
  7.       
  8.     public Object take() throws InterruptedException{  
  9.         synchronized(notEmpty){  
  10.             if(linkedList.size() == 0){  
  11.                 notEmpty.wait();  
  12.             }  
  13.             synchronized(notFull){  
  14.                 if(linkedList.size() == maxLength){  
  15.                     notFull.notifyAll();  
  16.                 }  
  17.                 return linkedList.poll();  
  18.             }  
  19.         }  
  20.     }  
  21.       
  22.     public void offer(Object object) throws InterruptedException{  
  23.         synchronized(notEmpty){  
  24.             if(linkedList.size() == 0){  
  25.                 notEmpty.notifyAll();  
  26.             }  
  27.             synchronized(notFull){  
  28.                 if(linkedList.size() == maxLength){  
  29.                     notFull.wait();  
  30.                 }  
  31.                 linkedList.add(object);  
  32.             }  
  33.         }  
  34.     }  
  35. }  
 

最简单的情况就是一个生产者,两个消费者,

[java] view plaincopyprint?
  1. if(linkedList.size() == 0){  
  2.     notEmpty.wait();  
  3. }  
 

上面的if语句在第二线程进来的时候不会再次判断是否有消费元素,会直接返回null。

[java] view plaincopyprint?
  1. public Object take() throws InterruptedException{  
  2.     synchronized(notEmpty){  
  3.         while(linkedList.size() == 0){  
  4.             notEmpty.wait();  
  5.         }  
  6.         synchronized(notFull){  
  7.             if(linkedList.size() == maxLength){  
  8.                 notFull.notifyAll();  
  9.             }  
  10.             return linkedList.poll();  
  11.         }  
  12.     }  
  13. }  
 

修正代码如上面所示。

整个代码主要用作示例,所以代码上存在很冗余的点,下面是代码优化后的版本:

[java] view plaincopyprint?
  1. public class SimpleBlockingQ {  
  2.     private Queue<Object> linkedList = new LinkedList<Object>();  
  3.     private int maxLength = 10;  
  4.       
  5.     public synchronized Object take() throws InterruptedException{  
  6.         while(linkedList.size() == 0){  
  7.             wait();  
  8.         }  
  9.         notifyAll();  
  10.         return linkedList.poll();  
  11.     }  
  12.     //www.cdtarena.com 
  13.     public synchronized void offer(Object object) throws InterruptedException{  
  14.         notifyAll();  
  15.         if(linkedList.size() == maxLength){  
  16.             wait();  
  17.         }  
  18.         linkedList.add(object);  
  19.     }  
  20. }  

下面是测试代码:

[java] view plaincopyprint?
  1. import java.util.concurrent.atomic.AtomicInteger;  
  2.   
  3. /** 
  4.   */  
  5. public class BlockingQTest {  
  6.   
  7.     public static AtomicInteger index = new AtomicInteger(0);  
  8.       
  9.     public static void main(String[] args){  
  10.         int tCount = 10// thread count   
  11.           
  12.         final BlockingQ BQ = new BlockingQ();  
  13.         final SimpleBlockingQ SBQ = new SimpleBlockingQ();  
  14.           
  15.         // provider   
  16.         Runnable pr = new Runnable(){  
  17.             @Override  
  18.             public void run() {  
  19.                 while(true){  
  20.                     try {  
  21.                         Thread.sleep(100);  
  22.                           
  23.                         int tindex = index.getAndIncrement();  
  24.                         //BQ.offer(tindex);   
  25.                         //System.out.println("BQ offer: " + tindex);   
  26.                         SBQ.offer(tindex);  
  27.                         System.out.println("SBQ offer: " + tindex);  
  28.                           
  29.                         Thread.sleep(100);  
  30.                     } catch (InterruptedException e) {  
  31.                         e.printStackTrace();  
  32.                     }  
  33.                       
  34.                 }  
  35.             }  
  36.         };  
  37.         //www.cdtarena.com consumer   
  38.         Runnable cr = new Runnable(){  
  39.             @Override  
  40.             public void run() {  
  41.                 while(true){  
  42.                     try {  
  43.                         Thread.sleep(100);  
  44.                       
  45.                         //System.out.println("BQ take: " + BQ.take());   
  46.                         System.out.println("SBQ take: " + SBQ.take());        
  47.   
  48.                         Thread.sleep(100);  
  49.                     } catch (InterruptedException e) {  
  50.                         e.printStackTrace();  
  51.                     }  
  52.                 }  
  53.             }  
  54.         };  
  55.           
  56.         for(int i=0; i<tCount; i++){  
  57.             new Thread(cr).start();  
  58.         }  
  59.           
  60.         for(int i=0; i<tCount; i++){  
  61.             new Thread(pr).start();  
  62.         }  
  63.     }  
  64. }  
原文地址:https://www.cnblogs.com/cdtarena/p/3026515.html