HDFS HA支持多Standby节点机制

前言


在现有的HDFS中,为了保证其高可用性,社区在早些年就已经完成HDFS的HA机制,也就是One Active,One Standby。在此种情况下,HDFS能够容忍其中一个节点出现失败的情况。这套HA机制的实现的确给用户带来了很大的帮助,基于此特性,我们可以做很多集群上的热操作,比如热迁移NameNode,或者滚动升级HDFS等等。可能唯一让人感觉还不是最好的一点是,它不能容忍更多失败的情况,比如2个NameNode都发生失败的情况。在其他的一些分布式系统中,例如Zookeeper,它的内部就可以容忍其中2节点出现崩溃的情况,当它启动了5个节点的时候。在HDFS内部的block副本设计上,也是保证了3副本的设计理念,同样可以容忍2个副本损坏的情况。所以我们不禁开始联想,在HDFS中什么时候也能容忍更多的出错情况?更具体地说,就是在只有一个Active NameNode情况下,同时有多个Standby NameNode。这样的话,HDFS的HA特性看上去就非常的强大了。本文我们就来好好聊聊这个话题。

HDFS多Standby节点机制概述


前面的铺垫内容说了这么多,那么到底目前是否已经有多Standby节点的实现机制呢?答案是有的,但是它还没有发布,目标发布版本Hadoop 3.0.又是在3.0版本,之前本人介绍了许多很棒的特性都是在这个版本发布的(比如HDFS EC),大家敬请期待这个版本吧。社区JIRA HDFS-6440(Support more than 2 NameNodes)最终实现了HA中支持多Standby的特性。本文是我阅读完此JIRA上的设计文档以及代码实现后所写的总结性文章,更多设计细节可以查看原文档。

在之前HDFS的HA的设计实现中,其实已经帮我们实现了许多在未来可能有多Standby节点出现的情况。所以在这里,我们只需要在原来One Active,One Standby完善的机制下,做局部的修改,来满足多Standby的情况即可。以下为几个需要修改的点:

  • Zkfc的Active选举,此时不是只有另外一个可选节点,而是很多个Standby节点。
  • Checkpoint过程以及Active NameNode上的Fsimage同步问题,之前都是一个Standby NameNode定期发给Active NameNode,这个时候有多个Standby,怎么办。
  • Bootstrap过程。之前都是向另外一个Active NameNode进行bootstrap,而现在有多个节点。
  • Block token id的生成。

主要为以上4点,其中第2点最为重要,因为涉及到元数据的更新同步,逻辑也作为复杂。在下一小节中,我们将会针对这4点做详细的分析。

多Standby节点细节实现


此小节将会针对以上提出的4点展开分析,下面首先是zkfc相关的改造。

Zkfc的选举


与原先的HA机制相比,多Standby的情况会造成锁竞争的加剧,因为每个Standby节点上的zkfc进程都要尝试获取锁,然后才会将自己的状态切到Active。所以在此建议的Standby数量不宜过多,3~5个足够了。还有当进行手动切换的时候,这个时候要保证其他节点此时不发生切换动作。

Checkpoint元数据同步过程


先来回顾一下原先HA机制的元数据同步过程:

Standby节点周期性的读取JournalNode上的editlog,等到了一次checkpoint周期,然后做一次checkpoint,然后将新的fsimage同步到Active节点。

在这个如果是多个Standby节点的情况,这个处理过程就没有那么简单了,下面几个是主要要解决的问题:

  • 这么多个Standby节点,每个节点上都有自己的fsimage,该选哪个作为最终上传镜像文件的节点呢?
    答:选择元数据最新的Standby,评判标准是看当前最新的txid。
  • 如果Active节点当前已经同步了最新fsimage,而Standby节点又将稍老的fsimage同步过去,怎么办?
    答:Active节点会进行比较,如果的确是老的fsimage,会给出失败的回复应答。

以上两点在后面代码实现的部分会有具体的体现。

