HDFS源码分析(四)-----节点Decommission机制

前言

在Hadoop集群中,按照集群规模来划分,规模可大可小,大的例如百度,据说有4000台规模大小的Hadoop集群,小的话,几十台机器组成的集群也都是存在的。但是不论说是大型的集群以及小规模的集群,都免不了出现节点故障的情况,尤其是超大型的集群,节点故障几乎天天发生,因此如何做到正确,稳妥的故障情况处理,就显得很重要了,这里提供一个在Hadoop集群中可以想到的办法,就是Decommission操作,节点下线操作,一般的情况是故障节点已经是一个dead节点,或是出现异常情况的节点。此时如若不处理,或许会影响到整个集群的性能。所以在这里分享一下Hadoop中的Decommision机制。


Hadoop节点Decommision操作

在分析相关源码之前,有必要了解一下,让一个数据节点下线的物理操作,操作步骤其实很简单,在以前老版本的Hadoop中,好像是可以通过hadoop dfsadmin带参数的形式执行,但是在最近新版的Hadoop中好像这类命令失效了,于是我在做测试的时候,用了一个更通用的办法来触发这一行为,就是把目标下线节点加入execlude文件中,就是拒绝接入Hadoop集群的节点名单,姑且可以理解为黑名单列表,对应的是include名单,默认这2个名单都没配,所以数据节点一启动,就会注册到namenode节点上。然后是再执行Hadoop 的refreshnode命令,此命令就会从对应的此配置文件中读取最新的数据节点信息,然后开始decommission操作,在50070的ui界面上就可以看到待下线节点的状态会从active状态变为decommision in progress,此时此数据节点的block将会被逐步的拷贝出去,最后随着操作的完成,最终状态就会被变为decommissioned,此时就可以正式下线此节点,用hadoop-demons.sh namenode stop即可。在执行过程的前后,可以执行hadoop fsck的方法观察block块的路径,判断block拷贝情况。


相关涉及类

为什么花了这么多的篇幅介绍,decommision操作呢,因为操作的顺序与实际代码的运行流程基本吻合,有很强的关联性。在下面具体的分析过程中将会逐步体现出来。下面简要列出相关的2个类。

1.DecommissionManager--decommission操作管理类,里面包含了decommission操作状态监控。

2.FSNamesystem--这是一个大的操作类,内部包含了许多模块的工作,包括之前介绍过的副本相关操作也是部分在此类中进行中转,与decommission主要的方法refreshNode()方法包含于此。


Decommision代码跟踪分析

在物理操作中,decommision操作的触发是因为添加了execlude文件,然后再输入refreshNode命令开始的,与此就会对应到了FSNamesystem的同名方法

/**
   * Rereads the config to get hosts and exclude list file names.
   * Rereads the files to update the hosts and exclude lists.  It
   * checks if any of the hosts have changed states:
   * 1. Added to hosts  --> no further work needed here.
   * 2. Removed from hosts --> mark AdminState as decommissioned. 
   * 3. Added to exclude --> start decommission.
   * 4. Removed from exclude --> stop decommission.
   * 重新从配置中读取节点列表,移除掉准备下线的列表等
   */
  public void refreshNodes(Configuration conf) throws IOException {
    checkSuperuserPrivilege();
    // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
    // Update the file names and refresh internal includes and excludes list
    if (conf == null)
      conf = new Configuration();
    //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
    hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                conf.get("dfs.hosts.exclude", ""));
    hostsReader.refresh();
....
果然在这里会重新去读exclude,include文件数据,然后这里会遍历当前的数据节点,与配置中新增的节点进行匹配

