Exchanger源代码剖析

    Exchanger是一个针对线程可以结对交换元素的同步器。每条线程把某个对象作为參数调用exchange方法,与伙伴线程进行匹配。然后再函数返回的时接收伙伴的对象。另外。Exchanger内部实现採用的是无锁算法。可以大大提高多线程竞争下的吞吐量以及性能。

算法实现
    基本方法是维持一个“槽”(slot)。这个槽是保持交换对象的结点的引用,同一时候也是一个等待填满的“洞”(hole)。

假设一个即将到来的“占据”(occupying)线程发现槽为空。然后它就会CAS(compareAndSet)一个结点到这个槽而且等待另外一个线程调用exchange方法。第二个“匹配”(fulfilling)线程发现槽为非空。则CAS它为空,而且通过CAS洞来交换对象,另外假设占据线程被堵塞,则会一并唤醒占据线程。在每一个样例里,CAS都可能因为槽一開始为非空但在CAS的时候为空。或者反之等情况而失败,所以线程须要重试这些动作。
    在仅仅有少量线程使用Exchanger的时候,这个简单的方法效果不错,可是在比較多线程使用同一个Exchanger的时候,因为CAS在同一个槽上竞争,性能就会急剧下降。因此我们使用一个“区域”(arena)。总的来说,就是一个槽数量能够动态变化的哈希表。当中随意一个槽都能够被线程用来交换。

到来的线程就能够用基于它们的线程id的哈希值来选择槽。

假设到来的线程在选择槽上CAS失败来,它就会选择另外一个槽。

类似地,假设一条线程成功CAS进去一个槽。可是没有其他线程到来,它也会尝试另外一个槽,直到第0槽,即使表缩小的时候第0槽也会一直存在。

这个特别的机制例如以下:

等待(Waiting):第0槽特别在于没有竞争的时候它是唯一存在的槽。当单条线程占据了第0槽后,假设没有线程匹配,那么该线程会在短暂的自旋之后堵塞。

在其他情况下,占据线程终于会放弃而且尝试另外的槽。在堵塞(假设是第0槽)或者放弃(其他的槽)或者又一次開始的时候,等待线程都会自旋片刻(比上下文切换时间略微短的一段时间)。除非不大可能有其他线程的存在,否则没有理由让线程堵塞。

为了避免内存竞争,所以竞争者会在静静地轮询一段比堵塞然后唤醒稍短的时间。因为缺少其他线程,非0槽会等待自旋时间结束。大概每次尝试都会浪费一次额外的上下文切换时间,平均依旧比另外的方法(堵塞然后唤醒)快非常多。

改变大小(Sizing):通常,使用少量槽可以降低竞争。

特别地当在少量线程时,使用太多槽会导致和使用太少槽的一样的糟糕性能。还有会导致空间不足的错误。变量“max”维持实际使用的槽的数量。当一条线程发现太多CAS失败的时候会添加“max”(这个类似于常规的基于一个目标加载因子来改变大小的哈希表,在这里不同的是,增长的速度是加一而不是按比例)。

增长须要在每一个槽上三次的失败竞争才会发生。须要多次失败才会增长可以处理这种情况。一些CAS的失败并不是因为竞争,可能在两条线程简单的竞争或者在读取和CAS过程中有线程抢先执行。同一时候,非常短暂的高峰竞争可能会大大高于平均可忍受的程度。

当非0槽等待超时没有被p匹配的时候,就会尝试降低最大槽数量(max)限制。线程经历了超时等待会移动到更加接近第0槽。所以即使因为不活跃导致表大小缩减,但终于也会发现存在(或者未来)的线程。这个增长和缩减的选择机制和阀值从本质上讲都会在交换代码里卷入索引和哈希。并且无法非常好地抽象出去。

哈希(Hashing):每条线程都会选择与简单的哈希码一直的初始槽来使用。

对于随意指定线程。每次相遇的顺序都是同样的。但实际上对于线程是随机的。

使用区域会遇到经典的哈希表的成本与质量权衡问题(cost vs quality tradeoffs)。

这里,我们使用基于当前线程的Thread.getId()返回值的one-step FNV-1a哈希值,还加上一个低廉的近似模数(mod)操作去选择一个索引。

以这种方式来优化索引选择的缺陷是须要硬编码去使用一个最大为32的最大表大小。可是这个值足以超过已知的平台。

探查(Probing):在侦查到已选的槽的竞争后,我们会按顺序探查整个表。类似与哈希表在冲突中的线性探查。

