ListenableFuture的状态同步和原子更新

前言


在Java8里的Future类实现中,引入了一种新的Future类:CompletableFuture。此类相比较于原来的Future类来说,最大的一点不同在于它可以完全异步执行结果回调。而在老Future模式下,用户等待结果是需要阻塞等待的,然后利用此结果做后续的操作。无疑这在效率上并不是很好。但其实在CompletableFuture出现之前,已经有类似特点的Future工具类的实现,它就是guava包里的ListenableFuture的实现。按照时间顺序,ListenableFuture实现在前,CompletableFuture在后。个人觉得,Java8的CompletableFuture在一定程度上应该还是借鉴了ListenableFuture的一些思想。二者的一个共通思想:一个可监听式的Future对象。今天笔者要聊的主题关乎里面的状态同步,关于ListenableFuture的子类内部实现,许多人可能未必清楚。

ListenableFuture的监听添加


ListenableFuture,ListenableFuture,关键词在里面的Listenable。而且它只有以下一个接口定义:

public interface ListenableFuture<V> extends Future<V> {
  void addListener(Runnable listener, Executor executor);
}

所以我们先来看看里面的监听是如何增加的。说是监听,其实就是一个回调执行操作,实现定义如下:

  @Override
  public void addListener(Runnable listener, Executor exec) {
    executionList.add(listener, exec);
  }

这里的executionList可不是线程池,而是一个执行列表,这里的监听runnable执行在给定的执行器内。executionList是被定义在ListenableFuture的实现子类中:

public abstract class AbstractFuture<V> implements ListenableFuture<V> {

  /** Synchronization control for AbstractFutures. */
  private final Sync<V> sync = new Sync<V>();

  // The execution list to hold our executors.
  private final ExecutionList executionList = new ExecutionList();
  ...

在这里我们看到了一个Sync对象,它是用来做什么的呢?我们继续往下看。

ListenableFuture内的状态同步控制


先不看Sync的具体实现,在ListenableFuture的子类实现的主要方法里,间接调用的都是Sync的同名方法,

  @Override
  public V get() throws InterruptedException, ExecutionException {
    return sync.get();
  }

  @Override
  public boolean isDone() {
    return sync.isDone();
  }

  @Override
  public boolean isCancelled() {
    return sync.isCancelled();
  }

  @Override
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (!sync.cancel()) {
      return false;
    }
    // Future执行cancal操作时,也执行一把监听回调操作
    executionList.execute();
    if (mayInterruptIfRunning) {
      interruptTask();
    }
    return true;
  }

换句话说,我们对一个ListenableFuture的操作调用其实是对Sync的一个操作调用。在这里我们基本可以有一个大概猜测:Sync是一个包装了Future实现的一个同步类,至于里面具体是怎么同步呢,为什么同步呢?我们继续往下看。

ListenableFuture内的Sync同步