Bootstrap过程


我们知道bootstrap的用处一般是在集群开始搭建时,将Active上的fsimage等元数据同步到当前的节点上,然后启动当前节点。而在当前多Standby节点的变化是,由向原来另外一个Active获取元数据变为同时向多个其他节点抓取元数据,直到有一个节点能抓取到元数据为止。

Block token id的构造


在block token id的生成中,会根据当前NameNode index下标来生成serialNo序列号数字,然后将此数字应用到token id的生成。生成代码如下:

   public synchronized void setSerialNo(int serialNo) {
     this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31); 
   }

但是原先的处理逻辑,只适用于2个NameNode的情况,也就是下标0和1的情况。在多个Standby出现的情况,NameNode的下标就有可能出现2,3,4等情况。因此此逻辑也需要进行修改。具体改动可见HDFS-6440上的设计文档。

多Standby情况下的Checkpoint同步


因为在多Standby情况下的checkpoint,fsmage同步过程最为复杂,此节我们从源代码实现层面来学习一下其中的过程,主要涉及以下2个类的改造:

  • StandbyCheckpointer:Standby NameNode上专门控制做checkpoint以及上传fsimage到Active NameNode的服务。
  • ImageServlet:NameNode服务请求处理类,里面包含了fsimage上传请求的处理逻辑。

我们首先进入StandbyCheckpointer类,

  public StandbyCheckpointer(Configuration conf, FSNamesystem ns)
      throws IOException {
    this.namesystem = ns;
    this.conf = conf;
    // checkpoint配置类初始化
    this.checkpointConf = new CheckpointConf(conf);
    // 定期checkpoint线程初始化
    this.thread = new CheckpointerThread();
    this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true)
        .setNameFormat("TransferFsImageUpload-%d").build();
    // active结点地址初始化
    setNameNodeAddresses(conf);
  }

这里会有active节点地址的初始化,

  private void setNameNodeAddresses(Configuration conf) throws IOException {
    // Look up our own address.
    myNNAddress = getHttpAddress(conf);

    // 获取其他NameNode节点配置,作为可能的Active NameNode
    List<Configuration> confForActive = HAUtil.getConfForOtherNodes(conf);
    activeNNAddresses = new ArrayList<URL>(confForActive.size());
    for (Configuration activeConf : confForActive) {
      URL activeNNAddress = getHttpAddress(activeConf);

      // sanity check each possible active NN
      Preconditions.checkArgument(checkAddress(activeNNAddress),
          "Bad address for active NN: %s", activeNNAddress);
      // 将此地址作为active的地址
      activeNNAddresses.add(activeNNAddress);
    }

    ...
  }

其实从这里可以看出一点:Standby节点其实并不知道哪个是当前真正的Active NameNode

