HDFS的块Topology位置重分布

前言


最近一段时间笔者在工作过程中遇到了HDFS块Topology不准确导致的一系列潜在问题,需要重新更新DataNode的Topology位置,使其上的块数据保持有高可用性。以下是笔者对此想到的一些操作方案和需要注意的点,或许对同领域的小伙伴有所帮助。

DataNode/块 Topology不准确问题


这里我们说的Topology位置并不是指的是物理位置的概念,而是逻辑上的。意思是说在逻辑空间上,我们将各个DataNode进行位置划分。在HDFS的Topology中,它是一树型结构,Switch和Router是其中的父亲节点,叶子节点代表的是实际的DN节点。

在HDFS中,Topology的一个关键作用是用来保证数据的rack awareness属性的。通俗地理解,就是数据副本需要分布在不同的rack下,以此防止同一个rack崩溃导致数据不可用的情况。因此从这里,我们可以得出下面一个Topology位置和物理真实位置的一个关系:

不同逻辑rack位置对应的物理rack位置必然是物理隔离的,不同物理位置rack的节点它们的逻辑rack可能是相同的。

本小节标题所述的Topology不准确问题指的就是逻辑rack位置对应实际物理rack位置不完全隔离的问题。这种情况往往发生在集群管理员在初始设定集群Topology位置划分时没有考虑完全。当然,最简单的方案就是完全按照真实物理rack划分,逻辑rack名就按照物理rack名来。

数据Topology位置不准确不仅仅会带来数据丢失的可能性,还有其它方面的影响。比如当我们利用逻辑位置来减少机房跨楼层间的带宽使用,我们只保证3副本中的1个副本在不同的楼层,2个在同个楼层。如果此时按照楼层所划分的大location位置不准确的话,就可能引起大量的楼层间的数据传输。

因此笔者这里想重点强调的是,数据的placement在实际的运用场景中是十分重要的。

HDFS的块位置重分布


当我们发现自己的HDFS集群数据的Topology位置违背了rack awareness的原则时,我们需要尽快的纠正它,以免其在后续引发一系列的问题。

这里笔者简单聊聊在HDFS中,有哪些方式可以让block块按照新的Topology规则进行分布。

依赖现成Balancer工具做块迁移

首先是第一种方法,利用现有Balancer移动块的逻辑进行块的重分布。HDFS Balancer工具在除了用于正常的块数据平衡之外,它也有一定的数据打散重分布的作用。在数据重新move的过程中 ,Balancer会根据当前最新的Topology位置来进行placement policy的满足。于是乎,旧的Topology位置块就能够被挪至新Topology下了。

但是过度依赖于Balancer的块迁移动作往往覆盖不全所有的块,因为Balancer受限制于节点的存储使用空间进行源,目标块的选择。换句话说,如果一些节点存储使用空间都在平均值阈值附近,它上面的块就会很少得到被移动的机会了。

NameNode的failover行为触发块的重分布


HDFS在每次切换Active服务时,会做一次初始全量块的检查,里面检查的步骤包含以下几点:

  • 块副本数是否达到预期值,就是我们所说的mis-replica的block。如果replica少了,则发送一次replication请求。
  • 块副本数满足条件之后,再进行block placement的检查,是否满足rack awareness,意为是否副本分布于多个rack。如果全部位于一个rack下,则需要进行一次重分布。这里的重分布行为和Balancer就不一样了,它是通过发起一次replica replication请求,然后由系统再进行over replica清理老的replica来达到副本位置的更新的。

相关代码如下,首先是切换Active时的replica检查方法:

  /**
   * Start services required in active state
   * @throws IOException
   */
  void startActiveServices() throws IOException {
    startingActiveService = true;
    LOG.info("Starting services required for active state");
    writeLock();
    ...
        blockManager.setPostponeBlocksFromFuture(false);
        blockManager.getDatanodeManager().markAllDatanodesStale();
        blockManager.clearQueues();
        blockManager.processAllPendingDNMessages();

        // Only need to re-process the queue, If not in SafeMode.
        if (!isInSafeMode()) {
          LOG.info("Reprocessing replication and invalidation queues");
          // BlockManager初始化副本队列
          blockManager.initializeReplQueues();
        }
	...
}

这里进入BlockManager的initializeReplQueues方法:

  /**
   * Initialize replication queues.
   */
  public void initializeReplQueues() {
    LOG.info("initializing replication queues");
    processMisReplicatedBlocks();
    initializedReplQueues = true;
  }

可以看到,上面processMisReplicatedBlocks方法就是做miss副本块的检查的,此方法最终会进入到processMisReplicatedBlock方法:

  /**
   * A block needs reconstruction if the number of redundancies is less than
   * expected or if it does not have enough racks.
   */
  boolean isNeededReconstruction(BlockInfo storedBlock,
      NumberReplicas numberReplicas, int pending) {
    return storedBlock.isComplete() &&
        !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
  }

