HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

在《HDFS源码分析心跳汇报之数据结构初始化》一文中,我们了解到HDFS心跳相关的BlockPoolManager、BPOfferService、BPServiceActor三者之间的关系,并且知道最终HDFS的心跳是通过BPServiceActor线程实现的。那么,这个BPServiceActor线程到底是如何工作的呢?本文,我们将继续HDFS心跳分析之BPServiceActor工作线程运行流程。

        首先,我们先看下

        那么,BPServiceActor线程是通过什么样的流程来实现心跳的呢?我们来看下它正常工作的run()方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  /** 
  2.   * No matter what kind of exception we get, keep retrying to offerService(). 
  3.   * That's the loop that connects to the NameNode and provides basic DataNode 
  4.   * functionality. 
  5.   * 
  6.   * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can 
  7.   * happen either at shutdown or due to refreshNamenodes. 
  8.   */  
  9.  @Override  
  10.  public void run() {  
  11.     
  12. // 记录日志信息:starting to offer service  
  13.    LOG.info(this + " starting to offer service");  
  14.   
  15.    try {  
  16.       
  17.      // 在一个while循环内,完成连接NameNode并握手操作,即初始化  
  18.      while (true) {  
  19.        // init stuff  
  20.        try {  
  21.          // setup storage  
  22.          // 连接NameNode并握手  
  23.          connectToNNAndHandshake();  
  24.          break;  
  25.        } catch (IOException ioe) {  
  26.           
  27.          // 如果存在异常  
  28.           
  29.          // Initial handshake, storage recovery or registration failed  
  30.           
  31.          // 现将运行状态runningState设置为初始化失败INIT_FAILED  
  32.          runningState = RunningState.INIT_FAILED;  
  33.            
  34.          // 调用shouldRetryInit()方法判断初始化失败时是否可以重试  
  35.          if (shouldRetryInit()) {  
  36.             
  37.            // Retry until all namenode's of BPOS failed initialization  
  38.         // 记录error日志信息  
  39.            LOG.error("Initialization failed for " + this + " "  
  40.                + ioe.getLocalizedMessage());  
  41.              
  42.            // 线程休眠5s,并记录info日志信息,之后再进入循环重复执行之前的操作  
  43.            sleepAndLogInterrupts(5000, "initializing");  
  44.              
  45.          } else {  
  46.             
  47.         // 不允许重试的情况下,将运行状态runningState设置为失败FAILED,退出循环,并返回  
  48.            runningState = RunningState.FAILED;  
  49.            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);  
  50.            return;  
  51.          }  
  52.        }  
  53.      }  
  54.   
  55.      // 设置运行状态runningState为正在运行RUNNING  
  56.      runningState = RunningState.RUNNING;  
  57.   
  58.      // 进入另一个while循环,不停的调用offerService()方法,  
  59.      // 发送心跳给NameNode并接收来自NameNode,然后根据命令交给不同的组件去处理  
  60.      // 循环的条件就是该线程的标志位shouldServiceRun为true,且dataNode的shouldRun()返回true  
  61.      while (shouldRun()) {  
  62.        try {  
  63.          offerService();  
  64.        } catch (Exception ex) {  
  65.           
  66.          // 存在异常的话,记录error日志,并休眠5s  
  67.          LOG.error("Exception in BPOfferService for " + this, ex);  
  68.          sleepAndLogInterrupts(5000, "offering service");  
  69.        }  
  70.      }  
  71.        
  72.      // 设置运行状态runningState为已退出EXITED  
  73.      runningState = RunningState.EXITED;  
  74.        
  75.    } catch (Throwable ex) {  
  76.      LOG.warn("Unexpected exception in block pool " + this, ex);  
  77.      runningState = RunningState.FAILED;  
  78.    } finally {  
  79.      LOG.warn("Ending block pool service for: " + this);  
  80.        
  81.      // 清空,释放占用的资源  
  82.      cleanUp();  
  83.    }  
  84.  }  

        在run()方法的开始,也就是BPServiceActor线程刚启动时,会在一个while循环内,完成连接NameNode并握手操作,即初始化。这里,是通过调用connectToNNAndHandshake()方法完成与NameNode的连接并进行两次握手的。值得一提的是,如果出现了IOException异常,会先将运行状态runningState设置为初始化失败INIT_FAILED,然后调用shouldRetryInit()方法判断初始化失败时是否可以重试:

        1、如果可以重试的话,记录error日志信息,线程休眠5s,并记录info日志信息,之后再进入循环重复执行之前的操作;

        2、不允许重试的情况下,将运行状态runningState设置为失败FAILED,退出循环,并返回。

        接下来,设置运行状态runningState为正在运行RUNNING,进入另一个while循环,不停的调用offerService()方法,发送心跳给NameNode并接收来自NameNode的命令,然后根据命令交给不同的组件去处理,循环的条件就是该线程的标志位shouldServiceRun为true,且dataNode的shouldRun()返回true;

        而当循环过程中存在异常Exception的话,记录error日志,并休眠5s,然后继续循环,只有当shouldRun()方法返回false,才退出循环,设置运行状态runningState为已退出EXITED,最终调用cleanUp()方法释放占用的资源等。

        上述就是BPServiceActor线程正常工作进行周期性心跳的主流程。下面,我们针对其中的某些细节进行详细描述。        

        首先,完成与NameNode的连接并进行两次握手的connectToNNAndHandshake()方法,实现如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  /** 
  2.   * 连接NameNode并握手 
  3.   */  
  4.  private void connectToNNAndHandshake() throws IOException {  
  5.      
  6. // get NN proxy  
  7. // 利用DataNode实例dn的connectToNN()方法和NameNode地址nnAddr获得NameNode的代理bpNamenode  
  8.    bpNamenode = dn.connectToNN(nnAddr);  
  9.   
  10.    // First phase of the handshake with NN - get the namespace  
  11.    // info.  
  12.    // 与NameNode握手第一阶段:获取命名空间信息  
  13.    NamespaceInfo nsInfo = retrieveNamespaceInfo();  
  14.      
  15.    // Verify that this matches the other NN in this HA pair.  
  16.    // This also initializes our block pool in the DN if we are  
  17.    // the first NN connection for this BP.  
  18.    // 验证,并设置命名空间信息()  
  19.    bpos.verifyAndSetNamespaceInfo(nsInfo);  
  20.      
  21.    // Second phase of the handshake with the NN.  
  22.    // 与NameNode握手第二阶段,注册  
  23.    register();  
  24.  }  

        它的主要处理流程如下:

        1、利用DataNode实例dn的connectToNN()方法和NameNode地址nnAddr获得NameNode的代理bpNamenode
        2、与NameNode握手第一阶段:调用retrieveNamespaceInfo()方法获取命名空间信息nsInfo;

        3、通过bpos的verifyAndSetNamespaceInfo()方法进行验证,并设置命名空间信息nsInfo;

        4、与NameNode握手第二阶段,调用register()方法进行注册。

        接着,我们再看下实现周期性心跳的offerService()方法,代码如下:

原文地址:https://www.cnblogs.com/jirimutu01/p/5556212.html