HDFS源码分析(六)-----租约

前言

在文章开始,先讲个大家都经历过的事--去图书馆借书,当然,喜欢阅读的朋友也许和我一样比较喜欢借书阅读,借书阅读方便是方便,但是唯一不好的地方在于他又期限,就是deadlline,之前在我们学校有规定,如果超期为归还的书不允许借阅另外的书籍,所以要想使自己能接到新的书,就必须先归怀超期的书籍。当然这个经历本身再寻常不过了,但是我想表达的是在HDFS分布式文件系统中的租约机制与此过程有着极强的吻合性,后面的归还书籍相当于租约恢复的操作,下面详细介绍一下什么是租约。


租约以及租约相关类

租约可简单理解为在短期时间内对于租约持有者也就是客户端一定的权限,例如写文件的凭证。在每次HDFS中进行块的添加,删除操作时候,都会进行租约的核查和更新,以此维护各个文件操作情况。一下列出租约的相关类:

1.LeaseManager--租约管理类,可以理解为是一个租约大管家,里面维护了多种映射关系的租约集合列表。

2.LeaseManager.Lease--租约实体类,就是租约具体的表现形式类。在下面会详细介绍,此类中的变量和方法。

3.FSNamesystem--名字系统类,因为在这个大杂烩的大类中会用到租约相关的方法,也就加入进来。


Lease

首先从小类开始分析,也就是租约类Lease,他是一个内部类,存在于LeaseManager中。对于租约,首先有明白这样一个概念,租约是凭证,对客户端写操作文件的一种凭证,首先肯定得包含租约持有者变量,其次有租约记录的操作文件列表,当然租约还需要有时间,来记录租约超时的情况,所以类的变量结构如下

