AQS学习

1.大纲

  aqs的思路

  为什么要学aqs

  aqs的作用

  aqs的重要性

  aqs的原理

  应用实例,源码解析

  aqs实现自己的门闩

一:AQS的思路

1.

  先从应用层面理解为什么需要他,如何使用

  了解使用场景

  再去分析它的结构

二:为什么要学习

1.锁与协作类的共同点

  闸门

2.协作同步功能

  类似的还有CountDownLatch

  他们的底层都有一个共同的基类,就是AQS

三:为什么要学AQS

1.

  很多工作都是类似的,如果能提起一个工具类,对于一些类而言,就可以屏蔽很多细节,只要关注业务逻辑了

  

四:AQS的重要性

1.Semaphore与AQS的关系

  Semaphore内部有一个Sync类,Sync类继承了AQS

  

2.CountDownLatch与AQS的关系

  

3.ReenTractLock与AQS

  

4.AQS的作用

  是一个用于构建锁,同步器,协作工具类的工具类。有了AQS,很多协作工具类都可以被方便的写出来

五:AQS原理

1.核心三大部分

  state

  控制线程抢锁和配合的FIFO队列

  协作工具类去实现的获取与释放的重要方法

2.state

  根据具体的实现类的不同而不同,例如在信号量中,表示剩余的许可证的数量,而countDownLatch里,它表示还需要倒数的数量

  state是volatile修饰的,会被并发的修改,所以都需要保证线程安全。getState,setState,compareAndSetState操作读取更新,都是依赖于atomic的支持。

  其中,在AbstractQueueSynchronizer中的方法:

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

  可以发现,底层是保证线程安全的。

  在ReentractLock中,state是锁的占有情况,包括可重入计数,当state是0的时候,表示lock不被任何线程占有

  

2.FIFO队列

  这个队列是存在等待的线程,AQS就是排队管理器。

  当多个线程用同一个锁时,必须有排队机制将没能拿到锁的线程串在一起。当锁释放的时候,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁

  是一个双向队列

3.需要实现的释放获取的方法

  获取方法:

    会依赖state变量,经常会阻塞

    在Semaphore中,获取就是acquire方法,作用是获取许可证

    在CountDownLatch中,获取就是await方法,作用是等待,知道结束

  释放方法:

    释放不会阻塞

  

4.需要重写tryAcquire和tryRelease方法

5.Aqs用法

  

六:AQS在CountDownLatch中的应用

1.构造函数

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

  然后,进入Sync:

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

  然后进入的是aqs的setState方法:

protected final void setState(int newState) {
        state = newState;
    }

  

2.getCount方法

