SynchronousQueue类

使用示例:

public static void main(String[] args) {
    final SynchronousQueue<String> q = new SynchronousQueue<String>();
    //put线程
    class Putter implements Runnable {
        String title;
        public Putter(String title) {
            this.title = title;
        }
        @Override
        public void run() {
            try {
                q.put(this.title);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }  
    new Thread(new Putter("h1")).start();
    new Thread(new Putter("h2")).start();
    
    //take线程
    class Taker implements Runnable {
        @Override
        public void run() {
            String take;
            try {
                take = q.take();
                System.out.printf("%s take %s
", Thread.currentThread().getName(), take);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }  
    
    new Thread(new Taker()).start();
    new Thread(new Taker()).start();
}

 SynchronousQueue的put和take是阻塞的,一个线程put,然后阻塞,等待另一个线程take;或者一个线程take,然后阻塞,等待另一个线程put。

阻塞线程:

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    long lastTime = timed ? System.nanoTime() : 0;
    Thread w = Thread.currentThread();
    SNode h = head;
    //自旋次数
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            long now = System.nanoTime();
            nanos -= now - lastTime;
            lastTime = now;
            if (nanos <= 0) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
            //自旋结束后,设置waiter
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            //阻塞线程
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

 唤醒线程:

boolean tryMatch(SNode s) {
    if (match == null &&
        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // waiters need at most one unpark
            waiter = null;
            //唤醒线程
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;
}

Executors.newCachedThreadPool使用的队列是SynchronousQueue,入队使用offer,出队使用poll。

put和offer的区别:put会阻塞,而offer不会,不满足入队条件即返回。

原文地址:https://www.cnblogs.com/allenwas3/p/8289261.html