HDFS异步访问模式

前言


在现有HDFS的RPC调用方式上,采用的基本是blocking call的形式,也就是阻塞式的调用方式.阻塞方式的一个明显的缺点是它的请求过程是同步的,也就是说,客户端必须等待当前请求结果的返回,才能接着发送下一次请求.如果此客户端打算在一个线程中发送大量请求的话,阻塞式的RPC调用将会非常耗时.但是如果为了每一次请求调用而专门单独开一个线程的话,系统资源将会被大幅度的使用,显然这也不是一个好的解决的办法.那么有没有什么好的办法呢,在HDFS中是否存在有异步模式的RPC请求接口呢?本文我们就来聊聊HDFS的异步访问模式.

HDFS异步访问模式


老实说,在目前Hadoop的发布版本中,确实还不存在HDFS异步访问的模式,但是这并不代表社区没有在关注这方面的问题.在许多特殊的场景下,HDFS的异步访问模式还是有它独到的用处的.社区在JIRA HDFS-9924([umbrella] Nonblocking HDFS Access)上对此功能特性进行了实现.在本文中,我们姑且取名”HDFS异步访问模式”为AsyncDistributedFileSystem,与DistributedFileSystem相对应.

HDFS异步访问模式原理


在HDFS异步访问模式的设计文档中,给出了新的异步的RPC调用模式,采用了Future-Get的异步调用模式,以FileSystem的rename方法为例:

同步的方式(现有的调用模式):

public boolean rename(Path src, Path dst) throws IOException;

异步的方式(AsyncDistributedFileSystem):

public Future<Boolean> rename(Path src, Path dst) ​throws IOException​;

rename操作在AsyncDistributedFileSystem中的实现代码如下:

public Future<Void> rename(Path src, Path dst,
    final Options.Rename... options) throws IOException {
  dfs.getFsStatistics().incrementWriteOps(1);

  final Path absSrc = dfs.fixRelativePart(src);
  final Path absDst = dfs.fixRelativePart(dst);
  // 获取Client原有Asynchronous的模式
  final boolean isAsync = Client.isAsynchronousMode();
  // 设置当前为异步调用模式
  Client.setAsynchronousMode(true);
  try {
    dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
        options);
    return getReturnValue();
  } finally {
    // 异步调用方式结束,Client类的异步设置回之前的模式
    Client.setAsynchronousMode(isAsync);
  }
}

这种异步的调用方式还能非常灵活地集成进行现有的同步的调用方式中,示例如下:

//DistributedFileSystem
public boolean rename(Path src, Path dst) throws IOException {
       if (isAsynchronousMode()) {
         return getFutureDistributedFileSystem().rename(src, dst).get();
       } else {
         //current implementation.
         ...
       }
}

其他方法的具体实现细节大家可以参考HDFS-9924下的各个子任务的实现.
我们拿到异步返回的Future对象后,只需调用get()方法即可拿到执行的结果.示例代码如下:

// asynchronous mode
AsyncDistributedFileSystem asyncDfs = dfs.getAsyncDistributedFileSystem();

// asynchronous calls
Future<Boolean> f3 = futureDfs.rename(src3, dst3); // non­blocking
Future<Boolean> f4 = futureDfs.rename(src4, dst4); // non­blocking

boolean r3 = f3.get(); // blocking
boolean r4 = f4.get(); // blocking

客户端异步请求的控制


在前面HDFS异步访问模式的过程中,有一点必须格外引起注意:客户端异步请求的控制.客户端应有异步请求数的限制,以此防止客户端利用大量的异步请求冲垮服务端.如果超过了此限制阈值,客户端的请求将会处于阻塞状态.这点必须要引起足够重视,否则客户端随随便便发起的请求将会摧毁NameNode.

下面以单元测试中的例子为例,我们来看看在HDFS-9924中是如何做到对此方面的控制的,同样以rename请求为例:

for (int i = 0; i < count; i++) {
  for (;;) {
    try {
      // 发起异步请求
      Future<Void> returnFuture = adfs.rename(srcs[i], dsts[i],
          Rename.OVERWRITE);
      renameRetFutures.put(i, returnFuture);
      break;
    } catch (AsyncCallLimitExceededException e) {
      // 如果超过异步请求的限制数量,则抛出异常
      start = end;
      end = i;
      // 阻塞式地等待结果
      waitForReturnValues(renameRetFutures, start, end);
    }
  }
}

上述waitForReturnValues的实现代码如下:

private void waitForReturnValues(final List<Future<Void>> futureList)
    throws InterruptedException, ExecutionException {
  // 进行逐个的阻塞式的获取结果
  for (int i = 0; i < futureList.size(); i++) {
    futureList.get(i).get();
  }
}

上面的操作意味着客户端要想继续调用异步请求,得需要等待当前请求结果的返回,否则还将会抛出AsyncCallLimitExceededException异常.

HDFS异步访问模式的优化点


各位看了上面HDFS异步访问模式的原理以及部分代码的实现之后,可能会觉得整个过程并不是太复杂.因为上面只是一个初始的功能性的实现,在未来的工作中,至少有以下几点需要去完善:

  • 第一, 保证异步请求的有序性.在某些场景下,我们需要保证异步请求能够按照请求发起的时间,顺序执行.
  • 第二, 客户端对HDFS读写异步请求的支持.

总结


HDFS Async调用模式的出现将会带给用户更灵活的RPC请求方式的选择,但是可能考虑到此种方式对比之前的方式而言,改动较大,目前这些异步调用相关的方法许多是打上了@Unstable标记的.HDFS异步调用的方式同样可以很好的运用在单元测试上.鉴于此特性是还暂未发布,大家可以根据自己的需要,进行部分的合入.

参考资料


[1].https://issues.apache.org/jira/browse/HDFS-9924
[2].https://issues.apache.org/jira/secure/attachment/12803347/AsyncHdfs20160510.pdf

原文地址:https://www.cnblogs.com/bianqi/p/12183732.html