Hadoop源码分析1: 客户端提交JOB

1. 测试wordcount,其源码如下:

public class WordCount {

  public static classTokenizerMapper 
      extends Mapper{
          ............ 
    }
  }
  
  public static classIntSumReducer 
      extends Reducer
        ............... 
    }
  }

  public static void main(String[]args) throws Exception {
    Configurationconf = new Configuration();
    String[] otherArgs = newGenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length !=2) {
     System.err.println("Usage: wordcount ");
     System.exit(2);
    }
    Job job = newJob(conf, "word count");
   job.setJarByClass(WordCount.class);
   job.setMapperClass(TokenizerMapper.class);
   job.setCombinerClass(IntSumReducer.class);
   job.setReducerClass(IntSumReducer.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);
   FileInputFormat.addInputPath(job, newPath(otherArgs[0]));
   FileOutputFormat.setOutputPath(job, newPath(otherArgs[1]));
   System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

2. 修改 bin/hadoop , 设置Debug参数

最后一行:

 exec "$JAVA" -Xdebug-Xrunjdwp:transport=dt_socket,address=8888,server=y,suspend=y -Dproc_$COMMAND $JAVA_HEAP_MAX $HADOOP_OPTS-classpath "$CLASSPATH" $CLASS "$@"

执行命令:
hadoop jar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jarwordcount in out2 

实际上运行:
exec /opt/jdk1.7.0_07/bin/java -Xdebug-Xrunjdwp:transport=dt_socket,address=8888,server=y,suspend=y -Dproc_jar -Xmx1000m  -Dhadoop.log.dir=/opt/hadoop-1.0.0/libexec/../logs-Dhadoop.log.file=hadoop.log-Dhadoop.home.dir=/opt/hadoop-1.0.0/libexec/.. -Dhadoop.id.str=-Dhadoop.root.logger=INFO,console-Dhadoop.security.logger=INFO,NullAppender-Djava.library.path=/opt/hadoop-1.0.0/libexec/../lib/native/Linux-amd64-64-Dhadoop.policy.file=hadoop-policy.xml -classpath/opt/hadoop-1.0.0/libexec/../conf:/opt/jdk1.7.0_07/lib/tools.jar:/opt/hadoop-1.0.0/libexec/..:/opt/hadoop-1.0.0/libexec/../hadoop-core-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/asm-3.2.jar:/opt/hadoop-1.0.0/libexec/../lib/aspectjrt-1.6.5.jar:/opt/hadoop-1.0.0/libexec/../lib/aspectjtools-1.6.5.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-beanutils-1.7.0.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-beanutils-core-1.8.0.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-cli-1.2.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-codec-1.4.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-collections-3.2.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-configuration-1.6.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-daemon-1.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-digester-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-el-1.0.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-httpclient-3.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-lang-2.4.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-logging-1.1.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-logging-api-1.0.4.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-math-2.1.jar:/opt/hadoop-1.0.0/libexec/../lib/commons-net-1.4.1.jar:/opt/hadoop-1.0.0/libexec/../lib/core-3.1.1.jar:/opt/hadoop-1.0.0/libexec/../lib/hadoop-capacity-scheduler-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/hadoop-fairscheduler-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/hadoop-thriftfs-1.0.0.jar:/opt/hadoop-1.0.0/libexec/../lib/hsqldb-1.8.0.10.jar:/opt/hadoop-1.0.0/libexec/../lib/jackson-core-asl-1.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jackson-mapper-asl-1.0.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jasper-compiler-5.5.12.jar:/opt/hadoop-1.0.0/libexec/../lib/jasper-runtime-5.5.12.jar:/opt/hadoop-1.0.0/libexec/../lib/jdeb-0.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jersey-core-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jersey-json-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jersey-server-1.8.jar:/opt/hadoop-1.0.0/libexec/../lib/jets3t-0.6.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jetty-6.1.26.jar:/opt/hadoop-1.0.0/libexec/../lib/jetty-util-6.1.26.jar:/opt/hadoop-1.0.0/libexec/../lib/jsch-0.1.42.jar:/opt/hadoop-1.0.0/libexec/../lib/junit-4.5.jar:/opt/hadoop-1.0.0/libexec/../lib/kfs-0.2.2.jar:/opt/hadoop-1.0.0/libexec/../lib/log4j-1.2.15.jar:/opt/hadoop-1.0.0/libexec/../lib/mockito-all-1.8.5.jar:/opt/hadoop-1.0.0/libexec/../lib/oro-2.0.8.jar:/opt/hadoop-1.0.0/libexec/../lib/servlet-api-2.5-20081211.jar:/opt/hadoop-1.0.0/libexec/../lib/slf4j-api-1.4.3.jar:/opt/hadoop-1.0.0/libexec/../lib/slf4j-log4j12-1.4.3.jar:/opt/hadoop-1.0.0/libexec/../lib/xmlenc-0.52.jar:/opt/hadoop-1.0.0/libexec/../lib/jsp-2.1/jsp-2.1.jar:/opt/hadoop-1.0.0/libexec/../lib/jsp-2.1/jsp-api-2.1.jarorg.apache.hadoop.util.RunJar /opt/hadoop-1.0.0/hadoop-examples-1.0.0.jar wordcount inout

