CountDownLatch

1:CountDownLatch

CountDownLatch countDownLatch = new CountDownLatch(1);

new Thread(()->{
  try {
      Thread.sleep(10000L);
  } catch (InterruptedException e) {
      e.printStackTrace();
  }
  countDownLatch.countDown();
}).start();

基于AQS实现的功能。

1.1 初始化

设置CountDownLatch的数量,本质上就是设置state的值,state是volatile关键字修饰的字段。

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
  this.sync = new Sync(count);
}
Sync(int count) {
  setState(count);
}

1.2 countDown()方法

进行释放功能

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
  }
  return false;
}
1.2.1 tryReleaseShared(arg)

当调用一次CountLatch(1),state值会减1。当state的值为0的时候,返回true。

protected boolean tryReleaseShared(int releases) {
  // Decrement count; signal when transition to zero
  for (;;) {
      int c = getState();
      if (c == 0)
          return false;
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
          return nextc == 0;
  }
}
1.2.2 doReleaseShared();

当countDownLatch的state值降为0,需要唤醒await线程,阻塞线程也是一个双向链表。该方法每一次调用都会unparkSuccessor(h),唤醒头结点后面节点的阻塞。

如果多个线程被阻塞,唤醒流程在后续进行分析。

private void doReleaseShared() {
  for (;;) {
      Node h = head;
      if (h != null && h != tail) {
          int ws = h.waitStatus;
          if (ws == Node.SIGNAL) {
              if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                  continue;           // loop to recheck cases
              unparkSuccessor(h);
          }
          else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
              continue;               // loop on failed CAS
      }
      if (h == head)                   // loop if head changed
          break;
  }
}
1.2.3 unparkSuccessor(h);

进行链表首节点进行释放。

private void unparkSuccessor(Node node) {
  /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
  int ws = node.waitStatus;
  if (ws < 0)
      compareAndSetWaitStatus(node, ws, 0);

  /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
          if (t.waitStatus <= 0)
              s = t;
  }
  if (s != null)
      LockSupport.unpark(s.thread);
}
 

1.3 await()方法

如果tryAcquireShared(arg) < 0,标识state 不等于0,需要进行阻塞。

public final void acquireSharedInterruptibly(int arg)
      throws InterruptedException {
  if (Thread.interrupted())
      throw new InterruptedException();
  if (tryAcquireShared(arg) < 0)
      doAcquireSharedInterruptibly(arg);
}
1.3.1 doAcquireSharedInterruptibly(arg)
private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedException {
  //当前节点添加列表
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
      for (;;) {
      //获取当前节点的前一个节点
          final Node p = node.predecessor();
          if (p == head) {
          //获取state值是否为0,等于0则r=1
              int r = tryAcquireShared(arg);
              if (r >= 0) {
              //关键操作
                  setHeadAndPropagate(node, r);
                  p.next = null; // help GC
                  failed = false;
                  return;
              }
          }
          //前面已写。设置pre的wait_status值,当前线程进行park
          if (shouldParkAfterFailedAcquire(p, node) &&
              parkAndCheckInterrupt())
              throw new InterruptedException();
      }
  } finally {
      if (failed)
          cancelAcquire(node);
  }
}
1.3.2 setHeadAndPropagate(node, r)

进行头结点设置,并唤醒下一个被阻塞的wait。

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; // Record old head for check below
  setHead(node);
   
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
      Node s = node.next;
      if (s == null || s.isShared())
          doReleaseShared();
  }
}
唤醒机制:先唤醒一个线程(第一个阻塞的线程) 然后被唤醒的线程又会执行到这里唤醒线程,如此重复下去 最终所有线程都会被唤醒,其实这也是AQS共享锁的唤醒原理。

2:CyclicBarrier

使用代码:

public static void main(String[] args) throws InterruptedException {
      CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
          @Override
          public void run() {
              System.out.println("线程组执行结束");
          }
      });
      for (int i = 0; i < 5; i++) {
          new Thread(new readNum(i,cyclicBarrier)).start();
      }
//       CyclicBarrier 可以重复利用,
//         这个是CountDownLatch做不到的
      for (int i = 11; i < 16; i++) {
          new Thread(new readNum(i,cyclicBarrier)).start();
      }
  }
  static class readNum implements Runnable{
      private int id;
      private CyclicBarrier cyc;
      public readNum(int id,CyclicBarrier cyc){
          this.id = id;
          this.cyc = cyc;
      }
      @Override
      public void run() {
          synchronized (this){
              System.out.println("id:"+id);
              try {
                  cyc.await();
                  System.out.println("线程组任务" + id + "结束,其他任务继续");
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
  }

2.1 await()方法

public int await() throws InterruptedException, BrokenBarrierException {
  try {
      return dowait(false, 0L);
  } catch (TimeoutException toe) {
      throw new Error(toe); // cannot happen
  }
}
2.2 dowait(boolean timed, long nanos)方法

当前功能主要就是该方法实现:

private int dowait(boolean timed, long nanos)
  throws InterruptedException, BrokenBarrierException,
          TimeoutException {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
      final Generation g = generation;

      if (g.broken)
          throw new BrokenBarrierException();

      if (Thread.interrupted()) {
          breakBarrier();
          throw new InterruptedException();
      }
//线程运行到此会减count值;
      int index = --count;
      //如果所有线程已执行await(),方法会执行线程。
      if (index == 0) { // tripped
          boolean ranAction = false;
          try {
              final Runnable command = barrierCommand;
              if (command != null)
                  command.run(); //没有start,而是使用当前线程执行方法,不用新创建线程
              ranAction = true;
              nextGeneration();
              return 0;
          } finally {
              if (!ranAction)
                  breakBarrier();
          }
      }

      // loop until tripped, broken, interrupted, or timed out
      for (;;) {
          try {
              if (!timed)
              //当前线程阻塞,使用的是条件锁
                  trip.await();
              else if (nanos > 0L)
                  nanos = trip.awaitNanos(nanos);
          } catch (InterruptedException ie) {
              if (g == generation && ! g.broken) {
                  breakBarrier();
                  throw ie;
              } else {
                  // We're about to finish waiting even if we had not
                  // been interrupted, so this interrupt is deemed to
                  // "belong" to subsequent execution.
                  Thread.currentThread().interrupt();
              }
          }

          if (g.broken)
              throw new BrokenBarrierException();

          if (g != generation)
              return index;

          if (timed && nanos <= 0L) {
              breakBarrier();
              throw new TimeoutException();
          }
      }
  } finally {
      lock.unlock();
  }
}

(1):使用到ReentrantLock lock来保证线程安全;

(2):nextGeneration();会唤醒所有的线程,并且重置count值和generation,可以等待下次调用。

private void nextGeneration() {
  // signal completion of last generation
  trip.signalAll();
  // set up next generation
  count = parties;
  generation = new Generation();
}

 

原文地址:https://www.cnblogs.com/mayang2465/p/14653201.html