队列同步器的实现

队列同步器主要包括:

  1. 同步队列

  2. 首节点(head)

  3. 尾节点(tail)

  4 独占式获取与释放同步状态

  5. 共享式获取与释放同步状态

  6. 超时获取与释放同步状态

 1.同步队列:

  1.1 一个FIFO双向队列

  1.2 当前线程获取同步状态失败时,会向队列中添加节点(Node)

    节点的组成:

      • 线程的引用(获取同步状态失败的线程) -- Thread thread
      • 等待状态 -- int waitStatus 
          • CANCELLED 值为1.
          • SIGNAL 值为-1
          • CONDITION 值为-2
          • PROPAGATE 值为-3
          • INITIAL 值为0
      • 前驱结点 -- Node prev 前驱节点 当节点加入同步队列时被设置
      • 后继节点 -- Node next 
      •  -- Node nextWaiter

2. 首节点和尾节点的操作逻辑   

2.1 当一个线程成功地获取了同步状态(或者锁),其他的线程将无法获取到同步状态,转而被构造称为节点并加入到同步队列的尾部。同时有多个尾节点插入同步队列是线程不安全的。同步器提供了一个基于CAS的设置尾节点的方法来保证线程安全:compareAndSetTail(Node except, Node update)---它需要传递当前线认为的尾点和当前点,只有置成功后,当前点才正式与之前的尾点建立关联。

 

2.2 首节点是获取同步状态成功的节点,首节点的线程在释放同步状态(或者解锁)时,将会唤醒后继节点,而后继节点会在获取同步状态成功时将自己设置为首节点(head)

 

独占式同步状态获取与放 

 过调用同步器的acquire(int arg)方法可以取同步状方法中断不敏感,也就是由于线取同步状入同步列中,后续对该线行中断操作线程不会从同列中移出 

1 //同步器的acquire方法 
2 public final void acquire(int arg) {
3      if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4          selfInterrupt();
5 }   

 主要逻辑是:首先用自定同步器实现tryAcquire(int arg)方法,方法保证线程安全的取同步状,如果同步状态获取失构造同步点(独占式Node.EXCLUSIVE,同一刻只能有一个线程成功取同步状)并通addWaiter(Node node)方法将该节点加入到同步列的尾部,最后acquireQueued(Node node,int arg)方法,使得该节点以死循的方式取同步状。如果取不到阻塞点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出或阻塞线程被中断来实现 

 1 //同步器的addWaiter和enq方法
 2 private Node addWaiter(Node mode) {
 3     Node node = new Node(Thread.currentThread(), mode);
 4     // 快速尝试在尾部添加
 5     Node pred = tail;
 6     if (pred != null) {
 7         node.prev = pred;
 8         if (compareAndSetTail(pred, node)) {
 9             pred.next = node;
10             return node;
11         }
12     }
13     enq(node);
14     return node;
15 }
16 //如果不存在尾节点,先初始化尾节点(和头节点),然后再设置尾节点
17 private Node enq(final Node node) {
18     for (;;) {
19         Node t = tail;
20         if (t == null) { // Must initialize
21             if (compareAndSetHead(new Node()))
22                 tail = head;
23         } else {
24             node.prev = t;
25             if (compareAndSetTail(t, node)) {
26                 t.next = node;
27                 return t;
28             }
29         }
30     }
31 }

enq(final Node node)方法中,同步器通死循来保证节点的正确添加,在死循环中只有通CAS置成点之后,当前线程才能从方法返回,否,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并添加点的求通CAS变得串行化了。


入同步列之后,就入了一个自旋的程,每个点(或者每个线程)都在自省地察,当条件足,取到了同步状,就可以从个自旋程中退出,否依旧留在这个自旋程中(并会阻塞点的线程),代码如下:

 1 final boolean acquireQueued(final Node node, int arg) {
 2     boolean failed = true;
 3     try {
 4         boolean interrupted = false;
 5         for (;;) {
 6             final Node p = node.predecessor();
 7             if (p == head && tryAcquire(arg)) {
 8                 setHead(node);
 9                 p.next = null; // help GC
10                 failed = false;
11                 return interrupted;
12            }
13             if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
14                 interrupted = true;
15         }
16     } finally {
17         if (failed)
18             cancelAcquire(node);
19      }
20 }

 acquireQueued(final Node node,int arg)方法中,当前线程在死循尝试获取同步状态只有前驱节点是头节点才能够尝试获取同步状态,如图:


独占式同步状态获取流程,也就是acquire(int arg)方法用流程,如5-5所示。



当前线取同步状行了相应逻辑之后,就会调用同步器的release(int arg)方法放同步状
 

 1 //同步器的release方法
 2 public final boolean release(int arg) {
 3     if (tryRelease(arg)) {
 4         Node h = head;
 5         if (h != null && h.waitStatus != 0)
 6             unparkSuccessor(h);
 7         return true;
 8     }
 9     return false;
10 }

 方法,会头节点的后继节线程,unparkSuccessor(Node node)方法使用LockSupport(在后面的章专门)来于等待状线
 

 总结:在取同步状态时,同步器维护一个同步列,取状线程都会被加入到列中并在列中行自旋;移出列(或停止自旋)的条件是前驱节为头节点且成功取了同步状。在放同步状态时,同步器tryRelease(int arg)方法放同步状,然后头节点的后继节点 

4.共享式同步状态获取与

同一刻可以有多个线程同时获取到同步状态。 

 

