AbstractQueuedSynchronizer、ReentrantLock源码分析——从未曾了解到精通原理

在这里分享下对AbstractQueuedSynchronizer原理和源码的学习资料,对于核心思想这里介绍的比较简单,建议大家还是网上搜下aqs的论文,先看论文,然后再看java concurrents包里的源码,会理解的更加透彻。

一、核心思想:

  1、  同步状态的原子操作

  2、  线程的阻塞和唤醒

  3、  维护一个队列,出入队列都是原子操作

二、 基本操作:

  1、  入队列:

    a) 添加到chl队列,并且设置其前任节点的singal状态,并阻塞

  2、  出队列:

    a) 队列的head node 才能获得执行权限

三、非公平的获得锁的设计思路:

nonfirelock
if(!tryAcquire){

         node = create a new node and add to the chl queue

         pre = node’s precedueer

         while(pre != head || !tryAcquire(args)){

                   if(pre’s signal bit is true){

                            park this thread;

} else {

         set the pre’s singal bit is true

}

 

pre = node’s precedueer

}

 

head = node;

}

四、释放锁的思路

free the lock
if(tryRelease(args) && head node’s singal is set){

         compare and set its singal to false;

         unpark its successor if exsits;

}

五、整体的设计思路:模版方法的完美体现,其主要的逻辑都已经实现,比如:添加等待节点、原子操作插入节点、节点入阻塞队列、阻塞前的检查等,而把tryAcquire()方法交给其子类自己去实现;实现了高可扩展性

ReentrantLock的对象调用lock()方法后

  1、 根据其锁的性质(公平和非公平),调用不同的lock()方法

    a) 公平锁:直接调用AbstractQueuedSynchronizer的acquire(1)方法

    b) 非公平锁:先竞争锁,如果获得锁,则设置互斥锁的拥有线程(ExclusiveOwnerThread)为当前线程、AbstractQueuedSynchronizer的state为1;否则请求AbstractQueuedSynchronizer的acquire(1)方法

  2、  AbstractQueuedSynchronizer的acquire(int args)方法分为下面几步

    a) 应该尝试获得锁,调用子类具体的boolean tryAcquire(args)方法

                         i. 公平锁:先判断state? 0(则判断头结点的线程是否是当前线程,是则能获得锁,设置本线程拥有互斥锁,state = args,返回true):(判断互斥锁的拥有线程是否是当前线程,是,则重入该所,state++,返回true);最后返回false

                       ii. 非公平锁:state?0(判断拥有互斥锁的线程是否是当前线程,是,重入锁,state++,返回true):(返回false)

    b) 如果上一步返回true,说明此线程获得锁成功;否则应该添加到锁队列(CLH改进队列)中去,并且阻塞,直到被唤醒

                         i. AbstractQueuedSynchronizer的addWaiter(Node)方法添加一个新的节点至锁队列的尾部,并且返回这个节点:首先根据Node.Exclusive和当前Thread构造一个节点t,如果tail节点p 为non-null,则把新节点添加到队尾t.pred = p;comparseAndSetTail(p,t)成功,则t.pred = p;返回p;否则(加上p为null一起控制),调用enq(Node)方法,原子的将节点添加到队尾,返回t,然后判定tail t是否为空,

            a) 如果为空,则创建head h;h.next = node;node.pred = h;compareAndSetHead(h)成功则将tail = node;return h;

            b) 不为空,t.next = node; compareAndSetTail(t,node)成功,则node.pred = t,返回t

            c) 如果没有返回则无限循环直到return

                       ii. 对得到的前任节点进行入队列阻塞操作boolean queueAcquire(Node,args):

      1. 先判定p == head && tryAcquire(args)是否为true,是:则setHead(p.next){head = p.next; head.pred = null; head.thread = null},setExclusiveOwnerThread(Thread.currentThread());setSate(args);
      2. 否则设置pred节点p的状态位signal,然后阻塞该线程

            a) 修改状态位shouldParkAfterFailedAcquire(p,node):判断s = p.waitStatus是否<0,是直接return true;

            b) s>0;标识着p节点已经被取消,则将 node.pred = p = p.pred,直到p.waitStatus > 0为止;s=0;原子操作compareAndSetWaitStatus(p,0,Node.signal),;最后返回false

          1. 用boolean parkFailedAcquire()阻塞线程:LockSupport.park(Thread. currentThread();return Thread.interruptted(用来判断是否由于取消而中断的);
          2. 被唤醒后,执行interruptted = true
          3. 无限循环直到return interruptted;

                      iii. 如果线程!tryAcquire(args) && queueAcquire(Node,args),表明线程是被中断操作唤醒,执行selfInterrupt(){Thread.currentThread().interrupt()}

调用unlock()方法后,执行的是AbstractQueuedSynchronizer的release(1)和tryRelease(int)

  1、 首先release(int)要判断是否能够释放锁 boolean tryRelease(int)

    a) 首先消减锁的重入次数c = getState() – args;如果c == 0成立,setExclusiveOwnerThread(null);setState(c);返回true,否则setState(c);return false;

    b) 如果成功的释放锁,则判断head != null && head.waitStatus != 0是否成立,成立,则唤醒继任者unparkPreducesser(head);

                         i. 首先(ws = head.waitStatus) < 0:compareAndSetWaitStatus(head,ws,0);

                       ii. ws > 0:标识哑节点取消了;则从tail往前找,直到head,将waitStatus<0的节点赋值给next

                      iii. LockSupport.unpark(next.thread);

