使用并发队列实现生产者与消费者

完全利用了阻塞队列的特性,实现了消费者生产者的关系,非常好玩!

大家赶紧看看代码,然后动手默写一个吧~~

package com.toov5.thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class ProducerThread implements Runnable{
      public volatile boolean FLAG = true;
      AtomicInteger atomicInteger = new AtomicInteger();  //作计数用的
      public BlockingQueue<String> blockingQueue;
      public ProducerThread(BlockingQueue<String> blockingQueue) {
         this.blockingQueue=blockingQueue;
    }
    
      @Override
    public void run() {
        System.out.println("生产者线程已启动");
        try {
            while (FLAG) {
                String  incrementAndGet = atomicInteger.incrementAndGet()+"";  //每次都安全的++ 可以这么理解  +""转为string类型    
                boolean offer =    blockingQueue.offer(incrementAndGet,2,TimeUnit.SECONDS);//如果超出了那么要等待两秒  
                if (offer) {
                    System.out.println("生产者存入队列成功:数据"+incrementAndGet);
                }else {
                    System.out.println("生产者存入队列失败:数据"+incrementAndGet);
                }    
            Thread.sleep(1000);    
            }
        } catch (Exception e) {
            // TODO: handle exception
        }finally{
            System.out.println("生产者结束......");
        }    
    }
    
    public void stop(){
        this.FLAG=false;
    }  
      
}
//消费者队列 获取队列
class ConsumerThread implements Runnable{
      public volatile boolean FLAG = true;
      AtomicInteger atomicInteger = new AtomicInteger();  //作计数用的
      public BlockingQueue<String> blockingQueue;
      public ConsumerThread(BlockingQueue<String> blockingQueue) {
        this.blockingQueue=blockingQueue;
    }
    @Override
    public void run() {
        System.out.println("消费者线程已经启动...");
    
        try {
            while (FLAG) {    
            String data = blockingQueue.poll(2, TimeUnit.SECONDS);
            if (data==null) {
                System.out.println("超过2秒时间、没有获取队列信息");
                FLAG=false;
                return;  //没有结果了 最后就推出啦
            }
            System.out.println("获取到结果data"+data);
        } 
        }catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            System.out.println("消费者停止.....");
        }
        
    }
}

public class ProducerAndConsumer {

    
    public static void main(String[] args) {
        
        LinkedBlockingDeque<String> linkedBlockingDeque = new LinkedBlockingDeque<>(8); //容量可传也可以不用 默认有个值 超过最大值要扩容
        ProducerThread producerThread = new ProducerThread(linkedBlockingDeque);
        ConsumerThread consumerThread = new ConsumerThread(linkedBlockingDeque);
        Thread p = new Thread(producerThread);
        Thread c = new Thread(consumerThread);
        p.start();
        c.start();
        try {
            //等待10s
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
       producerThread.stop();    
    }
    
    
}

看到运行结果

消费者获取不到什么的话 等待等待 再获取不到就退出了

原文地址:https://www.cnblogs.com/toov5/p/9833637.html