/************************************************************
   * A Lease governs all the locks held by a single client.
   * For each client there's a corresponding lease, whose
   * timestamp is updated when the client periodically
   * checks in.  If the client dies and allows its lease to
   * expire, all the corresponding locks can be released.
   *************************************************************/
   //每条租约记录信息,只能被单一的客户端占有
  class Lease implements Comparable<Lease> {
    //租约信息客户持有者
    private final String holder;
    //租约最后更新时间
    private long lastUpdate;
    //此租约内所打开的文件,维护一个客户端打开的所有文件
    private final Collection<String> paths = new TreeSet<String>();
  
    /** Only LeaseManager object can create a lease */
    private Lease(String holder) {
      this.holder = holder;
      renew();
    }
.....
在这里,Lease类将客户端打开的所有文件维护在了paths类中,然后通过租约持有者的名字进行初始构造函数的构造。然后注意这里有一个renew()方法,他是做租约时间更新的

/** Only LeaseManager object can renew a lease */
    //根据租约最后的检测时间
    private void renew() {
      this.lastUpdate = FSNamesystem.now();
    }
OK,租约类暂时先了解到这里,跳到下一个租约管理者类LeaseManager.


LeaseManager

身为一个管理者,内部变量肯定会稍稍多一些

/**
 * LeaseManager does the lease housekeeping for writing on files.   
 * This class also provides useful static methods for lease recovery.
 * 
 * Lease Recovery Algorithm
 * 1) Namenode retrieves lease information
 * 2) For each file f in the lease, consider the last block b of f
 * 2.1) Get the datanodes which contains b
 * 2.2) Assign one of the datanodes as the primary datanode p

 * 2.3) p obtains a new generation stamp form the namenode
 * 2.4) p get the block info from each datanode
 * 2.5) p computes the minimum block length
 * 2.6) p updates the datanodes, which have a valid generation stamp,
 *      with the new generation stamp and the minimum block length 
 * 2.7) p acknowledges the namenode the update results

 * 2.8) Namenode updates the BlockInfo
 * 2.9) Namenode removes f from the lease
 *      and removes the lease once all files have been removed
 * 2.10) Namenode commit changes to edit log
 * 租约管理器,包含了与文件租约相关的许多方法
 */
public class LeaseManager {
  public static final Log LOG = LogFactory.getLog(LeaseManager.class);

  private final FSNamesystem fsnamesystem;
  
  //租约软超时时间
  private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;
  //租约硬超时时间
  private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;

  //
  // Used for handling lock-leases
  // Mapping: leaseHolder -> Lease
  //租约持有者到租约的映射图,保存在treeMap图中
  private SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
  // Set of: Lease
  //全部租约图
  private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();

  // 
  // Map path names to leases. It is protected by the sortedLeases lock.
  // The map stores pathnames in lexicographical order.
  //路径租约图映射关系
  private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
.....
从上往下看,首先是2个超时时间,软超时时间和硬超时时间,这2个超时时间分别运用在了不同的租约场景检测环境中,在后面会做分析。其次,管理者类在这里维护了3种租约映射关系对

1.租约持有者到所属租约

2.所有租约集合类

3.文件路径就是打开文件到租约的映射集合

初步分析,作者这么设计的目的是为了方便快速的找出目标租约,以便进行后续操作。这里用到了SortedMap也是为了超找的快速。毕竟如此庞大的分布式系统,租约记录将会非常多,因为实时的操作文件数数目也一定是非常多的。OK,下面再回到类,看看几个与租约操作相关的几个典型方法

/** @return the lease containing src */
  //根据路径获取租约
  public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
从其中一个集合中直接获取,这个很好理解。下面是一个添加新的租约记录方法

/**
   * Adds (or re-adds) the lease for the specified file.
   * 添加指定文件的租约信息
   */
  synchronized Lease addLease(String holder, String src) {
    //根据用户名获取其租约
    Lease lease = getLease(holder);
    if (lease == null) {
      //如果租约为空
      lease = new Lease(holder);
      //加入租约集合中
      leases.put(holder, lease);
      sortedLeases.add(lease);
    } else {
      //如果存在此用户的租约,则进行租约更新
      renewLease(lease);
    }
    //加入一条新的路径到租约的映射信息
    sortedLeasesByPath.put(src, lease);
    //在此租约路径映射信息中加入新路径
    lease.paths.add(src);
    return lease;
  }
在加入新的租约记录时,要同时同步相应集合的数据。对应的租约移除方法

/**
   * Remove the specified lease and src.
   * 移除值指定路径以及租约
   */
  synchronized void removeLease(Lease lease, String src) {
    //移动掉指定路径的映射信息
    sortedLeasesByPath.remove(src);
    //租约内部移除此路径
    if (!lease.removePath(src)) {
      LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
    }
    
    if (!lease.hasPath()) {
      //根据租约持有者移除指定租约
      leases.remove(lease.holder);
      if (!sortedLeases.remove(lease)) {
        LOG.error(lease + " not found in sortedLeases");
      }
    }
  }



实际租约样例

下面通过实际的操作文件方法,看看租约在这个过程中发挥的作用。比如在FSNamesystem的一个打开文件操作

/**
   * Create a new file entry in the namespace.
   * 
   * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
   * 
   * @throws IOException if file name is invalid
   *         {@link FSDirectory#isValidToCreate(String)}.
   * 命名系统打开一个新的文件
   */
  void startFile(String src, PermissionStatus permissions,
                 String holder, String clientMachine,
                 boolean overwrite, boolean createParent, short replication, long blockSize
                ) throws IOException {
    //调用startFileInternal方法
    startFileInternal(src, permissions, holder, clientMachine, overwrite, false,
                      createParent, replication, blockSize);
    getEditLog().logSync();
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      final HdfsFileStatus stat = dir.getFileInfo(src);
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "create", src, null, stat);
    }
  }
继续追踪

private synchronized void startFileInternal(String src,
                                              PermissionStatus permissions,
                                              String holder, 
                                              String clientMachine, 
                                              boolean overwrite,
                                              boolean append,
                                              boolean createParent,
                                              short replication,
                                              long blockSize
                                              ) throws IOException {
    if (NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
          + ", holder=" + holder
          + ", clientMachine=" + clientMachine
          + ", createParent=" + createParent
          + ", replication=" + replication
          + ", overwrite=" + overwrite
          + ", append=" + append);
    }

    if (isInSafeMode())
      throw new SafeModeException("Cannot create file" + src, safeMode);
    if (!DFSUtil.isValidName(src)) {
      throw new IOException("Invalid file name: " + src);
    }
