Yarn的服务库和事件库使用方法

事件类型定义:

package org.apache.hadoop.event;
public enum JobEventType {
        JOB_KILL,
        JOB_INIT,
        JOB_START
}
package org.apache.hadoop.event;

public enum TaskEventType {
    T_KILL,
    T_SCHEDULE
}

事件定义:

package org.apache.hadoop.event;

import org.apache.hadoop.yarn.event.AbstractEvent;
public class JobEvent extends AbstractEvent<JobEventType> {
    
    private String jobID;
    
    public JobEvent(JobEventType type,String jobID) {
        super(type);
        this.jobID=jobID;
    }
    public String getJobID() {
        return jobID;
    }
        
}
package org.apache.hadoop.event;

import org.apache.hadoop.yarn.event.AbstractEvent;

public class TaskEvent extends AbstractEvent<TaskEventType> {
    
    private String taskID; //TASkID

    public TaskEvent(TaskEventType type,String taskID) {
        super(type);
        this.taskID=taskID;
    }

    public String getTaskID() {
        return taskID;
    }


}

简单服务定义:

package org.apache.hadoop.event;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;

public class SimpleMRAppMaster extends CompositeService {

    private Dispatcher dispatcher;   //中央异步调度器
    private String jobID;
    private int taskNumber;  //作业中包含的任务数
    private String[] taskIDS; //该作业中包含的所有任务
    
    public SimpleMRAppMaster(String name,String jobID,int taskNumber) {
        super(name);
        this.jobID=jobID;
        this.taskNumber=taskNumber;
        this.taskIDS=new String[taskNumber];
        for(int i=0;i<taskNumber;i++){
            this.taskIDS[i]=new String(jobID+"_task_"+i);
        }
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        
        dispatcher=new AsyncDispatcher();
        
        dispatcher.register(JobEventType.class, new JobEventHandller());
        dispatcher.register(TaskEventType.class, new TaskEventHandller());
        addService((Service)dispatcher);
        
        super.serviceInit(conf);
    }
    
    
    public Dispatcher getDispatcher(){
        return dispatcher;
    }
    
    private class JobEventHandller implements EventHandler<JobEvent>{

        @Override
        public void handle(JobEvent event) {
            
            //若收到 杀死  作业 事件
            if(event.getType() == JobEventType.JOB_KILL){
                System.out.println("收到 杀死作业事件   ,要 杀掉作业"+event.getJobID()+"下的所有任务");
                
                for(int i=0;i<=taskNumber;i++){
                    dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_KILL, taskIDS[i]));
                }
                
            }else if(event.getType()== JobEventType.JOB_INIT){
                System.out.println("收到 启动作业事件   ,要启动 作业"+event.getJobID()+"下的所有任务");
                for(int i=0;i<=taskNumber;i++){
                    dispatcher.getEventHandler().handle(new TaskEvent(TaskEventType.T_SCHEDULE, taskIDS[i]));
                }
            }
        }
    }
    
    private class TaskEventHandller implements EventHandler<TaskEvent>{

        @Override
        public void handle(TaskEvent event) {
            if(event.getType()==TaskEventType.T_KILL){
                System.out.println("收到杀死任务命令,开始杀死任务"+event.getTaskID());
            }else if(event.getType()==TaskEventType.T_SCHEDULE){
                System.out.println("收到启动任务命令,开始启动任务"+event.getTaskID());
            }
        }
    }
    
    
}

测试程序定义:

package org.apache.hadoop.event;

import static org.junit.Assert.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;

/**
 * 自己写的 关于事件库 和 服务 库的使用
 * @author joqk
 *
 */
public class SimpleMRAppMasterTest {

    @Test
    public void test() throws Exception {
    
            String jobID="job_20140912_01";
            SimpleMRAppMaster appMaster=new SimpleMRAppMaster("作业测试", jobID, 10);
            YarnConfiguration conf = new YarnConfiguration(new Configuration());
            
            appMaster.serviceInit(conf);
            
            appMaster.start();
            
            appMaster.getDispatcher().getEventHandler().handle(new JobEvent(JobEventType.JOB_INIT, jobID));
            
    }

}

 

原文地址:https://www.cnblogs.com/joqk/p/3968912.html