(循环地移动,依照相反的顺序,能够最好地配合表增长和缩减规则——表的增长和缩减都是从尾部開始,头部0槽保持不变)除了为了最小化错报和缓存失效的影响,我们会对第一个选择的槽进行两次探查。

填充(Padding):即使有了竞争管理,槽还是会被严重竞争,所以利用缓存填充(cache-jpadding)去避免糟糕的内存性能。因为这样。槽仅仅有在使用的时候延迟构造,避免浪费不必要的空间。当内存地址不是程序的优先问题的时候,随着时间消逝,垃圾回收器运行压缩,槽很可能会被移动到互相联结,除非使用了填充,否则会导致大量在多个内核上的快速缓存行无效。



    算法实现主要为了优化高竞争条件下的吞吐量,所以添加了较多的特性来避免各种问题,初始看上去较为复杂,因此建议先大致看一下流程,然后再看看源代码实现。再反过来看会有更加深刻的理解。



源代码实现
    Exchanger主要目的是不同线程间交换对象,因此exchange方法是Exchanger唯一的public方法。exchange方法有两个版本号。一个是仅仅抛出InterruptedException异常的无超时版本号,一个是抛出InterruptedException, TimeoutException的有超时版本号。

先来看看无超时版本号的实现

    public V exchange(V x) throws InterruptedException {
        if (!Thread.interrupted()) {
            Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
            if (v == NULL_ITEM)
                return null;
            if (v != CANCEL)
                return (V)v;
            Thread.interrupted(); // Clear interrupt status on IE throw
        }
        throw new InterruptedException();
    }
    函数首先推断当前线程是否已经被中断,假设是则抛出IE异常,否则调用doExchange函数,调用函数之前。为了防止传入交换对象的參数x为null,因此会当null时会传入NULL_ITEM,一个提前定义的作为标识的Object作为參数,另外,依据doExchange返回的对象来推断槽中的对象为null或者当前操作被中断,假设被中断则doExchange返回CANCEL对象,这样exchange就会抛出IE异常。


    private static final Object CANCEL = new Object();
    private static final Object NULL_ITEM = new Object();
    我们再来看看doExchange方法的实现。
    private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);                 // Create in case occupying
        int index = hashIndex();                  // Index of current slot
        int fails = 0;                            // Number of CAS failures


        for (;;) {
            Object y;                             // Contents of current slot
            Slot slot = arena[index];
            if (slot == null)                     // Lazily initialize slots
                createSlot(index);                // Continue loop to reread
            else if ((y = slot.get()) != null &&  // Try to fulfill
                     slot.compareAndSet(y, null)) {
                Node you = (Node)y;               // Transfer item
                if (you.compareAndSet(null, item)) {
                    LockSupport.unpark(you.waiter);
                    return you.item;
                }                                 // Else cancelled; continue
            }
            else if (y == null &&                 // Try to occupy
                     slot.compareAndSet(null, me)) {
                if (index == 0)                   // Blocking wait for slot 0
                    return timed ?
                        awaitNanos(me, slot, nanos) :
                        await(me, slot);
                Object v = spinWait(me, slot);    // Spin wait for non-0
                if (v != CANCEL)
                    return v;
                me = new Node(item);              // Throw away cancelled node
                int m = max.get();
                if (m > (index >>>= 1))           // Decrease index
                    max.compareAndSet(m, m - 1);  // Maybe shrink table
            }
            else if (++fails > 1) {               // Allow 2 fails on 1st slot
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                    index = m + 1;                // Grow on 3rd failed slot
                else if (--index < 0)
                    index = m;                    // Circularly traverse
            }
        }
    }
    函数首先利用当前要交换对象作为參数构造Node变量me,类Node定义例如以下
    private static final class Node extends AtomicReference<Object> {
        public final Object item;
        public volatile Thread waiter;


        public Node(Object item) {
            this.item = item;
        }
    }
    内部类Node继承于AtomicReference,而且内部拥有两个成员对象item,waiter。假设线程1和线程2须要进行对象交换,类Node把线程1中须要交换的对象作为參数传递给Node构造函数,然后线程2假设在槽中发现此Node,则会利用CAS把当前原子引用从null变为须要交换的item对象,然后返回Node的成员变量item对象。构造Node的线程1调用get()方法发现原子引用非null的时候,就返回此对象。这样线程1和线程2就顺利交换对象。类Node的成员变量waiter一般在线程1假设须要堵塞和唤醒的情况下使用。
    我们顺便看看槽Slot以及其相关变量的定义
    private static final int CAPACITY = 32;

    private static final class Slot extends AtomicReference<Object> {
        // Improve likelihood of isolation on <= 128 byte cache lines.
        // We used to target 64 byte cache lines, but some x86s (including
        // i7 under some BIOSes) actually use 128 byte cache lines.
        long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
    }

    private volatile Slot[] arena = new Slot[CAPACITY];

    private final AtomicInteger max = new AtomicInteger();
    内部类Slot也是继承于AtomicReference,其内部变量一共定义了15个long型成员变量。这15个long成员变量的作用就是缓存填充(cache padding)。这样能够避免在大量CAS的时候减轻cache的影响。arena定义为大小为CAPACITY的数组,而max就是arena实际使用的数组大小,一般max会依据情况进行增长或者缩减。这样避免同一时候对一个槽进行CAS带来的性能下降影响。

    我们看回doExchange函数。函数接着调用hashIndex依据线程Id获取相应槽的索引。
   private final int hashIndex() {
        long id = Thread.currentThread().getId();
        int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;


        int m = max.get();
        int nbits = (((0xfffffc00  >> m) & 4) | // Compute ceil(log2(m+1))
                     ((0x000001f8 >>> m) & 2) | // The constants hold
                     ((0xffff00f2 >>> m) & 1)); // a lookup table
        int index;
        while ((index = hash & ((1 << nbits) - 1)) > m)       // May retry on
            hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
        return index;
    }
    hashIndex主要依据当前线程的id依据one-step FNV-1a的算出相应的哈希值。而且利用一个高速的模数估算来把哈希值限制在[0, max)之间(max是槽实际使用大小)。详细实现涉及各种运算。有兴趣能够自行研究。此处略去。



    doExchange函数接着会进入一个循环中,循环内部便是真正的算法逻辑,一共同拥有4个推断,每一个推断完之后假设没有返回再须要再次又一次推断。