.....

    if (!createParent) {
      verifyParentDir(src);
    }

    try {
      INode myFile = dir.getFileINode(src);
      //在这里进行租约的恢复操作
      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
     ....
因为重新操作了此文件,所以要进行租约的恢复操作

//租约恢复操作
  private void recoverLeaseInternal(INode fileInode, 
      String src, String holder, String clientMachine, boolean force)
  throws IOException {
    if (fileInode != null && fileInode.isUnderConstruction()) {
      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
      //
      // If the file is under construction , then it must be in our
      // leases. Find the appropriate lease record.
      //
      //根据客户端名称,取出其租约
      Lease lease = leaseManager.getLease(holder);
      //
      // We found the lease for this file. And surprisingly the original
      // holder is trying to recreate this file. This should never occur.
      //
      if (!force && lease != null) {
        //如果租约记录中已经存在此文件路径,不允许重复创建记录操作
        Lease leaseFile = leaseManager.getLeaseByPath(src);
        if (leaseFile != null && leaseFile.equals(lease)) { 
          throw new AlreadyBeingCreatedException(
                    "failed to create file " + src + " for " + holder +
                    " on client " + clientMachine + 
                    " because current leaseholder is trying to recreate file.");
        }
      }
      //
      // Find the original holder.
      //取出客户端的租约记录
      lease = leaseManager.getLease(pendingFile.clientName);
      if (lease == null) {
        throw new AlreadyBeingCreatedException(
                    "failed to create file " + src + " for " + holder +
                    " on client " + clientMachine + 
                    " because pendingCreates is non-null but no leases found.");
      }
      if (force) {
        // close now: no need to wait for soft lease expiration and 
        // close only the file src
        LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
                 " from client " + pendingFile.clientName);
        //如果设置了强制执行参数,直接进行租约恢复操作
        internalReleaseLeaseOne(lease, src);
      } else {
        //
        // If the original holder has not renewed in the last SOFTLIMIT 
        // period, then start lease recovery.
        //
        //如果没有设置,判断是否软超时,来进行租约恢复
        if (lease.expiredSoftLimit()) {
          LOG.info("startFile: recover lease " + lease + ", src=" + src +
              " from client " + pendingFile.clientName);
          internalReleaseLease(lease, src);
        }
        throw new AlreadyBeingCreatedException(
            "failed to create file " + src + " for " + holder +
            " on client " + clientMachine + 
            ", because this file is already being created by " +
            pendingFile.getClientName() + 
            " on " + pendingFile.getClientMachine());
      }
    }

  }
这里就用到了软超时时间。继续调用恢复租约操作

/**
   * This is invoked when a lease expires. On lease expiry, 
   * all the files that were written from that dfsclient should be
   * recovered.
   * 进行租约恢复操作
   */
  void internalReleaseLease(Lease lease, String src) throws IOException {
    if (lease.hasPath()) {
      // make a copy of the paths because internalReleaseLeaseOne removes
      // pathnames from the lease record.
      String[] leasePaths = new String[lease.getPaths().size()];
      lease.getPaths().toArray(leasePaths);
      for (String p: leasePaths) {
        internalReleaseLeaseOne(lease, p);
      }
    } else {
      internalReleaseLeaseOne(lease, src);
    }
  }
根据租约维护的打开文件列表一条条的恢复

/**
   * Move a file that is being written to be immutable.
   * @param src The filename
   * @param lease The lease for the client creating the file
   */
  void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
    assert Thread.holdsLock(this);

    LOG.info("Recovering lease=" + lease + ", src=" + src);

    INodeFile iFile = dir.getFileINode(src);
    if (iFile == null) {
      final String message = "DIR* NameSystem.internalReleaseCreate: "
        + "attempt to release a create lock on "
        + src + " file does not exist.";
      NameNode.stateChangeLog.warn(message);
      throw new IOException(message);
    }
    if (!iFile.isUnderConstruction()) {
      final String message = "DIR* NameSystem.internalReleaseCreate: "
        + "attempt to release a create lock on "
        + src + " but file is already closed.";
      NameNode.stateChangeLog.warn(message);
      throw new IOException(message);
    }

    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
    .....
    // start lease recovery of the last block for this file.
    pendingFile.assignPrimaryDatanode();
    //在末尾进行租约的重分配
    Lease reassignedLease = reassignLease(
      lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
    leaseManager.renewLease(reassignedLease);
  }
进行租约重分配,重分配操作很简单,就是先移除老租约,再添加新的租约,然后更新一下时间

/**
   * Reassign lease for file src to the new holder.
   * 租约重分配方法,等价于先移除后添加的方法
   */
  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
    assert newHolder != null : "new lease holder is null";
    if (lease != null) {
      removeLease(lease, src);
    }
    return addLease(newHolder, src);
  }

synchronized void renewLease(Lease lease) {
    if (lease != null) {
      //首先进行列表租约移除
      sortedLeases.remove(lease);
      //更新时间
      lease.renew();
      //再进行添加
      sortedLeases.add(lease);
    }
  }