过调用同步器的acquireShared(int arg)方法可以共享式地取同步状态 

 1 //同步器的acquireShared和doAcquireShared方法
 2 public final void acquireShared(int arg) {
 3     if (tryAcquireShared(arg) < 0)//tryAcquireShared(int arg)方法返回值为int型,当返回大于等于0,表示能够获取到同
步状
4 doAcquireShared(arg); 5 } 6 private void doAcquireShared(int arg) { 7 final Node node = addWaiter(Node.SHARED); 8 boolean failed = true; 9 try { 10 boolean interrupted = false; 11 for (;;) { 12 final Node p = node.predecessor(); 13 if (p == head) { 14 int r = tryAcquireShared(arg); 15 if (r >= 0) { 16 setHeadAndPropagate(node, r); 17 p.next = null; 18 if (interrupted) 19 selfInterrupt(); 20 failed = false; 21 return; 22 } 23 } 24 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 25 interrupted = true; 26 } 27 } finally { 28 if (failed) 29 cancelAcquire(node); 30 } 31 }

与独占式一,共享式取也需要放同步状,通过调releaseShared(int arg)方法可以释放同步状

1 //释放同步状态
2 public final boolean releaseShared(int arg) {
3     if (tryReleaseShared(arg)) {
4         doReleaseShared();
5         return true;
6     }
7     return false;
8 }

tryReleaseShared(int arg)方法必确保同步状(或者源数)线程安全放,一般是通CAS来保的,因为释放同步状的操作会同来自多个线

5.独占式超时获取同步状


过调用同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以响中断和时获取同步状态,即在指定的时间段内取同步状,如果取到同步状态则返回true,否,返回false

 1 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
 2     long lastTime = System.nanoTime();final Node node = addWaiter(Node.EXCLUSIVE);
 3     boolean failed = true;
 4     try {
 5         for (;;) {
 6             final Node p = node.predecessor();
 7             if (p == head && tryAcquire(arg)) {
 8                 setHead(node);
 9                 p.next = null; // help GC
10                 failed = false;
11                 return true;
12             }
13             if (nanosTimeout <= 0)
14                 return false;
15             if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
16                 LockSupport.parkNanos(this, nanosTimeout);
17             long now = System.nanoTime();
18             //计算时间,当前时间now减去睡眠之前的时间lastTime得到已经睡眠
19             //的时间delta,然后被原有超时时间nanosTimeout减去,得到了
20             //还应该睡眠的时间
21             nanosTimeout -= now - lastTime;
22             lastTime = now;
23             if (Thread.interrupted())
24                 throw new InterruptedException();
25         }
26      } finally {
27           if (failed)
28               cancelAcquire(node);
29      }
30 }    

如果当前线取同步状判断是否超nanosTimeout小于等于0表示已),如果没有超,重新算超时间nanosTimeout,然后使当前线程等待nanosTimeout秒(当已到置的超时时间该线程会从LockSupport.parkNanos(Objectblocker,long nanos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold1000秒),将不会使该线行超等待,而是入快速的自旋程。 


6.自定同步——TwinsLock (该工具在同一刻,只允至多两个线程同时访问,超两个线程的访问将被阻塞 )

  步骤:

  1.  重写tryAcquireShared(int args)方法和tryReleaseShared(int args)方法
  2. 义资源数。TwinsLock在同一刻允至多两个线程的同时访问,表明同步源数2这样可以置初始状status2,当一个线取,status1该线放,则status1,状的合法范围为012,其中0表示当前已有两个线取了同步源。

 1 public class TwinsLock implements Lock {
 2     private final Sync sync = new Sync(2);//定义资源数为2
 3     private static final class Sync extends AbstractQueuedSynchronizer {
 4         Sync(int count) {
 5             if (count <= 0) {
 6                 throw new IllegalArgumentException("count must large than zero.");
 7             }
 8             setState(count);//初始化同步状态--同步状态的初始值为同步资源数
 9         }
10         public int tryAcquireShared(int reduceCount) {
11             for (;;) {
12                 int current = getState();
13           int newCount = current - reduceCount;
14                 if (newCount < 0 || compareAndSetState(current, newCount)) {
15                     return newCount;
16                 }
17             }
18         }
19         public boolean tryReleaseShared(int returnCount) {
20             for (;;) {
21                 int current = getState();
22                 int newCount = current + returnCount;
23                 if (compareAndSetState(current, newCount)) {
24                     return true;
25                 }
26             }
27         }
28     }
29     public void lock() {
30         sync.acquireShared(1);
31     }
32     public void unlock() {
33         sync.releaseShared(1);
34     }
35     // 其他接口方法略
36 }   

同步器会先算出取后的同步状,然后通CAS确保状的正确置,当tryAcquireShared(int reduceCount)方法返回大于等于0,当前线程才取同步状态(锁)

测试TwinsLock:

测试用例中,定了工作者线Worker该线程在程中,当之后使当前线程睡眠1秒(并不),随后打印当前线程名称,最后再次睡眠1秒并锁 

 1 //测试TwinsLock
 2 public class TwinsLockTest {
 3   @Test
 4   public void test() {
 5     final Lock lock = new TwinsLock();
 6     class Worker extends Thread {
 7     public void run() {
 8       while (true) {
 9         lock.lock();
10         try {
11           SleepUtils.second(1);
12           System.out.println(Thread.currentThread().getName());
13           SleepUtils.second(1);
14         } finally {
15           lock.unlock();
16         }
17       }
18      }
19     }
20     // 启动10个线程
21     for (int i = 0; i < 10; i++) {
22       Worker w = new Worker();
23       w.setDaemon(true);
24       w.start();
25     }
26     // 每隔1秒换行
27     for (int i = 0; i < 10; i++) {
28       SleepUtils.second(1);
29       System.out.println();
30     }
31   }
32 }
原文地址:https://www.cnblogs.com/jimboi/p/6410517.html