mapreduce源码过程

1.实现分区的步骤:  1.1先分析一下具体的业务逻辑,确定大概有多少个分区  1.2首先书写一个类,它要继承org.apache.hadoop.mapreduce.Partitioner这个类  1.3重写public int getPartition这个方法,根据具体逻辑,读数据库或者配置返回相同的数字  1.4在main方法中设置Partioner的类,job.setPartitionerClass(DataPartitioner.class);  1.5设置Reducer的数量,job.setNumReduceTasks(6);

2.排序MR默认是按key2进行排序的,如果想自定义排序规则,被排序的对象要实现WritableComparable接口,在compareTo方法中实现排序规则,然后将这个对象当做k2,即可完成排序

3.combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

4.MR启动流程  start-mapred.sh  --> hadoop-daemon.sh --> hadoop --> org.apache.hadoop.mapred.JobTracker      Jobtracker调用顺序:main --> startTracker  --> new JobTracker 在其构造方法中首先创建一个调度器,接着创建一个RPC的server(interTrackerServer)tasktracker会通过PRC机制与其通信 然后调用offerService方法对外提供服务,在offerService方法中启动RPC server,初始化jobtracker,调用taskScheduler的start方法 --> eagerTaskInitializationListener调用start方法, --> 调用jobInitManagerThread的start方法,因为其是一个线程,会调用JobInitManager的run方法 --> jobInitQueue任务队列去取第一个任务,然后把它丢入线程池中,然后调用-->InitJob的run方法 --> jobTracker的initJob方法 --> JobInProgress的initTasks --> maps = new TaskInProgress[numMapTasks]和reduces = new TaskInProgress[numReduceTasks];

 TaskTracker调用顺序:main --> new TaskTracker在其构造方法中调用了initialize方法,在initialize方法中调用RPC.waitForProxy得到一个jobtracker的代理对象 接着TaskTracker调用了本身的run方法,--> offerService方法  --> transmitHeartBeat返回值是(HeartbeatResponse)是jobTracker的指令,在transmitHeartBeat方法中InterTrackerProtocol调用了heartbeat将tasktracker的状态通过RPC机制发送给jobTracker,返回值就是JobTracker的指令 heartbeatResponse.getActions()得到具体的指令,然后判断指令的具体类型,开始执行任务 addToTaskQueue启动类型的指令加入到队列当中,TaskLauncher又把任务加入到任务队列当中,-->  TaskLauncher的run方法 --> startNewTask方法 --> localizeJob下载资源 --> launchTaskForJob开始加载任务 --> launchTask  --> runner.start()启动线程;  --> TaskRunner调用run方法 --> launchJvmAndWait启动java child进程

原文地址:https://www.cnblogs.com/dandandeyoushangnan/p/4727444.html