Phaser相位(工具的实战案例使用)

Phaser没指定parties的时候需要先注册

package com.dwz.phaser;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class PhaserExample {
    private static final Random random = new Random(System.currentTimeMillis());
    
    static class Task extends Thread {
        private final Phaser phaser;

        public Task(Phaser phaser) {
            this.phaser = phaser;
            this.phaser.register();
            start();
        }
        
        @Override
        public void run() {
            System.out.println("The worker [" + getName() + "] is working ...");
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            phaser.arriveAndAwaitAdvance();
        }
    }
    
    public static void main(String[] args) {
        final Phaser phaser = new Phaser();
        IntStream.rangeClosed(1, 5).boxed().map(i -> phaser).forEach(Task::new);
        
        phaser.register();
        phaser.arriveAndAwaitAdvance();
        System.out.println("All of task finished the task.");
    }
}

每个run()里面的phaser.arriveAndAwaitAdvance()都是一轮phaser的消费

package com.dwz.phaser;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
/**
 *    每个run()里面的phaser.arriveAndAwaitAdvance()都是一轮getPhase()的消费
 *    phaser的同一阶段arriveAndAwaitAdvance()全部执行完成才会执行下一阶段,即是相同的getPhase()值时
 */
public class PhaserExample2 {
    private static final Random random = new Random(System.currentTimeMillis());
    
    static class Athletes extends Thread {
        private final int num;
        private final Phaser phaser;
        
