MapReduce编程--1.统计用户上网流量DataCount

数据原型:
这里写图片描述

行数据原型格式释义:

(访问日期)(手机号)(mac地址)(ip地址)(网站名称)(网站类型)(上行流量)(下行流量)(运行状态码)

需求:
将以上数据进行抽取统计,统计每个用户一天内上网数据的上行流量、下行流量和总流量(注意:用户一天之内很可能有多条上网记录)

1.1 使用自定义Writable数据类型—DataBean作为统一的数据类型对数据进行封装


为什么要进行封装?
  因为要统计的项目很多,而mapreduce一次只能输出一个类型的数据,所以我们需要将(手机号、上行流量、下行流量、总流量)封装起来。
其中要注意的细节:
  1.自定义数据类型,需要像 LongWritable、Text一样实现Writable接口
  2.定义成员变量,生成 getter setter方法
  3.添加一个有参构造函数,目的是为了方便对象的初始化
  4.同时别忘了添加默认的无参构造方法
  5.*重写序列化方法 write(DataOutput out )
  6.*重写反序列化方法 readFields(DataInput in )
  !!!注意:
    a. 序列化与反序列化的输出顺序一定要一致
    b. 参数个数一定要一致,有多少输出成员变量,就有多少输入成员变量
  7.如果有需要,重写该自定义类的toString()方法,便于输出到文件中去
  

1.2 自定义数据类型的具体定义过程 举例如下:

  定义DataBean类,属性:用户手机,上行流量,下行流量,总流量 产生getter和setter方法,其中总流量的有参构造方法中定义为 上行+下行

public class FlowBean implements Writable {

    /*
     *成员变量
     */
    private String phoneNB;//手机号
    private long up_flow;//上行流量
    private long down_flow;//下行流量
    private long sum_flow;//总流量

    //在反序列化时,反射机制需要调用无参构造方法,所以显式定义了一个无参构造方法
    public FlowBean() {
        super();
    }
    //为了对象数据的初始化方便,加入一个带参的构造函数
    public FlowBean(String phoneNB, long up_flow, long down_flow) {
        super();
        this.phoneNB = phoneNB;
        this.up_flow = up_flow;
        this.down_flow = down_flow;
        this.sum_flow = up_flow+down_flow;
    }

    // getters and setters
    public String getPhoneNB() {
        return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
        this.phoneNB = phoneNB;
    }

    public long getUp_flow() {
        return up_flow;
    }

    public void setUp_flow(long up_flow) {
        this.up_flow = up_flow;
    }

    public long getDn_flow() {
        return down_flow;
    }

    public void setDn_flow(long dn_flow) {
        this.down_flow = dn_flow;
    }

    public long getSum_flow() {
        return sum_flow;
    }

    public void setSum_flow(long sum_flow) {
        this.sum_flow = sum_flow;
    }

    //重写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phoneNB);
        out.writeLong(up_flow);
        out.writeLong(down_flow);
        out.writeLong(sum_flow);
    }
    //重写反序列化方法
    @Override
    public void readFields(DataInput in) throws IOException {
        phoneNB = in.readUTF();
        up_flow = in.readLong();
        down_flow = in.readLong();
        sum_flow = in.readLong();
    }
    //重写toString方法
    @Override
    public String toString() {
        return ""+ up_flow+"	"+down_flow+"	"+sum_flow;
    }
}

1.3 Map-Reduce程序编写:

mapper类

public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

    //拿到日志中的一行数据,并切分成各个字段,,抽取出我们需要的字段
        //:手机号,上行流量、下行流量  接着封装成k-v发送出去
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //拿一行数据出来
        String line = value.toString();
        //切分
        String[] fields = StringUtils.split(line, '	');

        //取出需要的字段
        String phoneNB = fields[1];
        long up_flow = Long.parseLong(fields[7]);
        long down_flow = Long.parseLong(fields[8]);

        //封装数据并输出
        context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,down_flow));

    }
}

reducer类

public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    //框架每传递一组数据<1387788654,{FlowBean,FlowBean,FlowBean...}>调用一次reduce方法
        //reduce中的业务逻辑是遍历values,然后进行累加求和后输出
    protected void reduce(Text key, Iterable<FlowBean> value, Context context) 
            throws java.io.IOException ,InterruptedException {
        long up_flow_counter = 0;
        long down_flow_counter = 0;

        for(FlowBean b : value){
            up_flow_counter += b.getUp_flow();
            down_flow_counter += b.getDn_flow();
        }
        //写出时,value是一个FlowBean对象,因为要写到文件中去,所以要重写其toString()方法
        context.write(key, new FlowBean(key.toString(),up_flow_counter,down_flow_counter));
    };
}

mapreduce运行类:

public class FlowSumRunner extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowSumRunner.class);

        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception {
        //官方推荐的写法
        ToolRunner.run(new Configuration(),new FlowSumRunner(),args);
        System.exit(0);
    }
}

1.4 运行该任务

打包项目,导出到/root/examples.jar
上传源数据文件,到/data.doc目录下

在 /root !!目录下运行如下命令:
这里写图片描述

注意事项:
运行前先检查是否已经存在该输出目录,如果有的话,删除后再运行MR例程

执行结果:
13480253104 180 180 360
13502468823 7335 110349 117684
13560436666 1116 954 2070
13560439658 2034 5892 7926
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13726238888 2481 24681 27162
13760778710 120 120 240
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548

和源数据进行对比,以上结果是正确的

原文地址:https://www.cnblogs.com/shiguangmanbu2016/p/5932886.html