Hadoop源码分析27 JobTracker空载处理心跳

JobTracker无任务时处理心跳流程

 

HeartBeat格式{restarted=trueinitialContact=trueacceptNewTasks=trueresponseId=-1

status=TaskTrackerStatus {failures=0trackerName="tracker_server3:localhost.localdomain/127.0.0.1:57441"(id=2249)      taskReports=[], maxReduceTasks=2, lastSeen=0, httpPort=50060, host="server3"(id=2243),

healthStatus=TaskTrackerStatus$TaskTrackerHealthStatus { lastReported=0,isNodeHealthy=true,              healthReport="" (id=2261)}      

resStatus=TaskTrackerStatus$ResourceStatus {availablePhysicalMemory=601034752, availableSpace=32463671296,availableVirtualMemory=2705653760, cpuFrequency=2195079,cpuUsage=-1.0,cumulativeCpuTime=1227000                        , mapSlotMemorySizeOnTT=-1, numProcessors=1,reduceSlotMemorySizeOnTT=-1, totalPhysicalMemory=1044144128,totalVirtualMemory=3158065152}

}

}                                                                                                                     

判断是否应该接受:(inHostsList(status)&& !inExcludedHostsList(status))

主要调用方法:

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId)


private synchronized boolean process Heartbeat(TaskTrackerStatus trackerStatus,

boolean initialContact,long timeStamp) throws UnknownHostException

 

private boolean updateTaskTrackerStatus(String trackerName, TaskTrackerStatus status)

 
private void addNewTracker(TaskTracker taskTracker) throws UnknownHostException
 
public Node resolveAndAddToTopology(String name) throws UnknownHostException
 
private Node addHostToNodeMapping(String host, StringnetworkLoc)
 
void  updateTaskStatuses(TaskTrackerStatus status)
 

private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus, long timeStamp)

 

synchronized ListTask> getSetupAndCleanupTasks(TaskTrackerStatus taskTracker) throws IOException

 
public  synchronized  ClusterStatus  getClusterStatus(boolean detailed)
 

private synchronized ListTaskTrackerAction>  getTasksToKill(String taskTracker)

 

private ListTaskTrackerAction> getJobsForCleanup(String taskTracker)

 

private synchronized ListTaskTrackerAction  getTasksToSave(TaskTrackerStatust ts )

 

public int getNextHeartbeatInterval()

 

private void removeMarkedTasks(String taskTracker)

 

void org.apache.hadoop.mapred.JobTracker.FaultyTrackersInfo.markTrackerHealthy(StringhostName)
 
boolean org.apache.hadoop.mapred.JobTracker.FaultyTrackersInfo.isBlacklisted(StringhostName)
 
void org.apache.hadoop.mapred.JobTracker.FaultyTrackersInfo.setNodeHealthStatus(StringhostName, boolean isHealthy, Stringreason, long timeStamp)
 
ListString org.apache.hadoop.net.CachedDNSToSwitchMapping.resolve(ListString names)
 
void org.apache.hadoop.net.CachedDNSToSwitchMapping.cacheResolvedHosts(ListString uncachedHosts, ListString resolvedHosts)
ListString org.apache.hadoop.net.CachedDNSToSwitchMapping.getCachedHosts(ListString names)
 
ListTask org.apache.hadoop.mapred.JobQueueTaskScheduler.assignTasks(TaskTrackertaskTracker) throws IOException
 

 

1Server3TaskTracker首次启动后HeartBeat

FaultyTrackersInfo.potentiallyFaultyTrackers移除看是否应该从Graylist Blacklist移除

 

trackerToHeartbeatResponseMap拿出上一次的HeartbeatResponse,为null

 

taskTrackers拿出上一次的TaskTracker.TaskTrackerStatus,null

 

更新JobTracker的成员totalMaps=0totalReduces=0occupiedMapSlots=0occupiedReduceSlots=0

 

FaultyTrackersInfo.potentiallyFaultyTrackers查看是否在黑名单中,更新totalMapTaskCapacity=2totalReduceTaskCapacity=2

 

加入taskTrackers此时内容为:

{tracker_server3:localhost.localdomain/127.0.0.1:43336=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@4fd86469}

 

加入uniqueHostsMap此时其内容为:

{server3=1}

 

加入trackerExpiryQueue,此时其内容为:

[org.apache.hadoop.mapred.TaskTrackerStatus@4e048dc6]

 

加入dnsToSwitchMapping.cache,内容为

{10.1.1.103=/default-rack}

 

加入clusterMap,内容为:

Number of racks: 1

Expected number of leaves:1

/default-rack/server3

 

加入hostnameToNodeMap,内容为

{server3=/default-rack/server3}

 

加入nodesAtMaxLevel,内容为:

[/default-rack]

 

加入hostnameToTaskTracker,内容为

{server3=[org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@4fd86469]}

 

检查status.getTaskReports(),若不为空,则更新expireLaunchingTaskstrackerToJobsToCleanuptrackerToTasksToCleanuptaskidToTIPMap

 

responseId1,从jobsjobQueueJobInProgressListenertrackerToTaskMaptrackerToTasksToCleanuptrackerToJobsToCleanuptaskidToTIPMap取出JobTask,生成TaskTrackerAction(此时为空)

取得nextInterval

生成HeartbeatResponse,内容:

{actions=[],conf=null,heartbeatInterval=240000,recoveredJobs=[],responseId=0}

 

加入trackerToHeartbeatResponseMap,内容为

