多生产者多消费者(第二种方式2.1)基于BlockingQueue

public class Producer implements Runnable {

    //静态变量只初始化一次
    private static AtomicInteger count = new AtomicInteger();
    
    private volatile boolean  isRunning = true;
    private BlockingQueue<String> queue;
    private String name;

    public Producer(BlockingQueue<String> queue, String name) {
        this.queue = queue;
        this.name = name;
    }

    public void run() {
        String data = null;
        Random r = new Random();
        System.out.println("启动" + name);
        try {
            while (isRunning) {
                Thread.sleep(r.nextInt(100));
                data = "data:" + count.incrementAndGet();
                System.out.println(name + "生产数据:" + data);
                queue.put(data);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + "退出!");
    }

    public void stop() {
        isRunning = false;
    }

}
public class Consumer implements Runnable {
    private volatile boolean  isRunning = true;
    private BlockingQueue<String> queue;
    private String name;

    public Consumer(BlockingQueue<String> queue, String name) {
        this.queue = queue;
        this.name = name;
    }

    public void run() {
        System.out.println("启动" + name);
        try {
            while (isRunning) {
                String data = queue.take();
                System.out.println(name + "消费数据:" + data);
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(name + "退出!");
    }

    public void stop() {
        isRunning = false;
    }
}
public class Client {
 
    public static void main(String[] args) throws InterruptedException {
        // 声明一个容量为10的缓存队列
        BlockingQueue<String> queue = new  LinkedBlockingQueue<String>(5);
 
        Producer producer1 = new Producer(queue , "【生产者一】");
        Producer producer2 = new Producer(queue ,"【生产者二】");
        Consumer consumer1 = new Consumer(queue ,"【消费者一】");
        // 借助Executors
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动线程
        service.execute(producer1);
        service.execute(producer2);
        service.execute(consumer1);
        
        // 执行7s
        Thread.sleep(7 * 1000);
        producer1.stop();
        producer2.stop();
        service.shutdown();
        
    }
}
 

 

队列的操作:

offer 是向队列尾部添加元素,返回true或者false

poll 是从队列头部取数据,如果没有取到返回null

offer和poll都不会阻塞当前线程

阻塞操作

put  往队列当中中放入数据

take  从队列当中取数据

原文地址:https://www.cnblogs.com/moris5013/p/10882112.html