接下来进入checkpoint的线程服务内的doWork工作方法,

    private void doWork() {
      // 获取checkpoint动作的执行周期时间,默认1小时
      final long checkPeriod = 1000 * checkpointConf.getCheckPeriod();
      // 重置checkpoint时间,以及最近上传时间
      lastCheckpointTime = monotonicNow();
      lastUploadTime = monotonicNow();
      while (shouldRun) {
        boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
        if (!needRollbackCheckpoint) {
          try {
            // 进行checkpoint周期时间睡眠
            Thread.sleep(checkPeriod);
          } catch (InterruptedException ie) {
          }
          if (!shouldRun) {
            break;
          }
        }
        // 这里开始准备checkpoint
        ...

我们继续来看后面执行的方法,

          final long now = monotonicNow();
          // 获取未checkpoint的tx事务数
          final long uncheckpointed = countUncheckpointedTxns();
          // 计算距离上次未更新checkpoint的时间
          final long secsSinceLast = (now - lastCheckpointTime) / 1000;

          // if we need a rollback checkpoint, always attempt to checkpoint
          boolean needCheckpoint = needRollbackCheckpoint;

          if (needCheckpoint) {
            LOG.info("Triggering a rollback fsimage for rolling upgrade.");
          } else if (uncheckpointed >= checkpointConf.getTxnCount()) {
            // 如果当前未checkpoint的事务数已经超过默认值,就是100w,则也需要进行一次checkpoint
            LOG.info("Triggering checkpoint because there have been " + 
                uncheckpointed + " txns since the last checkpoint, which " +
                "exceeds the configured threshold " +
                checkpointConf.getTxnCount());
            needCheckpoint = true;
          } else if (secsSinceLast >= checkpointConf.getPeriod()) {
            // 如果未更新时间已超出了周期时间,就是1小时,则需要进行一次checkpoint操作
            LOG.info("Triggering checkpoint because it has been " +
                secsSinceLast + " seconds since the last checkpoint, which " +
                "exceeds the configured interval " + checkpointConf.getPeriod());
            needCheckpoint = true;
          }

以上代码表明了一个Standby节点做一次checkpoint需要达到的2个条件(满足一个条件即可):

  • 未做checkpoint的tx事务数超过100w
  • 超过1小时的checkpoint周期

我们继续看下面的执行过程,

            ...
            // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
            // rollback request, are the checkpointer, are outside the quiet period.
            final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
            // sendRequest表示是否要发送新的fsimage给Active NameNode,需满足以下2个条件:
            // 1.isPrimaryCheckPointer标记为true,也就是上次已经发送过fsimage给Active NameNode
            // 2.距离上次发送fsimage给Active NameNode的时间已经超过了Standby专门发送Active NameNode的时间,
            //   默认1.5倍的checkpoint的周期时间
            boolean sendRequest = isPrimaryCheckPointer
                || secsSinceLastUpload >= checkpointConf.getQuietPeriod();
            // 执行checkpoint动作
            doCheckpoint(sendRequest);
            ...

在上述过程中,会判断是否要将checkpoint过后的fsimage传到Active NameNode上。然后我们继续进入doCheckpoint方法内部,

  private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException {
    assert canceler != null;
    final long txid;
    final NameNodeFile imageType;
    // 这里开始准备进行checkpoint操作
    namesystem.cpLockInterruptibly();
    ...

    //如果不需要进发送fsimage动作,则在这里会直接结束
    if(!sendCheckpoint){
      return;
    }

    // 新建线程池用来执行上传fsimage的动作
    ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
        uploadThreadFactory);
    List<Future<TransferFsImage.TransferResult>> uploads =
        new ArrayList<Future<TransferFsImage.TransferResult>>();
    // 遍历潜在的Active NameNode地址,添加上传fsimage的请求、
    for (final URL activeNNAddress : activeNNAddresses) {
      Future<TransferFsImage.TransferResult> upload =
          executor.submit(new Callable<TransferFsImage.TransferResult>() {
            @Override
            public TransferFsImage.TransferResult call() throws IOException {
              return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem
                  .getFSImage().getStorage(), imageType, txid, canceler);
            }
          });
      uploads.add(upload);
    }
    ...
    for (; i < uploads.size(); i++) {

      Future<TransferFsImage.TransferResult> upload = uploads.get(i);
      try {
        // 获取上传请求结果,如果成功了,则直接退出,无须获取下个请求的处理结果
        if (upload.get() == TransferFsImage.TransferResult.SUCCESS) {
          success = true;
          break;
        }

      } catch (ExecutionException e) {
      ...
    }
        // 重新设置上次上传成功时间
    lastUploadTime = monotonicNow();

    // 记录此次上传结果,表明此Standby是当前领先的checkpointer节点
    this.isPrimaryCheckPointer = success;
    ...
  }

Ok,以上就是StandbyCheckpointer内部的相关执行逻辑。在这里upload请求倒是发出去了,那么后面是怎么被处理的呢?接下来我们就来 ImageServlet内部的请求处理逻辑。

我们进入ImafeServlet的doPut处理方法,因为我们是上传文件的请求,不是Get,

  protected void doPut(final HttpServletRequest request,
      final HttpServletResponse response) throws ServletException, IOException {
    try {
      ServletContext context = getServletContext();
      final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
      final Configuration conf = (Configuration) getServletContext()
          .getAttribute(JspHelper.CURRENT_CONF);
      final PutImageParams parsedParams = new PutImageParams(request, response,
          conf);
      final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
      // 验证请求信息
      validateRequest(context, conf, request, response, nnImage,
          parsedParams.getStorageInfoString());

      UserGroupInformation.getCurrentUser().doAs(
          new PrivilegedExceptionAction<Void>() {

            @Override
            public Void run() throws Exception {
              // 获取当前节点的服务状态
              HAServiceProtocol.HAServiceState state = NameNodeHttpServer
                  .getNameNodeStateFromContext(getServletContext());
              // 如果当前处理的节点不是Active节点,说明请求发送到错误的目标节点上了
              if (state != HAServiceProtocol.HAServiceState.ACTIVE) {
                // 在此给出错误的回复码以及出错信息
                response.sendError(HttpServletResponse.SC_EXPECTATION_FAILED,
                    "Nameode "+request.getLocalAddr()+" is currently not in a state which can "
                        + "accept uploads of new fsimages. State: "+state);
                return null;
              }
              ...

因为之前Standby节点并不知道当前具体的Active NameNode,所以采用的是一种轮询遍历的方式,这样的话同为Standby的其他节点也会处理到。如果当前处理节点的确是Active NameNode,还会进行如下2个判断逻辑,

...
              // 通过tailSet方法,来比较是否此请求是来自于更旧的事务
              SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);
              // 如果是,则给出错误回复,表明当前已经在处理更新的fsimage文件
              if (larger.size() > 0) {
                response.sendError(HttpServletResponse.SC_CONFLICT,
                    "Another checkpointer is already in the process of uploading a" +
                        " checkpoint made up to transaction ID " + larger.last());
                return null;
              }

              // 保证当前只处理一份请求,不处理重复的请求
              if (!currentlyDownloadingCheckpoints.add(imageRequest)) {
                response.sendError(HttpServletResponse.SC_CONFLICT,
                    "Either current namenode is checkpointing or another"
                        + " checkpointer is already in the process of "
                        + "uploading a checkpoint made at transaction ID "
                        + txid);
                return null;
              }
...

前面判断如果都没问题,最后会进行fsimage的下载动作,Active NameNode会从目标Standby NameNode上download文件,


                InputStream stream = request.getInputStream();
                try {
                  long start = monotonicNow();
                  // 此处进行镜像文件下载操作
                  MD5Hash downloadImageDigest = TransferFsImage
                      .handleUploadImageRequest(request, txid,
                          nnImage.getStorage(), stream,
                          parsedParams.getFileSize(), getThrottler(conf));
                  nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
                      downloadImageDigest);
                  ...
                } finally {
                  // 下载完成后移除镜像文件的请求
                  currentlyDownloadingCheckpoints.remove(imageRequest);

                  stream.close();
                }

具体上传的细节处理大家可以查阅TransferFsImage类里的代码。

总结


所以总的来看,HDFS-6440支持多Standby特性更多的是一些适配的改造,而不是对原先HA机制的大改。但是依然不可否认,这个特性要做的周边工作还是很多的,比如对应的unit test的构造,这些工作量也是很大的。最后一个小小的建议,配置多Standby的时候,建议数量不宜过多,3~5个足够了,2点原因:第一,zkfc切换选取Active时锁竞争的问题;第二,这些Standby节点同时tail editlog时造成的JournalNode带宽使用上升问题。

参考资料


[1].https://issues.apache.org/jira/browse/HDFS-6440
[2].https://issues.apache.org/jira/secure/attachment/12677453/Multiple-Standby-NameNodes_V1.pdf

原文地址:https://www.cnblogs.com/bianqi/p/12183726.html