synchronized (this) {
      //遍历数据节点
      for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
           it.hasNext();) {
        DatanodeDescriptor node = it.next();
        // Check if not include.
        //判断数据节点是否在允许的主机列表内
        if (!inHostsList(node, null)) {
          //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
          node.setDecommissioned();  // case 2.
        } else {
inHost系列的判断方法如下,inExclude与此方法同,不附上代码

/** 
   * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
   * 如何判断节点是否包含在允许接入列表中的判断方法,exclude列表是同样的道理
   */
  private boolean inHostsList(DatanodeID node, String ipAddr) {
    //从hostReader中读取最新的host列表
    Set<String> hostsList = hostsReader.getHosts();
    //利用主机名去判断
    return (hostsList.isEmpty() || 
            (ipAddr != null && hostsList.contains(ipAddr)) ||
            hostsList.contains(node.getHost()) ||
            hostsList.contains(node.getName()) || 
            ((node instanceof DatanodeInfo) && 
             hostsList.contains(((DatanodeInfo)node).getHostName())));
  }
判断完毕之后,会进行逻辑判断,如果节点在exclude名单中,代表准备下线,则修改其状态,如果是正在下线的节点,则无须操作。完整逻辑如下

/**
   * Rereads the config to get hosts and exclude list file names.
   * Rereads the files to update the hosts and exclude lists.  It
   * checks if any of the hosts have changed states:
   * 1. Added to hosts  --> no further work needed here.
   * 2. Removed from hosts --> mark AdminState as decommissioned. 
   * 3. Added to exclude --> start decommission.
   * 4. Removed from exclude --> stop decommission.
   * 重新从配置中读取节点列表,移除掉准备下线的列表等
   */
  public void refreshNodes(Configuration conf) throws IOException {
    checkSuperuserPrivilege();
    // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
    // Update the file names and refresh internal includes and excludes list
    if (conf == null)
      conf = new Configuration();
    //重新读取配置文件中的dfs.hosts以及dfs.hosts.exclude属性
    hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
                                conf.get("dfs.hosts.exclude", ""));
    hostsReader.refresh();
    synchronized (this) {
      //遍历数据节点
      for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
           it.hasNext();) {
        DatanodeDescriptor node = it.next();
        // Check if not include.
        //判断数据节点是否在允许的主机列表内
        if (!inHostsList(node, null)) {
          //如果不是,则把此节点的状态设为Decommissioned,代表着此节点准备下线
          node.setDecommissioned();  // case 2.
        } else {
           //入如果此节点是包含在不允许接入的列表名单中时
          if (inExcludedHostsList(node, null)) {
            //判断此时状态是否为还没开始下线操作,如果是开始decommission
            if (!node.isDecommissionInProgress() && 
                !node.isDecommissioned()) {
              startDecommission(node);   // case 3.
            }
          } else {
            //如果是其他的情况,如果节点处于decommsion操作,则停止操作
            if (node.isDecommissionInProgress() || 
                node.isDecommissioned()) {
              stopDecommission(node);   // case 4.
            } 
          }
        }
      }
    } 
      
  }
然后开始沿着decommision操作继续往里走,就是startDecommision方法

/**
   * Start decommissioning the specified datanode. 
   * 对指定节点开始进行decommission操作
   */
  private void startDecommission (DatanodeDescriptor node) 
    throws IOException {

    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
      LOG.info("Start Decommissioning node " + node.getName());
      node.startDecommission();
      //设置节点decommison开始时间
      node.decommissioningStatus.setStartTime(now());
      //
      // all the blocks that reside on this node have to be 
      // replicated.
      //检查此时的decommission操作状态
      checkDecommissionStateInternal(node);
    }
  }

在这个方法主要操作还是在于让节点启动相应的decommision下线操作,开始一波副本的拷贝工作了,在这里感觉第一次启动,就在这里判断decommision状态,个人感觉没有必要,一般操作不会很快结束,一般的decomission监控会有额外的线程周期性的监控此类操作。而这个线程进行检查的函数也是checkDecommissionStateInternal方法,他是如何进行检查判断的呢

/**
   * Change, if appropriate, the admin state of a datanode to 
   * decommission completed. Return true if decommission is complete.
   * decommision的状态检测是根据其上的副本量来衡量的
   */
  boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
    //
    // Check to see if all blocks in this decommissioned
    // node has reached their target replication factor.
    //
    if (node.isDecommissionInProgress()) {
      //调用副本进度判断函数
      if (!isReplicationInProgress(node)) {
        node.setDecommissioned();
        LOG.info("Decommission complete for node " + node.getName());
      }
    }
    if (node.isDecommissioned()) {
      return true;
    }
    return false;
  }
答案就在上面,通过剩余副本的拷贝情况,如果isReplicationInPogress返回FALSE代表了,已经全部完成拷贝工作了,状态就可以修改为decomissioned结束状态了,下面仔细看看这个isReplicationInProgress方法

