Hadoop之猜測运行

近期在測试环境跑任务,有一部分任务出现例如以下情况:


猜測执行(Speculative Execution)是指在集群环境下执行MapReduce,可能是程序Bug,负载不均或者其它的一些问题,导致在一个JOB下的多个TASK速度不一致。比方有的任务已经完毕,可是有些任务可能仅仅跑了10%,依据木桶原理。这些任务将成为整个JOB的短板,假设集群启动了猜測执行,这时为了最大限度的提高短板,Hadoop会为该task启动备份任务,让speculative task与原始task同一时候处理一份数据,哪个先执行完。则将谁的结果作为终于结果,而且在执行完毕后Kill掉另外一个任务。



猜測运行(Speculative Execution)是通过利用很多其它的资源来换取时间的一种优化策略。可是在资源非常紧张的情况下,猜測运行也不一定能带来时间上的优化,假设在測试环境中,DataNode总的内存空间是40G。每一个Task可申请的内存设置为1G,如今有一个任务的输入数据为5G。HDFS分片为128M。这样Map Task的个数就40个,基本占满了全部的DataNode节点,假设还由于每些Map Task运行过慢,启动了Speculative Task,这样就可能会影响到Reduce Task的运行了,影响了Reduce的运行,自然而然就使整个JOB的运行时间延长。

所以是否启用猜測运行,假设能依据资源情况来决定,假设在资源本身就不够的情况下,还要跑猜測运行的任务,这样会导致兴许启动的任务无法获取到资源。以导致无法运行。

默认的猜測运行器是:org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator,假设要改变猜測运行的策略。能够依照这个类重写,继承org.apache.hadoop.service.AbstractService。实现org.apache.hadoop.mapreduce.v2.app.speculate.Speculator接口。

DefaultSpeculator构造方法:

  public DefaultSpeculator
      (Configuration conf, AppContext context,
       TaskRuntimeEstimator estimator, Clock clock) {
    super(DefaultSpeculator.class.getName());

    this.conf = conf;
    this.context = context;
    this.estimator = estimator;
    this.clock = clock;
    this.eventHandler = context.getEventHandler();
    this.soonestRetryAfterNoSpeculate =
        conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_NO_SPECULATE,
                MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_NO_SPECULATE);
    this.soonestRetryAfterSpeculate =
        conf.getLong(MRJobConfig.SPECULATIVE_RETRY_AFTER_SPECULATE,
                MRJobConfig.DEFAULT_SPECULATIVE_RETRY_AFTER_SPECULATE);
    this.proportionRunningTasksSpeculatable =
        conf.getDouble(MRJobConfig.SPECULATIVECAP_RUNNING_TASKS,
                MRJobConfig.DEFAULT_SPECULATIVECAP_RUNNING_TASKS);
    this.proportionTotalTasksSpeculatable =
        conf.getDouble(MRJobConfig.SPECULATIVECAP_TOTAL_TASKS,
                MRJobConfig.DEFAULT_SPECULATIVECAP_TOTAL_TASKS);
    this.minimumAllowedSpeculativeTasks =
        conf.getInt(MRJobConfig.SPECULATIVE_MINIMUM_ALLOWED_TASKS,
                MRJobConfig.DEFAULT_SPECULATIVE_MINIMUM_ALLOWED_TASKS);
  }


mapreduce.map.speculative:假设为true则Map Task能够猜測运行,即一个Map Task能够启动Speculative Task运行并行运行,该Speculative Task与原始Task同一时候处理同一份数据,谁先处理完,则将谁的结果作为终于结果。默觉得true。

mapreduce.reduce.speculative:同上,默认值为true。

mapreduce.job.speculative.speculative-cap-running-tasks:可以猜測重跑正在执行任务(单个JOB)的百分之几,默认是:0.1

mapreduce.job.speculative.speculative-cap-total-tasks:可以猜測重跑所有任务(单个JOB)的百分之几,默认是:0.01

mapreduce.job.speculative.minimum-allowed-tasks:能够猜測又一次运行同意的最小任务数。

默认是:10

首先。mapreduce.job.speculative.minimum-allowed-tasks和mapreduce.job.speculative.speculative-cap-total-tasks * 总任务数,取最大值。

