JAVA多线程经典问题 -- 生产者 消费者

工作2年多来一直也没有计划写自己的技术博客,最近辞职在家翻看《thingking in JAVA》,偶尔看到了生产者与消费者的一个经典的多线程同步问题。本人在工作中很少使用到多线程以及高并发方面的知识,正好回忆下以前学过的一些知识和代码,以经典的多线程问题生产者消费者问题为开篇。

先上一段经典的生产者与消费者的代码

class Product {
    private final int orderNum;

    public Product(int orderNum) {
        this.orderNum = orderNum;
    }

    public String toString() {
        return "Product" + orderNum;
    }
}

class Consumer implements Runnable {
    private Main main;

    public Consumer(Main m) {
        this.main = m;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (main.product == null)
                        wait();
                }
                System.out.println("Consumer got " + main.product);

                synchronized (main.producter) {
                    main.product = null;
                    main.producter.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer interrupted!");
        }

    }

}

class Producter implements Runnable {
    private Main main;
    private int count = 0;

    public Producter(Main m) {
        this.main = m;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (main.product != null)
                        wait();
                }
                if (++count == 10) {
                    System.out.println("Out of product ,closing!");
                    main.exec.shutdownNow();
                }
                System.out.print("OK put it up!! ");
                synchronized (main.consumer) {
                    main.product = new Product(count);
                    main.consumer.notifyAll();
                }
                TimeUnit.MICROSECONDS.sleep(100);
            }
        } catch (InterruptedException e) {
            System.out.println("Producter interrupted!");
        }

    }
}

public class Main {
    Product product;
    Consumer consumer = new Consumer(this);
    Producter producter = new Producter(this);

    ExecutorService exec = Executors.newCachedThreadPool();

    public Main() {
        exec.execute(producter);
        exec.execute(consumer);
    }

    public static void main(String[] args) {
        new Main();
    }
}

运行结果如下:

OK put it up!! Consumer got Product1
OK put it up!! Consumer got Product2
OK put it up!! Consumer got Product3
OK put it up!! Consumer got Product4
OK put it up!! Consumer got Product5
OK put it up!! Consumer got Product6
OK put it up!! Consumer got Product7
OK put it up!! Consumer got Product8
OK put it up!! Consumer got Product9
Out of product ,closing!
OK put it up!! Consumer interrupted!
Producter interrupted!

没什么好说的,基本上只要学过JAVA多线程的朋友对于以上代码都会清楚明白。

问题:如果在增加一个角色 这个角色在消费者得到一个产品后把产品销毁,然后再让生产者生产产品,这就好像一个生产者--中间商--消费者的问题,那么如果不用队列的思想,又该怎样解决呢?

分析:此时给product 类增加了一个boolean状态,标识一下是否从中间商处理过 public boolean isDo = false// false 没有处理过,true 已经处理过

那么消费者在拿商品的时候就应该多家一下判断,如果商品没有或者还没有被处理过,那么 就必须等待...

经过此番分析  将以上代码简单的修改了一下

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Product {
    private final int orderNum;
    public  boolean  isDo;  // false没有处理过
    
    public Product(int orderNum) {
        this.orderNum = orderNum;
        isDo = false;
    }

    public String toString() {
        return "Product" + orderNum;
    }
}

class Middleman implements Runnable{
    private Main main;

    public Middleman(Main m) {
        this.main = m;
    }
    
    public void run(){
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (main.product == null || (main.product !=null &&main.product.isDo))
                        wait();
                }
                System.out.println("Middleman Got " + main.product+" and do "+main.product);
                main.product.isDo = true;  //中间商处理
                synchronized (main.consumer) {
                    if(main.product != null)
                        main.consumer.notifyAll();
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Middleman interrupted!");
        }

    }
}

class Consumer implements Runnable {
    private Main main;

    public Consumer(Main m) {
        this.main = m;
    }

    public void run() {
        try {
            while(!Thread.interrupted()){
                synchronized (this) {
                    if(main.product == null)
                        wait();
                }
                
                //如果生产者已经生产完毕 并且中间商已经处理过了
                if(main.product !=null && main.product.isDo){
                    System.out.println("Consumer Got  "+main.product);
                }
                synchronized (main.producter) {
                    if(main.product!= null){
                        main.product = null;
                        main.producter.notifyAll();
                    }
                }
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer interrupted!");
        }
    }

}

class Producter implements Runnable {
    private Main main;
    private int count = 0;

    public Producter(Main m) {
        this.main = m;
    }

