Java Thread系列(十)生产者消费者模式

Java Thread系列(十)生产者消费者模式

生产者消费者问题(producer-consumer problem),是一个多线程同步问题的经典案例。该问题描述了两个共亨固定大小缓冲区的线程—即所谓的“生产者”和“消费者—在实际运行时会发生的问题。

一、信号灯法

信号灯法实际上就是保证同一时间只有一个线程在操作数据,操作完成后通知其它线程,从而避免死锁。

(1) 生产者

public class Producer implements Runnable {

    private Data data;

    public Producer(Data data) {
        this.data = data;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            data.produce(String.format("product-%s", i));
        }
    }
}

(2) 消费者

public class Consumer implements Runnable {
    private Data data;

    public Consumer(Data data) {
        this.data = data;
    }

    @Override
    public void run() {
        while (true) {
            data.consume();
        }
    }
}

(3) 数据

public class Data {

    private String data;

    // flag = true  生产者生产,消费者等待,生产完毕后通知消费者消费
    // flag = false 消费者消费,生产者等待,消费完毕后通知生产者生产
    private boolean flag = true;

    public synchronized void consume() {
        if (flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                ;
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            ;
        }

        notify();
        System.out.println("消费者消费:" + getData());
        flag = true;
    }

    public synchronized void produce(String data) {
        if (!flag) {
            try {
                wait();
            } catch (InterruptedException e) {
                ;
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            ;
        }

        notify();
        System.out.println("生产者生产:" + data);
        setData(data);
        flag = false;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }
}

(4) 测试

public static void main(String[] args) {
    Data data = new Data();
    new Thread(new Producer(data)).start();
    new Thread(new Consumer(data)).start();
}

二、容器法

(1) Producter

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producter implements Runnable {

    private BlockingQueue<Data> data;
    //用于生成id
    private static AtomicInteger count = new AtomicInteger(0);

    public Producter(BlockingQueue<Data> data) {
        this.data = data;
    }

    public Producter() { }

    @Override
    //生产者生产数据
    public void run() {
        try {
            Thread.sleep(2000);
            int id = count.incrementAndGet();
            if (!this.data.offer(new Data(id, "data-" + id), 2, TimeUnit.SECONDS)) {
                System.out.printf("%s生产:data-%s失败
", Thread.currentThread().getName(), id);
            } else {
                System.out.printf("%s生产:data-%s
", Thread.currentThread().getName(), id);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

(2) Consumer

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue<Data> data;

    public Consumer(BlockingQueue<Data> data) {
        this.data = data;
    }
    public Consumer() { }

    @Override
    //消费者消费数据
    public void run() {
        Data d = null;
        try {
            d = this.data.take();
            System.out.printf("%s消费:%s
", Thread.currentThread().getName(), d.getName());

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Data {
    private int id;
    private String name;
}

(3) 测试

LinkedBlockingQueue<Data> data = new LinkedBlockingQueue<Data>();

Producter p1 = new Producter(data);
Producter p2 = new Producter(data);
Producter p3 = new Producter(data);

Consumer c1 = new Consumer(data);
Consumer c2 = new Consumer(data);
Consumer c3 = new Consumer(data);

ExecutorService pool = Executors.newCachedThreadPool();
pool.execute(p1);
pool.execute(p2);
pool.execute(p3);
pool.execute(c1);
pool.execute(c2);
pool.execute(c3);
pool.shutdown();

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/8999708.html