多线程之生产者消费者模式

最近在项目中需要使用使用多线程实现一种功能,和生产者消费者模式类似,因此,学习了下生产者消费者模式的多线程实现。在生产者消费者模式中,通常有两类线程,

即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责处理生产者提交的任务。生产者和消费者之间则通过共享内存缓冲区进行通信。

在这里我们选择BlockingQueue做为共享内存缓冲区。

首先,我们构建生产者生产的,和消费者需要处理的数据PCData,即相关任务数据。

public class PCData {

private final int intData;

public PCData(int d){
intData = d ;
}

public PCData(String d){
intData = Integer.valueOf(d);
}

public int getData(){
return intData;
}

@Override
public String toString() {
return "data:"+intData;
}
}

接下来,实现生产者线程,它构建PCData对象,并放入BlockingQueue队列中。
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<PCData> queue;
private static AtomicInteger count = new AtomicInteger(); //总数,原子操作
private static final int SLEEPTIME = 1000;

public Producer(BlockingQueue<PCData> queue){
this.queue = queue;
}

@Override
public void run() {
PCData data = null;
Random random = new Random();

System.out.println("start produce id =" + Thread.currentThread().getId());
while (isRunning){
try {
Thread.sleep(random.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
System.out.println(data + "is put into queue" );
if(!queue.offer(data, 2, TimeUnit.SECONDS)){
System.err.println("failed to put data: " + data);
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

public void stop(){
isRunning = false;
}
}
接下来,实现消费者,它从BlockingQueue队列中取出PCData对象进行处理:
public class Comsumer implements Runnable {
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;

public Comsumer(BlockingQueue<PCData> queue){
this.queue = queue;
}

@Override
public void run() {
System.out.println("start Comsume id = " + Thread.currentThread().getId());

Random random = new Random();

try {
while(true){
PCData data = queue.take();
if(null != data){
int re = data.getData() * data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}",data.getData(),
data.getData(), re));
Thread.sleep(random.nextInt(SLEEPTIME));
}
}
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
最后,在主函数中,我们创建三个生产者和三个消费者,并让他们协作运行。
public class MainTest {

public static void main(String[] args) throws InterruptedException{
BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10);
Producer pro1 = new Producer(queue);
Producer pro2 = new Producer(queue);
Producer pro3 = new Producer(queue);
Comsumer coms1 = new Comsumer(queue);
Comsumer coms2 = new Comsumer(queue);
Comsumer coms3 = new Comsumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(pro1);
service.execute(pro2);
service.execute(pro3);
service.execute(coms1);
service.execute(coms2);
service.execute(coms3);
Thread.sleep(10*1000);
pro1.stop();
pro2.stop();
pro3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
原文地址:https://www.cnblogs.com/junjiang3/p/7096747.html