HDFS nnTop统计功能

前言


在HDFS的使用过程中,有的时候集群维护者可能想要知道哪些用户使用他们集群的资源比较多,以此有一个全面的了解。在YARN中,衡量用户使用资源的一个指标是container数,而在HDFS中我们 可以用什么指标呢?答案是请求数。当然你可能会说,为什么不能用写入写出的总数据量作为指标呢?没错,这的确也是一个可选指标,但是显然它们不易收集以及统计,相比于请求数而言。那么问题又来了,是否我们有必要自己写程序做这样的一套分析功能呢?HDFS是否已经帮我们做了这样的事情呢?带着以上的几个问题,我们继续下文的阅读,答案将会一点点的揭晓。

HDFS 请求数Top统计的设计


先抛开前面所说的内容,如果单纯考虑HDFS请求数的Top统计功能,我们会采用什么样的办法呢?可能我们的实现思路会是这样的:

  • 首先,找到HDFS请求处理的一个“入口”,客户端的所有请求将会从这个“入口”经过。
  • 然后,在这个“入口”处,我们需要截获经过此“入口”的每个请求,解析此请求的类型,请求用户等信息,然后做metric统计。
  • 最后,做定时的metric统计信息的top排名查询,就实现了我们的Top统计功能。

在以上3步中,第一步的实现是最难的,因为这个“入口”不好找,在这里我列举了3种办法:

  • 第一种方案,因为要得到每次请求,我们可以分析editlog文件,editlog中的每条事务记录清楚地交待了请求操作的信息。
  • 第二种方案,分析editlog日志要自己写另外的解析程序,不够简便,而且editlog是存在于NameNode和JournalNode上的,如果这2类节点的地址变了,我的程序还要重新改变editlog文件地址,所以一种更优的方案是直接解析hdfs的audit日志,在hdfs-audit的日志中已经格式化输出每次请求的详细信息了。
  • 第三种方案,分析hdfs-audit文件毕竟还是要做进一步的文件解析工作,是否我们能在hdfs-audit数据记录产生的那个时刻做一个拦截,将这些数据统计到我们自定义的Metric中呢,这个显然会比第二种方案更好。

上面的第三种方案无疑是最佳的方案,重新回到刚刚前言中提过的一个问题,现有HDFS中是否已经实现这样的概念呢?答案是肯定的,在目前的Hadoop 2.7以及以上版本中已经实现了这样的功能,相关JIRA HDFS-6982(nntop: top­-like tool for name node users)。此JIRA的实现者是来自twitter的一个hadoop工程师,此特性已经在twitter内部用了几个月的时间,然后此工程师将其贡献到了社区。在HDFS-6982的设计中,还添加了Top统计值的获取方式,如下图所示:



图 1-1 HDFS nnTop的原理设计

从上面的设计中,我们可以看到,这里新定义了一个叫TopAuditLogger来做这样的拦截解析,将统计值存入TopMetrics中,然后通过web和jmx的方式进行Top用户数据的获取。在后面代码的实现分析中,大家会更能体会此架构的设计思路。在HDFS-6982中,将此HDFS Top请求数的用户统计简称为HDFS nnTop,所以在后续的文字描述中也将会以这样的简称来称呼。

HDFS nnTop细节设计


在本节中,我们将会聊聊HDFS nnTop细节上的一些实现。其中最大的一个问题就是Metric统计值的细节设计,我们到底以怎么样的形式来存这些统计值呢?因为不同的用户在不同时间段内访问的请求数是各不相同的,所以一个比较合适的办法是每次统计阶段时间内的Top user统计。然后对于这个区间时间,我们是可以根据配置进行配置的,5分钟,1分钟,或30s等等。基于这个核心设计思想,在HDFS-6982中,那位twitter的工程师使用了类似滑动窗口的机制,入下图所示:



图 1-2 RollingWindow的设计

比如上面显示的RollingWindow表示的整个区间是假设是最近1分钟,就是60s,然后其内部分成了6个bucket,每个bucket就是代表10s。然后根据动作的发生时间,计算出其中所在的bucket,在此bucket对象内进行计数累加。但是在这里得要提一点,bucket如果配的越多,代表统计的精度就会越准,同样内存的开销也将会加大,这里会有一个内存空间与准确度之间的博弈比较

