Hadoop源码分析26 JobTracker主要容器和线程

2.  JobTracker中的容器

2.1 nodesAtMaxLevelhostnameToNodeMap

  //(hostname -- Node(NetworkTopology))

  MapString,Node hostnameToNodeMap =

    Collections.synchronizedMap(new TreeMapString,Node());

private SetNode nodesAtMaxLevel =

    Collections.newSetFromMap(new ConcurrentHashMapNode,Boolean());

 

 addHostToNodeMapping(Stringhost, String networkLoc) 中:

node = new NodeBase(host,networkLoc);

。。。。。。

hostnameToNodeMap.put(host,node);

nodesAtMaxLevel.add(getParentNode(node,getNumTaskCacheLevels() - 1));

 

2.2 jobInProgressListeners

private final ListJobInProgressListener jobInProgressListeners =

    new CopyOnWriteArrayListJobInProgressListener();

调用:

public void addJobInProgressListener(JobInProgressListenerlistener) {

    jobInProgressListeners.add(listener);

  }

public void removeJobInProgressListener(JobInProgressListenerlistener) {

    jobInProgressListeners.remove(listener);

}

private void updateJobInProgressListeners(JobChangeEventevent) {

    for (JobInProgressListenerlistener : jobInProgressListeners){

      listener.jobUpdated(event);

    }

}

2.3 jobs

//All the knownjobs.  (jobid-JobInProgress)

  MapJobID,JobInProgress jobs = 

    Collections.synchronizedMap(new TreeMapJobID,JobInProgress());

 addJob(JobIDjobId, JobInProgress job) 中填充:

    synchronized (jobs){

      synchronized (taskScheduler){

        jobs.put(job.getProfile().getJobID(),job);

        for (JobInProgressListenerlistener : jobInProgressListeners){

          listener.jobAdded(job);

        }

      }

    }

2.4 userToJobsMap

//(user - listof JobInProgress)

  TreeMapString,ArrayListJobInProgress>> userToJobsMap =

    new TreeMapString,ArrayListJobInProgress>>();

finalizeJob(JobInProgressjob) 填充:

StringjobUser = job.getProfile().getUser();

synchronized (userToJobsMap){

      ArrayListJobInProgress userJobs= userToJobsMap.get(jobUser);

      if (userJobs== null){

        userJobs=  new ArrayListJobInProgress();

        userToJobsMap.put(jobUser,userJobs);

      }

      userJobs.add(job);

    }

2.5 trackerToJobsToCleanup

//(trackerID -- listof jobs to cleanup)

  MapString,SetJobID>> trackerToJobsToCleanup =

    new HashMapString,SetJobID>>();

addJobForCleanup(JobIDid) 填充:

synchronized (trackerToJobsToCleanup){

        SetJobID jobsToKill= trackerToJobsToCleanup.get(taskTracker);

        if (jobsToKill== null){

          jobsToKill= new HashSetJobID();

          trackerToJobsToCleanup.put(taskTracker,jobsToKill);

        }

        jobsToKill.add(id);

      }

2.6 trackerToTasksToCleanup

//(trackerID -- listof tasks to cleanup)

  MapString,SetTaskAttemptID>> trackerToTasksToCleanup =

    new HashMapString,SetTaskAttemptID>>();

 

updateTaskStatuses(TaskTrackerStatusstatus) 填充

if (!job.inited()){

        //if job is not yet initialized ... kill the attempt

        synchronized (trackerToTasksToCleanup){

          SetTaskAttemptID tasks= trackerToTasksToCleanup.get(trackerName);

          if (tasks== null){

            tasks= new HashSetTaskAttemptID();

            trackerToTasksToCleanup.put(trackerName,tasks);

          }

          tasks.add(taskId);

        }

        continue;

      }

 

2.7 taskidToTIPMaptaskidToTrackerMaptrackerToTaskMap

//All the known TaskInProgress items, mapped to by taskids(taskid-TIP)

MapTaskAttemptID,TaskInProgress taskidToTIPMap =

    new TreeMapTaskAttemptID,TaskInProgress();

 

   //(taskid -- trackerID)

  TreeMapTaskAttemptID,String taskidToTrackerMap = new   TreeMapTaskAttemptID,String();

  //(trackerID-TreeSetof taskids running at that tracker)

  TreeMapString,SetTaskAttemptID>> trackerToTaskMap =

    new TreeMapString,SetTaskAttemptID>>();

 

createTaskEntry(TaskAttemptIDtaskid, String taskTracker, TaskInProgresstip) 填充:

    taskidToTrackerMap.put(taskid,taskTracker);

    taskidToTIPMap.put(taskid,tip);

    SetTaskAttemptID taskset= trackerToTaskMap.get(taskTracker);

    if (taskset== null){

      taskset= new TreeSetTaskAttemptID();

      trackerToTaskMap.put(taskTracker,taskset);

    }

2.8 hostnameToTaskTrackertrackerExpiryQueue