ConditionObject则指出java风格的await()和signal:

         在其内部有一个单向的等待队列

    调用await()方法时:

    1、先判断线程是否被终止,是则抛出InterrupttedException

    2、Node s = addCondtionWaiter();给等待队列添加一个新的节点

      a) 首先判断firstWaiter.waitStatus == Node.CANCELLED;是则表明头节点取消,调用unlinkCancelledWaiter()方法去掉队列中的取消节点

                         i. 对单向队列遍历,依次判断节点的waitStatus

      b) 否则,构造一个Node s = new Node(Thread.currentThread(),Node.CONDITION);如果头节点为null,则firstWaiter = s;否则firstWaiter.next = s;最后lastWaiter = s;return s;

    3、释放锁int saveState = fullyRelease();调用release(getState());

    4、循环判断此节点是否在锁队列中!isSyncQueue(s):

      a) 如果s.waitStatus == Node.CONDITION || s.pred == null,表明此节点没有在锁队列中,return false;

      b) 如果s.pred != null; return true;

      c) 最后从tail到head依次寻找,是否有node.thread == Thread.currentThread();有则return true;否则return false;

    5、如果不在sync队列中,则Locksupport.park(Thread.currentThread());如果线程被唤醒,则if(interruptMode = checkInterruptWhileWaiting(s) != 0) break;

      a) Thread.interruptted?(transforCancelledWaiter(s)?THROW_IE:REINTERRUPT):0根据线程是否被中断来判断决定后面的操作

      b) transforCancelledWaiter(s):

                         i. 如果compareAndSetStatus(p,Node.CONDITION,0)成功,表明取消操作在唤醒之前,则此线程要重新获锁,并且抛出IE异常,因此enq(s);return THROW_IE;

                       ii. 否则表明取消操作在唤醒之后,while(!isSyncQueue()) Thread.yield();如果节点还没进入锁队列,等待,return REINTERRUPT;

    6、if(queueAcquire(s,saveState) && interruptMode!=THROW_IE) interruptMode = REINTERRUPT;如果线程被中断则queueAcquire()会返回false;

    7、判断s.nextWaiter != null 成立,则unlinkCancelledWaiter();

    8、如果interruptMode != 0;根据不同的interruptMode进行不同的处理reportInterrupt(interruptMode);

      a) 抛出异常或者是selfInterrupt();

条件的signal():将firstWaiter移入锁队列

    1、判断firstWaiter != null; doSignal (firstWaiter);

    2、doSignal (firstWaiter):如果tranferForSignal(first)失败,则移动下一个节点,直到下个节点为null

    3、boolean tranferForSignal(first)要和取消操作进行判定

      a) !compareAndSetStatus(first,Node.CONDITON,0);说明此节点waitStatus已经被设置为CANCELLED,所以不能移入锁队列,return false;

      b) 否则Node p = enq(first);如果p.waitStatus > 0 || !compareAndSetStatus(p,0,Node.SIGNAL) 成立,说明此节点原子操作插入锁队列失败唤醒此节点的线程LockSupport.unpark(first.thread);此后,方法await()中会执行进一步判定isSyncQueue()为true,从而调用queueAcquire()去获得锁

原文地址:https://www.cnblogs.com/uttu/p/2909059.html