这里的RollingWindow是对应到具体用户的具体请求动作了。所在HDFS中,你会看到针对不同用户,操作的许许多多的RollingWindow,它的总的结构组织图如:



图 1-3 TopMetric的设计

上面的结构图用一句话概括如下:

TopMetric根据不同period时间创建不同的RollingWindowManager,在每个RollingWindowManager中,又包含了数个不同操作请求类型的RollingWindowMap,最后在每个RollingWindowMap中,按照用户又划分出了许多个RolingWindow对象。

HDFS nnTop的核心代码实现


下面是我们非常关注的nnTop的代码实现部分,我尽量会挑选其中核心的代码进行分析,这样看起来会更加的简洁。

用户请求的拦截


我们首先来看请求数据的拦截获取,下面是TopMetric的定义,

public class TopMetrics {
  ...

  // RollingWindowManagers对象图,每个对象映射到一个周期时间
  final Map<Integer, RollingWindowManager> rollingWindowManagers =
      new HashMap<Integer, RollingWindowManager>();

  public TopMetrics(Configuration conf, int[] reportingPeriods) {
    logConf(conf);
    for (int i = 0; i < reportingPeriods.length; i++) {
      // 根据配置传入的period,创建不同的RollingWindowManager,并加入变量rollingWindowManagers中
      rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
          conf, reportingPeriods[i]));
    }
  }

然后TopMetric会在FSNamesystem的initAuditLoggers方法中被构造,

  private List<AuditLogger> initAuditLoggers(Configuration conf) {
    // Initialize the custom access loggers if configured.
    Collection<String> alClasses =
        conf.getTrimmedStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
    List<AuditLogger> auditLoggers = Lists.newArrayList();
    ...

    // Add audit logger to calculate top users
    // 判断是否开启nnTop统计功能
    if (topConf.isEnabled) {
      // 创建TopMetric对象进行nnTop的统计,传入配置的周期时间   
      topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
      // 新建TopAuditLogger对象进行audit log记录的拦截处理
      auditLoggers.add(new TopAuditLogger(topMetrics));
    }

    return Collections.unmodifiableList(auditLoggers);
  }

随后在logAuditEvent方法中,topAuditLogger会被调用到,

  private void logAuditEvent(boolean succeeded,
      UserGroupInformation ugi, InetAddress addr, String cmd, String src,
      String dst, HdfsFileStatus stat) {
    FileStatus status = null;
    ...
    final String ugiStr = ugi.toString();
    // 在每次的logAuditEvent方法中,会得到每次的请求事件,给到AuditLogger处理,包括之前的TopAuditLogger
    for (AuditLogger logger : auditLoggers) {
      if (logger instanceof HdfsAuditLogger) {
        HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
        hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst,
            status, CallerContext.getCurrent(), ugi, dtSecretManager);
      } else {
        logger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst, status);
      }
    }
  }

在这里topAuditLogger的logAuditEvent同样会被调用到,我们进入此方法,

  public void logAuditEvent(boolean succeeded, String userName,
      InetAddress addr, String cmd, String src, String dst, FileStatus status) {
    try {
      // 这里会调用topMetrics的report统计方法,进行计数的累加
      topMetrics.report(succeeded, userName, addr, cmd, src, dst, status);
    } catch (Throwable t) {
      LOG.error("An error occurred while reflecting the event in top service, "
          + "event: (cmd={},userName={})", cmd, userName);
    }
    ...
  }

用户请求数据的统计


每次的数据我们已经拿到了,然后我们怎么统计到之前的RollingWindow对象中呢?我们继续刚才的topMetric的report统计方法,

  public void report(long currTime, String userName, String cmd) {
    LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
    userName = UserGroupInformation.trimLoginMethod(userName);
    for (RollingWindowManager rollingWindowManager : rollingWindowManagers
        .values()) {
      // 遍历不同period的rollingWindowManager对象,传入当前时间,操作类型,用户,增加的计数值
      rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
      // 额外增加总数的统计类型
      rollingWindowManager.recordMetric(currTime,
          TopConf.ALL_CMDS, userName, 1);
    }
  }

Ok,上面的过程似乎与之前我们所设计的过程十分的吻合了,我们再次进入RollWindowManafer的方法,

  public void recordMetric(long time, String command,
      String user, long delta) {
    // 传入操作类型,用户名称,获取目标的RollingWindow对象
    RollingWindow window = getRollingWindow(command, user);
    // 在此rollingWindow中进行计数累加
    window.incAt(time, delta);
  }

