HDFS源码分析数据块校验之DataBlockScanner

 DataBlockScanner是运行在数据节点DataNode上的一个后台线程。它为所有的块池管理块扫描。针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描、校验数据块。当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新。

        我们先看下DataBlockScanner的成员变量,如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // 所属数据节点DataNode实例  
  2. private final DataNode datanode;  
  3. // 所属存储FsDatasetSpi实例  
  4. private final FsDatasetSpi<? extends FsVolumeSpi> dataset;  
  5. // 配置信息Configuration实例  
  6. private final Configuration conf;  
  7.   
  8. // 线程休眠周期,5s  
  9. static final int SLEEP_PERIOD_MS = 5 * 1000;  
  10.   
  11. /** 
  12.  * Map to find the BlockPoolScanner for a given block pool id. This is updated 
  13.  * when a BPOfferService becomes alive or dies. 
  14.  * 存储块池ID到对应BlockPoolScanner实例的映射。 
  15.  * 当一个BPOfferService服务变成活跃或死亡状态,blockPoolScannerMap将会随之更新。 
  16.  */  
  17. private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap =   
  18.   new TreeMap<String, BlockPoolSliceScanner>();  
  19.   
  20. // 数据块扫描线程  
  21. Thread blockScannerThread = null;  

        首先是由构造函数确定的三个成员变量:所属数据节点DataNode实例datanode、所属存储FsDatasetSpi实例dataset、配置信息Configuration实例conf,对应构造函数如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // 构造函数  
  2. DataBlockScanner(DataNode datanode,  
  3.     FsDatasetSpi<? extends FsVolumeSpi> dataset,  
  4.     Configuration conf) {  
  5.   this.datanode = datanode;  
  6.   this.dataset = dataset;  
  7.   this.conf = conf;  
  8. }  

        然后设定了一个静态变量,5s的线程休眠周期,即SLEEP_PERIOD_MS,另外两个重要的成员变量是:

       1、TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap

             存储块池ID到对应BlockPoolScanner实例的映射。当一个BPOfferService服务变成活跃或死亡状态,blockPoolScannerMap将会随之更新。

        2、Thread blockScannerThread

              数据块扫描线程。

        既然DataBlockScanner实现了Runnable接口,那么它肯定是作为一个线程在DataNode节点上运行的,我们看下DataNode是如何对其进行构造及启动的,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  /** 
  2.   * See {@link DataBlockScanner} 
  3.   */  
  4.  private synchronized void initDataBlockScanner(Configuration conf) {  
  5.      
  6. // 如果blockScanner不为null,直接返回  
  7. if (blockScanner != null) {  
  8.      return;  
  9.    }  
  10.   
  11. // 数据块校验功能无法开启的原因  
  12.    String reason = null;  
  13.    assert data != null;  
  14.      
  15.    // 如果参数dfs.datanode.scan.period.hours未配置,或者配置为0,说明数据块校验功能已关闭  
  16.    if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,  
  17.                    DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {  
  18.      reason = "verification is turned off by configuration";  
  19.        
  20.    // SimulatedFSDataset不支持数据块校验  
  21.    } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {  
  22.      reason = "verifcation is not supported by SimulatedFSDataset";  
  23.    }  
  24.      
  25.    // 如果数据块校验功能无法开启的原因为null,构造DataBlockScanner实例,并调用其start()方法启动该线程  
  26.    if (reason == null) {  
  27.      blockScanner = new DataBlockScanner(this, data, conf);  
  28.      blockScanner.start();  
  29.    } else {  
  30.       
  31.      // 否则在日志文件中记录周期性数据块校验扫描无法启用的原因  
  32.      LOG.info("Periodic Block Verification scan disabled because " + reason);  
  33.    }  
  34.  }  

        首先,如果blockScanner不为null,直接返回,说明之前已经初始化并启动了,然后,确定数据块校验功能无法开启的原因reason:

        1、如果参数dfs.datanode.scan.period.hours未配置,或者配置为0,说明数据块校验功能已关闭;

        2、SimulatedFSDataset不支持数据块校验;

        如果数据块校验功能无法开启的原因为null,构造DataBlockScanner实例,并调用其start()方法启动该线程,否则在日志文件中记录周期性数据块校验扫描无法启用的原因。

        DataBlockScanner线程启动的start()方法如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public void start() {  
  2.    
  3. / 基于DataBlockScanner实例创建一个线程blockScannerThread  
  4.   blockScannerThread = new Thread(this);  
  5.   // 将线程blockScannerThread设置为后台线程  
  6.   blockScannerThread.setDaemon(true);  
  7.   // 启动线程blockScannerThread  
  8.   blockScannerThread.start();  
  9. }  

        实际上它是基于DataBlockScanner实例创建一个线程blockScannerThread,将线程blockScannerThread设置为后台线程,然后启动线程blockScannerThread。

        DataBlockScanner线程已创建,并启动,那么我们看下它是如何工作的,接下来看下它的run()方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.   // 线程核心run()方法  
  2.   @Override  
  3.   public void run() {  
  4.         
  5.     // 当前块池ID,默认为空  
  6.     String currentBpId = "";  
  7.       
  8.     // 第一次运行标志,默认当然应该为true  
  9.     boolean firstRun = true;  
  10.       
  11.     // 如果所属数据节点DataNode实例datanode正常运行,且当前线程没有被中断  
  12.     while (datanode.shouldRun && !Thread.interrupted()) {  
  13.       //Sleep everytime except in the first iteration.  
  14.           
  15.       // 如果不是第一次运行,线程休眠5s  
  16.       if (!firstRun) {  
  17.         try {  
  18.           Thread.sleep(SLEEP_PERIOD_MS);  
  19.         } catch (InterruptedException ex) {  
  20.           // Interrupt itself again to set the interrupt status  
  21.               
  22.           // 如果发生InterruptedException异常,中断blockScannerThread线程,然后跳过,继续下一轮循环  
  23.           blockScannerThread.interrupt();  
  24.           continue;  
  25.         }  
  26.       } else {  
  27.         // 第一次运行时先将firstRun标志设置为false  
  28.         firstRun = false;  
  29.       }  
  30.         
  31.       // 获取下一个块池切片扫描器BlockPoolSliceScanner实例bpScanner  
  32.       BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);  
  33.         
  34.       // 如果bpScanner为null,跳过,继续下一轮循环  
  35.       if (bpScanner == null) {  
  36.         // Possible if thread is interrupted  
  37.         continue;  
  38.       }  
  39.         
  40.       // 设置当前块池ID,即currentBpId,从块池切片扫描器BlockPoolSliceScanner实例bpScanner中获取  
  41.       currentBpId = bpScanner.getBlockPoolId();  
  42.         
  43.       // If BPOfferService for this pool is not alive, don't process it  
  44.       // 如果当前块池对应的心跳服务BPOfferService不是活跃的,不对它进行处理,调用removeBlockPool()方法从blockPoolScannerMap中移除数据,  
  45.       // 并关闭对应BlockPoolSliceScanner,然后跳过,执行下一轮循环  
  46.       if (!datanode.isBPServiceAlive(currentBpId)) {  
  47.         LOG.warn("Block Pool " + currentBpId + " is not alive");  
  48.         // Remove in case BP service died abruptly without proper shutdown  
  49.         removeBlockPool(currentBpId);  
  50.         continue;  
  51.       }  
  52.         
  53.       // 调用块池切片扫描器BlockPoolSliceScanner实例bpScanner的scanBlockPoolSlice()方法,  
  54.       // 扫描对应块池里的数据块,进行数据块校验  
  55.       bpScanner.scanBlockPoolSlice();  
  56.     }  
  57.   
  58.     // Call shutdown for each allocated BlockPoolSliceScanner.  
  59.     // 退出循环后,遍历blockPoolScannerMap中的每个BlockPoolSliceScanner实例bpss,  
  60.     // 挨个调用对应shutdown()方法,停止块池切片扫描器BlockPoolSliceScanner  
  61.     for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {  
  62.       bpss.shutdown();  
  63.     }  
  64.   }  

        run()方法逻辑比较清晰,大体如下:

        1、首先初始化当前块池ID,即currentBpId,默认为空,再确定第一次运行标志firstRun,默认当然应该为true;

        2、接下来进入一个while循环,循环的条件是如果所属数据节点DataNode实例datanode正常运行,且当前线程没有被中断:

               2.1、处理第一次运行标志位firstRun:

                         2.1.1、如果不是第一次运行,线程休眠5s:即firstRun为false,这时如果发生InterruptedException异常,中断blockScannerThread线程,然后跳过,继续下一轮循环;

                         2.1.2、第一次运行时先将firstRun标志设置为false;

               2.2、获取下一个块池切片扫描器BlockPoolSliceScanner实例bpScanner,通过调用getNextBPScanner()方法,传入当前块池ID,即currentBpId来实现,首次循环,currentBpId为空,后续会传入之前处理的值,下面会对其进行更新;

               2.3、如果bpScanner为null,跳过,继续下一轮循环;

               2.4、设置当前块池ID,即currentBpId,从块池切片扫描器BlockPoolSliceScanner实例bpScanner中获取;

               2.5、如果当前块池对应的心跳服务BPOfferService不是活跃的,不对它进行处理,调用removeBlockPool()方法从blockPoolScannerMap中移除数据,并关闭对应BlockPoolSliceScanner,然后跳过,执行下一轮循环;

               2.6、调用块池切片扫描器BlockPoolSliceScanner实例bpScanner的scanBlockPoolSlice()方法,扫描对应块池里的数据块,进行数据块校验;

        3、退出循环后,遍历blockPoolScannerMap中的每个BlockPoolSliceScanner实例bpss,挨个调用对应shutdown()方法,停止块池切片扫描器BlockPoolSliceScanner。

        我们接下来看下比较重要的getNextBPScanner()方法,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. /** 
  2.  * Find next block pool id to scan. There should be only one current 
  3.  * verification log file. Find which block pool contains the current 
  4.  * verification log file and that is used as the starting block pool id. If no 
  5.  * current files are found start with first block-pool in the blockPoolSet. 
  6.  * However, if more than one current files are found, the one with latest  
  7.  * modification time is used to find the next block pool id. 
  8.  * 寻找下一个块池ID以进行scan。 
  9.  * 此时应该只有一个当前验证日志文件。 
  10.  */  
  11. private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {  
  12.     
  13.   String nextBpId = null;  
  14.     
  15.   // 如果所属数据节点DataNode实例datanode正常运行,且当前blockScannerThread线程没有被中断  
  16.   while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {  
  17.       
  18.     // 等待初始化  
  19.     waitForInit();  
  20.       
  21.     synchronized (this) {  
  22.         
  23.     // 当blockPoolScannerMap大小大于0,即存在BlockPoolSliceScanner实例时,做以下处理:  
  24.       if (getBlockPoolSetSize() > 0) {            
  25.         // Find nextBpId by the minimum of the last scan time  
  26.         // lastScanTime用于记录上次浏览时间  
  27.         long lastScanTime = 0;  
  28.           
  29.         // 遍历blockPoolScannerMap集合,取出每个块池ID,即bpid  
  30.         for (String bpid : blockPoolScannerMap.keySet()) {  
  31.             
  32.         // 根据块池ID,即bpid,取出其对应BlockPoolSliceScanner实例的上次浏览时间t  
  33.           final long t = getBPScanner(bpid).getLastScanTime();  
  34.             
  35.           // 如果t不为0,且如果块池ID为null,或者t小于lastScanTime,则将t赋值给lastScanTime,bpid赋值给nextBpId  
  36.           // 也就是计算最早的上次浏览时间lastScanTime,和对应块池ID,即nextBpId  
  37.           if (t != 0L) {  
  38.             if (bpid == null || t < lastScanTime) {  
  39.               lastScanTime =  t;  
  40.               nextBpId = bpid;  
  41.             }  
  42.           }  
  43.         }  
  44.           
  45.         // nextBpId can still be null if no current log is found,  
  46.         // find nextBpId sequentially.  
  47.           
  48.         // 如果对应块池ID,即nextBpId为null,则取比上次处理的块池currentBpId高的key作为nextBpId,  
  49.         // 如果还不能取出的话,那么取第一个块池ID,作为nextBpId  
  50.         if (nextBpId == null) {  
  51.           nextBpId = blockPoolScannerMap.higherKey(currentBpId);  
  52.           if (nextBpId == null) {  
  53.             nextBpId = blockPoolScannerMap.firstKey();  
  54.           }  
  55.         }  
  56.           
  57.         // 如果nextBpId不为空,那么从blockPoolScannerMap中获取其对应BlockPoolSliceScanner实例返回  
  58.         if (nextBpId != null) {  
  59.           return getBPScanner(nextBpId);  
  60.         }  
  61.       }  
  62.     }  
  63.       
  64.     // 记录warn日志,No block pool is up, going to wait,然后等待  
  65.     LOG.warn("No block pool is up, going to wait");  
  66.       
  67.     try {  
  68.     // 线程休眠5s  
  69.       Thread.sleep(5000);  
  70.     } catch (InterruptedException ex) {  
  71.       LOG.warn("Received exception: " + ex);  
  72.       blockScannerThread.interrupt();  
  73.       return null;  
  74.     }  
  75.   }  
  76.   return null;  
  77. }  

        它的主要作用就是寻找下一个块池ID以进行scan,其存在一个整体的while循环,循环的条件为如果所属数据节点DataNode实例datanode正常运行,且当前blockScannerThread线程没有被中断,循环内做以下处理:

        1、调用waitForInit()方法等待初始化;

        2、当前对象上使用synchronized进行同步,当blockPoolScannerMap大小大于0,即存在BlockPoolSliceScanner实例时,做以下处理:

               2.1、设定lastScanTime用于记录上次浏览时间,默认值为0;

               2.2、遍历blockPoolScannerMap集合,取出每个块池ID,即bpid,计算最早的上次浏览时间lastScanTime,和对应块池ID,即nextBpId:

                        2.2.1、根据块池ID,即bpid,取出其对应BlockPoolSliceScanner实例的上次浏览时间t;

                        2.2.2、如果t不为0,且如果块池ID为null,或者t小于lastScanTime,则将t赋值给lastScanTime,bpid赋值给nextBpId,也就是计算最早的上次浏览时间lastScanTime,和对应块池ID,即nextBpId;

               2.3、如果对应块池ID,即nextBpId为null,则取比上次处理的块池currentBpId高的key作为nextBpId,如果还不能取出的话,那么取第一个块池ID,作为nextBpId;

               2.4、如果nextBpId不为空,那么从blockPoolScannerMap中获取其对应BlockPoolSliceScanner实例返回;

        3、如果blockPoolScannerMap大小等于0,或者上述2找不到的话,记录warn日志,No block pool is up, going to wait,然后等待5s后继续下一轮循环;

        最后,实在找不到就返回null。

        可见,getNextBPScanner()方法优先选取最早处理过的块池,找不到的话再按照之前处理过的块池ID增长的顺序,找下一个块池ID,按照块池ID大小顺序到尾部的话,再折回取第一个。

        其中等待初始化的waitForInit()方法比较简单,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  // Wait for at least one block pool to be up  
  2.  private void waitForInit() {  
  3.     
  4. // 如果BlockPoolSliceScanner的个数小于数据节点所有BpOS个数,或者BlockPoolSliceScanner的个数小于1,一直等待  
  5. // BpOS你可以理解为DataNode上每个块池或命名空间对应的一个实例,它处理该命名空间到对应活跃或备份状态NameNode的心跳。  
  6.    while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)  
  7.        || (getBlockPoolSetSize() < 1)) {  
  8.      try {  
  9.         
  10.     // 线程休眠5s  
  11.        Thread.sleep(SLEEP_PERIOD_MS);  
  12.      } catch (InterruptedException e) {  
  13.         
  14.     // 如果发生InterruptedException异常,中断blockScannerThread线程,然后返回  
  15.        blockScannerThread.interrupt();  
  16.        return;  
  17.      }  
  18.    }  
  19.  }  

        它本质上是等所有块池都被上报至blockPoolScannerMap集合后,才认为已完成初始化,然后再挑选块池ID,否则线程休眠5s,继续等待。代码注释比较详细,这里不再赘述!

        获取到块池ID,并获取到其对应的块池切片扫描器BlockPoolSliceScanner实例bpScanner了,接下来就是调用bpScanner的scanBlockPoolSlice()方法,扫描该块池的数据块,并做数据块校验工作了。这方面的内容,请阅读《HDFS源码分析数据块校验之BlockPoolSliceScanner》一文,这里不再做介绍。

        到了这里,各位看官可能有个疑问,选取块池所依赖的blockPoolScannerMap集合中的数据是哪里来的呢?答案就在处理数据节点心跳的BPServiceActor线程中,在完成数据块汇报、处理来自名字节点NameNode的相关命令等操作后,有如下代码被执行:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // Now safe to start scanning the block pool.  
  2. // If it has already been started, this is a no-op.  
  3. // 现在可以安全地扫描块池,如果它已经启动,这是一个空操作。  
  4. if (dn.blockScanner != null) {  
  5.   dn.blockScanner.addBlockPool(bpos.getBlockPoolId());  
  6. }  

        很简单,数据节点汇报数据块给名字节点,并执行来自名字节点的相关命令后,就可以通过数据节点DataNode中成员变量blockScanner的addBlockPool()方法,添加块池,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  public synchronized void addBlockPool(String blockPoolId) {  
  2.      
  3. // 如果blockPoolScannerMap集合中存在块池blockPoolId,直接返回  
  4. if (blockPoolScannerMap.get(blockPoolId) != null) {  
  5.      return;  
  6.    }  
  7.   
  8. // 根据块池blockPoolId、数据节点datanode、存储dataset、配置信息conf等构造BlockPoolSliceScanner实例bpScanner  
  9.    BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,  
  10.        datanode, dataset, conf);  
  11.      
  12.    // 将块池blockPoolId与bpScanner的映射关系存储到blockPoolScannerMap中  
  13.    blockPoolScannerMap.put(blockPoolId, bpScanner);  
  14.      
  15.    // 记录日志信息  
  16.    LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="  
  17.        + blockPoolScannerMap.size());  
  18.  }  

        逻辑很简单,首先需要看看blockPoolScannerMap集合中是否存在块池blockPoolId,存在即返回,否则根据块池blockPoolId、数据节点datanode、存储dataset、配置信息conf等构造BlockPoolSliceScanner实例bpScanner,将块池blockPoolId与bpScanner的映射关系存储到blockPoolScannerMap中,最后记录日志信息。

        我们在上面也提到了如果当前块池对应的心跳服务BPOfferService不是活跃的,那么会调用removeBlockPool()方法,移除对应的块池,代码如下:

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public synchronized void removeBlockPool(String blockPoolId) {  
  2.    
  3. / 根据块池blockPoolId,从blockPoolScannerMap中移除数据,并得到对应BlockPoolSliceScanner实例bpss  
  4.   BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);  
  5.     
  6.   // 调用bpss的shutdown()方法,关闭bpss  
  7.   if (bpss != null) {  
  8.     bpss.shutdown();  
  9.   }  
  10.     
  11.   // 记录日志信息  
  12.   LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");  
  13. }  

        代码很简单,不再赘述。

        总结

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程,它负责管理所有块池的数据块扫描工作。当数据节点DataNode发送心跳给名字节点NameNode进行数据块汇报并执行完返回的命令时,会在DataBlockScanner的内部集合blockPoolScannerMap中注册块池ID与为此新创建的BlockPoolSliceScanner对象的关系,然后DataBlockScanner内部线程blockScannerThread周期性的挑选块池currentBpId,并获取块池切片扫描器BlockPoolSliceScanner实例bpScanner,继而调用其scanBlockPoolSlice()方法,扫描对应块池里的数据块,进行数据块校验。块池选择的主要依据就是优先选择扫描时间最早的,也就是自上次扫描以来最长时间没有进行扫描的,按照这一依据选择不成功的话,则默认按照块池ID递增的顺序循环选取块池。

原文地址:https://www.cnblogs.com/jirimutu01/p/5556285.html