    public void run() {
        try {
            while (!Thread.interrupted()) {
                synchronized (this) {
                    while (main.product != null)
                        wait();
                }
                if (++count == 10) {
                    System.out.println("Out of product ,closing!");
                    main.exec.shutdownNow();
                }
                System.out.print(" The Product is OK ,middleman put it up!! ");
                synchronized (main.middleman) {
                    main.product = new Product(count);
                    main.middleman.notifyAll();// 此时 唤起的应该是中间商的线程
                }
                TimeUnit.MICROSECONDS.sleep(100);
            }
        } catch (InterruptedException e) {
            System.out.println("Producter interrupted!");
        }

    }
}

public class Main {
    Product product;
    Consumer consumer = new Consumer(this);
    Producter producter = new Producter(this);
    Middleman middleman = new Middleman(this);

    ExecutorService exec = Executors.newCachedThreadPool();

    public Main() {
        exec.execute(producter);
        exec.execute(middleman);
        exec.execute(consumer);
    }

    public static void main(String[] args) {
        new Main();
    }
}

运行结果如下:

The Product is OK ,middleman put it up!! Middleman Got Product1 and do Product1
Consumer Got Product1
The Product is OK ,middleman put it up!! Middleman Got Product2 and do Product2
Consumer Got Product2
The Product is OK ,middleman put it up!! Middleman Got Product3 and do Product3
Consumer Got Product3
The Product is OK ,middleman put it up!! Middleman Got Product4 and do Product4
Consumer Got Product4
The Product is OK ,middleman put it up!! Middleman Got Product5 and do Product5
Consumer Got Product5
The Product is OK ,middleman put it up!! Middleman Got Product6 and do Product6
Consumer Got Product6
The Product is OK ,middleman put it up!! Middleman Got Product7 and do Product7
Consumer Got Product7
The Product is OK ,middleman put it up!! Middleman Got Product8 and do Product8
Consumer Got Product8
The Product is OK ,middleman put it up!! Middleman Got Product9 and do Product9
Consumer Got Product9
Out of product ,closing!
The Product is OK ,middleman put it up!! Consumer interrupted!
Middleman interrupted!
Producter interrupted!

这里补充说明:Main是三个角色的焦点,三个角色必须围绕着Main运行,以便于拿取main.product.

为什么使用notifyAll ?

多个任务在某个特定对象锁上等待,因此有可能我们不知道哪个任务应该被唤醒,因此用notifyAll更加安全些。

为什么要加isDo 真的是逻辑上需要吗?

出了逻辑上的需要还有一个更重要的问题就是,在并发中,某个其他任务很可能会在中间商被唤醒的时候突然插足并且拿走product 导致了中间商一遍又一遍的拿到的都是同一个产品进行处理,而直到消费者把产品拿走,并且唤醒了生产者,才会去拿取下一件商品。所以需要在while(?){wait();}?中加以判断。

使用的是JAVASE5 以后的创建线程的方式---线程池。

这里我用了一些很有意思的东西,在product角色中当生产到10个的时候 调用了exec.showdownNow()方法,该方法向所有的任务发送interrupt,但是在product中任务没有在获得interrupt后关闭,为啥呢? 因为当一个任务视图进入一个阻塞的状态时,这个操作只能抛出InterruptedExctption,所以我们看到了最后的The Product is OK ,middleman put it up!! Consumer interrupted! 然后再调用sleep方法的时候抛出了InterruptedExctption 这也就解释了为什么没有直接结束而是抛出了异常后打印Producter interrupted! 可以推导出 如果我们去掉了sleep方法会是什么样呢?应该可以正常结束了吧

去掉sleep方法的结果

The Product is OK ,middleman put it up!! Middleman Got Product1 and do Product1
Consumer Got Product1
The Product is OK ,middleman put it up!! Middleman Got Product2 and do Product2
Consumer Got Product2
The Product is OK ,middleman put it up!! Middleman Got Product3 and do Product3
Consumer Got Product3
The Product is OK ,middleman put it up!! Middleman Got Product4 and do Product4
Consumer Got Product4
The Product is OK ,middleman put it up!! Middleman Got Product5 and do Product5
Consumer Got Product5
The Product is OK ,middleman put it up!! Middleman Got Product6 and do Product6
Consumer Got Product6
The Product is OK ,middleman put it up!! Middleman Got Product7 and do Product7
Consumer Got Product7
The Product is OK ,middleman put it up!! Middleman Got Product8 and do Product8
Consumer Got Product8
The Product is OK ,middleman put it up!! Middleman Got Product9 and do Product9
Consumer Got Product9
Out of product ,closing!
The Product is OK ,middleman put it up!! Middleman interrupted!
Consumer interrupted!

当已经不生产第10个产品了 (哦 这里多打印了一句话)线程结束了,producter 结束,middlenman结束,此时consmer还在等待middleman的product,但是exec向他发起了interruped  所以抛出异常结束了!

其实经典的生产者和消费者问题可以用队列存储的思想解决,今后在写如果大家有好例子,回复在评论上,并讨论,不胜感激啊。

原文地址:https://www.cnblogs.com/zm112358/p/3230527.html