Exchanger类

1.简述

  Exchanger是适用在两个线程之间数据交换的并发工具类,它的作用是找到一个同步点,当两个线程都执行到了同步点(exchange方法)之后(有一个没有执行到就一直等待,也可以设置等待超时时间),就将自身线程的数据与对方交换。

  Exchanger使用场景

  • 线程间交互数据。

2.Exchanger的常用方法

/**构造方法
 */
//创建一个新的Exchanger
Exchanger()

/**常用方法
 */
//exchange方法用于交互数据V
V exchange(V x)
//延迟一定时间交换数据
V exchange(V x, long timeout, TimeUnit unit)
View Code

3.Exchanger的源码分析

  Exchanger的算法核心是通过一个可以交换数据的slot和一个可以带有数据item的参与者。

  Exchanger的主要属性

/** The number of CPUs, for sizing and spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//arena(Slot数组)的容量。设置这个值用来避免竞争。
private static final int CAPACITY = 32;
//arena最大不会超过FULL,避免空间浪费。如果单核或者双核CPU,FULL=0,只有一个SLot可以用。
private static final int FULL = Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
//自旋等待次数。单核情况下,自旋次数为0;多核情况下为大多数系统线程上下文切换的平均值。该值设置太大会消耗CPU。
private static final int SPINS = (NCPU == 1) ? 0 : 2000;
//若在超时机制下,自旋次数更少,因为多个检测超时的时间,这是一个经验值。
private static final int TIMED_SPINS = SPINS / 20;

private static final class Node extends AtomicReference<Object> {
    //创建这个节点的线程提供的用于交换的数据。
    public final Object item;
    //等待唤醒的线程
    public volatile Thread waiter;
    /**
     * Creates node with given item and empty hole.
     * @param item the item
     */
    public Node(Object item) {
        this.item = item;
    }
}  
 
//一个Slot就是一对线程交换数据的地方。这里对Slot做了缓存行填充,能够避免伪共享问题。虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。
private static final class Slot extends AtomicReference<Object> {
    // Improve likelihood of isolation on <= 64 byte cache lines
    long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}  
//Slot数组,在需要时才进行初始化,用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。
private volatile Slot[] arena = new Slot[CAPACITY];
//正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后,这个值会递增。当一个线程自旋等待超时后,这个值会递减。
private final AtomicInteger max = new AtomicInteger(); 
View Code

  Exchanger的exchange方法

/**
 * 等待其他线程到达交换点,然后与其进行数据交换。
 * 如果其他线程到来,那么交换数据,返回。
 * 如果其他线程未到来,那么当前线程等待,直到如下情况发生:
 *   1.有其他线程来进行数据交换。
 *   2.当前线程被中断。
 */
public V exchange(V x) throws InterruptedException {
    //检测当前线程是否被中断。
    if (!Thread.interrupted()) {
        //进行数据交换。
        Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
        //检测结果是否为null。
        if (v == NULL_ITEM)
            return null;
        //检测是否被取消。
        if (v != CANCEL)
            return (V)v;
        //清除中断标记。
        Thread.interrupted(); // Clear interrupt status on IE throw
    }
    throw new InterruptedException();
}
/**
 * 等待其他线程到达交换点,然后与其进行数据交换。
 * 如果其他线程到来,那么交换数据,返回。
 * 如果其他线程未到来,那么当前线程等待,直到如下情况发生:
 *   1.有其他线程来进行数据交换。
 *   2.当前线程被中断。
 *   3.超时。
 */
public V exchange(V x, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    //检测当前线程是否被中断。
    if (!Thread.interrupted()) {
        //进行数据交换。
        Object v = doExchange((x == null) ? NULL_ITEM : x,
                              true, unit.toNanos(timeout));
        //检测结果是否为null。
        if (v == NULL_ITEM)
            return null;
        //检测是否被取消。
        if (v != CANCEL)
            return (V)v;
        if (!Thread.interrupted())
            throw new TimeoutException();
    }
    throw new InterruptedException();
}

/**doExchange方法,进行数据交换
 */
