DataNode启动优化改进:磁盘检测并行化

前言


在本篇文章中,本人打算聊一个大家平常都会遇见到的场景:HDFS中的DataNode启动的问题。DataNode启动不是一件非常迅速的事情吗?这其中能有大问题?看到这里,如果你也是这么想的话,那说明一点:你所运维的集群碰到的异常场景还不太多。本文所要讲述的问题并不是DataNode无法启动的问题,而是它启动有时会过慢的问题。DataNode进程启动过慢造成的直接影响是其上数据的服务延时。集群规模在一个很大规模量级的情况下,如果出现了大量DataNode慢启动的现象,这将会对集群本身对外提供服务造成不小的影响。本文所要讲述的内容是对于目前DataNode的启动优化,以此加速其启动时间,优化的主要点在于将DataNode启动时的磁盘检测行为。

现有DataNode启动时的磁盘检测


在讲述本节主要内容之前,我们要大概了解DataNode启动时的相关操作。在HDFS中,一个DataNode从启动开始到最终提供数据服务,中间会做很多的操作步骤。这里主要概括为以下几点:

  • 1.读取解析数据目录即datadir所配置的目录。
  • 2.检查这些目录对应磁盘是否是坏的磁盘(此步骤现有的逻辑为串行执行)。
  • 3.DataNode发送心跳信息,向NameNode进行注册。
  • 4.扫描各个数据目录下的数据块,并将这些数据块初次汇报给NameNode。

主要为以上的逻辑,而本节我们要优化的点在于其中第2点,也就是磁盘检测相关的操作。在DataNode的启动过程中,为什么要对磁盘做一次健康检查呢?因为它是保证节点本身数据可用性的一个重要指标,如果DataNode在磁盘检测中发现坏盘的个数超出了可容忍阈值(可配)的情况下,会直接让DataNode启动失败,并抛出异常。由此可见,HDFS对其磁盘可用性的一个重视。在正常情况下,这部分的检测操作会非常顺利,但是在某些情况下,可能会出现检测十分耗时的情况,比如下面两类情况:

第一个,如果节点内配置的磁盘目录非常多,比如一个机器,上面有10来块,20来块盘,然后我配置了对应盘数的目录。由于目前磁盘健康检测的逻辑是串行执行,所以总执行时间会线性增长。当然,如果机器磁盘本身都比较健康,它所花的总时间也不会多多少时间。可怕的是第二种情况。
第二个,个别DataNode磁盘数据目录检测出现非常慢的现象,可能是这个目录对应磁盘本身的性能问题(DataNode磁盘健康检测时会尝试在目录下创建文件、目录动作以此确定磁盘是否可用)。这个时候后面待检测的目录就会被迫等待当前磁盘检测动作的完成,最后就会导致总检测时间过长。

所以为了避免出现第二点提到的个别磁盘检测极慢影响到整体的问题,我们可以对其进行改造,改造的核心点就在于将原有磁盘检测的执行逻辑由串行化改为并行化。这个改进想法是目前社区在做的,JIRA编号HDFS-11086DataNode disk check improvements)。本文的主要思想和代码也是借鉴于这个JIRA上的。

现有DataNode内部磁盘检测代码


接下来我们来看一下目前DataNode磁盘检测代码,既然我们已经知道它是串行执行的逻辑了,那么DataNode内部到底是怎么执行的呢?

首先它是在初始化DataNode节点的操作中,代码如下:

  public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();

    if (args != null) {
      // parse generic hadoop options
      GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
      args = hParser.getRemainingArgs();
    }

    // 解析DataNode启动参数
    if (!parseArguments(args, conf)) {
      printUsage(System.err);
      return null;
    }
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf));
    // 进入DataNode构建实例方法
    return makeInstance(dataLocations, conf, resources);
  }

r然后我们进入makeInstance方法,

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    // 初始化磁盘检测对象
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 传入磁盘检测对象进行磁盘检测
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");

    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }

