MapReduce实战(一)自定义类型

需求:

处理以下流量数据,第1列是手机号,第7列是上行流量,第8列是下行流量。将手机号一样的用户进行合并,上行流量汇总,下行流量也汇总,并相加求得总流量。

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200
1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200

思考:

和之前mapreduce讲解的那个统计单词的例子类似。在这里我们主要解决的是怎么处理map和reduce的输入输出。

首先看map,其输入的key是LongWritable,value是Text,绝对固定。为什么呢?我们之前说过,默认情况下,框架传递给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为value。

输出的key我们可以用手机号表示,那么就是Text,输出value我们既想表示出上行流量,又想表示下行流量,发现没有这种数据类型,所以就需要自定义一个类,进行传递。自定义类,就一定需要序列化,才可以传递哦。

然后看reduce,其输入key就是map的输出key,其输入value就是map的输出value。

输出key我们也可以用手机号表示,就是Text,输出value我们也用一个类进行表示。

下面我们编写该程序:

导入必要的包后,建立文件格式如下:

FlowBean.java:

package cn.darrenchan.hadoop.mr.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {
    private String phoneNum;// 手机号
    private long upFlow;// 上行流量
    private long downFlow;// 下行流量
    private long sumFlow;// 总流量

    public FlowBean() {
        super();
    }

    public FlowBean(String phoneNum, long upFlow, long downFlow) {
        super();
        this.phoneNum = phoneNum;
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public String getPhoneNum() {
        return phoneNum;
    }

    public void setPhoneNum(String phoneNum) {
        this.phoneNum = phoneNum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "	" + downFlow + "	" + sumFlow;
    }

    // 从数据流中反序列出对象的数据
    // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {
        phoneNum = in.readUTF();
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    // 将对象数据序列化到流中
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phoneNum);
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

}

FlowMapper.java:

package cn.darrenchan.hadoop.mr.flow;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 将这一行的内容转换成string类型
        String line = value.toString();
        // 对这一行的文本按特定分隔符切分
        String[] words = StringUtils.split(line, "	");
        
        //拿到我们需要的字段
        String phoneNum = words[0];
        long upFlow = Long.parseLong(words[7]);
        long downFlow = Long.parseLong(words[8]);
        
        //封装数据为kv并输出
        context.write(new Text(phoneNum), new FlowBean(phoneNum, upFlow, downFlow));
    }
}

FlowReducer.java:

package cn.darrenchan.hadoop.mr.flow;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    // 框架每传递一组数据<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>调用一次我们的reduce方法
    // reduce中的业务逻辑就是遍历values,然后进行累加求和再输出
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        long upFlowCounter = 0;
        long downFlowCounter = 0;
        for (FlowBean flowBean : values) {
            upFlowCounter += flowBean.getUpFlow();
            downFlowCounter += flowBean.getDownFlow();
        }

        context.write(key, new FlowBean(key.toString(), upFlowCounter,
                downFlowCounter));
    }
}

FlowRunner.java:

package cn.darrenchan.hadoop.mr.flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//这是job描述和提交类的规范写法
//执行命令: hadoop jar flow.jar cn.darrenchan.hadoop.mr.flow.FlowRunner /flow/srcdata /flow/output
public class FlowRunner extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 设置整个job所用的那些类在哪个jar包
        job.setJarByClass(FlowRunner.class);

        // 本job使用的mapper和reducer的类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 指定要处理的输入数据存放路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定处理结果的输出数据存放路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 将job提交给集群运行 ,将运行状态进行打印
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new FlowRunner(), args);
        System.exit(res);
    }

}

将要处理的文件上传到hdfs上一个目录,我的是/flow/srcdata。将程序打成jar包flow.jar,然后执行命令:

hadoop jar flow.jar cn.darrenchan.hadoop.mr.flow.FlowRunner /flow/srcdata /flow/output

我们会得到如下运行效果:

17/02/26 04:35:23 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
17/02/26 04:35:23 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/26 04:35:24 INFO input.FileInputFormat: Total input paths to process : 1
17/02/26 04:35:24 INFO mapreduce.JobSubmitter: number of splits:1
17/02/26 04:35:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0001
17/02/26 04:35:25 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0001
17/02/26 04:35:25 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0001/
17/02/26 04:35:25 INFO mapreduce.Job: Running job: job_1488112052214_0001
17/02/26 04:35:34 INFO mapreduce.Job: Job job_1488112052214_0001 running in uber mode : false
17/02/26 04:35:34 INFO mapreduce.Job: map 0% reduce 0%
17/02/26 04:35:39 INFO mapreduce.Job: map 100% reduce 0%
17/02/26 04:35:44 INFO mapreduce.Job: map 100% reduce 100%
17/02/26 04:35:44 INFO mapreduce.Job: Job job_1488112052214_0001 completed successfully
17/02/26 04:35:44 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=1266
FILE: Number of bytes written=188391
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2338
HDFS: Number of bytes written=623
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3661
Total time spent by all reduces in occupied slots (ms)=2568
Total time spent by all map tasks (ms)=3661
Total time spent by all reduce tasks (ms)=2568
Total vcore-seconds taken by all map tasks=3661
Total vcore-seconds taken by all reduce tasks=2568
Total megabyte-seconds taken by all map tasks=3748864
Total megabyte-seconds taken by all reduce tasks=2629632
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=1216
Map output materialized bytes=1266
Input split bytes=124
Combine input records=0
Combine output records=0
Reduce input groups=22
Reduce shuffle bytes=1266
Reduce input records=22
Reduce output records=22
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=147
CPU time spent (ms)=1400
Physical memory (bytes) snapshot=218402816
Virtual memory (bytes) snapshot=726446080
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=2214
File Output Format Counters
Bytes Written=623

最终的结果如下所示:

1363154400022 0 200 200
1363157973098 27 3659 3686
1363157982040 102 7335 7437
1363157983019 0 200 200
1363157984040 12 1938 1950
1363157984041 9 6960 6969
1363157985069 186852 200 187052
1363157985079 180 200 380
1363157986029 3 1938 1941
1363157986041 180 200 380
1363157986072 18 9531 9549
1363157988072 120 200 320
1363157990043 63 11058 11121
1363157991076 1512 200 1712
1363157992093 4938 200 5138
1363157993044 12 1527 1539
1363157993055 954 200 1154
1363157995033 20 3156 3176
1363157995052 0 200 200
1363157995074 4116 1432 5548
1363157995093 3008 3720 6728
1363157985066 2481 24681 27162

原文地址:https://www.cnblogs.com/DarrenChan/p/6445764.html