Hadoop异常Task发现分析

前言

Hadoop作为一个大型的分布式系统,当他的规模不断的扩大,扩增到一定程度的时候,所使用的业务方自热而然的也会变多,不同的业务方会提交各种各样类型的任务,有人提交hive的查询任务,有人会写MapReduce解析程序的job.于是这就慢慢产生了一个叫"多租户"的概念.多租户最简单直接的理解就是一个大的公共自行车场,被一波人共同使用,自行车被人借光了,你就不能使用了,你就得等.但是,当这个用户越来越多的时候,一个很棘手的问题就会发生,某些不良"用户"独占大部分资源,导致其他的用户根本无法正常使用工作.今天本文所讨论的问题就是这个主题.对于此类问题,一般有2种解决方案,一个是分析管理,人工分析,然后手动操作管理,手动更改配置限定一下个别用户的资源使用上限,第二种则是用户的资源隔离,每位用户都固定分配好多少多少资源,只能用这么多.论难度而言,后者比前者更有技术难度,因为要改核心代码,今天就分析前面1种,就是分析找出这些"大户".


Hadoop现有监控的不足

把上面这个问题对应到Hadoop系统中,就是找出哪个user用的container,memory,cpu-vcores最多.单纯从Hadoop自身提供的一些工具来看,这些其实对于我们的帮助不大,不管说是ResourceManager的后台页还是JobHistory的历史job信息展示页来看,这些信息详细程序还是有的,就是太散,缺乏一个汇总这些数据信息的地方.例如1个finished job,我想要知道他是不是异常的job,那么我当然得需要知道里面的task异常的多不多,于是我就得在页面上继续往里点.这个数量小一点尚可接受,但是对于集群规模1天可达数万个job的集群时,根本难以想象.所以我们的初步目标就是2点,1个是汇总一些数据.2分析汇总后的数据,并做一些处理并展示到页面上,达到最直观的效果.


JobHistory的Task分析

在Hadoop层面,要想做到细粒度层面的分析,task级别的分析是一个不错的切入点.而Task的数据都是存在与JobHistory的.jhist文件中.于是我们可以在JobHistory的job解析的层面做一些加工.首先要知道JobHistory的页面是怎么生成的,首先他是在hadoop-mapredce-client-hs工程下的webapp包下.如下图所示的位置:


然后下面的Hs打头的类就是负责显示页面数据的逻辑代码.这部分的代码逻辑大致相同,读者可自行研究学习.然后我们找到一个与Job信息显示相关的一个类,HsJobBlock,分析一下里面的代码:

...
/*
   * (non-Javadoc)
   * @see org.apache.hadoop.yarn.webapp.view.HtmlBlock#render(org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block)
   */
  @Override protected void render(Block html) {
    String jid = $(JOB_ID);
    if (jid.isEmpty()) {
      html.
        p()._("Sorry, can't do anything without a JobID.")._();
      return;
    }
    JobId jobID = MRApps.toJobID(jid);
    Job j = appContext.getJob(jobID);
    if (j == null) {
      html.
        p()._("Sorry, ", jid, " not found.")._();
      return;
    }
    List<AMInfo> amInfos = j.getAMInfos();
    JobInfo job = new JobInfo(j);
    ResponseInfo infoBlock = info("Job Overview").
        _("Job Name:", job.getName()).
        _("User Name:", job.getUserName()).
        _("Queue:", job.getQueueName()).
        _("State:", job.getState()).
        _("Uberized:", job.isUber()).
        _("Submitted:", new Date(job.getSubmitTime())).
        _("Started:", new Date(job.getStartTime())).
        _("Finished:", new Date(job.getFinishTime())).
        _("Elapsed:", StringUtils.formatTime(
            Times.elapsed(job.getStartTime(), job.getFinishTime(), false)));
...
大致意思就是获取页面上传入的jobid,然后从appContext对象中获取此id对应的job信息.appContext这个对象是一个父类,他的继承关系图如下:


从图中基本可以看出,他的实现类是JobHistory类对象.job信息从AppContext中获取完毕之后,他是如何获取更多的关于内部的task信息的呢,答案在下面这几行代码中.

...
JobId jobID = MRApps.toJobID(jid);
    Job j = appContext.getJob(jobID);
    if (j == null) {
      html.
        p()._("Sorry, ", jid, " not found.")._();
      return;
    }
    List<AMInfo> amInfos = j.getAMInfos();
    JobInfo job = new JobInfo(j);
    ResponseInfo infoBlock = info("Job Overview").
        _("Job Name:", job.getName()).
...
这一切都来自于中间的一个转换,最终得到JobInfo.JobInfo中保留了大量的关于job的信息变量,下面是其中的一部分.

@XmlRootElement(name = "job")
@XmlAccessorType(XmlAccessType.FIELD)
public class JobInfo {

  protected long submitTime;
  protected long startTime;
  protected long finishTime;
  protected String id;
  protected String name;
  protected String queue;
  protected String user;
  protected String state;
  protected int mapsTotal;
  protected int mapsCompleted;
  protected int reducesTotal;
  protected int reducesCompleted;
...
在构造函数中,会经过1步Task信息的load加载

public JobInfo(Job job) {
    this.id = MRApps.toString(job.getID());
    JobReport report = job.getReport();
    ....
    this.name = job.getName().toString();
    this.queue = job.getQueueName();
    this.user = job.getUserName();
    this.state = job.getState().toString();

    this.acls = new ArrayList<ConfEntryInfo>();
    
    if (job instanceof CompletedJob) {
      avgMapTime = 0l;
      avgReduceTime = 0l;
      avgShuffleTime = 0l;
      avgMergeTime = 0l;
      avgMapGcTime = 0L;
      avgMapElapsedTime = 0L;
      avgReduceElapsedTime = 0L;
      ...
      countTasksAndAttempts(job);
在countTaskAndAttemptes就会能拿到task的信息,然后做一些统计分析,jobInfo默认在这里只做了很浅的统计,只是一些平均运行时间和成功失败次数的统计.

/**
   * Go through a job and update the member variables with counts for
   * information to output in the page.
   *
   * @param job
   *          the job to get counts for.
   */
  private void countTasksAndAttempts(Job job) {
    numReduces = 0;
    numMaps = 0;
    final Map<TaskId, Task> tasks = job.getTasks();
    if (tasks == null) {
      return;
    }
    for (Task task : tasks.values()) {
      // Attempts counts
      Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
      int successful, failed, killed;
      for (TaskAttempt attempt : attempts.values()) {

        successful = 0;
        failed = 0;
        killed = 0;
        if (TaskAttemptStateUI.NEW.correspondsTo(attempt.getState())) {
          // Do Nothing
        } else if (TaskAttemptStateUI.RUNNING.correspondsTo(attempt.getState())) {
          // Do Nothing
        } else if (TaskAttemptStateUI.SUCCESSFUL.correspondsTo(attempt
            .getState())) {
          ++successful;
        } else if (TaskAttemptStateUI.FAILED.correspondsTo(attempt.getState())) {
          ++failed;
        } else if (TaskAttemptStateUI.KILLED.correspondsTo(attempt.getState())) {
          ++killed;
        }
...
前面铺垫了这么久,终于找到了一个切入点,就是这里.


自定义慢任务,异常任务筛选

我以目前我们公司对这方面的改造为1个例子,让大家真实的了解如何做到自定义的task筛选.分为2大主题,1个是slow taks,慢任务.第二个是error task,异常任务,这个主要针对的是task attempt.OK,下面仔细的描述下.

慢任务

在这里所指的慢任务当然不是仅仅指的是执行时间的长短,只要涉及到时间值的,其他的维度也都是OK的,比如我们做了GC时间慢的,还有1个很关键的是启动时间明显偏慢的,尤其是启动时间偏慢的任务,对于我们发现一些异常的问题将会非常有用.而对于这些慢的任务,我们会在页面上设置一个文本输入框,用户可以传入自己想要设置的时间阈值.然后在JobInfo中进行过滤处理.下面给出一个按照执行时间的慢任务筛选:

public Map<TaskId, Task> getElapsedSlowTasks(Job job, double ratio) {
    long tmpElapsedTime;
    double mapElapsedThresholdTime;
    double reduceElapsedThresholdTime;
    Map<TaskId, Task> slowTasks;
    
    slowTasks = new HashMap<TaskId, Task>();

    mapElapsedThresholdTime = avgMapElapsedTime * (1 + ratio);
    reduceElapsedThresholdTime = avgReduceElapsedTime * (1 + ratio);

    final Map<TaskId, Task> tasks = job.getTasks();
    if (tasks == null) {
      return slowTasks;
    }

    for (Task task : tasks.values()) {
      tmpElapsedTime =
          task.getReport().getFinishTime() - task.getReport().getStartTime();

      if (task.getType() == TaskType.MAP
          && tmpElapsedTime >= mapElapsedThresholdTime) {
        slowTasks.put(task.getID(), task);
      } else if (task.getType() == TaskType.REDUCE
          && tmpElapsedTime >= reduceElapsedThresholdTime) {
        slowTasks.put(task.getID(), task);
      }
    }

    return slowTasks;
  }
里面的许多平均值将会在前面的countTaskAndAttemptes()方法中计算好,这个方法会在前前台页面中被调用.

...
StringBuilder jobsTableData = new StringBuilder("[
");
	    for (Job j : appContext.getAllJobs().values()) {
	    	JobId jobId = j.getID();
	    	Job jb = appContext.getJob(jobId);
	    	JobInfo job = new JobInfo(jb);
	    	
	    	taskMap = null;
	    	slowRatio = Double.parseDouble($(SLOW_TASKS_RATIO));
	    	if($(SLOW_TASKS_TYPE).equals("elapsedtime")){
	    	  taskMap = job.getElapsedSlowTasks(jb, slowRatio);
	    	}else if($(SLOW_TASKS_TYPE).equals("gctime")){
	    	  taskMap = job.getGcSlowTasks(jb, slowRatio);
	    	}else if ($(SLOW_TASKS_TYPE).equals("slowstart")) {
	    	  taskMap = job.getSlowStartTasks(jb, slowRatio);
	    	}else if ($(SLOW_TASKS_TYPE).equals("readdatalean")) {
	    	  taskMap = job.getReadDataLeanTasks(jb, slowRatio);
        }else if ($(SLOW_TASKS_TYPE).equals("writedatalean")) {
          taskMap = job.getWriteDataLeanTasks(jb, slowRatio);
        }
...
然后就是task信息的展示了.


Error TaskAttempt

异常Task尝试的信息的过滤就比较简单一些,我们可以直接在前台页中进行简单判断,过滤掉状态为SUCCEED状态的记录,一般剩下的就会是KILLED和FAILED,里面会包含了note信息,这个对于帮助我们分析问题非常有用.

...
 for(Entry<TaskAttemptId, TaskAttempt> ta: taskAttempts.entrySet()){
	          taskAttempt = ta.getValue();
	          
	          if(taskAttempt.getState() == TaskAttemptState.SUCCEEDED){
	            continue;
	          }
	          
          if (type == TaskType.MAP) {
            mapTime = taskAttempt.getFinishTime() - taskAttempt.getLaunchTime();
            shuffleTime = 0;
            mergeTime = 0;
            reduceTime = 0;
          } else {
            mapTime = 0;
            shuffleTime =
                taskAttempt.getShuffleFinishTime()
                    - taskAttempt.getLaunchTime();
            mergeTime =
                taskAttempt.getSortFinishTime()
                    - taskAttempt.getShuffleFinishTime();
            reduceTime =
                taskAttempt.getFinishTime() - taskAttempt.getSortFinishTime();
          }
	          
	          jobsTableData.append("["")
	          .append(jobPrefixInfo)
	          .append(dateFormat.format(new Date(taskAttempt.getLaunchTime()))).append("","")
	          .append(dateFormat.format(new Date(taskAttempt.getFinishTime()))).append("","")
	          .append(taskAttempt.getID()).append("","")
	          .append(type).append("","")
	          .append(taskAttempt.getState()).append("","")
...

改造的JobHistory效果图展示

新增导航栏:


新增SlowTask执行时间等指标:


新增Error TaskAttempt任务尝试:


实现此逻辑的代码链接如下,大家可以仔细阅读这部分的代码,从如何把功能加到导航栏上再到设置链接地址,都在下面的代码链接中:

https://github.com/linyiqun/yarn-jobhistory-crawler/tree/master/slowTask


原文地址:https://www.cnblogs.com/bianqi/p/12183840.html