继续进入RollingWindow的incr方法,

  public void incAt(long time, long delta) {
    // 计算当前时间对应的bucket下标
    int bi = computeBucketIndex(time);
    Bucket bucket = buckets[bi];
    // 如果此时间已经超过此bucket上次更新时间的周期范围,进行计数重置,
    // 表明此bucket是上一周期内的bucket。
    if (bucket.isStaleNow(time)) {
      bucket.safeReset(time);
    }
    // 增加bucket上的计数统计,此bucket内部用的是AtomicLong数据类型,所以不会有线程安全的问题
    bucket.inc(delta);
  }

上面的统计方法,其实是个滑动技术统计的过程,老的超时的bucket会被重新统计,然后又从0开始,相当于是往后挪了一个窗口,以此保证了最近周期内的统计,所以前面我们说bucket越多,统计的精度将会越准,bucket所控制的时间范围会越短,重置bucket的影响会变小

nnTop数据的获取


前面数据的拦截,统计都完成了,那么我们有什么样的方式拿到呢?这里以jmx的获取方式为例,获取的代码如下,

  public String getTopUserOpCounts() {
    if (!topConf.isEnabled) {
      return null;
    }

    Date now = new Date();
    // 从topMetric对象中获取top user返回结果
    final List<RollingWindowManager.TopWindow> topWindows =
        topMetrics.getTopWindows();
    Map<String, Object> topMap = new TreeMap<String, Object>();
    topMap.put("windows", topWindows);
    topMap.put("timestamp", DFSUtil.dateToIso8601String(now));
    try {
      // 将Top统计结果转为json字符返回
      return JsonUtil.toJsonString(topMap);
    } catch (IOException e) {
      LOG.warn("Failed to fetch TopUser metrics", e);
    }
    return null;
  }

上面的代码意味着通过http://nn-address:50070/jmx的方式能够拿到top user的结果。本人在测试集群中进行测试,拿到的结果如下:

NNTop:
    "NumStaleStorages" : 0,
    "TopUserOpCounts" : "{"timestamp":"2016-09-11T16:31:27+0800","windows":[{"windowLenMs":300000,"ops":
    [{"opType":"delete","topUsers":[{"user":"root","count":3}],"totalCount":3},
    {"opType":"setTimes","topUsers":[{"user":"data","count":2}],"totalCount":2},
    {"opType":"open","topUsers":[{"user":"root","count":22230},{"user":"data","count":4766}],"totalCount":26996},
    {"opType":"create","topUsers":[{"user":"root","count":518},{"user":"data","count":5}],"totalCount":523},
    {"opType":"setPermission","topUsers":[{"user":"root","count":15}],"totalCount":15},
    {"opType":"*","topUsers":[{"user":"root","count":50134},{"user":"data","count":21943},{"user":"bc","count":20}],"totalCount":72097},
    {"opType":"rename","topUsers":[{"user":"root","count":285},{"user":"data","count":2}],"totalCount":287},
    {"opType":"mkdirs","topUsers":[{"user":"root","count":8},{"user":"bc","count":4}],"totalCount":12},
    {"opType":"setReplication","topUsers":[{"user":"root","count":212}],"totalCount":212},
    {"opType":"listStatus","topUsers":[{"user":"data","count":12400},{"user":"root","count":102}],"totalCount":12502},
    {"opType":"getfileinfo","topUsers":[{"user":"root","count":25929},{"user":"data","count":6362},{"user":"bc","count":16}],"totalCount":32307}]},
    {"windowLenMs":1500000,"ops":
    [{"opType":"delete","topUsers":...}"
  }, {

相信上面的统计结果大家都能直接看得懂,大家可以通过拿到jmx的数据,自行解析成object对象即可拿来用了。

总结


OK,HDFS nnTop用户Top请求数统计功能就是如上所描述的,此功能默认是开启的,由配置项dfs.namenode.top.enabled所控制,而且里面top用户的数量以及统计的周期时间都是可配的,大家可以好好把此功能给用起来了。

参考资料


[1].https://issues.apache.org/jira/browse/HDFS-6982
[2].https://issues.apache.org/jira/secure/attachment/12665990/nntop-design-v1.pdf

原文地址:https://www.cnblogs.com/bianqi/p/12183728.html