/**
   * Return true if there are any blocks on this node that have not
   * yet reached their replication factor. Otherwise returns false.
   * 如果当前数据节点block块的副本系数还没有满足期望的副本数值值,则表明还要添加复制请求
   */
  private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
    boolean status = false;
    int underReplicatedBlocks = 0;
    int decommissionOnlyReplicas = 0;
    int underReplicatedInOpenFiles = 0;
    
    //遍历此节点上的所有数据块
    for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
      final Block block = i.next();
      INode fileINode = blocksMap.getINode(block);

      if (fileINode != null) {
        NumberReplicas num = countNodes(block);
        //获取此数据块当前的副本数
        int curReplicas = num.liveReplicas();
        //获取此副本块的期望副本块数
        int curExpectedReplicas = getReplication(block);
        //如果期望副本块数大于当前副本块数,表明block还需要复制
        if (curExpectedReplicas > curReplicas) {
          // Log info about one block for this node which needs replication
          if (!status) {
            //做状态的修改,表明block还需要复制
            status = true;
            logBlockReplicationInfo(block, srcNode, num);
          }
          underReplicatedBlocks++;
          if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
            decommissionOnlyReplicas++;
          }
          if (fileINode.isUnderConstruction()) {
            underReplicatedInOpenFiles++;
          }

          if (!neededReplications.contains(block) &&
            pendingReplications.getNumReplicas(block) == 0) {
            //
            // These blocks have been reported from the datanode
            // after the startDecommission method has been executed. These
            // blocks were in flight when the decommission was started.
            //
            //添加新的副本复制请求
            neededReplications.add(block, 
                                   curReplicas,
                                   num.decommissionedReplicas(),
                                   curExpectedReplicas);
          }
        }
      }
    }
    srcNode.decommissioningStatus.set(underReplicatedBlocks,
        decommissionOnlyReplicas, underReplicatedInOpenFiles);

    return status;
  }
原理很简单,对于待撤销数据节点上的每个block块,判断当前副本与期望副本数之间的差,如果不满足,就增强复制请求,至此,decommision这个核心流程就走通了.下面看看一个常驻的监控线程,毕竟他才是主要做监控进度这项任务的.


DecommissionManager

decommisionMannager,decommision操作管理器,所包含的变量很少

/**
 * Manage node decommissioning.
 * 节点Decommission操作状态管理器
 */
class DecommissionManager {
  static final Log LOG = LogFactory.getLog(DecommissionManager.class);
  
  //名字空间系统
  private final FSNamesystem fsnamesystem;

  DecommissionManager(FSNamesystem namesystem) {
    this.fsnamesystem = namesystem;
  }

  /** Periodically check decommission status. */
  //监控方法
  class Monitor implements Runnable {
...
}

关键看他内部的核心monitor runnable

/** Periodically check decommission status. */
  //监控方法
  class Monitor implements Runnable {
    /** recheckInterval is how often namenode checks
     *  if a node has finished decommission
     * 定期检查周期
     */
    private final long recheckInterval;
    /** The number of decommission nodes to check for each interval */
    private final int numNodesPerCheck;
    /** firstkey can be initialized to anything. */
    private String firstkey = "";

    Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
      this.recheckInterval = recheckIntervalInSecond * 1000L;
      this.numNodesPerCheck = numNodesPerCheck;
    }

    /**
     * Check decommission status of numNodesPerCheck nodes
     * for every recheckInterval milliseconds.
     */
    public void run() {
      for(; fsnamesystem.isRunning(); ) {
        synchronized(fsnamesystem) {
          //调用check()方法
          check();
        }
  
        try {
          Thread.sleep(recheckInterval);
        } catch (InterruptedException ie) {
          LOG.info("Interrupted " + this.getClass().getSimpleName(), ie);
        }
      }
    }
for循环内持周期性的续调check()方法,如果系统没有结束的话,docheck方法又会调用之前提到的checkDecommission的方法

private void check() {
      int count = 0;
      //遍历每个数据节点
      for(Map.Entry<String, DatanodeDescriptor> entry
          : new CyclicIteration<String, DatanodeDescriptor>(
              fsnamesystem.datanodeMap, firstkey)) {
        final DatanodeDescriptor d = entry.getValue();
        firstkey = entry.getKey();
 
        //如果数据节点正处于decommison操作的话,则做检查
        if (d.isDecommissionInProgress()) {
          try {
            //调用fsnamesystem的checkDecommissionStateInternal方法,此方法内部又会调用isReplicationInProgress进行副本的
            //情况判断
            fsnamesystem.checkDecommissionStateInternal(d);
          } catch(Exception e) {
            LOG.warn("entry=" + entry, e);
          }
          if (++count == numNodesPerCheck) {
            return;
          }
        }
      }
    }
  }
他是每个数据节点遍历着判断.OK,希望通过我的分析,大家对Decommission有更多的了解,有所收获.

全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。


参考文献

《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等





原文地址:https://www.cnblogs.com/bianqi/p/12183881.html