//This is used to keep track of all trackers running on one host.While

  //decommissioning the host, all the trackers on the host will belost.

  MapString,SetTaskTracker>> hostnameToTaskTracker =

    Collections.synchronizedMap(new TreeMapString,SetTaskTracker>>());

  TreeSetTaskTrackerStatus trackerExpiryQueue =

    new TreeSetTaskTrackerStatus(

        new ComparatorTaskTrackerStatus() {

            public int compare(TaskTrackerStatusp1, TaskTrackerStatus p2) {

              if (p1.getLastSeen()  p2.getLastSeen()){

                           return -1;

              } else if (p1.getLastSeen()  p2.getLastSeen()){

                   return 1;

              } else {

                   return (p1.getTrackerName().compareTo(p2.getTrackerName()));

              }

         }

     }

 );

 

 addNewTracker(TaskTrackertaskTracker) 填充:

    TaskTrackerStatusstatus = taskTracker.getStatus();

    trackerExpiryQueue.add(status);

 

    //  Registerthe tracker if its not registered

    Stringhostname = status.getHost();

    if (getNode(status.getTrackerName())== null){

      //Making the network location resolution inline ..

      resolveAndAddToTopology(hostname);

    }

 

    //add it to the set of tracker per host

    SetTaskTracker trackers= hostnameToTaskTracker.get(hostname);

    if (trackers== null){

      trackers= Collections.synchronizedSet(new HashSetTaskTracker());

      hostnameToTaskTracker.put(hostname,trackers);

    }

2.9 trackerToMarkedTasksMap

  //(trackerID - TreeSetof completed taskids running at that tracker)

  TreeMapString,SetTaskAttemptID>> trackerToMarkedTasksMap =

    new TreeMapString,SetTaskAttemptID>>();

 

  markCompletedTaskAttempt(StringtaskTracker, TaskAttemptID taskid)调用:

    SetTaskAttemptID taskset= trackerToMarkedTasksMap.get(taskTracker);

    if (taskset== null){

      taskset= new TreeSetTaskAttemptID();

      trackerToMarkedTasksMap.put(taskTracker,taskset);

    }

    taskset.add(taskid);

2.10 trackerToHeartbeatResponseMap

//(trackerID -- lastsent HeartBeatResponse)

  MapString,HeartbeatResponse trackerToHeartbeatResponseMap =

    new TreeMapString,HeartbeatResponse();

 HeartbeatResponseheartbeat(TaskTrackerStatus status,

                                                  boolean restarted,

                                                  boolean initialContact,

                                                  boolean acceptNewTasks,

                                                  short responseId)中调用

//Update the trackerToHeartbeatResponseMap

    trackerToHeartbeatResponseMap.put(trackerName,response);

2.11 taskTrackers

  private HashMapString,TaskTracker taskTrackers =

    new HashMapString,TaskTracker();

updateTaskTrackerStatus(StringtrackerName, TaskTrackerStatusstatus)调用:

 

      TaskTrackertaskTracker = taskTrackers.get(trackerName);

      if (taskTracker!= null){

        alreadyPresent= true;

      } else {

        taskTracker= new TaskTracker(trackerName);

      }

     

      taskTracker.setStatus(status);

      taskTrackers.put(trackerName,taskTracker);

2.12 uniqueHostsMap

MapString,IntegeruniqueHostsMap = new ConcurrentHashMapString,Integer();

updateTaskTrackerStatus(StringtrackerName, TaskTrackerStatusstatus)调用:

if (status== null){

        taskTrackers.remove(trackerName);

        IntegernumTaskTrackersInHost =

          uniqueHostsMap.get(oldStatus.getHost());

        if (numTaskTrackersInHost!= null){

          numTaskTrackersInHost--;

          if (numTaskTrackersInHost  0)  {

            uniqueHostsMap.put(oldStatus.getHost(),numTaskTrackersInHost);

          }

          else {

            uniqueHostsMap.remove(oldStatus.getHost());

          }

        }

      }

addHostCapacity(StringhostName)

        int numTrackersOnHost= 0;

        //add the capacity of trackers on the host

        for (TaskTrackerStatusstatus : getStatusesOnHost(hostName)) {

          int mapSlots= status.getMaxMapSlots();

          totalMapTaskCapacity +=mapSlots;

          int reduceSlots= status.getMaxReduceSlots();

          totalReduceTaskCapacity +=reduceSlots;

          numTrackersOnHost++;

          getInstrumentation().decBlackListedMapSlots(mapSlots);

          getInstrumentation().decBlackListedReduceSlots(reduceSlots);

        }

        uniqueHostsMap.put(hostName,numTrackersOnHost);

        decrBlacklistedTrackers(numTrackersOnHost);

2.13 内部类ExpireLaunchingTasks launchingTasks

  3.1

2.14内部类FaultInfo numFaultsblackRfbMapgrayRfbMap

private static class FaultInfo{

    int[] numFaults;      //timeslice buckets

    private HashMapReasonForBlackListing,String blackRfbMap;

    private HashMapReasonForBlackListing,String grayRfbMap;

}