{tracker_server3:localhost.localdomain/127.0.0.1:43336=org.apache.hadoop.mapred.HeartbeatResponse@25a78661}

发送HeartbeatResponse给客户端

 

2Server2TaskTracker首次启动后HeartBeat

同样先从potentiallyFaultyTrackers移除看是否应该从Graylist Blacklist移除

 

FaultyTrackersInfo.trackerToHeartbeatResponseMap拿出上一次的HeartbeatResponse,为null

 

taskTrackers拿出上一次的TaskTracker.TaskTrackerStatus,null

 

更新JobTracker的成员totalMaps=0totalReduces=0occupiedMapSlots=0occupiedReduceSlots=0totalMapTaskCapacity=4totalReduceTaskCapacity=4

 

加入taskTrackers此时内容为:

{tracker_server2:localhost.localdomain/127.0.0.1:34381=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@412eb15f,tracker_server3:localhost.localdomain/127.0.0.1:45605=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@2634d0e2}

 

加入uniqueHostsMap此时其内容为:

{server2=1, server3=1}

 

加入trackerExpiryQueue,此时其内容为:

[org.apache.hadoop.mapred.TaskTrackerStatus@4444ad54,org.apache.hadoop.mapred.TaskTrackerStatus@2ea31991]

 

加入dnsToSwitchMapping.cache,内容为

{10.1.1.102=/default-rack,10.1.1.103=/default-rack}

 

加入clusterMap,内容为:

Number of racks: 1

Expected number of leaves:2

/default-rack/server3

/default-rack/server2

 

 

加入hostnameToNodeMap,内容为

{server2=/default-rack/server2,server3=/default-rack/server3}

 

加入nodesAtMaxLevel,内容为:

[/default-rack]

 

加入hostnameToTaskTracker,内容为

{server2=[org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@412eb15f],server3=[org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@2634d0e2]}

 

检查status.getTaskReports(),若不为空,则更新expireLaunchingTaskstrackerToJobsToCleanuptrackerToTasksToCleanuptaskidToTIPMap

 

responseId1,从jobsjobQueueJobInProgressListenertrackerToTaskMaptrackerToTasksToCleanuptrackerToJobsToCleanuptaskidToTIPMap取出JobTask,生成TaskTrackerAction(此时为空)

取得nextInterval

生成HeartbeatResponse,内容:

{actions=[],conf=null,heartbeatInterval=240000,recoveredJobs=[],responseId=0}

 

加入trackerToHeartbeatResponseMap,内容为

{tracker_server2:localhost.localdomain/127.0.0.1:34381=org.apache.hadoop.mapred.HeartbeatResponse@2f4dd8ae,tracker_server3:localhost.localdomain/127.0.0.1:45605=org.apache.hadoop.mapred.HeartbeatResponse@16bd1f19}

发送HeartbeatResponse给客户端

 

3. Server3再次HeartBeat

 

potentiallyFaultyTrackers移除看是否应该从Blacklist移除

 

FaultyTrackersInfo.trackerToHeartbeatResponseMap取得上次HeartbeatResponse,为

org.apache.hadoop.mapred.HeartbeatResponse@16bd1f19

 

判断上一次的ResponseId是否与这次接收的ResponseId相同。

 

更新JobTracker的成员totalMapstotalReducesoccupiedMapSlotsoccupiedReduceSlotstotalMapTaskCapacitytotalReduceTaskCapacity先从taskTrackers拿出上一次的TaskTracker.TaskTrackerStatus还原更新,然后用这一次的TaskTrackerStatus更新,其中要FaultyTrackersInfo.potentiallyFaultyTrackers查看是否在黑名单中。

 

更新taskTrackers此时内容为: {tracker_server2:localhost.localdomain/127.0.0.1:52688=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@1cdc471a,tracker_server3:localhost.localdomain/127.0.0.1:40286=org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker@665755f5}

 

检查status.getTaskReports(),若不为空,则更新expireLaunchingTaskstrackerToJobsToCleanuptrackerToTasksToCleanuptaskidToTIPMap

 

responseId1,从jobsjobQueueJobInProgressListenertrackerToTaskMaptrackerToTasksToCleanuptrackerToJobsToCleanuptaskidToTIPMap取出JobTask生成TaskTrackerAction(此时为空)

取得nextInterval

生成HeartbeatResponse,内容:

{actions=[],conf=null,heartbeatInterval=240000,recoveredJobs=[],responseId=1}

 

更新trackerToHeartbeatResponseMap,内容为

{tracker_server2:localhost.localdomain/127.0.0.1:52688=org.apache.hadoop.mapred.HeartbeatResponse@1500df0b,tracker_server3:localhost.localdomain/127.0.0.1:40286=org.apache.hadoop.mapred.HeartbeatResponse@6c3355f2}

发送HeartbeatResponse给客户端

 

4. ExpireTrackers移除过期

 

trackerExpiryQueue取出一个 TaskTrackerStatus,根据LastSeen判断是否清除或更新,

taskTrackers取出TaskTracker.TaskTrackerStatus继续判断LastSeen

 

若不需清除有则更新trackerExpiryQueue

 

若需清除从trackerExpiryQueue清除,trackerToJobsToCleanuptrackerToTasksToCleanuprecoveredTrackerstrackerToTaskMap清除

 

还原更新totalMapstotalReducesoccupiedMapSlotsoccupiedReduceSlotstotalMapTaskCapacitytotalReduceTaskCapacity

 

taskTrackersuniqueHostsMaphostnameToTaskTracker移除

原文地址:https://www.cnblogs.com/leeeee/p/7276492.html