Hadoop进入安全模式源码分析

Hadoop进入安全模式有三个条件具体源码如下

 private boolean needEnter() {
              // TODO-ZH DataNode汇报block状态为complete状态数量
      return (threshold != 0 && blockSafe < blockThreshold) ||
              // TODO-ZH 默认存活DataNode节点小于 datanodeThreshold(默认为0)
        (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) ||
              // TODO-ZH 元数据存储磁盘空间是不充足
        (!nameNodeHasResourcesAvailable());
    }
  1. 条件一:threshold != 0 && blockSafe < blockThreshold;DataNode汇报过来的complete的block个数占block总数是否达到设定阈值占比(默认0.99),若未达到此阈值则进入安全模式,计算complete状态数量如下

    // TODO-ZH 计算阈值
    this.blockThreshold = (int) (blockTotal * threshold);
    

    上述计算过程中blockTotal为Hadoop集群中block块总个数,threshold为参数配置阈值,默认配置阈值为0.99f

     this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
              DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
    

    参数配置如下:

      // block设置处于安全模式时的阈值占比,默认值为0.999f
      public static final String  DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY = "dfs.namenode.safemode.threshold-pct";
      // block设置处于安全模式时的阈值占比的默认值
      public static final float   DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;
    

    根据以上代码和配置可得知,只要DataNode汇报到NameNode的block块状态为complete占总block块占比小于0.99则Hadoop集群处于安全模式。

  2. 条件二:datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold;如果DataNode存活个数小于一定个数(默认为0)时则进入安全模式

    datanodeThreshold通过参数配置获取,获取配置如下

    this.datanodeThreshold = conf.getInt(
      DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY,
      DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT); 
    

    参数配置具体配置值如下

      public static final String  DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY = "dfs.namenode.safemode.min.datanodes";
      public static final int     DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT = 0;
    

    getNumLiveDataNodes() 为获取存活DataNode存活数量,具体实现代码如下

      public int getNumLiveDataNodes() {
        return getBlockManager().getDatanodeManager().getNumLiveDataNodes();
      }
    
     public int getNumLiveDataNodes() {
        int numLive = 0;
        synchronized (datanodeMap) {
          for(DatanodeDescriptor dn : datanodeMap.values()) {
            if (!isDatanodeDead(dn) ) {
              numLive++;
            }
          }
        }
        return numLive;
      }
    
     /** Is the datanode dead? */
      boolean isDatanodeDead(DatanodeDescriptor node) {
        return (node.getLastUpdateMonotonic() <
                (monotonicNow() - heartbeatExpireInterval));
      }
    

    判定一个DataNode节点是否存活,是根据上次更新汇报心跳时间距现在时间超过一定时间,则判定此DataNode节点为不存活状态(即dead状态),判定逻辑如下:

     /** Is the datanode dead? */
      boolean isDatanodeDead(DatanodeDescriptor node) {
        return (node.getLastUpdateMonotonic() <
                (monotonicNow() - heartbeatExpireInterval));
      }
    

    heartbeatExpireInterval为配置参数,即设定距离上次心跳更新时间超过此时间则判定此DataNode为不存活状态,设定计算如下

    this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
            + 10 * 1000 * heartbeatIntervalSeconds;
    
    final int heartbeatRecheckInterval = conf.getInt(
            DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
            DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5minutes
    
      public static final String  DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY = "dfs.namenode.heartbeat.recheck-interval";
      public static final int     DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT = 5*60*1000;
    
    final long heartbeatIntervalSeconds = conf.getLong(
            DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
            DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
    
      public static final String  DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
      public static final long    DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
    

    有上述参数和计算代码可得知,若DataNode节点超过10分钟30秒后,NameNode未接收到DataNode心跳则判定此DataNode为不存活状态。

    综上代码分析可知,当DataNode节点存活个数小于0时,Hadoop集群进入安全模式。

  3. 条件三:nameNodeHasResourcesAvailable();检查NameNode写元数据磁盘目录空间是否大于设定阈值(默认100M),若磁盘目录空间小于设定阈值则NameNodeHasResourceAvailable = false,此时HDFS进入安全模式,具体代码如下:

    hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();
    

    hasAvailableDiskSpace()方法时获取存储元数据存盘目录是否有足够存储空间

     return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
            minimumRedundantVolumes);
    

    volumes里存放的就是需要检查的目录集合,具体设置如下:

      private void addDirToCheck(URI directoryToCheck, boolean required)
          throws IOException {
        File dir = new File(directoryToCheck.getPath());
        if (!dir.exists()) {
          throw new IOException("Missing directory "+dir.getAbsolutePath());
        }
    
        // 一个目录就是一个checkVolume对象
        CheckedVolume newVolume = new CheckedVolume(dir, required);
        CheckedVolume volume = volumes.get(newVolume.getVolume());
        if (volume == null || !volume.isRequired()) {
          //volumes里面就会有多个目录
          volumes.put(newVolume.getVolume(), newVolume);
        }
      }
    

    判定设定存储元数据存储目录是否有足够存储空间逻辑如下

    static boolean areResourcesAvailable(
          Collection<? extends CheckableNameNodeResource> resources,
          int minimumRedundantResources) {
    
        // TODO: workaround:
        // - during startup, if there are no edits dirs on disk, then there is
        // a call to areResourcesAvailable() with no dirs at all, which was
        // previously causing the NN to enter safemode
        if (resources.isEmpty()) {
          return true;
        }
        
        int requiredResourceCount = 0;
        int redundantResourceCount = 0;
        int disabledRedundantResourceCount = 0;
        for (CheckableNameNodeResource resource : resources) {
          if (!resource.isRequired()) {
            redundantResourceCount++;
            if (!resource.isResourceAvailable()) {
              disabledRedundantResourceCount++;
            }
          } else {
            requiredResourceCount++;
            // TODO-ZH 判断磁盘资源是否充足
            if (!resource.isResourceAvailable()) {
              // Short circuit - a required resource is not available.
              return false;
            }
          }
        }
        
        if (redundantResourceCount == 0) {
          // If there are no redundant resources, return true if there are any
          // required resources available.
          return requiredResourceCount > 0;
        } else {
          return redundantResourceCount - disabledRedundantResourceCount >=
              minimumRedundantResources;
        }
      }
    }
    

    判定逻辑

        public boolean isResourceAvailable() {
          // TODO-ZH 获取当前磁盘目录空间大小
          long availableSpace = df.getAvailable();
          if (LOG.isDebugEnabled()) {
            LOG.debug("Space available on volume '" + volume + "' is "
                + availableSpace);
          }
          // TODO-ZH 如果磁盘空间大小小于100M则返回false
          if (availableSpace < duReserved) {
            LOG.warn("Space available on volume '" + volume + "' is "
                + availableSpace +
                ", which is below the configured reserved amount " + duReserved);
            return false;
          } else {
            return true;
          }
        }
    

    设定阈值获取如下

    // TODO-ZH 检查资源是否充足阈值
        duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
            DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
    

    设定阈值如下

      // 检查存储元数据磁盘低于多少的一个阈值,默认100M
      public static final String  DFS_NAMENODE_DU_RESERVED_KEY = "dfs.namenode.resource.du.reserved";
      // 元数据存储磁盘低于多少一个阈值默认值100M
      public static final long    DFS_NAMENODE_DU_RESERVED_DEFAULT = 1024 * 1024 * 100; // 100 MB
    

    根据上述参数设定和代码计算可知,当存储元数据磁盘目录小于100M时,Hadoop集群进入安全模式

原文地址:https://www.cnblogs.com/starzy/p/14401241.html