Phaser类详解

  Phaser允许并发多阶段任务。Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。

  一个Phaser对象有两种状态:

  • 活跃态(Active):当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
  • 终止态(Termination):当所有参与同步的线程都取消注册的时候,Phaser就处于终止态,在终止状态下,Phaser没有任何参与者。当Phaser对象onAdvance()方法返回True时,Phaser对象就处于终止态。当Phaser处于终止态时,同步方法arriveAndAwaitAdvance()会立即返回,而且不会做任何同步操作。

  Phaser对象的主要方法:

  • arrive():这个方法通知phaser对象一个参与者已经完成当前阶段,但是它不应该等待其他参与者都完成当前阶段任务。必须使用这个方法,因为它不会与其他线程同步。
  • awaitAdvance(int phase).如果传入的参数与当前阶段一直,这个方法将会将当前线程置于休眠,直到这个阶段的参与者都完成运行。如果传入的阶段参数与当前阶段不一致,立即返回。
  • arriveAndAwaitAdvance().当一个线程调用此方法时,Phaser对象将减1,并把这个线程至于休眠状态,直到所有其他线程完成这个阶段。
  • arriveAndDeregister().当一个线程调用此方法时,Phaser对象将减1,并且通知这个线程已经完成了当前语句,不会参加到下一个阶段中,因此phaser对象在开始下一个阶段时不会等待这个线程。
  • awaitAdvanceInterruptibly(int phase).这个方法跟awaitAdvance(int phase)一样,不同之处是,如果这个方法中休眠的线程被中断,它将抛出InterruptedException异常。
  • register():这个方法将一个新的参与者注册到phaser中,这个新的参与者将被当成没有执行完本阶段的线程。
  • bulkRegister(int Parties):这个方法将指定数目的参与者注册到Phaser中,所有的这些参与者都讲被当成没有执行完本阶段的线程。

  下面将通过具体的实例来讲解Phaser的用法。在实例中Phaser将同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内改过扩展名为。log的文件。这个任务分解为三个步骤:

  1.在指定文件夹及其子文件夹中获得扩展名为.log的文件。

  2.对第一步的结果过滤,删除修改时间超过24小时的文件。

  3.将结果打印数据到控制台。

public class FileSearch implements Runnable {
    private String initPath;// 查找路径
    private String end;// 文件后缀
    private List<String> results;// 结果集
    private Phaser phaser;

    public FileSearch(String initPath, String end, Phaser phaser) {
        this.initPath = initPath;
        this.end = end;
        this.phaser = phaser;
        this.results = new ArrayList<String>();
    }

    private void direactoryProcess(File file) {
        File list[] = file.listFiles();
        if (list != null) {
            for (File file2 : list) {
                if (file2.isDirectory()) {
                    direactoryProcess(file2);
                } else {
                    fileProcess(file2);
                }
            }
        }
    }

    private void fileProcess(File file) {
        if (file.getName().endsWith(end)) {
            results.add(file.getAbsolutePath());
        }
    }

    private void filterResult() {
        List<String> newResult = new ArrayList<String>();
        long actualDate = new Date().getTime();
        for (int i = 0; i < results.size(); i++) {
            File file = new File(results.get(i));
            long lastModifyTime = file.lastModified();
            if (actualDate - lastModifyTime < TimeUnit.MICROSECONDS.convert(1,
                    TimeUnit.DAYS)) {
                newResult.add(results.get(i));
            }
        }
        results = newResult;
    }

    private boolean checkResults() {
        if (results.isEmpty()) {
            System.out.println(Thread.currentThread().getName() + ": Phase "
                    + phaser.getPhase() + " 0 result");
            System.out.println(Thread.currentThread().getName() + ": Phase "
                    + phaser.getPhase() + " end");
            phaser.arriveAndDeregister();
            return false;
        } else {
            System.out.println(Thread.currentThread().getName() + ": Phase "
                    + phaser.getPhase() + " " + results.size() + " result");
            phaser.arriveAndAwaitAdvance();
            return true;
        }
    }

    private void showInfo() {
        for (int i = 0; i < results.size(); i++) {
            System.out.println(Thread.currentThread().getName() + ":"
                    + results.get(i));
        }
        phaser.arriveAndAwaitAdvance();
    }

    @Override
    public void run() {
        phaser.arriveAndAwaitAdvance();
        System.out.println(Thread.currentThread().getName()+": Starting");
        File file=new File(initPath);
        if(file.isDirectory()){
            direactoryProcess(file);
        }
        if(!checkResults()){
            return;
        }
        filterResult();
        if(!checkResults()){
            return;
        }
        showInfo();
        phaser.arriveAndDeregister();
        System.out.println(Thread.currentThread().getName()+": Work completed");
    }
}
public class PhaserMain {
public static void main(String[] args) {
    Phaser phaser=new Phaser(3);
    FileSearch system=new FileSearch("c:\Windows", "log", phaser);
    FileSearch apps=new FileSearch("C:\Programs Files", "log", phaser);
    FileSearch documents=new FileSearch("C:\Documents And Settings", "log", phaser);
    Thread systemThread=new Thread(system, "system");
    systemThread.start();
    Thread appsThread=new Thread(apps, "apps");
    appsThread.start();
    Thread documentsThread=new Thread(documents, "documents");
    documentsThread.start();
    try {
        systemThread.join();
        appsThread.join();
        documentsThread.join();
    } catch (Exception e) {
        e.printStackTrace();
    }
    System.out.println("Terminated:"+ phaser.isTerminated());
}
}
原文地址:https://www.cnblogs.com/wxgblogs/p/5431671.html