首先从arena获取当前选中的Slot。因为hashIndex保证小于max值。因此不会数组越界。

我们来看第一个推断,当第一次使用Slot的时候,该Slot为null,因此调用createSlot进行初始化。


    private void createSlot(int index) {
        Slot newSlot = new Slot();
        Slot[] a = arena;
        synchronized (a) {
            if (a[index] == null)
                a[index] = newSlot;
        }
    }
    createSlot的实现非常easy,仅仅是依据index參数把数组中的相应位置加入引用。

但要注意并发问题,因此在给数组赋值的时候还要利用synchronizedkeyword进行同步。


    接着看回doExchange循环。来看看第二个推断,假设选择的slot已经初始化,则调用当前slot.get()方法尝试获取Node节点,假设当前Node节点非null。则表明之前已有线程占据此Slot,则此时继续尝试CAS此slot为null,假设成功。则表示当前线程已经和此前的占据线程进行了匹配,接下来则CAS替换Node的原子引用为交换对象item,然后唤醒Node的占据线程waiter,接着返回Node.item完毕了交换。


    第三个推断中,假设获取槽中的Node为null。则表明选中的槽没有被占据,于是CAS把当前槽从null变为一開始以交换对象item构造的Node结点me,假设CAS成功,则要依照选择的槽索引分为两种处理,首先对于第0槽,须要进行堵塞等待。因为我们这里是非超时等待,因此调用await函数。

    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    private static final int SPINS = (NCPU == 1) ?

0 : 2000; private static Object await(Node node, Slot slot) { Thread w = Thread.currentThread(); int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) // Spin-wait phase --spins; else if (node.waiter == null) // Set up to block next node.waiter = w; else if (w.isInterrupted()) // Abort on interrupt tryCancel(node, slot); else // Block LockSupport.park(node); } }

    首先看看SPINS变量的定义,SPINS表示的是在堵塞或者等待匹配中超时放弃前须要自旋轮询变量的次数,在当仅仅有单个CPU时为0,否则为2000。SPINS在多核CPU上可以在交换中,假设当中一条线程因为GC或者被抢占等原因暂停时,可以仅仅等待短暂的轮询后就可以又一次进行交换操作。来看看await的实现,相同在循环里有四个推断:
    第一个推断。调用Node的get方法,假设非null。则证明已经有线程成功交换对象又或者由于线程中断被取消了此次等待,因此直接返回对象v。
    第二个推断,则get方法返回null。则要进行自旋等待。自旋的值是依据SPINS来决定。
    第三个推断,此时自旋已经完结,因此须要进入堵塞状态,堵塞之前。首先把node.waiter赋值为当前线程,这样等后面有线程进行交换的时候能够唤醒此线程;
    第四个推断,在最后进入堵塞前,假设发现当前线程已经被中断,则须要调用tryCancel取消此次等待
    最后。调用LockSupport.park进入堵塞。


    private static boolean tryCancel(Node node, Slot slot) {
        if (!node.compareAndSet(null, CANCEL))
            return false;
        if (slot.get() == node) // pre-check to minimize contention
            slot.compareAndSet(node, null);
        return true;
    }
    tryCancel的实现非常easy,首先须要CAS把当前结点的原子引用从null变为CANCEL对象,假设CAS失败。则有可能已经有线程顺利与当前结点进行匹配,而且调用CAS进行了交换。否则的话,再调用CAS把node所在的slot改动为null。

