iv008-线程之间通信之生产者消费者

1.wait和notify传统版本

示例代码:

class MyData {

    private int number = 0;

    public synchronized void add() throws InterruptedException {

        while (number != 0) {
            this.wait();
        }
        number++;
        this.notify();
        System.out.println(Thread.currentThread().getName() + "	 number:" + number);

    }

    public synchronized void subtract() throws InterruptedException {
        while (number == 0) {
            this.wait();
        }
        number--;
        this.notify();
        System.out.println(Thread.currentThread().getName() + "	 number:" + number);
    }
}

public class WaitNotifyDemo {

    public static void main(String[] args) {
        MyData myData = new MyData();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    myData.add();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "ADD").start();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    myData.subtract();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "SUB").start();
    }
}

运行结果:

ADD	 number:1
SUB	 number:0
ADD	 number:1
SUB	 number:0
ADD	 number:1
SUB	 number:0
ADD	 number:1
SUB	 number:0
ADD	 number:1
SUB	 number:0

2.await和signal传统版本

示例代码:

class ShareData {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void add() {
        lock.lock();
        try {
            while (number != 0) {
                condition.await();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "	" + number);
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void subtract() {
        lock.lock();
        try {
            while (number == 0) {
                condition.await();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "	" + number);
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class AwaitSignalDemo {

    public static void main(String[] args) {

        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                shareData.add();
            }
        }, "ADD").start();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                shareData.subtract();
            }
        }, "SUB").start();
    }
}

运行结果:

ADD	1
SUB	0
ADD	1
SUB	0
ADD	1
SUB	0
ADD	1
SUB	0
ADD	1
SUB	0

3.阻塞队列版本

示例代码:

class MyResource {

    public volatile boolean FLAG = true;

    private AtomicInteger atomicInteger = new AtomicInteger();

    private BlockingQueue<String> blockingQueue = null;

    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    //生产
    public void myProd() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + "	启动");
        String data = null;
        while (FLAG) {
            data = atomicInteger.incrementAndGet() + "";
            blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            System.out.println(Thread.currentThread().getName() + "	 生产:" + data);
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName() + "	退出");
    }

    /**
     * 消费
     */
    public void myConsumer() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + "	启动");
        String data = null;
        while (FLAG) {
            data = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if (data == null || "".equals(data)) {
                System.out.println(Thread.currentThread().getName() + "	获取超时");
                break;
            }
            System.out.println(Thread.currentThread().getName() + "	 消费:" + data);
        }
        System.out.println(Thread.currentThread().getName() + "	退出");
    }

    public void stop() {
        FLAG = false;
    }
}

public class ProducerConsumerBlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));

        new Thread(() -> {
            try {
                myResource.myProd();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }, "生产者").start();

        new Thread(() -> {
            try {
                myResource.myConsumer();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "消费者").start();

        TimeUnit.SECONDS.sleep(5);


        myResource.stop();
        System.out.println(Thread.currentThread().getName() + "停止,FLAG:" + myResource.FLAG);
    }
}

运行结果:

生产者	启动
消费者	启动
生产者	 生产:1
消费者	 消费:1
生产者	 生产:2
消费者	 消费:2
生产者	 生产:3
消费者	 消费:3
生产者	 生产:4
消费者	 消费:4
生产者	 生产:5
消费者	 消费:5
main停止,FLAG:false
生产者	退出
消费者	获取超时
消费者	退出
原文地址:https://www.cnblogs.com/everyingo/p/14554461.html