我们直接来看里面的Sync定义说明:

  /**
   * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
   * private subclass to hold the synchronizer.  This synchronizer is used to
   * implement the blocking and waiting calls as well as to handle state changes
   * in a thread-safe manner.  The current state of the future is held in the
   * Sync state, and the lock is released whenever the state changes to either
   * {@link #COMPLETED} or {@link #CANCELLED}.
   *
   * <p>To avoid races between threads doing release and acquire, we transition
   * to the final state in two steps.  One thread will successfully CAS from
   * RUNNING to COMPLETING, that thread will then set the result of the
   * computation, and only then transition to COMPLETED or CANCELLED.
   *
   * <p>We don't use the integer argument passed between acquire methods so we
   * pass around a -1 everywhere.
   */
  static final class Sync<V> extends AbstractQueuedSynchronizer {

一句话简单概括,它是一个继承了AbstractQueuedSynchronizer(AQS)类的用于做future对象状态同步控制的操作类。这里其实假设了一个情况,会存在多线程同时操作某future对象的情况,通过AQS进行这些调用的阻塞同步控制,从而保证状态的原子更新操作,获取不到锁的线程会被置入一个FIFO的队列中进行等待。在AQS里,有分为互斥和共享两种模式,前后者的核心区别在于是否支持多个线程同时能够获取锁的情况。

下面是Sync里面的complete实现,通过CAS操作来更新状态:

    private boolean complete(@Nullable V v, @Nullable Throwable t,
        int finalState) {
      boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
      if (doCompletion) {
        // If this thread successfully transitioned to COMPLETING, set the value
        // and exception and then release to the final state.
        this.value = v;
        this.exception = t;
        releaseShared(finalState);
      } else if (getState() == COMPLETING) {
        // If some other thread is currently completing the future, block until
        // they are done so we can guarantee completion.
        acquireShared(-1);
      }
      return doCompletion;
    }

其实在JDK lock包下的许多锁的实现类中,都有用到类似Sync(AQS子类)做多线程的同步控制,比如ReentrantLock。

其它状态原子更新方法


在Hadoop中,同样有对于ListenableFuture的一个抽象类实现,不过它的内部不是AQS类做状态同步控制,而是实现3种更为高效的原子更新方法。按照不同情况,做逐一fall back的选择。以下代码供大家学习参考使用、

以下代码的目的是更新AbstractFuture类中的waiters,listener这样的volatile类型变量。

  /**
   * This field encodes the current state of the future.
   * <p>
   * <p>The valid values are:
   * <ul>
   * <li>{@code null} initial state, nothing has happened.
   * <li>{@link Cancellation} terminal state, {@code cancel} was called.
   * <li>{@link Failure} terminal state, {@code setException} was called.
   * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
   * <li>{@link #NULL} terminal state, {@code set(null)} was called.
   * <li>Any other non-null value, terminal state, {@code set} was called with
   * a non-null argument.
   * </ul>
   */
  private volatile Object value;

  /**
   * All listeners.
   */
  private volatile Listener listeners;

  /**
   * All waiting threads.
   */
  private volatile Waiter waiters;

  /**
   * Constructor for use by subclasses.
   */
  protected AbstractFuture() {
  }

首先是,调用比较底层的Unsafe包,更新对象按照地址偏移量进行对象更新。

  /**
   * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
   * <p>
   * <p>Static initialization of this class will fail if the
   * {@link sun.misc.Unsafe} object cannot be accessed.
   */
  private static final class UnsafeAtomicHelper extends AtomicHelper {
    static final sun.misc.Unsafe UNSAFE;
    static final long LISTENERS_OFFSET;
    static final long WAITERS_OFFSET;
    static final long VALUE_OFFSET;
    static final long WAITER_THREAD_OFFSET;
    static final long WAITER_NEXT_OFFSET;

    static {
      sun.misc.Unsafe unsafe = null;
      try {
        unsafe = sun.misc.Unsafe.getUnsafe();
      } catch (SecurityException tryReflectionInstead) {
        ...
      }
      try {
        Class<?> abstractFuture = AbstractFuture.class;
        WAITERS_OFFSET = unsafe
            .objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
        LISTENERS_OFFSET = unsafe
            .objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
        VALUE_OFFSET = unsafe
            .objectFieldOffset(abstractFuture.getDeclaredField("value"));
        WAITER_THREAD_OFFSET = unsafe
            .objectFieldOffset(Waiter.class.getDeclaredField("thread"));
        WAITER_NEXT_OFFSET = unsafe
            .objectFieldOffset(Waiter.class.getDeclaredField("next"));
        UNSAFE = unsafe;
      } catch (Exception e) {
        throwIfUnchecked(e);
        throw new RuntimeException(e);
      }
    }

    ...

    @Override
    void putThread(Waiter waiter, Thread newValue) {
      UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
    }

    @Override
    void putNext(Waiter waiter, Waiter newValue) {
      UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
    }


    /**
     * Performs a CAS operation on the {@link #listeners} field.
     */
    @Override
    boolean casListeners(
        AbstractFuture<?> future, Listener expect, Listener update) {
      return UNSAFE
          .compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
    }

    /**
     * Performs a CAS operation on the {@link #value} field.
     */
    @Override
    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
      return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
    }
  }

方法二,通过基于反射原理的原子引用类型AtomicReferenceFieldUpdater类,来做变量的原子更新。

  /**
   * {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}.
   */
 private static final class SafeAtomicHelper extends AtomicHelper {
    final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
    final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
    final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
    final AtomicReferenceFieldUpdater<AbstractFuture, Listener>
        listenersUpdater;
    final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;

    SafeAtomicHelper(
        AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
        AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
        AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
        AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
        AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
      this.waiterThreadUpdater = waiterThreadUpdater;
      this.waiterNextUpdater = waiterNextUpdater;
      this.waitersUpdater = waitersUpdater;
      this.listenersUpdater = listenersUpdater;
      this.valueUpdater = valueUpdater;
    }

    @Override
    void putThread(Waiter waiter, Thread newValue) {
      waiterThreadUpdater.lazySet(waiter, newValue);
    }

    @Override
    void putNext(Waiter waiter, Waiter newValue) {
      waiterNextUpdater.lazySet(waiter, newValue);
    }

    @Override
    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
        update) {
      return waitersUpdater.compareAndSet(future, expect, update);
    }

    @Override
    boolean casListeners(
        AbstractFuture<?> future, Listener expect, Listener update) {
      return listenersUpdater.compareAndSet(future, expect, update);
    }

    @Override
    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
      return valueUpdater.compareAndSet(future, expect, update);
    }
  }

方法三,基于synchronized关键字做字段更新,在效果上不及前二者:

  /**
   * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
   * <p>
   * <p>This is an implementation of last resort for when certain basic VM
   * features are broken (like AtomicReferenceFieldUpdater).
   */
  private static final class SynchronizedHelper extends AtomicHelper {
    @Override
    void putThread(Waiter waiter, Thread newValue) {
      waiter.thread = newValue;
    }

    @Override
    void putNext(Waiter waiter, Waiter newValue) {
      waiter.next = newValue;
    }

    @Override
    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter
        update) {
      synchronized (future) {
        if (future.waiters == expect) {
          future.waiters = update;
          return true;
        }
        return false;
      }
    }

    @Override
    boolean casListeners(
        AbstractFuture<?> future, Listener expect, Listener update) {
      synchronized (future) {
        if (future.listeners == expect) {
          future.listeners = update;
          return true;
        }
        return false;
      }
    }

    @Override
    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
      synchronized (future) {
        if (future.value == expect) {
          future.value = update;
          return true;
        }
        return false;
      }
    }
  }

上面的CAS操作已经基本是标准的CAS算法步骤了。
以上就是3个简单的原子更新器的简单实现类。

引用


[1].https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AbstractFuture.java

原文地址:https://www.cnblogs.com/bianqi/p/12183558.html