1、基本概念
2、Mapper代码
package com.ares.hadoop.mr.flowsum; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; import com.ares.hadoop.mr.wordcount.MRTest; //Long, String, String, Long --> LongWritable, Text, Text, LongWritable public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { private static final Logger LOGGER = Logger.getLogger(MRTest.class); private String line; private int length; private final static char separator = ' '; private String phoneNum; private long upFlow; private long downFlow; //private long sumFlow; private Text text = new Text(); private FlowBean flowBean = new FlowBean(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //super.map(key, value, context); line = value.toString(); String[] fields = StringUtils.split(line, separator); length = fields.length; if (length != 11) { LOGGER.error(key.get() + ", " + line + " LENGTH INVALID, IGNORE..."); } phoneNum = fields[1]; try { upFlow = Long.parseLong(fields[length-3]); downFlow = Long.parseLong(fields[length-2]); //sumFlow = upFlow + downFlow; } catch (Exception e) { // TODO: handle exception LOGGER.error(key.get() + ", " + line + " FLOW DATA INVALID, IGNORE..."); } flowBean.setPhoneNum(phoneNum); flowBean.setUpFlow(upFlow); flowBean.setDownFlow(downFlow); //flowBean.setSumFlow(sumFlow); text.set(phoneNum); context.write(text, flowBean); } }
3、Reducer代码
package com.ares.hadoop.mr.flowsum; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ //private static final Logger LOGGER = Logger.getLogger(MRTest.class); private FlowBean flowBean = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //super.reduce(arg0, arg1, arg2); long upFlowCounter = 0; long downFlowCounter = 0; for (FlowBean flowBean : values) { upFlowCounter += flowBean.getUpFlow(); downFlowCounter += flowBean.getDownFlow(); } flowBean.setPhoneNum(key.toString()); flowBean.setUpFlow(upFlowCounter); flowBean.setDownFlow(downFlowCounter); flowBean.setSumFlow(upFlowCounter + downFlowCounter); context.write(key, flowBean); } }
4、序列化Bean代码
package com.ares.hadoop.mr.flowsum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable { private String phoneNum; private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { // TODO Auto-generated constructor stub } // public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) { // super(); // this.phoneNum = phoneNum; // this.upFlow = upFlow; // this.downFlow = downFlow; // this.sumFlow = sumFlow; // } public String getPhoneNum() { return phoneNum; } public void setPhoneNum(String phoneNum) { this.phoneNum = phoneNum; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub phoneNum = in.readUTF(); upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(phoneNum); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public String toString() { return "" + upFlow + " " + downFlow + " " + sumFlow; } }
5、TestRunner代码
package com.ares.hadoop.mr.flowsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; public class FlowSumRunner extends Configured implements Tool { private static final Logger LOGGER = Logger.getLogger(FlowSumRunner.class); @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub LOGGER.debug("MRTest: MRTest STARTED..."); if (args.length != 2) { LOGGER.error("MRTest: ARGUMENTS ERROR"); System.exit(-1); } Configuration conf = new Configuration(); //FOR Eclipse JVM Debug //conf.set("mapreduce.job.jar", "flowsum.jar"); Job job = Job.getInstance(conf); // JOB NAME job.setJobName("flowsum"); // JOB MAPPER & REDUCER job.setJarByClass(FlowSumRunner.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); // MAP & REDUCE job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // MAP job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // JOB INPUT & OUTPUT PATH //FileInputFormat.addInputPath(job, new Path(args[0])); FileInputFormat.setInputPaths(job, args[0]); FileOutputFormat.setOutputPath(job, new Path(args[1])); // VERBOSE OUTPUT if (job.waitForCompletion(true)) { LOGGER.debug("MRTest: MRTest SUCCESSFULLY..."); return 0; } else { LOGGER.debug("MRTest: MRTest FAILED..."); return 1; } } public static void main(String[] args) throws Exception { int result = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); System.exit(result); } }
参考资料:
http://www.cnblogs.com/robert-blue/p/4157768.html
http://www.cnblogs.com/qlee/archive/2011/05/18/2049610.html
http://blog.163.com/lzm07@126/blog/static/25705468201331611857190/
http://blog.csdn.net/lastsweetop/article/details/9193907