...

   // Check if the number of live + pending replicas satisfies
  // the expected redundancy.
  boolean hasEnoughEffectiveReplicas(BlockInfo block,
      NumberReplicas numReplicas, int pendingReplicaNum) {
    int required = getExpectedLiveRedundancyNum(block, numReplicas);
    int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
    return (numEffectiveReplicas >= required) &&
        (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
  }

当BlockManager检测出当前块placement不正确的时候,会返回一个UNDER_REPLICATE的处理结果同时将此block加入待复制块列表中。

...
    // add to low redundancy queue if need to be
    if (isNeededReconstruction(block, num)) {
      if (neededReconstruction.add(block, numCurrentReplica,
          num.readOnlyReplicas(), num.outOfServiceReplicas(),
          expectedRedundancy)) {
        return MisReplicationResult.UNDER_REPLICATED;
      }
    }
...

因为BlockManager在此过程中是对全量块做检查的,为了避免同时触发大量块复制所引发的性能问题,这里是将此过程拆分为小的iteration,每次iteration还会有睡眠间隔时间。

同时为了避免block副本队列初始化过程实际太长,此过程已改为异步进行,不会阻后续服务的初始化。综上所述,通过failover行为方式触发的block块重分布是一个可行且高效的办法,但是需要我们控制好相关速率参数,相关参数如下:

<property>
  <name>dfs.block.misreplication.processing.limit</name>
  <value>10000</value>
  <description>
    Maximum number of blocks to process for initializing replication queues.
  </description>
</property>

局部数据文件块位置迁移


上面小节介绍的是集群全量块级别的块Topology位置迁移,但在有的使用场景中,我们可能只需要对极个别文件目录或是具体一些DN节点上的数据做块检查迁移,以此缩短Topology变更带来的影响。

这个功能是在前段时间不久被合入的,HDFS-14053: Provide ability for NN to re-replicate based on topology changes.

这个JIRA的的本质核心还是利用了上文讲述的BlockManager的process miss replica的方法。不过此时它传入的不是全量块,而是收集了指定目录下的块列表,作为参数传入。此功能命令被集成进了fsck命令来方便用户的使用。

核心patch改动如下:

   }
@@ -683,6 +694,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
     StringBuilder report = new StringBuilder();
     int blockNumber = 0;
     final LocatedBlock lastBlock = blocks.getLastLocatedBlock();
+    List<BlockInfo> misReplicatedBlocks = new LinkedList<>();
     for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
       ExtendedBlock block = lBlk.getBlock();
       if (!blocks.isLastBlockComplete() && lastBlock != null &&
@@ -791,6 +803,9 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
         }
         out.println(" Replica placement policy is violated for " +
                     block + ". " + blockPlacementStatus.getErrorDescription());
+        if (doReplicate) {
+ 		   //如果需要replica重新replication,则收集fsck扫描的块结果
+          misReplicatedBlocks.add(storedBlock);
+        }
       }
 
       // count storage summary
@@ -888,6 +903,19 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
         out.print(report + "
");
       }
     }
+
+    if (doReplicate && !misReplicatedBlocks.isEmpty()) {
+      // 将收集好的列表传入BlockManager的processMisReplicatedBlocks的方法
+      int processedBlocks = this.blockManager.processMisReplicatedBlocks(
+              misReplicatedBlocks);
+      if (processedBlocks < misReplicatedBlocks.size()) {
+        LOG.warn("Fsck: Block manager is able to process only " +
+                processedBlocks +
+                " mis-replicated blocks (Total count : " +
+                misReplicatedBlocks.size() +
+                " ) for path " + path);
+      }
+      res.numBlocksQueuedForReplication += processedBlocks;
+    }
   }

如果是按照单台DN来的话,相似的道理,我们需要去收集目标DN上的所有块数据,g感兴趣尝试的同学可以参考BlockManager现有的移除节点的方法,下面的toRemove就是DN上的当前块汇总。

  /** Remove the blocks associated to the given datanode. */
  void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
    providedStorageMap.removeDatanode(node);
    for (DatanodeStorageInfo storage : node.getStorageInfos()) {
      final Iterator<BlockInfo> it = storage.getBlockIterator();
      //add the BlockInfos to a new collection as the
      //returned iterator is not modifiable.
      Collection<BlockInfo> toRemove = new ArrayList<>();
      while (it.hasNext()) {
        toRemove.add(it.next());
      }

      for (BlockInfo b : toRemove) {
        removeStoredBlock(b, node);
      }
    }
    // Remove all pending DN messages referencing this DN.
    pendingDNMessages.removeAllMessagesForDatanode(node);

    node.resetBlocks();
    invalidateBlocks.remove(node);
  }

以上就是笔者今天要阐述的所有内容了,希望对大家有所帮助。

引言


[1].https://issues.apache.org/jira/browse/HDFS-14053: Provide ability for NN to re-replicate based on topology changes

原文地址:https://www.cnblogs.com/bianqi/p/12183532.html