当你要具体操作block块的时候,还会经历租约检测工作,比如下面abandon块操作的时候

/**
   * The client would like to let go of the given block
   */
  public synchronized boolean abandonBlock(Block b, String src, String holder
      ) throws IOException {
    //
    // Remove the block from the pending creates list
    //
    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                  +b+"of file "+src);
    if (isInSafeMode()) {
      throw new SafeModeException("Cannot abandon block " + b +
                                  " for fle" + src, safeMode);
    }
    //移除块操作时进行租约检查,如果出现不符号要求的时候会抛异常
    INodeFileUnderConstruction file = checkLease(src, holder);
    dir.removeBlock(src, file, b);
    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                    + b
                                    + " is removed from pendingCreates");
    return true;
  }
// make sure that we still have the lease on this file.
  private INodeFileUnderConstruction checkLease(String src, String holder) 
                                                      throws IOException {
    INodeFile file = dir.getFileINode(src);
    //继续调用同名方法
    checkLease(src, holder, file);
    return (INodeFileUnderConstruction)file;
  }
然后是下面的核心检查方法

//下面是租约检查的核心逻辑方法
  private void checkLease(String src, String holder, INode file) 
                                                     throws IOException {
    //如果正在操作的文件不存在,抛异常
    if (file == null || file.isDirectory()) {
      Lease lease = leaseManager.getLease(holder);
      throw new LeaseExpiredException("No lease on " + src +
                                      " File does not exist. " +
                                      (lease != null ? lease.toString() :
                                       "Holder " + holder + 
                                       " does not have any open files."));
    }
    
    //如果文件没有被打开,说明一定没有对应的租约记录存在,也抛异常
    if (!file.isUnderConstruction()) {
      Lease lease = leaseManager.getLease(holder);
      throw new LeaseExpiredException("No lease on " + src + 
                                      " File is not open for writing. " +
                                      (lease != null ? lease.toString() :
                                       "Holder " + holder + 
                                       " does not have any open files."));
    }

    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
    //判断文件所有者和客户端租约持有者是否一致
    if (holder != null && !pendingFile.getClientName().equals(holder)) {
      throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
          + pendingFile.getClientName() + " but is accessed by " + holder);
    }
  }
主要进行了租约持有者与文件所属者名称进行检查,还有文件打开状态的判断。通过抛出异常的方式阻止用户的操作。


租约监控

在租约管理器内部也有租约线程操作

//租约过期监控检测线程
  class Monitor implements Runnable {
    final String name = getClass().getSimpleName();

    /** Check leases periodically. */
    public void run() {
      for(; fsnamesystem.isRunning(); ) {
        synchronized(fsnamesystem) {
          //执行checkLeases方法
          checkLeases();
        }

        try {
          Thread.sleep(2000);
        } catch(InterruptedException ie) {
          if (LOG.isDebugEnabled()) {
            LOG.debug(name + " is interrupted", ie);
          }
        }
      }
    }
  }
会进行定期的租约检查操作,并对超时租约进行租约恢复操作

  /** Check the leases beginning from the oldest. */
  synchronized void checkLeases() {
    for(; sortedLeases.size() > 0; ) {
      //获取距离目前最晚的租约时间开始
      final Lease oldest = sortedLeases.first();
      //如果最晚的时间是否超过硬超时时间
      if (!oldest.expiredHardLimit()) {
        return;
      }

      //到了这步,说明已经发生租约超时
      LOG.info("Lease " + oldest + " has expired hard limit");

      final List<String> removing = new ArrayList<String>();
      // need to create a copy of the oldest lease paths, becuase 
      // internalReleaseLease() removes paths corresponding to empty files,
      // i.e. it needs to modify the collection being iterated over
      // causing ConcurrentModificationException
      //获取此租约管理的文件路径
      String[] leasePaths = new String[oldest.getPaths().size()];
      oldest.getPaths().toArray(leasePaths);
      for(String p : leasePaths) {
        try {
          //进行租约释放
          fsnamesystem.internalReleaseLeaseOne(oldest, p);
        } catch (IOException e) {
          // 如果是租约释放失败的情况加入移除列表中
          LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
          removing.add(p);
        }
      }
      
      //进行移除租约记录的remove操作
      for(String p : removing) {
        removeLease(oldest, p);
      }
    }
  }
每次从获取最晚的租约记录检测。

全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。


参考文献

《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等


原文地址:https://www.cnblogs.com/bianqi/p/12183875.html