public long getCount() {
        return sync.getCount();
    }

  进入getCount:

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

  进入aqs中:

    /**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

  

3.await方法

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

  然后进入acquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

  然后,进入tryAcquireShared方法,在CountDownLatch里已经实现了:

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

  当不等于0的时候,表示需要进行等待,具体的doAcquireSharedInterruptibly,在aqs中:

  /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  这个方法是入队列进行等待,然后进行阻塞。

  先对当前的线程包装成Node节点,如下:

    

  阻塞是parkAndCheckInterrupt方法做的,进入看一下源码:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

  再进入park方法:

public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

  在上面可以知道UNSAFE.park是一个native方法,就是讲当前线程进行挂起。

  总结:

  doAcquireSharedInterruptibly就是讲当前的线程进行挂起

4.countDown方法

public void countDown() {
        sync.releaseShared(1);
    }

  进入releaseShared方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

  分析tryReleaseShared方法

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

  返回false,表示这次不需要进行释放,已经被释放过了。进行state-1,使用cas进行更新;如果不成功,再进行for循环,进行更新,一旦等于0,则返回true

  然后,在返回true的时候,会进行doReleaseShared方法,这个方法是唤醒等待的线程

七::AQS在Semaphore中的应用

1.state

  表示许可证的剩余数量

2.acquire方法

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

  进入acqiureSharedInterruptibly:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

  发现和上面的countDownLatch使用的一样

  针对参数不同,有公平与不公平两种方式:

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

  这个是不公平的方式,进入nonfairTryAcquireShared

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

  获取当前可用许可证,然后进行计算。如果小于0,则返回一个负数,外面的方法就是进行等待阻塞;如果不小于0,则使用cas将剩余的许可证给设置进去,如果成功,同时返回一个正数,说明有可用的许可证;如果cas失败,则新一轮的循环

八:AQS在ReenTrantLock中的应用

1.unlock方法

public void unlock() {
        sync.release(1);
    }

  进入release方法

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  进入tryRelease方法:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

  如果,当前线程没有持有锁,则抛出异常

  计算一个c,其中getState是已经重入的次数

  如果不等于0,则将c设置

  如果等于0,则要释放锁,让free为true,同时,将当前的线程不再持有锁,null即可

  

  再回到上面的代码。

  unparkSuccessor方法,后面的节点会被唤醒

2.lock方法

public void lock() {
        sync.lock();
    }

  然后进行lock

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    /**
     * Acquires the lock.
     *
     * <p>Acquires the lock if it is not held by another thread and returns
     * immediately, setting the lock hold count to one.
     *
     * <p>If the current thread already holds the lock then the hold
     * count is incremented by one and the method returns immediately.
     *
     * <p>If the lock is held by another thread then the
     * current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired,
     * at which time the lock hold count is set to one.
     */
    public void lock() {
        sync.lock();
    }

  因为有公平与非公平的不同实现方式,具体是那一个,可以看到上面有一个sync的判断

  先看不公平的实现:

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

  进行cas操作,如果是0,表示没有锁,将当前的线程进行加锁

  如果失败,则进入else:

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  然后看非公平的tryAcquire

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

  在sync中,看nobfairTryAcquire:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  如果是0,表示没有线程持有锁,则加锁就行

  否则,如果线程恰好是这个锁的持有者,就是一个重入的操作,在当前的基础上加上acquire,如果小于0,表示溢出了。不然就setState。

·  再继续,如果又不是当前持有的锁,返回false。

  所以,返回上一层,tryAcquire表示获取锁失败,因为是取非,则执行acquireQueued,当前的线程被包装,放入等待队列进行等待

九:实现一个自己的门闩

1.程序

package com.jun.juc.aqs;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * 使用aqs实现一个简单的门闩
 */
public class OneShotLatch {
    // 不知道是使用独占的还是共享的,所以,不强制重写
    private class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected int tryAcquireShared(int arg) {
            return (getState()==1) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    /**
     * 等待
     */
    public void await(){
        sync.acquireShared(0);
    }

    public void signal(){
        sync.releaseShared(0);
    }

    public static void main(String[] args) throws Exception{
        OneShotLatch oneShotLatch = new OneShotLatch();
        for (int i=0; i<10; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("尝试获取");
                    oneShotLatch.await();
                    System.out.println("门闩开了");
                }
            }).start();
        }
        Thread.sleep(5000);
        oneShotLatch.signal();
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("尝试获取");
                oneShotLatch.await();
                System.out.println("门闩开了");
            }
        }).start();
    }
}

  效果:

Connected to the target VM, address: '127.0.0.1:64474', transport: 'socket'
尝试获取
尝试获取
尝试获取
尝试获取
尝试获取
尝试获取
尝试获取
尝试获取
尝试获取
尝试获取
Disconnected from the target VM, address: '127.0.0.1:64474', transport: 'socket'
门闩开了
门闩开了
门闩开了
门闩开了
门闩开了
门闩开了
门闩开了
门闩开了
门闩开了
门闩开了
尝试获取
门闩开了

Process finished with exit code 0

  

原文地址:https://www.cnblogs.com/juncaoit/p/13227944.html