        public Athletes(int num, Phaser phaser) {
            this.num = num;
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            try {
                System.out.println(num + ": start running...");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(num + ": end running...");
                System.out.println(num + "getPhase()=>" + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
                
                System.out.println(num + ": start bicycle...");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(num + ": end bicycle...");
                System.out.println(num + "getPhase()=>" + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
                
                System.out.println(num + ": start long jump...");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(num + ": end long jump...");
                System.out.println(num + "getPhase()=>" + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(5);
        
        for(int i = 0; i < 5; i++) {
            new Athletes(i, phaser).start();
        }
    }
}

Phaser可以动态的创建也可以动态的注销

package com.dwz.phaser;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
/**
 *     Phaser可以动态的创建也可以动态的注销
 *     相当于一轮一轮的动作完成
 * 
 *    由于一个运动员受伤了, long jump这个动作没有完成,其它运动员都在等待他完成
 *    导致线程不能终止。
 */
public class PhaserExample3 {
    private static final Random random = new Random(System.currentTimeMillis());
    
    static class InjuredAthletes extends Thread {
        private final int num;
        private final Phaser phaser;
        
        public InjuredAthletes(int num, Phaser phaser) {
            this.num = num;
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            try {
                sport(phaser, num + ": start running...", num + ": end running...");
                
                sport(phaser, num + ": start bicycle...", num + ": end bicycle...");
                
//                System.out.println("Oh, shit. I am injured.");
                
                System.out.println("Oh shit, i am injured, i will exited.");
                phaser.arriveAndDeregister();//取消该相位注册让其他的相位不再等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    static class Athletes extends Thread {
        private final int num;
        private final Phaser phaser;
        
        public Athletes(int num, Phaser phaser) {
            this.num = num;
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            try {
                sport(phaser, num + ": start running...", num + ": end running...");
                
                sport(phaser, num + ": start bicycle...", num + ": end bicycle...");
                
                sport(phaser, num + ": start long jump...", num + ": end long jump...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    private static void sport(Phaser phaser, String str1, String str2) throws InterruptedException {
        System.out.println(str1);
        TimeUnit.SECONDS.sleep(random.nextInt(5));
        System.out.println(str2);
        phaser.arriveAndAwaitAdvance();
    }
    
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(5);
        
        for(int i = 1; i < 5; i++) {
            new Athletes(i, phaser).start();
        }
        
        new InjuredAthletes(5, phaser).start();
    }
}

phaser的onAdvance方法返回值true和false

返回值true时:phaser.arriveAndAwaitAdvance()可能会产生阻塞,等待未完成的相位

package com.dwz.phaser;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserExample4 {
    static class OnAdvanceTask extends Thread {
        private final Phaser phaser;
        
        OnAdvanceTask(String name, Phaser phaser) {
            super(name);
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            System.out.println(getName() + " I am start and the phaser " + phaser.getPhase());
            phaser.arriveAndAwaitAdvance();
            System.out.println(getName() + " I am end and the phaser " + phaser.getPhase());
            
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            if(getName().equals("Alex")) {
                System.out.println(getName() + " I am start and the phaser " + phaser.getPhase());
                phaser.arriveAndAwaitAdvance();
                System.out.println(getName() + " I am end and the phaser " + phaser.getPhase());
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        final Phaser phaser = new Phaser(2) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                return false;
//                return true;
            }
        };
        
        new OnAdvanceTask("Alex", phaser).start();
        new OnAdvanceTask("Jack", phaser).start();
        
        TimeUnit.SECONDS.sleep(2);
        System.out.println(phaser.getArrivedParties());
        System.out.println(phaser.getUnarrivedParties());
    }
}

arrive 不会阻塞其它的相位,而是立即返回

示例代码:

package com.dwz.phaser;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserExample5 {
    private static final Random random = new Random(System.currentTimeMillis());
    
    //arrive 不会阻塞其它的相位,而是立即返回
    public static void main(String[] args) throws InterruptedException {
        final Phaser phaser = new Phaser(5);
        for(int i = 0; i < 4; i++) {
            new ArriveTask(phaser, i).start();
        }
        
        //等待arrive之前的业务全部执行完
        phaser.arriveAndAwaitAdvance();
        System.out.println("The phaser 1 work finished done.");
    }
    
    private static class ArriveTask extends Thread {
        private final Phaser phaser;

        public ArriveTask(Phaser phaser, int no) {
            super(String.valueOf(no));
            this.phaser = phaser;
        }
        
        @Override
        public void run() {
            System.out.println(getName() + " start working.");
            sleepSeconds();
            System.out.println(getName() + " the phaser one is running.");
            phaser.arrive();
            
            sleepSeconds();
            System.out.println(getName() + " keep to do other thing.");
        }
    }
    
    private static void sleepSeconds() {
        try {
           TimeUnit.SECONDS.sleep(random.nextInt(5));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

抛出问题:awaitAdvance can decremental the arrived parties?

package com.dwz.phaser;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
/**
 *    awaitAdvance can decremental the arrived parties?
 */
public class PhaserExample6 {
    //awaitAdvance
    //如果Phaser.awaitAdvance(int phase)中 phase==phaser.getPhase()会产生阻塞,反之不会
    //Phaser.awaitAdvance对parties没有影响
    public static void main(String[] args) throws InterruptedException {
        /*final Phaser phaser = new Phaser(6);
        new Thread(() -> phaser.awaitAdvance(phaser.getPhase())).start();
        TimeUnit.SECONDS.sleep(3);
        
        System.out.println(phaser.getArrivedParties());
        System.out.println(phaser.getUnarrivedParties());*/
        
        final Phaser phaser = new Phaser(6);
        IntStream.rangeClosed(0, 5).boxed().map(i -> phaser).forEach(AwaitAdvanceTask::new);
        //监控phaser.arriveAndAwaitAdvance()是否执行完
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("========================");
    }
    
    private static class AwaitAdvanceTask extends Thread {
        private final Phaser phaser;

        public AwaitAdvanceTask(Phaser phaser) {
            this.phaser = phaser;
            start();
        }
        
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            phaser.arriveAndAwaitAdvance();
            System.out.println(getName() + " finished work.");
        }
    }
}

打断相位执行:phaser.awaitAdvanceInterruptibly 与thread.interrupt()的配合使用

package com.dwz.phaser;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserExample7 {
    public static void main(String[] args) throws InterruptedException {
        /*final Phaser phaser = new Phaser(3);
        Thread thread = new Thread(phaser::arriveAndAwaitAdvance);
        thread.start();
        System.out.println("===================");
        TimeUnit.SECONDS.sleep(10);
        
        thread.interrupt();
        System.out.println("==============thread.interrupt()=============");*/
        
        final Phaser phaser = new Phaser(3);
        Thread thread = new Thread(() -> {
            try {
                phaser.awaitAdvanceInterruptibly(phaser.getPhase());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        thread.start();
        System.out.println("===================");
        TimeUnit.SECONDS.sleep(10);
        
        thread.interrupt();
        System.out.println("==============thread.interrupt()=============");
    }
}

相位销毁phaser.forceTermination()

package com.dwz.phaser;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
/**
 *    相位销毁
 */
public class PhaserExample8 {
    public static void main(String[] args) throws InterruptedException {
        final Phaser phaser = new Phaser(3);
        
        new Thread(phaser::arriveAndAwaitAdvance).start();
        
        TimeUnit.SECONDS.sleep(3);
        System.out.println(phaser.isTerminated());
        
        phaser.forceTermination();
        System.out.println(phaser.isTerminated());
    }
}
原文地址:https://www.cnblogs.com/zheaven/p/13410661.html