ConCurrent in Practice小记 (3)

ConCurrent in Practice小记 (3)

高级同步技巧

Semaphore

Semaphore信号量,据说是Dijkstra大神发明的。内部维护一个许可集(Permits Set),用于发放许可和回收许可,存在内部计数器,主要用来计数能否得到资源(一般用来限制同时访问资源数)。当一个线程拿到许可,计数器减一;当线程释放资源则计数器加一;当计数器为0则阻塞线程。

特别地: Semaphore的同步锁机制仅仅用于对访问许可的同步,对于需要访问对象的池等的同步锁并不保证。如一个线程池需要访问一个资源池,此时对于每一个需要访问资源的线程,要先获得许可,这是在Semaphore中得到同步保护的,但是在得到许可后,对资源池本身的存取等依然是非同步保护的。需要自己实现。(或者是资源池本身的同步维护)——所以在方法中应该确保首先释放(线程所占)资源,再去释放许可。

JDK中的示例代码:

Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }

 }

Semaphore的构造方法为:
Semaphore(int permits); 给出允许发放的许可个数,默认不公平发放
Semaphore(int permits, boolead fair); 除给出许可个数外,boolean值代表是否允许公平发放许可
acquire(),release()分别是取得许可和释放许可。当Semaphore的许可数目为1时,即只有1和0两种状态,此时的Semaphore被称之为二进制信号量,是完全互斥的,仅允许线程一个一个执行。

另外同Lock类相同,Semaphore也有两种特殊acquire的方法:
acquireUninterruptibly(): 普通的acquire()方法在阻塞的时候是可以被中断的,且抛出异常。但是使用此方法会忽视中断信息且不会抛出异常。
tryAcquire(): 如果可以得到许可返回true,如果不能则立即返回false,并不会阻塞或者等待Semaphore的释放。

CountDownLatch

CountDownLatch提供这样一个类:目的在于同步线程之间的任务,它让一个或多个线程等待,直到一组操作全都执行完毕才返回。初始化时仅需要一个int参数,用来表示这一组操作共有多少任务,该正数传递到CountDownLatch对象的计数器中。当某个线程完成自己的任务时调用await()方法,该方法在内部计数器未自减到0之前让线程保持睡眠,当计数器减少到0时会唤醒所有调用await()而睡眠的线程并返回结果。CountDownLatch对象的countDown()方法会自减其内部计数器。

特别地:注意CountDownLatch对象只能使用一次,内部计数器初始化后均不可再更改值,当计数器减少到0时,再调用countDown()方法也不会有影响。如果需要下一次同步则必须生成新的对象。

CountDownLatch和临界区保护等没有关系,仅

示例代码:

package com.lyb.Section3;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Created by lyb on 15-7-28.
 */
public class VideoConferenceTest {

    public static void main(String[] args){
        VideoConference conference = new VideoConference(10);
        Thread conferenceThread = new Thread(conference);
        conferenceThread.start();

        for (int i = 0; i < 10; i++){
            Participant participant = new Participant(conference,"Participant " + i);
            Thread thread = new Thread(participant);
            thread.start();
        }

    }



}



class VideoConference implements Runnable{

    private final CountDownLatch countDownLatchControler;

    public VideoConference(int number){
        countDownLatchControler = new CountDownLatch(number);
    }

