【Hadoop】Hadoop MR 自定义序列化类

1、基本概念

 

2、Mapper代码

package com.ares.hadoop.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;

import com.ares.hadoop.mr.wordcount.MRTest;

//Long, String, String, Long --> LongWritable, Text, Text, LongWritable
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private static final Logger LOGGER = Logger.getLogger(MRTest.class);
    
    private String line;
    private int length;
    private final static char separator = '	';
    
    private String phoneNum;
    private long upFlow;
    private long downFlow;
    //private long sumFlow;
    
    private Text text = new Text();
    private FlowBean flowBean = new FlowBean();
    
    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        //super.map(key, value, context);
        line = value.toString();
        String[] fields = StringUtils.split(line, separator);
        length = fields.length;
        if (length != 11) {
            LOGGER.error(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
        }
        
        phoneNum = fields[1];
        try {
            upFlow = Long.parseLong(fields[length-3]);
            downFlow = Long.parseLong(fields[length-2]);
            //sumFlow = upFlow + downFlow;
        } catch (Exception e) {
            // TODO: handle exception
            LOGGER.error(key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...");
        }
        
        flowBean.setPhoneNum(phoneNum);
        flowBean.setUpFlow(upFlow);
        flowBean.setDownFlow(downFlow);
        //flowBean.setSumFlow(sumFlow);
        
        text.set(phoneNum);
        context.write(text, flowBean);
    }
}

3、Reducer代码

package com.ares.hadoop.mr.flowsum;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    //private static final Logger LOGGER = Logger.getLogger(MRTest.class);
    
    private FlowBean flowBean = new FlowBean();
    
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,
            Reducer<Text, FlowBean, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        //super.reduce(arg0, arg1, arg2);
        long upFlowCounter = 0;
        long downFlowCounter = 0;
        
        for (FlowBean flowBean : values) {
            upFlowCounter += flowBean.getUpFlow();
            downFlowCounter += flowBean.getDownFlow();
        }
        flowBean.setPhoneNum(key.toString());
        flowBean.setUpFlow(upFlowCounter);
        flowBean.setDownFlow(downFlowCounter);
        flowBean.setSumFlow(upFlowCounter + downFlowCounter);
        
        context.write(key, flowBean);
    }
}

4、序列化Bean代码

package com.ares.hadoop.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {
    private String phoneNum;
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    
    public FlowBean() {
        // TODO Auto-generated constructor stub
    }    
//    public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
//        super();
//        this.phoneNum = phoneNum;
//        this.upFlow = upFlow;
//        this.downFlow = downFlow;
//        this.sumFlow = sumFlow;
//    }


    public String getPhoneNum() {
        return phoneNum;
    }

    public void setPhoneNum(String phoneNum) {
        this.phoneNum = phoneNum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        phoneNum = in.readUTF();
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(phoneNum);
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public String toString() {
        return "" + upFlow + "	" + downFlow + "	" + sumFlow;
    }
    
}

5、TestRunner代码

package com.ares.hadoop.mr.flowsum;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

public class FlowSumRunner extends Configured implements Tool {
    private static final Logger LOGGER = Logger.getLogger(FlowSumRunner.class);
    
    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        LOGGER.debug("MRTest: MRTest STARTED...");
        
        if (args.length != 2) {
            LOGGER.error("MRTest: ARGUMENTS ERROR");
            System.exit(-1);
        }
        
        Configuration conf = new Configuration();
        //FOR Eclipse JVM Debug  
        //conf.set("mapreduce.job.jar", "flowsum.jar");
        Job job = Job.getInstance(conf);
        
        // JOB NAME
        job.setJobName("flowsum");
        
        // JOB MAPPER & REDUCER
        job.setJarByClass(FlowSumRunner.class);
        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReducer.class);
        
        // MAP & REDUCE
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // MAP
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        
        // JOB INPUT & OUTPUT PATH
        //FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.setInputPaths(job, args[0]);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        // VERBOSE OUTPUT
        if (job.waitForCompletion(true)) {
            LOGGER.debug("MRTest: MRTest SUCCESSFULLY...");
            return 0;
        } else {
            LOGGER.debug("MRTest: MRTest FAILED...");
            return 1;
        }            
        
    }
    
    public static void main(String[] args) throws Exception {
        int result = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
        System.exit(result);
    }

}

参考资料:

http://www.cnblogs.com/robert-blue/p/4157768.html

http://www.cnblogs.com/qlee/archive/2011/05/18/2049610.html

http://blog.163.com/lzm07@126/blog/static/25705468201331611857190/

http://blog.csdn.net/lastsweetop/article/details/9193907

原文地址:https://www.cnblogs.com/junneyang/p/5846195.html