然后。拿到上一步的值和mapreduce.job.speculative.speculative-cap-running-tasks * 正在执行的任务数,取最大值。该值就是推測执行的执行的任务数

mapreduce.job.speculative.retry-after-no-speculate:等待时间(毫秒)做下一轮的猜測。假设没有任务,猜測在这一轮。

默认:1000(ms)

mapreduce.job.speculative.retry-after-speculate:等待时间(毫秒)做下一轮的猜測。假设有任务猜測在这一轮。默认:15000(ms)

mapreduce.job.speculative.slowtaskthreshold:标准差,任务的平均进展率必须低于全部正在执行任务的平均值才会被觉得是太慢的任务,默认值:1.0

启动服务:

  @Override
  protected void serviceStart() throws Exception {
    Runnable speculationBackgroundCore
        = new Runnable() {
            @Override
            public void run() {
              while (!stopped && !Thread.currentThread().isInterrupted()) {
                long backgroundRunStartTime = clock.getTime();
                try {
                  //计算猜測,会依据Map和Reduce的任务类型,遍历mapContainerNeeds和reduceContainerNeeds。满足条件则启动猜測任务。
                  int speculations = computeSpeculations(); 
                  long mininumRecomp
                      = speculations > 0 ? soonestRetryAfterSpeculate
                                         : soonestRetryAfterNoSpeculate;

                  long wait = Math.max(mininumRecomp,
                        clock.getTime() - backgroundRunStartTime);

                  if (speculations > 0) {
                    LOG.info("We launched " + speculations
                        + " speculations.  Sleeping " + wait + " milliseconds.");
                  }

                  Object pollResult
                      = scanControl.poll(wait, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                  if (!stopped) {
                    LOG.error("Background thread returning, interrupted", e);
                  }
                  return;
                }
              }
            }
          };
    speculationBackgroundThread = new Thread
        (speculationBackgroundCore, "DefaultSpeculator background processing");
    speculationBackgroundThread.start();

    super.serviceStart();
  }

最后我们看看源代码,是怎样启动一个猜測任务的:

private int maybeScheduleASpeculation(TaskType type) {
    int successes = 0;

    long now = clock.getTime();

    ConcurrentMap<JobId, AtomicInteger> containerNeeds
        = type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;

    for (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {//遍历全部的JOB
      // This race conditon is okay.  If we skip a speculation attempt we
      //  should have tried because the event that lowers the number of
      //  containers needed to zero hasn't come through, it will next time.
      // Also, if we miss the fact that the number of containers needed was
      //  zero but increased due to a failure it's not too bad to launch one
      //  container prematurely.
      if (jobEntry.getValue().get() > 0) {
        continue;
      }

      int numberSpeculationsAlready = 0;
      int numberRunningTasks = 0;

      // loop through the tasks of the kind
      Job job = context.getJob(jobEntry.getKey());

      Map<TaskId, Task> tasks = job.getTasks(type);//获取JOB的task

      int numberAllowedSpeculativeTasks
          = (int) Math.max(MINIMUM_ALLOWED_SPECULATIVE_TASKS,
                           PROPORTION_TOTAL_TASKS_SPECULATABLE * tasks.size());//上面有介绍

      TaskId bestTaskID = null;
      long bestSpeculationValue = -1L;

      // this loop is potentially pricey.
      // TODO track the tasks that are potentially worth looking at
      for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {//遍历全部任务
        long mySpeculationValue = speculationValue(taskEntry.getKey(), now);//获取猜測值

        if (mySpeculationValue == ALREADY_SPECULATING) {
          ++numberSpeculationsAlready;
        }

        if (mySpeculationValue != NOT_RUNNING) {
          ++numberRunningTasks;
        }

        if (mySpeculationValue > bestSpeculationValue) {
          bestTaskID = taskEntry.getKey();
          bestSpeculationValue = mySpeculationValue;
        }
      }
      numberAllowedSpeculativeTasks
          = (int) Math.max(numberAllowedSpeculativeTasks,
                           PROPORTION_RUNNING_TASKS_SPECULATABLE * numberRunningTasks);

      // If we found a speculation target, fire it off
      if (bestTaskID != null
          && numberAllowedSpeculativeTasks > numberSpeculationsAlready) {//同意的个数大于准备猜測执行的个数,就開始创建猜測执行任务
        addSpeculativeAttempt(bestTaskID);//发送一个T_ADD_SPEC_ATTEMPT事件。启动另外一个任务。
        ++successes;
      }
    }

    return successes;
  }



private long speculationValue(TaskId taskID, long now) {
    Job job = context.getJob(taskID.getJobId());
    Task task = job.getTask(taskID);
    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
    long acceptableRuntime = Long.MIN_VALUE;
    long result = Long.MIN_VALUE;

    if (!mayHaveSpeculated.contains(taskID)) {//是否包括在猜測执行的SET中
      acceptableRuntime = estimator.thresholdRuntime(taskID);//执行的阀值
      if (acceptableRuntime == Long.MAX_VALUE) {
        return ON_SCHEDULE;
      }
    }

    TaskAttemptId runningTaskAttemptID = null;

    int numberRunningAttempts = 0;

    for (TaskAttempt taskAttempt : attempts.values()) {
      if (taskAttempt.getState() == TaskAttemptState.RUNNING
          || taskAttempt.getState() == TaskAttemptState.STARTING) {//任务在执行状态下,或開始状态下
        if (++numberRunningAttempts > 1) {//重试超过一次的,直接返回,则numberSpeculationsAlready的值加1
          return ALREADY_SPECULATING;
        }
        runningTaskAttemptID = taskAttempt.getID();

        long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);//估算的执行时间

        long taskAttemptStartTime
            = estimator.attemptEnrolledTime(runningTaskAttemptID);//任务的開始时间
        if (taskAttemptStartTime > now) {
          // This background process ran before we could process the task
          //  attempt status change that chronicles the attempt start
          return TOO_NEW;
        }

        long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;//估算的执行时间+任务的開始时间,等于完毕时间

        long estimatedReplacementEndTime
            = now + estimator.estimatedNewAttemptRuntime(taskID);//新开启一个任务的完毕时间

        float progress = taskAttempt.getProgress();
        TaskAttemptHistoryStatistics data =
            runningTaskAttemptStatistics.get(runningTaskAttemptID);
        if (data == null) {
          runningTaskAttemptStatistics.put(runningTaskAttemptID,
            new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
        } else {
          if (estimatedRunTime == data.getEstimatedRunTime()
              && progress == data.getProgress()) {
            // Previous stats are same as same stats
            if (data.notHeartbeatedInAWhile(now)) {
              // Stats have stagnated for a while, simulate heart-beat.
              TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
              taskAttemptStatus.id = runningTaskAttemptID;
              taskAttemptStatus.progress = progress;
              taskAttemptStatus.taskState = taskAttempt.getState();
              // Now simulate the heart-beat
              handleAttempt(taskAttemptStatus);
            }
          } else {
            // Stats have changed - update our data structure
            data.setEstimatedRunTime(estimatedRunTime);
            data.setProgress(progress);
            data.resetHeartBeatTime(now);
          }
        }

        if (estimatedEndTime < now) {//完毕时间小于当前时间
          return PROGRESS_IS_GOOD;
        }

        if (estimatedReplacementEndTime >= estimatedEndTime) {//新开任务的完毕时间小于或等于当前时间
          return TOO_LATE_TO_SPECULATE;
        }

        result = estimatedEndTime - estimatedReplacementEndTime;
      }
    }

    // If we are here, there's at most one task attempt.
    if (numberRunningAttempts == 0) {//任务没有执行
      return NOT_RUNNING;
    }



    if (acceptableRuntime == Long.MIN_VALUE) {
      acceptableRuntime = estimator.thresholdRuntime(taskID);
      if (acceptableRuntime == Long.MAX_VALUE) {
        return ON_SCHEDULE;
      }
    }

    return result;
  }
DefaultSpeculator依赖于一个执行时间估算器,默认採用了LegacyTaskRuntimeEstimator,此外,MRv2还提供了另外一个实现:ExponentiallySmoothedTaskRuntimeEstimator,该实现採用了平滑算法对结果进行平滑处理。

原文地址:https://www.cnblogs.com/wgwyanfs/p/6826124.html