java 并发编程

闭锁

  一种可以延迟线程的进度直到其到达终止状态.可以用来确保某些活动直到其他活动都完成后才继续执行

  例如:

  1. 确保某个计算在其需要的所有资源都被初始化了之后才继续执行.
  2. 确保某个服务在其他依赖的服务都启动了之后才开始执行
  3. 等待某个操作的所有参与者(如LOL) 都就绪了之后再继续执行.

锁的实现

1.CountDownLatch

  CountDownLatch 是一种灵活的闭锁实现. 可以在以上的各种类型情况下使用.它可以使一个或多个线程等待一组事件的发生. 

  闭锁状态包括一个计数器,该计数器被初始化为一个正数.表示需要被等待的事件的数量. countDown 方法用于递减计数器,表示有一个事件已经发生,而await 方法等待计数器到达零时就会执行, 否则会一直阻塞直到计数器为零,或者等待中的线程中断, 或者等待超时.

import java.util.concurrent.*;

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) {
                    }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;
    }

    public static void main(String[] args) throws InterruptedException {
        new TestHarness().timeTasks(9, new Runnable() {
            public void run() {
                System.out.println(this);
            }
        });
    }
}

  以上程序.它使用两个闭锁,分别表示起始门 "startGate" 和 结束门 "endGate" 来确保所有线程都准备就绪后才继续执行,而每个线程做的最后一件事都是让 "endGate" 减一,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间.

2.FutureTask

  futureTask 也可以用作闭锁. futureTask 是通过Callable 来实现的. 相当于一种可用于生产结果的runnable , 并且可以用于以下3钟等待状态

  1. 等待运行(Waiting to run )
  2. 正在运行(Running)
  3. 运行完成(completed)

执行完成 ,表示计算的所有可能结束的方式. 包括 正常结束,由于取消而结束和由于异常而结束等.

Future.get() 的行为取决于任务的状态,如果任务已完成,那么get 会立即返回结果,get 将阻塞直到这个任务进去完成状态.然后返回结果或者抛出异常. FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程, 而 FutureTask 的规范确保了这种传递的过程能实现结果的安全发布.

public class Preloader {
    ProductInfo loadProductInfo() throws DataLoadException {
        return null;
    }

    private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
            new Callable<ProductInfo>() {
                public ProductInfo call() throws DataLoadException {
                    return loadProductInfo();
                }
            });
    private final Thread thread = new Thread(future);

    public void start() {
        thread.start();
    }

    public ProductInfo get() throws DataLoadException, InterruptedException {
        try {
            return future.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof DataLoadException)
                throw (DataLoadException) cause;
            else
                throw LaunderThrowable.launderThrowable(cause);
        }
    }

    interface ProductInfo {
    }
}

class DataLoadException extends Exception {
}

  

3. Semaphore 

  计数信号量(Counting Semaphore) 用来控制同是访问某个特定资源的操作数量,或者同时执行某个指定操作的数量. 计数信号量还可以用来实现某种资源池,或者对容器施加边界(如:blockingQueue)

  Semaphore 中管理着一组虚拟许可(permit), 许可的初始化数量可以通过构造函数来指定,在执行操作前先获取许可(只要还有剩余许可),并在使用后释放,如果没有许可,那么acquire 将阻塞到有许可(或者直到被中断或者操作超时). release 方法将返回一个许可给信号量(许可与线程无关,一个许可可以在一个线程获取,在另一个线程释放,且不具备重入性)

public class BoundedHashSet <T> {
    private final Set<T> set;
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded)
                sem.release();
        }
    }

    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved)
            sem.release();
        return wasRemoved;
    }
}

 4. Barrier

  栅栏 类似于闭锁,它能阻塞一组线程直到某个事件发生,栅栏与闭锁的区别关键在于:所有线程必须同时到达栅栏的位置,才能继续,闭锁用于等待某件事情,而栅栏用于实现一些协议.例如: 几个人决定在某个地方集合:'所有人6:00 在 麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事.'

  CyclicBarrier 可以使一定数量的参与方法反复在栅栏位置会聚,它在并行迭代算法中非常有用,这种算法通常将一个问题拆分成一系列相互独立的子问题,当线程到达栅栏位置时将调用await 方法, 这个方法将阻塞直到所有线程都到达栅栏位置,如果所有线程都到达栅栏位置,那么栅栏将打开, 此时所有线程都被释放,而栅栏将被重置以便下次使用, 如果对await的调用超时,或者await 阻塞的线程被中断, 那么栅栏将被认为是打破了, 所有阻塞的await 调用都将终止并抛出 BrokenBarrierException . 如果成功通过栅栏,那么await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来"选举" 产生一个领导线程.并在下一次迭代中由该领导线程执行一些特殊的工作.CyclicBarrier 还可以是你将一个栅栏操作传递给构造函数,这是一个Runnable ,当成功通过栅栏时会(在一个子任务线程中) 执行它.但在阻塞线程被释放钱是不能执行的.

  在模拟程序中经常使用栅栏.

  

