hadoop学习笔记--找到执行hadoop的入口

参与个hadoop项目,之前没搞过,赶紧学习:

照葫芦画瓢,得到代码是hdfs2local.sh脚本和LiaoNingFilter.jar包,迫不及待用jd-gui打开jar包,搜索到main(在MANIFEST.MF中没有找到main,只能search,其实在hdfs2local.sh脚本中写明了main所在的package)。

  1 package cn.com.dtmobile.hadoop.biz.LiaoNingFilter.job;
  2 
  3 import cn.com.dtmobile.hadoop.biz.LiaoNingFilter.mr.DataMap;
  4 import cn.com.dtmobile.hadoop.biz.LiaoNingFilter.mr.DataReduce;
  5 import java.net.URI;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.conf.Configured;
  8 import org.apache.hadoop.filecache.DistributedCache;
  9 import org.apache.hadoop.fs.Path;
 10 import org.apache.hadoop.io.NullWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.io.compress.CompressionCodec;
 13 import org.apache.hadoop.io.compress.GzipCodec;
 14 import org.apache.hadoop.mapreduce.Job;
 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 19 import org.apache.hadoop.util.GenericOptionsParser;
 20 import org.apache.hadoop.util.Tool;
 21 import org.apache.hadoop.util.ToolRunner;
 22 
 23 public class DataJob
 24   extends Configured
 25   implements Tool
 26 {
 27   public int run(String[] args)
 28     throws Exception
 29   {
 30     Configuration conf = getConf();
 31     conf.setBoolean("mapred.output.compress", true);
 32     conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
 33     
 34     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 35     
 36     String sysHour = otherArgs[10];
 37     
 38     DistributedCache.addCacheFile(new URI(otherArgs[9]), conf);
 39     Job job = new Job(conf, "LiaoNingFilter");
 40     job.getConfiguration().setStrings("sys_hour", new String[] { sysHour });
 41     job.setJarByClass(DataJob.class);
 42     job.setMapperClass(DataMap.class);
 43     job.setReducerClass(DataReduce.class);
 44     job.setNumReduceTasks(100);
 45     
 46     job.setMapOutputKeyClass(Text.class);
 47     job.setMapOutputValueClass(Text.class);
 48     
 49     job.setOutputKeyClass(NullWritable.class);
 50     job.setOutputValueClass(Text.class);
 51     
 52     MultipleOutputs.addNamedOutput(job, "volterx", TextOutputFormat.class, NullWritable.class, Text.class);
 53     
 54     MultipleOutputs.addNamedOutput(job, "s1mmeorgn", TextOutputFormat.class, NullWritable.class, Text.class);
 55     
 56     MultipleOutputs.addNamedOutput(job, "voltesv", TextOutputFormat.class, NullWritable.class, Text.class);
 57     
 58     MultipleOutputs.addNamedOutput(job, "sgsorgn", TextOutputFormat.class, NullWritable.class, Text.class);
 59     
 60     MultipleOutputs.addNamedOutput(job, "s1uhttporgn", TextOutputFormat.class, NullWritable.class, Text.class);
 61     
 62     MultipleOutputs.addNamedOutput(job, "volteorgn", TextOutputFormat.class, NullWritable.class, Text.class);
 63     
 64     MultipleOutputs.addNamedOutput(job, "sharevolterx", TextOutputFormat.class, NullWritable.class, Text.class);
 65     
 66     MultipleOutputs.addNamedOutput(job, "shares1mme", TextOutputFormat.class, NullWritable.class, Text.class);
 67     
 68     MultipleOutputs.addNamedOutput(job, "sharehttp", TextOutputFormat.class, NullWritable.class, Text.class);
 69     
 70     MultipleOutputs.addNamedOutput(job, "sharevolteorgn", TextOutputFormat.class, NullWritable.class, Text.class);
 71     
 72     MultipleOutputs.addNamedOutput(job, "voltemossession", TextOutputFormat.class, NullWritable.class, Text.class);
 73     
 74     MultipleOutputs.addNamedOutput(job, "voltemosslice", TextOutputFormat.class, NullWritable.class, Text.class);
 75     
 76     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
 77     
 78     FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
 79     
 80     FileInputFormat.addInputPath(job, new Path(otherArgs[2]));
 81     
 82     FileInputFormat.addInputPath(job, new Path(otherArgs[3]));
 83     
 84     FileInputFormat.addInputPath(job, new Path(otherArgs[4]));
 85     
 86     FileInputFormat.addInputPath(job, new Path(otherArgs[5]));
 87     
 88     FileInputFormat.addInputPath(job, new Path(otherArgs[6]));
 89     
 90     FileInputFormat.addInputPath(job, new Path(otherArgs[7]));
 91     
 92     FileOutputFormat.setOutputPath(job, new Path(otherArgs[8]));
 93     
 94     return job.waitForCompletion(true) ? 0 : 1;
 95   }
 96   
 97   public static void main(String[] args)
 98     throws Exception
 99   {
100     try
101     {
102       int returnCode = ToolRunner.run(new DataJob(), args);
103       System.exit(returnCode);
104     }
105     catch (Exception e)
106     {
107       e.printStackTrace();
108     }
109   }

class DataJob76~92行:

这是传递参数给hadoop,但args到底是什么,otherArgs从run(String[] args)来的,run(String[] args)从main(String[] args)来的,main(String[] args)需要回到调用jar包的hdfs2local.sh脚本中找:

  1  #!/bin/bash
  2 # 
  3  export HADOOP_CONF_DIR=/etc/hadoop/conf
  4  
  5 #ANALY_DATE=20180619
  6 #ANALY_HOUR=13
  7 ANALY_DATE=`date +%Y%m%d`
  8 ANALY_HOUR="`date -d ' -1 hour' +%H`"
  9 CUR_DATE=`date +%Y%m%d`
 10 CUR_HOUR="`date +%H`"
 11 #CUR_DATE=20180619
 12 #CUR_HOUR=13
 13 if [ $CUR_HOUR = 00 ]
 14 then
 15    ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
 16 else
 17    ANALY_DATE=$CUR_DATE
 18 fi
 19  mypath="$(cd "$(dirname "$0")"; pwd)"
 20  cd $mypath
 21  #SOURCE_SVR="hdfs://nameservice1:8020/datang"
 22  SOURCE_SVR="hdfs://nameservice1:8020/ws/detail"
 23  
 24  JAR_PACKAGK="/cup/d6/datang/bin/LiaoNingFilter.jar"
 25  JAR_MAIN="cn.com.dtmobile.hadoop.biz.LiaoNingFilter.job.DataJob"
 26  
 27  HIVE_TABLES="s1mme_orgn sgs_orgn volte_rx volte_sv s1u_http_orgn volte_mos_session volte_mos_slice"
 28  
 29  GX=${SOURCE_SVR}/volte_rx/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
 30  S1=${SOURCE_SVR}/s1mme_orgn/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.txt
 31  SV=${SOURCE_SVR}/volte_sv/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
 32  SGS=${SOURCE_SVR}/sgs_orgn/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
 33  MW=${SOURCE_SVR}/volte_sip/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
 34  HTTP=${SOURCE_SVR}/s1u_http_orgn/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.txt
 35  SESSION=${SOURCE_SVR}/volte_mos_session/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
 36  SLICE=${SOURCE_SVR}/volte_mos_slice/p1_day=${ANALY_DATE}/*${ANALY_DATE}${ANALY_HOUR}*.ok1
 37  OUTPUT=/datang/FILTER/${ANALY_DATE}${ANALY_HOUR}
 38  NODATA=/datang/nodata
 39  hdfs dfs -test -e ${GX}
 40  if [ $? -ne 0 ];then
 41  GX=${NODATA}
 42  fi
 43  hdfs dfs -test -e ${S1}
 44  if [ $? -ne 0 ];then
 45  S1=${NODATA}
 46  fi
 47  hdfs dfs -test -e ${SV}
 48  if [ $? -ne 0 ];then
 49  SV=${NODATA}
 50  fi
 51  hdfs dfs -test -e ${SGS}
 52  if [ $? -ne 0 ]; then
 53  SGS=${NODATA}
 54  fi
 55  hdfs dfs -test -e ${MW}
 56  if [ $? -ne 0 ]; then
 57  MW=${NODATA}
 58  fi
 59  hdfs dfs -test -e ${HTTP}
 60  if [ $? -ne 0 ]; then
 61  HTTP=${NODATA}
 62  fi
 63  hdfs dfs -test -e ${SESSION}
 64  if [ $? -ne 0 ];then
 65  SESSION=${NODATA}
 66  fi
 67  hdfs dfs -test -e ${SLICE}
 68  if [ $? -ne 0 ];then
 69  SLICE=${NODATA}
 70  fi
 71 
 72  T_PROCESS=/datang/t_process/t_process.csv
 73  echo "`date '+/%y/%m/%d %H:%M:%S'` INFO begni,exit..."
 74  HADOOP=`which hadoop`
 75  #${HADOOP} fs -rm -R ${OUTPUT}
 76  ${HADOOP} jar ${JAR_PACKAGK} ${JAR_MAIN} 
 77  ${GX} 
 78  ${S1} 
 79  ${SV} 
 80  ${SGS} 
 81  ${MW} 
 82  ${HTTP} 
 83  ${SESSION} 
 84  ${SLICE} 
 85  ${OUTPUT} 
 86  ${T_PROCESS} 
 87  ${ANALY_HOUR}
 88  
 89  #exit
 90  
 91  TEMP_DIR=/cup/d6/datang/TEMP
 92  echo "get data begin!------------------------------------------"
 93  
 94  for tableName in ${HIVE_TABLES}
 95   do
 96   
 97   file_name=`echo ${tableName}|sed 's/_//g'`
 98   
 99   rm -rf ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR/*
100   mkdir -p ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR
101   
102   hdfs dfs -get ${OUTPUT}/${file_name}* ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR/
103   echo "hdfs dfs -get ${OUTPUT}/${file_name}* ${TEMP_DIR}/${tableName}/$ANALY_DATE/$ANALY_HOUR/"
104  done
105  
106  orgn=volteorgn
107  rm -rf $TEMP_DIR/volte_orgn/$ANALY_DATE/$ANALY_HOUR/*
108  mkdir -p ${TEMP_DIR}/volte_orgn/$ANALY_DATE/$ANALY_HOUR
109  hdfs dfs -get ${OUTPUT}/${orgn}* ${TEMP_DIR}/volte_orgn/$ANALY_DATE/$ANALY_HOUR
110  rm -rf $TEMP_DIR/share_volte_orgn/$ANALY_DATE/$ANALY_HOUR/*
111  mkdir -p ${TEMP_DIR}/share_volte_orgn/$ANALY_DATE/$ANALY_HOUR
112  hdfs dfs -get ${OUTPUT}/sharevolteorgn* ${TEMP_DIR}/share_volte_orgn/$ANALY_DATE/$ANALY_HOUR
113  
114  rm -rf $TEMP_DIR/share_s1mme/$ANALY_DATE/$ANALY_HOUR/*
115  mkdir -p ${TEMP_DIR}/share_s1mme/$ANALY_DATE/$ANALY_HOUR
116  hdfs dfs -get ${OUTPUT}/shares1mme* ${TEMP_DIR}/share_s1mme/$ANALY_DATE/$ANALY_HOUR
117 
118  rm -rf $TEMP_DIR/share_volterx/$ANALY_DATE/$ANALY_HOUR/*
119  mkdir -p ${TEMP_DIR}/share_volterx/$ANALY_DATE/$ANALY_HOUR
120  hdfs dfs -get ${OUTPUT}/sharevolterx* ${TEMP_DIR}/share_volterx/$ANALY_DATE/$ANALY_HOUR
121 
122  DEL_HOUR="`date -d ' -6 hour' +%H`"
123  if [ $ANALY_HOUR = 00 ]
124  then
125     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
126  elif [ $ANALY_HOUR = 01 ]; then
127     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
128  elif [ $ANALY_HOUR = 02 ]; then
129     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
130  elif [ $ANALY_HOUR = 03 ]; then
131     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
132  elif [ $ANALY_HOUR = 04 ]; then
133     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
134  elif [ $ANALY_HOUR = 23 ]; then
135     ANALY_DATE="`date -d ' -1 day' +%Y%m%d`"
136  else
137     ANALY_DATE=$1
138  fi
139  hdfs dfs -rm -R -skipTrash /datang/FILTER/${ANALY_DATE}${DEL_HOUR}
140  echo "get data end!------------------------------------------"
141  echo "`date '+/%y/%m/%d %H:%M:%S'` INFO done ,exit..."
142  exit
143  

hdfs2local.sh脚本76~87行:

这是hadoop命令调用jar包的命令,格式为:

jar

运行jar文件。用户可以把他们的Map Reduce代码捆绑到jar文件中,使用这个命令执行。

用法:hadoop jar <jar> [mainClass] args...

http://hadoop.apache.org/docs/r1.0.4/cn/commands_manual.html

问题来了,取到的参数是哪些?

${HADOOP} jar ${JAR_PACKAGK} ${JAR_MAIN}  ${GX}  ${S1}  ${SV}  ${SGS}  ${MW}  ${HTTP}  ${SESSION}  ${SLICE}  ${OUTPUT}  ${T_PROCESS}  ${ANALY_HOUR}

按理说${HADOOP} jar后面的都是参数,但是数量与class DataJob76~92行的参数数量对不上,问题出在哪?分析GenericOptionsParser类有蹊跷:

34     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

 找到官方文档:http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html

 String[] getRemainingArgs() 
          Returns an array of Strings containing only application-specific arguments.
还需要说明的是,hadoop的参数分两类:通用参数和命令参数
bin/hadoop command [genericOptions] [commandOptions]

通用参数是:

常规选项

下面的选项被 dfsadminfsfsck和 job支持。 应用程序要实现 Tool来支持 常规选项

GENERIC_OPTION描述
-conf <configuration file> 指定应用程序的配置文件。
-D <property=value> 为指定property指定值value。
-fs <local|namenode:port> 指定namenode。
-jt <local|jobtracker:port> 指定job tracker。只适用于job
-files <逗号分隔的文件列表> 指定要拷贝到map reduce集群的文件的逗号分隔的列表。 只适用于job
-libjars <逗号分隔的jar列表> 指定要包含到classpath中的jar文件的逗号分隔的列表。 只适用于job
-archives <逗号分隔的archive列表> 指定要被解压到计算节点上的档案文件的逗号分割的列表。 只适用于job





除了generic options 通用参数以外的,都是命令的参数(或者叫application-specific arguments命令专属参数)

所以执行代码
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs()后,otherArgs 中剔除了${JAR_PACKAGK} ${JAR_MAIN},只保留剩余的参数:
${GX}  ${S1}  ${SV}  ${SGS}  ${MW}  ${HTTP}  ${SESSION}  ${SLICE}  ${OUTPUT}  ${T_PROCESS}  ${ANALY_HOUR}

这就与class DataJob76~92行对应上了。


相关参考材料:

Hadoop 常见指令

https://blog.csdn.net/u011414200/article/details/50465433


MapReduce处理输出多文件格式(MultipleOutputs)

https://blog.csdn.net/shujuboke/article/details/77199840

Hadoop系列之ToolRunner与GenericOptionsParser用法

https://blog.csdn.net/u011734144/article/details/60769745

Class GenericOptionsParser

http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html

Hadoop 0.18 命令手册

http://hadoop.apache.org/docs/r1.0.4/cn/commands_manual.html

 
原文地址:https://www.cnblogs.com/wangziyi0513/p/10529493.html