在Eclipse中准备好源码,配置好环境,连接端口8888进行调试。

3.提交过程

(1).入口函数 org.apache.hadoop.util.RunJar.main(String[]args)

 public static void main(String[] args)throws Throwable {
    String usage = "RunJarjarFile [mainClass] args...";

    if (args.length < 1){
     System.err.println(usage);
     System.exit(-1);
    }

    int firstArg = 0;
    String fileName =args[firstArg++];
    File file = newFile(fileName);
    String mainClassName =null;

    //加载Jar文件
    JarFile jarFile;
    try {
     jarFile = new JarFile(fileName);
    } catch(IOException io){
      thrownew IOException("Error opening job jar: " + fileName)
       .initCause(io);
    }

   // 加载Jar文件的入口main函数class
    Manifest manifest =jarFile.getManifest();
    if (manifest != null){
     mainClassName =manifest.getMainAttributes().getValue("Main-Class");
    }
    jarFile.close();

    if (mainClassName ==null) {
      if(args.length < 2) {
       System.err.println(usage);
       System.exit(-1);
     }
     mainClassName = args[firstArg++];
    }
    mainClassName =mainClassName.replaceAll("/", ".");

    //根据hadoop.tmp.dir建立临时文件夹
    File tmpDir = newFile(new Configuration().get("hadoop.tmp.dir"));
    tmpDir.mkdirs();
    if(!tmpDir.isDirectory()) { 
     System.err.println("Mkdirs failed to create " + tmpDir);
     System.exit(-1);
    }
    final File workDir =File.createTempFile("hadoop-unjar", "", tmpDir);
    workDir.delete();
    workDir.mkdirs();
    if(!workDir.isDirectory()) {
     System.err.println("Mkdirs failed to create " + workDir);
     System.exit(-1);
    }

   //添加一个删除临时文件夹的hook
   Runtime.getRuntime().addShutdownHook(new Thread() {
       public void run() {
         try {
          FileUtil.fullyDelete(workDir);
         } catch (IOException e){
         }
       }
     });

   //将Jar文件解压到临时文件夹
    unJar(file,workDir);
    
    //将临时文件夹的class和lib加载到classpath
    ArrayList classPath =new ArrayList();
    classPath.add(newFile(workDir+"/").toURL());
   classPath.add(file.toURL());
    classPath.add(newFile(workDir, "classes/").toURL());
    File[] libs = newFile(workDir, "lib").listFiles();
    if (libs != null){
      for(int i = 0; i < libs.length; i++) {
       classPath.add(libs[i].toURL());
     }
    }
    
   //加载main函数
    ClassLoader loader=
      newURLClassLoader(classPath.toArray(new URL[0]));

   Thread.currentThread().setContextClassLoader(loader);
    Class
原文地址:https://www.cnblogs.com/leeeee/p/7276543.html