public class CellularAutomata {
    private final Board mainBoard;
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CellularAutomata(Board board) {
        this.mainBoard = board;
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                new Runnable() {
                    public void run() {
                        mainBoard.commitNewValues();
                    }});
        this.workers = new Worker[count];
        for (int i = 0; i < count; i++)
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
    }

    private class Worker implements Runnable {
        private final Board board;

        public Worker(Board board) { this.board = board; }
        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++)
                    for (int y = 0; y < board.getMaxY(); y++)
                        board.setNewValue(x, y, computeValue(x, y));
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }

        private int computeValue(int x, int y) {
            // Compute the new value that goes in (x,y)
            return 0;
        }
    }

    public void start() {
        for (int i = 0; i < workers.length; i++)
            new Thread(workers[i]).start();
        mainBoard.waitForConvergence();
    }

    interface Board {
        int getMaxX();
        int getMaxY();
        int getValue(int x, int y);
        int setNewValue(int x, int y, int value);
        void commitNewValues();
        boolean hasConverged();
        void waitForConvergence();
        Board getSubBoard(int numPartitions, int index);
    }
}

张孝祥的案例:

public class CyclicBarrierTest {
    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final CyclicBarrier cb = new CyclicBarrier(3); // 三个线程同时到达
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                public void run() {
                    try {
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("线程"
                                + Thread.currentThread().getName()
                                + "即将到达集合地点1,当前已有"
                                + (cb.getNumberWaiting() + 1)
                                + "个已到达"
                                + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                        : "正在等候"));
                        try {
                            cb.await();
                        } catch (BrokenBarrierException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("线程"
                                + Thread.currentThread().getName()
                                + "即将到达集合地点2,当前已有"
                                + (cb.getNumberWaiting() + 1)
                                + "个已到达"
                                + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                        : "正在等候"));
                        try {
                            cb.await();
                        } catch (BrokenBarrierException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("线程"
                                + Thread.currentThread().getName()
                                + "即将到达集合地点3,当前已有"
                                + (cb.getNumberWaiting() + 1)
                                + "个已到达"
                                + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
                                        : "正在等候"));
                        try {
                            cb.await();
                        } catch (BrokenBarrierException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

  另一种形式的栅栏是 Exchanger ,它是一种两方(Two-Party)栅栏 , 各方在栅栏位置上交换数据,当两方执行不对称的操作时, Exchanger 会非常有用.

  例如: 当一个线程想缓冲区写入数据, 而另一个线程用缓冲区中读取数据.这些线程可以使用Exchanger来汇聚,并将满的缓冲区与空的缓冲区交换.当两个线程通过Exchanger交换对象时.这种交换就把两个对象安全地发布给另一方.

  数据交换的实际取决于应用程序的相应需求. 最简单的方案是. 当缓冲区被填满时,由填充任务进行交换. 当缓冲区为空时,由清空任务进行交换. 这样会把需要交换的次数降至最低, 但如果新数据的到达不可预测,那么一些数据的处理过程就将延迟.另一个方法是,不仅当缓冲区被填满时进行交换. 并且当缓冲区被充到一定程度,并保持一段时间后.也进行交换.

/**
 * 
 */
package mjorcen.nio.test2;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Exchanger;

/**
 * 
 * 
 * @author mjorcen
 * @email mjorcen@gmail.com
 * @dateTime Jan 19, 2015 6:57:56 PM
 * @version 1
 */
public class ExchangerTest {
    final Exchanger<List<String>> exchanger;

    public ExchangerTest(Exchanger<List<String>> exchanger) {
        super();
        this.exchanger = exchanger;
    }

    public static void main(String[] args) {
        Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

        new Thread(new ExchangerThread01(exchanger)).start();
        new Thread(new ExchangerThread02(exchanger)).start();

    }
}

class ExchangerThread01 implements Runnable {
    final Exchanger<List<String>> exchanger;

    public ExchangerThread01(Exchanger<List<String>> exchanger) {
        super();
        this.exchanger = exchanger;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    public void run() {
        System.out.println("ExchangerThread01 begin ... ");
        try {
            List<String> list = new LinkedList<String>();
            for (int i = 0; i < 20; i++) {
                list.add("str_01_" + i);
            }
            list = exchanger.exchange(list);
            for (String string : list) {
                System.out.println("Thread01 is " + string);
            }
            System.out.println("ExchangerThread01 end ... ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

class ExchangerThread02 implements Runnable {
    final Exchanger<List<String>> exchanger;

    public ExchangerThread02(Exchanger<List<String>> exchanger) {
        super();
        this.exchanger = exchanger;
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Runnable#run()
     */
    public void run() {
        System.out.println("ExchangerThread02 begin... ");
        List<String> list = new LinkedList<String>();
        for (int i = 0; i < 10; i++) {
            list.add("str_02_" + i);
        }
        try {
            Thread.sleep(1000);
            list = exchanger.exchange(list);
            for (String string : list) {
                System.out.println("Thread02 is " + string);
            }
            System.out.println("ExchangerThread02 end ... ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

6. 构建高效且可伸缩的结果缓存

public class Memoizer <A, V> implements Computable<A, V> {
    private final ConcurrentMap<A, Future<V>> cache
            = new ConcurrentHashMap<A, Future<V>>();
    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    public V call() throws InterruptedException {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(arg, ft);
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw LaunderThrowable.launderThrowable(e.getCause());
            }
        }
    }
}

以上内容出自: <<java并发编程实践>>

原文地址:https://www.cnblogs.com/mjorcen/p/4233897.html