调用:

    void incrFaultCount(long timeStamp){

      checkRotation(timeStamp);

      ++numFaults[bucketIndex(timeStamp)];

    }

    public void addBlacklistedReason(ReasonForBlackListingrfb,

                                     Stringreason, boolean gray){

      if (gray){

        grayRfbMap.put(rfb,reason);

      } else {

        blackRfbMap.put(rfb,reason);

      }

    }

    void setBlacklist(ReasonForBlackListingrfb, String trackerFaultReport,

                      boolean gray){

      if (gray){

        graylisted = true;

        this.grayRfbMap.put(rfb,trackerFaultReport);

      } else {

        blacklisted = true;

        this.blackRfbMap.put(rfb,trackerFaultReport);

      }

    }

 

2.15内部类FaultyTrackersInfo potentiallyFaultyTrackers

  private class FaultyTrackersInfo{

    //A map from hostName to its faults

    private MapString,FaultInfo potentiallyFaultyTrackers =

              new HashMapString,FaultInfo();

}

调用:

//Assumes JobTracker is locked on the entry

private FaultInfogetFaultInfo(StringhostName, boolean createIfNecessary)   {

      FaultInfofi = null;

      synchronized (potentiallyFaultyTrackers){

        fi= potentiallyFaultyTrackers.get(hostName);

        if (fi== null &&createIfNecessary) {

          fi= new FaultInfo(clock.getTime(), NUM_FAULT_BUCKETS,

                             TRACKER_FAULT_BUCKET_WIDTH_MSECS);

          potentiallyFaultyTrackers.put(hostName,fi);

        }

      }

      return fi;

}

 

2.16 内部类RecoveryManagerjobsToRecoverrecoveredTrackershangingAttempts

  class RecoveryManager{

   

    SetJobID jobsToRecover//set of jobs to be recovered

    SetString recoveredTrackers =

      Collections.synchronizedSet(new HashSetString());

    class JobRecoveryListener implements Listener{

              //Maintains open transactions

         private MapString,String hangingAttempts =

                new HashMapString,String();

     

    }

}

调用:

    void addJobForRecovery(JobIDid) {

      jobsToRecover.add(id);

    }

    private void markTracker(StringtrackerName) {

      recoveredTrackers.add(trackerName);

    }

    private void processTaskAttempt(StringtaskAttemptId,

                                      JobHistory.TaskAttemptattempt)

   {   ......

       hangingAttempts.put(id.getTaskID().toString(),taskAttemptId);

       ......

   }

2.17 内部类RetireJobsjobIDStatusMapjobRetireInfoQ

  3.3

3.JobTracker中的内部类线程

3.1 线程ExpireLaunchingTasks

 

  

private class ExpireLaunchingTasks implements Runnable{

    

    private MapTaskAttemptID,Long launchingTasks =

      new LinkedHashMapTaskAttemptID,Long();

}

launchingTasks中超时的TaskAttemptID设置为失败,主要代码:

TaskAttemptIDtaskId = pair.getKey();

TaskInProgresstip =   taskidToTIPMap.get(taskId);

JobInProgress job =tip.getJob();

.....

job.failedTask(tip,taskId, "Errorlaunching task",

                                     tip.isMapTask()?TaskStatus.Phase.MAP:

                                     TaskStatus.Phase.STARTING,

                                     TaskStatus.State.FAILED,

                                     trackerName);

 

 launchingTasks     JobTracker.ExpireLaunchingTasks.addNewTask(TaskAttemptID taskName)

 

JobTracker.ExpireLaunchingTasks.removeTask(TaskAttemptID taskName)

 

中被添加、删除。

 

3.2 线程ExpireTrackers

  ///////////////////////////////////////////////////////

  //Used to expire TaskTrackers that have gone down

  /////////////////////////////////////////////////////// 

class ExpireTrackers implements Runnable{

}

trackerExpiryQueue中超时的TaskTrackers 或者更新,或者移出 hostnameToTaskTracker

trackerExpiryQueue 在方法JobTracker.addNewTracker(TaskTracker taskTracker)JobTracker.RecoveryManager.recover() 中被添加/删除。

 

3.3. 线程RetireJobs

  ///////////////////////////////////////////////////////

  //Used to remove old finished Jobs that have been around for toolong

  ///////////////////////////////////////////////////////

class RetireJobs implements Runnable{

    private final MapJobID,RetireJobInfo jobIDStatusMap =

      new HashMapJobID,RetireJobInfo();

    private final LinkedListRetireJobInfo jobRetireInfoQ =

      new LinkedListRetireJobInfo();

}

jobs Allthe knownjobs.  (jobid-JobInProgress) 中已完成且超时的job 移出jobsjobInProgressListeners清理其JobHistory,放入 jobIDStatusMapjobRetireInfoQ 

jobIDStatusMap存储RetireJobInfo会在JobTracker.getJobCounters(JobIDjobid)JobTracker.getJobProfile(JobID jobid)JobTracker.getJobStatus(JobIDjobid)中调用做

jobRetireInfoQ存储RetireJobInfo会在JobTracker.generateRetiredJobTable(JobTrackertracker, introwId)JobTracker.getAllJobs()中调用做

 

原文地址:https://www.cnblogs.com/leeeee/p/7276494.html