private Object doExchange(Object item, boolean timed, long nanos) {
    Node me = new Node(item);
    //根据thread id计算出自己要去的那个交易位置(slot)
    int index = hashIndex();
    int fails = 0;

    for (;;) {
        Object y;
        Slot slot = arena[index];
        //slot = null,创建一个slot,然后会回到for循环,再次开始
        if (slot == null)
            createSlot(index);
        else if ((y = slot.get()) != null &&//slot里面有人等着(有Node),则尝试和其交换
                 slot.compareAndSet(y, null)) {//关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点
            Node you = (Node)y;
            //把Node里面的东西,换成自己的
            if (you.compareAndSet(null, item)) {
                //唤醒对方
                LockSupport.unpark(you.waiter);
                //自己把对方的东西拿走
                return you.item;
            }//关键点2:如果运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始
        }
        else if (y == null &&//slot里面为空(没有Node),则自己把位置占住
                 slot.compareAndSet(null, me)) {
            //如果是0这个位置,自己阻塞,等待别人来交换
            if (index == 0)
                return timed ?
                    awaitNanos(me, slot, nanos) :
                    await(me, slot);
            //不是0这个位置,自旋等待
            Object v = spinWait(me, slot);
            //自旋等待的时候,运气好,有人来交换了,返回
            if (v != CANCEL)
                return v;
            //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环
            me = new Node(item);
            int m = max.get();
            if (m > (index >>>= 1))
                max.compareAndSet(m, m - 1);
        }
        else if (++fails > 1) {//失败 case1: slot有人,要交互,但被人家抢了  case2: slot没人,自己要占位置,又被人家抢了
            int m = max.get();
            //3次匹配失败,把index扩大,再次开始for循环
            if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                index = m + 1;
            else if (--index < 0)
                index = m;
        }
    }
}

/**
 * 在下标为0的Slot上等待获取其他线程填充的值。
 * 如果在Slot被填充之前超时或者被中断,那么操作失败。
 */
private Object awaitNanos(Node node, Slot slot, long nanos) {
    int spins = TIMED_SPINS;
    long lastTime = 0;
    Thread w = null;
    for (;;) {
        Object v = node.get();
        if (v != null)
            //如果已经被其他线程填充了值,那么返回这个值。
            return v;
        long now = System.nanoTime();
        if (w == null)
            w = Thread.currentThread();
        else
            nanos -= now - lastTime;
        lastTime = now;
        if (nanos > 0) {
            if (spins > 0)
                --spins; //先自旋几次。
            else if (node.waiter == null)
                node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。
            else if (w.isInterrupted())
                tryCancel(node, slot); //如果当前线程被中断,尝试取消node。
            else
                LockSupport.parkNanos(node, nanos); //阻塞给定的时间。
        }
        else if (tryCancel(node, slot) && !w.isInterrupted())
            //超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点
            return scanOnTimeout(node);
    }
}
View Code

4.Exchanger的使用示例

public class Test {
    public static void main(String[] args) throws Exception {
        final Exchanger<String> exgr = new Exchanger<String>();
        new Thread((new Runnable() {
            @Override
            public void run() {
                try {
                    String A = Thread.currentThread().getName()+"的数据";
                    System.out.println(Thread.currentThread().getName()+"交互前:"+ A);
                    A = exgr.exchange(A);
                    System.out.println(Thread.currentThread().getName()+"交互后:"+ A);
                } catch (InterruptedException e) {
                }
            }
        }), "线程1").start();

        new Thread((new Runnable() {
            @Override
            public void run() {
                try {
                    String B = Thread.currentThread().getName()+"的数据";
                    System.out.println(Thread.currentThread().getName()+"交互前:"+ B);
                    B = exgr.exchange(B);
                    System.out.println(Thread.currentThread().getName()+"交互后:"+ B);
                } catch (InterruptedException e) {
                }
            }
        }), "线程2").start();
    }
}
View Code

5.总结

  Exchange和SynchronousQueue类似,都是通过两个线程操作同一个对象实现数据交换,只不过就像我们开始说的,SynchronousQueue使用的是同一个属性,通过不同的isData来区分,多线程并发时,使用了队列进行排队。Exchange使用了一个对象里的两个属性,item和match,不需要isData 属性了,因为在Exchange里面,没有isData这个语义。而多线程并发时,使用数组来控制,每个线程访问数组中不同的槽。

原文地址:https://www.cnblogs.com/bl123/p/14189113.html