自己做多线程的屏障

有时候我们令多个线程去共同完成一项任务,在他们做到某个阶段后我们需要merge一下结果,然后继续去做。那么如何让他们在一个阶段结束后都停下来,等merge完后在并发执行呢?也许有人会说用join不就好啦,等待所有线程。join真的能很好的完成我们的要求吗?

我们来看一下如果用join会是什么样的结果:

例子:如果我们分两个阶段merge结果一个是在中间,一个是在最后。那么它的执行过程是这样的:

显然由于join是等待线程结束,这样就要求我们把处理1作为一个进程,处理2也作为一个线程。这样显然比较浪费资源,如果能把这四个步骤写到一个run方法中,这样就不用kill线程,在创建这样的操作了。如果是个小的项目还好,如果是高并发性的,这可是一笔很大的开销。

还和往常一样我们先试着写出一个版本,在根据需求慢慢改,慢慢完善:

有什么需求呢?我们的功能Barrier类能做到是:

1.每个子线程运行到merge前时能停下来,

2.每个线程在merge执行完后还能互相不干扰的继续并发执行。

 1 package com.choi.tools;
 2 
 3 public class Barrier {
 4 
 5     private int count;
 6 
 7     private int countRegister;
 8 
 9     public Barrier(int count) {
10         this.count = count;
11         this.countRegister = count;
12     }
13 
14     public synchronized void waitForRest() {
15         if (count > 1) {
16             count--;
17             try {
18                 wait();
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             }
22         }
23         if (count == 1) {
24             count = countRegister;
25             notifyAll();
26         }
27     }
28 }

构造函数:把要停住的线程数传进去,count保持当前要等待的线程数,countRegister保存nThreads,这是为了恢复Barrier用的。

waitForRest():在需要停住线程的地方调用waitForRest(),所有的线程都会被wait()阻塞,当最后一个线程来的时候,恢复count的值,然后通知所有线程。(这里有个问题,我们是否还要阻塞这最后一个线程,如果不阻塞好像也没什么,但如果要求每个线程同时被唤醒,我们还是阻塞一下好,在下面的版本中选择阻塞,在同一唤醒。)

下载修改一下,这里我写错了,不能通过countRegister来重置,因为存在这么一种情况,如果线程超过了构造函数要求的线程数,那么这个线程就会继续被阻塞,相当于又来一遍。

修改后的类:

 1 package com.choi.tools;
 2 
 3 public class Barrier {
 4 
 5     private int count;
 6 
 7     private InterruptedException iex;
 8 
 9     public Barrier(int nThreads) {
10         this.count = nThreads;
11     }
12 
13     public synchronized int waitForRest() throws InterruptedException {
14         if (iex != null)
15             throw iex;
16         count--;
17         if (count <= 0) {
18             notifyAll();
19             return count;
20         }
21         while (count > 0) {
22             if(iex!=null)
23                 throw iex;
24             try {
25                 wait();
26             } catch (InterruptedException e) {
27                 iex = e;
28                 notifyAll();
29             }
30         }
31         return count;
32     }
33 }

这里加入了异常处理机制,这里我把if (count>0)换成了while,为什么做么做,其实一般我们都会把wait放入while循环中,那是因为wait一般被调用都是因为资源申请不到之类的情况,我们还想等被唤醒后继续申请,但并不是什么时候都要用while的。我们来说一下这里的情况,如果一个线程被中断了,我们会继续阻塞它,所以wait就好用了。

例子看这里:

原文地址:https://www.cnblogs.com/chaiwentao/p/4678734.html