假设这里CAS成功,则CANCEL对象会被返回到exchange方法里。让exchange方法推断后。抛出InterruptedException异常。

    接着我们看回doExchange第三个推断。假设选择的是非0槽。则会调用spinWait进行自旋等待。

    private static Object spinWait(Node node, Slot slot) {
        int spins = SPINS;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            else if (spins > 0)
                --spins;
            else
                tryCancel(node, slot);
        }
    }
    spinWait的实现与await类似,但稍有不同,主要逻辑是假设经过SPINS次自旋以后。仍然无法被匹配。则会调用tryCancel把当前结点调用tryCancel取消,这样返回doExchange的时候,假设发现当前结点已经被取消,则又一次构造一个新结点Node。而且把index的值右移一位(即整除2),另外此处还须要考虑把槽的数量降低,于是推断假设max的值比整除后的index要大。则通过CAS把max值减去一。



    doExchange的第四个推断里。假设前三个推断都失败。则表明CAS失败,CAS的失败有可能仅仅是由于两条线程之间的竞争。也有可能大量线程的并发。因此我们先把fails值加一记录此次的失败,然后继续循环前面的推断。假设连续两次都失败,则大量线程并发的可能性较大,此时假设失败次数大于3次。而且max仍然小于FULL(定义max的最大值),则尝试CAS把max添加1,假设成功的话,则把index赋值为m+1。下次选择的槽则为新分配的索引。假设失败次数还不够3次,则把当前索引减去一,循环遍历整个Slot表。

    于是doExchange大致逻辑便是如此,exchange的超时版本号大体逻辑类似,在调用doExchange传入相应超时參数,这样在第0槽须要等待的时候会调用另外的函数awaitNanos。

   private Object awaitNanos(Node node, Slot slot, long nanos) {
        int spins = TIMED_SPINS;
        long lastTime = 0;
        Thread w = null;
        for (;;) {
            Object v = node.get();
            if (v != null)
                return v;
            long now = System.nanoTime();
            if (w == null)
                w = Thread.currentThread();
            else
                nanos -= now - lastTime;
            lastTime = now;
            if (nanos > 0) {
                if (spins > 0)
                    --spins;
                else if (node.waiter == null)
                    node.waiter = w;
                else if (w.isInterrupted())
                    tryCancel(node, slot);
                else
                    LockSupport.parkNanos(node, nanos);
            }
            else if (tryCancel(node, slot) && !w.isInterrupted())
                return scanOnTimeout(node);
        }
    }
    awaitNanos大体逻辑基本与await同样,但加入了一些关于超时推断的逻辑。当中最基本的是在超时之后,会尝试调用scanOnTimeout函数。
    private Object scanOnTimeout(Node node) {
        Object y;
        for (int j = arena.length - 1; j >= 0; --j) {
            Slot slot = arena[j];
            if (slot != null) {
                while ((y = slot.get()) != null) {
                    if (slot.compareAndSet(y, null)) {
                        Node you = (Node)y;
                        if (you.compareAndSet(null, node.item)) {
                            LockSupport.unpark(you.waiter);
                            return you.item;
                        }
                    }
                }
            }
        }
        return CANCEL;
    }
    scanOnTimeout把整个槽表都扫描一次。假设发现有线程在另外的槽位中,则进行CAS交换。这样就能够降低超时的可能性。

注意CAS替换的是node.item,并非get()方法返回的先前在tryCancel中被CAS掉的原子引用。

总结
    Exchanger使用了无锁算法,使用了一个能够在多线程下两组线程相互交换对象引用的同步器。该同步器在激烈竞争的环境下,做了大量的优化,并在对于CAS的内存竞争也採用了padding来避免cache带来的影响。当中的无锁算法以及其优化值得细致品味和理解。

原文地址:https://www.cnblogs.com/yjbjingcha/p/6918961.html