    public void arrive(String name){
        System.out.printf("%s has arrived. 
",name);
        countDownLatchControler.countDown();
        System.out.printf("VideoConference : Waiting for %d conferences 
", countDownLatchControler.getCount());
    }

    public void run(){
        System.out.printf("VideoConference Initialization : %d participants. 
",
                countDownLatchControler.getCount());
        try {
            countDownLatchControler.await();
            System.out.printf("All the participants come 
");
            System.out.printf("Let's begin the conference .... 
");
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

}

class Participant implements Runnable{

    private VideoConference conference;
    private String name;

    public Participant(VideoConference conference, String name){
        this.conference = conference;
        this.name = name;
    }

    public void run(){
        try {
            long duration = (long)(Math.random()*10);
            TimeUnit.SECONDS.sleep(duration);
        }catch (InterruptedException e ){
            e.printStackTrace();
        }
        conference.arrive(name);
    }

}

CyclicBarrier

CyclicBarrier跟CountDownLatch类相似,也是在多个线程到达某一同步点之后睡眠,等待一系列操作完成之后,再进行下一个任务。这里仍然是同步的线程数目,当有一个线程到达时,先睡眠,到达的数目为制定数目后可以进行初始化参数的Runnable任务。
(这里有一个疑问,如果是线程池中,仅有3条线程,但是在这里CyclicBarrier设置为5,拿是不是永远达不到Barrier了?)

另一个版本的await()方法:
await(long time, TimeUnit unit): 唤醒条件为休眠被中断,内部计数器到达,等待时间到。
getNumberWaiting() 返回当前在屏障前等待的参与者的数目
getParties() 返回同步的任务数,即总共需要达到屏障前的数目

CyclicBarrier同CountDownLatch最大的不同是CyclicBarrier可以被重用,使用reset()方法可以重置barrier,但是全部正在await()的线程将抛出broken异常。Broken状态是Barrier的特殊状态,在此状态下不能操作,状态的引起是由于正在await()的某个线程被中断。isBroken()可以检测该状态。

Phaser

Phaser移相器是JDK1.7引入的新类,可以运行阶段性的并发任务。当任务是分为多个步骤来做,则Phaser可以在每个阶段的的结尾同步线程,所以除非完成第一个阶段,否则不可能开始第二个步骤。

package com.lyb.Section3;

import com.sun.xml.internal.stream.util.ThreadLocalBufferAllocator;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

/**
 * Created by lyb on 15-7-30.
 */
public class PhaserTest {
    public static void main(String[] args){
        Phaser phaser = new Phaser(3);

        FileSearch ahome = new FileSearch("/home/lyb/WorkSpace","md",phaser);
        FileSearch bhome = new FileSearch("/home/lyb/Book","md",phaser);
        FileSearch chome = new FileSearch("/home/lyb/Software","md",phaser);

        Thread ahomeThread = new Thread(ahome,"WorkSpace");
        ahomeThread.start();

        Thread bhomeThread = new Thread(bhome,"Book");
        bhomeThread.start();

        Thread chomeThread = new Thread(chome,"Software");
        chomeThread.start();

        try {
            ahomeThread.join();
            bhomeThread.join();
            chomeThread.join();
        }catch (InterruptedException e){
            e.printStackTrace();
        }

        System.out.printf("Terminated:" + phaser.isTerminated());
    }

}

class FileSearch implements Runnable{
    private String initPath;
    private String end;
    private Phaser phaser;
    private List<String> results;

    public FileSearch(String initPath, String end, Phaser phaser){
        this.initPath = initPath;
        this.end = end;
        this.phaser = phaser;
        results = new ArrayList<>();
    }

    private void directoryProcessed(File file){
        File filelist[] = file.listFiles();
        if (filelist != null){
            for (File file1 : filelist){
                if (file1.isDirectory()){
                    directoryProcessed(file1);
                }else {
                    fileProcessed(file1);
                }
            }
        }
    }

    private void fileProcessed(File file){
        if (file.getName().endsWith(end)){
            results.add(file.getAbsolutePath());
        }
    }

    private void filterResults(){
        List<String> newResults = new ArrayList<>();
        long actualTime = new Date().getTime();
        for (String filePath : results){
            File file = new File(filePath);
            long modifyTime = file.lastModified();
            if (actualTime - modifyTime < TimeUnit.MILLISECONDS.convert(100,TimeUnit.DAYS)){
                newResults.add(filePath);
            }
        }
        results = newResults;
    }

    private boolean checkResults(){
        if (results.isEmpty()){
            System.out.printf("%s : Phase %d: 0 results 
",
                    Thread.currentThread().getName(),
                    phaser.getPhase());
            System.out.printf("%s : Phase %d: end 
",
                    Thread.currentThread().getName(),
                    phaser.getPhase());
            phaser.arriveAndDeregister();
            return false;
        }else {
            System.out.printf("%s : Phase %d : %d results. 
",
                    Thread.currentThread().getName(),
                    phaser.getPhase(),results.size());
            phaser.arriveAndAwaitAdvance();
            return true;
        }
    }

    private void showInfo(){
        for (String filePath : results){
            File file = new File(filePath);
            System.out.printf("%s : %s 
",Thread.currentThread().getName(),file.getAbsoluteFile());
        }
        phaser.arriveAndAwaitAdvance();
    }

    public void run(){
        phaser.arriveAndAwaitAdvance();
        System.out.printf("%s Starting. 
", Thread.currentThread().getName());
        File file = new File(initPath);
        if (file.isDirectory()){
            directoryProcessed(file);
        }

        if (!checkResults()){
            return;
        }

        filterResults();
        if (!checkResults()){
            return;
        }

        showInfo();
        phaser.arriveAndDeregister();
        System.out.printf("%s Work completed. 
", Thread.currentThread().getName());
    }


}

Phaser的构造函数接受的参数是指在phase末端控制的同步的线程数目,也称之为参与者数目。在run方法中首先调用arriveAndAwaitAdvance(),这样每个线程都在自己创建完毕后等待其他线程,同时每次调用该方法,会更新phase内部的线程计数,减去完结的线程数,得到现在实际应该同步的线程数,并使当前线程睡眠,等待最后一个同步线程到达,使所有的线程唤醒,开始执行第二个phase的操作。

arriveAndDeregister()是在线程到达phase时,已经判定为应该终止的线程,Deregister之后,phase内部计数减一,不会再计算该线程。当然每个phase执行到最后,都应每个线程调用该方法,退出phase计数。

最终,由main方法中调用isTerminated()退出phaser。

Phaser 对象可能是在这2中状态:

  • Active: 当 Phaser 接受新的参与者注册,它进入这个状态,并且在每个phase的末端同步。 (在此状态,Phaser像在这个指南里解释的那样工作。此状态不在Java 并发 API中。)
  • Termination: 默认状态,当Phaser里全部的参与者都取消注册,它进入这个状态,所以这时 Phaser 有0个参与者。更具体的说,当onAdvance() 方法返回真值时,Phaser 是在这个状态里。如果你覆盖那个方法,你可以改变它的默认行为。当 Phaser 在这个状态,同步方法 arriveAndAwaitAdvance()会立刻返回,不会做任何同步。

Phaser 类的一个显著特点是你不需要控制任何与phaser相关的方法的异常。不像其他同步应用,线程们在phaser休眠不会响应任何中断也不会抛出 InterruptedException 异常。只有一个异常InterruptedException在特定中才会出现。

The Phaser类还提供了其他相关方法来改变phase。他们是:

  • arrive(): 此方法示意phaser某个参与者已经结束actual phase了,但是他应该等待其他的参与者才能继续执行。小心使用此法,因为它并不能与其他线程同步。(不懂。。。)

  • awaitAdvance(int phase): 如果我们传递的参数值等于phaser的actual phase,此方法让当前线程进入睡眠直到phaser的全部参与者结束当前的phase。如果参数值与phaser 的 actual phase不等,那么立刻返回。

  • awaitAdvanceInterruptibly(int phaser): 此方法等同与之前的方法,只是在线程正在此方法中休眠而被中断时候,它会抛出InterruptedException 异常。

当你创建一个 Phaser 对象,你表明了参与者的数量。但是Phaser类还有2种方法来增加参与者的数量。他们是:

  • register(): 此方法为Phaser添加一个新的参与者。这个新加入者会被认为是还未到达 actual phase.
  • bulkRegister(int Parties): 此方法为Phaser添加一个特定数量的参与者。这些新加入的参与都会被认为是还未到达 actual phase.
  • Phaser类提供的唯一一个减少参与者数量的方法是arriveAndDeregister() 方法,它通知phaser线程已经结束了actual phase,而且他不想继续phased的操作了。

当phaser有0个参与者,它进入一个称为Termination的状态。Phaser 类提供 forceTermination() 来改变phaser的状态,让它直接进入Termination 状态,不在乎已经在phaser中注册的参与者的数量。此机制可能会很有用在一个参与者出现异常的情况下来强制结束phaser.

当phaser在 Termination 状态, awaitAdvance() 和 arriveAndAwaitAdvance() 方法立刻返回一个负值,而不是一般情况下的正值如果你知道你的phaser可能终止了,那么你可以用这些方法来确认他是否真的终止了。

个人总结:

  1. 任意时刻都可以加入phase,通过register和bulkregister,第一次到达phase才开始同步
  2. onArrive和onAdvance会手动调用线程的到达,onarrive并不等待其他的线程,而如果onAdvance返回true(其实是判断内部的计数是否是0),则即将进入termination。
  3. 可以级联,成为phase树,而且child phase的参与者的register是自动在parent的计数上注册的。当子phase参与者为0,则自动从树上裂解。
  4. 有效的镜像,可以通过随时调用getxxx方法得到参与者数量,为到达数量等等。

generated by haroopad

原文地址:https://www.cnblogs.com/putuotingchan/p/4701176.html