最后我们进入其中真正的磁盘检测动作,

  static List<StorageLocation> checkStorageLocations(
      Collection<StorageLocation> dataDirs,
      LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
          throws IOException {
    ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
    StringBuilder invalidDirs = new StringBuilder();
    // 遍历数据目录,注意这里就是串行的方式
    for (StorageLocation location : dataDirs) {
      final URI uri = location.getUri();
      try {
        // 利用磁盘检测对象进行磁盘目录的检测
        dataNodeDiskChecker.checkDir(localFS, new Path(uri));
        // 检测完毕没有抛出异常,则说明目录可用,加入到可用列表
        locations.add(location);
      } catch (IOException ioe) {
        // 如果出现IO异常,说明此磁盘目录不可用,加入到目录中
        LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
            + location + " : ", ioe);
        invalidDirs.append(""").append(uri.getPath()).append("" ");
      }
    }
    // 如果可用目录数量为0,表明所有的目录都不可用
    if (locations.size() == 0) {
      throw new IOException("All directories in "
          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
          + invalidDirs);
    }
    // 返回可用数据目录列表
    return locations;
  }

OK,由此我们可以明确地知道我们将要改造的地方了。

DataNode磁盘检测并行化改造


在此小节部分,我们将要介绍社区目前在这方面的改进,主要集中在JIRA HDFS-11086下的2个子JIRA:HDFS-11119Support for parallel checking of StorageLocations on DataNode startup)和HDFS-11148Update DataNode to use StorageLocationChecker at startup)。

在HDFS-11086中,作者引入了类似于Future-Get的异步执行模式,但是它没有用JDK中原生的Future-Get,而是名叫ListenableFuture的类(包名com.google.common.util.concurrent.ListenableFuture)。大家可以试着用用这个Future类

还有2个小点是本人同样认为是不错的优化点:第一,它额外保留了最近一次磁盘检测的结果,以及新定义了最小检测需要间隔的时间大小。这一点其实是非常有意义的,它可以避免短时间内的重复检测动作。也就是说,如果某块磁盘在最小检测间隔时间内又一次被检测了,则将直接返回上次的检测结果,不执行真正的检测操作。第二,它的内部新定义了磁盘检测的最大超时时间,换句话说,如果某磁盘检测处于非常慢的情况下时,直接抛出IO异常来终止此操作,进行下一个磁盘检测结果的返回

磁盘检测改造相关类设计


在本次磁盘检测改造的相关类设计中,定义了下面几个类,这几个类被放在了新的package:org.apache.hadoop.hdfs.server.datanode.checker下(大家获取最新的hadoop-trunk的代码就能找到)。

  • AsyncChecker:最基本的接口类,内部定义了用于发启异步检测与停止的操作方法。
  • ThrottledAsyncChecker:异步检测接口的具体实现类。
  • StorageLocationChecker:磁盘检测对象类,此对象会调用上面异步检测磁盘类来对各个磁盘进行并行地检测。
  • VolumeCheckResult:磁盘检测结果类,里面定义了3种检测结果:HEALTHY、DEGRADED、FAILED。

磁盘检测具体代码实现


了解完相关类的设计之后,我们最后要来真正学习此部分的代码实现了。首先要改造的地方就是前面篇幅makeInstance方法中的串行检测的逻辑,原始代码如下:

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 这里就是要改造的地方
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");

    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }

在此处代码部分,将会改造为如下代码:

  static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    // 改造后的结果,利用StorageLocationChecker类来做并行检测
    List<StorageLocation> locations;
    StorageLocationChecker storageLocationChecker =
        new StorageLocationChecker(conf, new Timer());
    try {
      locations = storageLocationChecker.check(conf, dataDirs);
    } catch (InterruptedException ie) {
      throw new IOException("Failed to instantiate DataNode", ie);
    }
    DefaultMetricsSystem.initialize("DataNode");
    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }

然后我们来重点查看StorageLocationChecker内的检测方法,

  public List<StorageLocation> check(
      final Configuration conf,
      final Collection<StorageLocation> dataDirs)
      throws InterruptedException, IOException {

    // 定义可用磁盘、不可用磁盘列表
    final ArrayList<StorageLocation> goodLocations = new ArrayList<>();
    final Set<StorageLocation> failedLocations = new HashSet<>();
    // 磁盘检测结果
    final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =
        Maps.newHashMap();
    final LocalFileSystem localFS = FileSystem.getLocal(conf);
    final CheckContext context = new CheckContext(localFS, expectedPermission);

    // 遍历磁盘目录,将待检测目录加入到AsyncChecker中,进行并行检测
    for (StorageLocation location : dataDirs) {
      futures.put(location,
          delegateChecker.schedule(location, context));
    }

    // 记录当前开始检测时间
    final long checkStartTimeMs = timer.monotonicNow();

    // Retrieve the results of the disk checks.
    for (Map.Entry<StorageLocation,
             ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) {

      // 计算当前已经等待的执行时间
      final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);
      // 用最大允许检测时间减去当前已经执行的时间来计算可允许再等待的时间
      final long timeLeftMs = Math.max(0,
          maxAllowedTimeForCheckMs - waitSoFarMs);
      final StorageLocation location = entry.getKey();

      try {
        // 阻塞式获取执行结果,最长阻塞时间为剩余可等待时间,超出此时间会抛出超时异常
        final VolumeCheckResult result =
            entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);
        // 根据检测结果加入到不同的列表最中
        switch (result) {
        case HEALTHY:
          goodLocations.add(entry.getKey());
          break;
        case DEGRADED:
          LOG.warn("StorageLocation {} appears to be degraded.", location);
          break;
        case FAILED:
          LOG.warn("StorageLocation {} detected as failed.", location);
          failedLocations.add(location);
          break;
        default:
          LOG.error("Unexpected health check result {} for StorageLocation {}",
              result, location);
          goodLocations.add(entry.getKey());
        }
      } catch (ExecutionException|TimeoutException e) {
        // 如果抛出异常,也加入到失败列表中
        LOG.warn("Exception checking StorageLocation " + location,
            e.getCause());
        failedLocations.add(location);
      }
    }
    // 如果不可用磁盘目录数超过阈值,则抛出IO异常
    if (failedLocations.size() > maxVolumeFailuresTolerated) {
      throw new IOException(
          "Too many failed volumes: " + failedLocations.size() +
          ". The configuration allows for a maximum of " +
          maxVolumeFailuresTolerated + " failed volumes.");
    }
    // 如果没有可用磁盘目录,也抛出异常
    if (goodLocations.size() == 0) {
      throw new IOException("All directories in "
          + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "
          + failedLocations);
    }
    // 返回可用磁盘目录列表
    return goodLocations;
  }

上面执行的逻辑非常的清晰,最后我们再来看ThrottledAsyncChecker的异步检测逻辑,入口即为上面的schedule方法。

  public synchronized ListenableFuture<V> schedule(
      final Checkable<K, V> target,
      final K context) {
    LOG.debug("Scheduling a check of {}", target);
    // 如果此对象已经是在检测中的状态时,则返回之前的对象
    if (checksInProgress.containsKey(target)) {
      return checksInProgress.get(target);
    }

    // 如果此结果已包含在完成列表中的情况
    if (completedChecks.containsKey(target)) {
      // 取出此对象的检测结果
      final LastCheckResult<V> result = completedChecks.get(target);
      // 计算距离上次检测结果的时间
      final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
      // 如果此间隔时间在最小间隔时间范围内,则直接返回上次的检测结果
      if (msSinceLastCheck < minMsBetweenChecks) {
        LOG.debug("Skipped checking {}. Time since last check {}ms " +
            "is less than the min gap {}ms.",
            target, msSinceLastCheck, minMsBetweenChecks);
        return result.result != null ?
            Futures.immediateFuture(result.result) :
            Futures.immediateFailedFuture(result.exception);
      }
    }
    // 否则提交到线程池中进行异步检测
    final ListenableFuture<V> lf = executorService.submit(
        new Callable<V>() {
          @Override
          public V call() throws Exception {
            return target.check(context);
          }
        });
    // 将Future对象加入到正在执行列表中
    checksInProgress.put(target, lf);
    addResultCachingCallback(target, lf);
    return lf;
  }

这里的目标对象target.check方法会调用到DiskCheker的真正磁盘检测方法,也就是StorageLocation的check方法,相关代码如下:

  public VolumeCheckResult check(CheckContext context) throws IOException {
    // 调用真正磁盘检测类DiskChecker的检测操作
    DiskChecker.checkDir(
        context.localFileSystem,
        new Path(baseURI),
        context.expectedPermission);
    return VolumeCheckResult.HEALTHY;
  }

OK,以上就是本文所要阐述的DataNode启动优化改造之磁盘检测并行化的内容了。大家可不能小看这仅仅是一个小小的改造,在某些极端情况下,可能就会帮助我们避免了后续的许多问题。

参考资料


[1].DataNode disk check improvements
[2].Support for parallel checking of StorageLocations on DataNode startup
[3].Update DataNode to use StorageLocationChecker at startup

原文地址:https://www.cnblogs.com/bianqi/p/12183709.html