MapReduce做客户手机流量统计

在上一篇关于单词的统计博文中,已经阐述了详细的导包和执行步骤,

可以查看参考,本文主要显示对用户流量统计的代码实现过程,并对其进行了分组。

代码:

底层的FlowWriteable:

package flowstat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FolwWritable implements Writable {
    private int upFolw;//上行流量
    private int downFlow;//下行流量字节数
    private int sumFlow;//总流量
    
    public FolwWritable() {
        
    }
    /**
     * 
     * 用上下行流量创建流量对象看,自动计算总流量
     * @param up
     * @param down
     */
    
    public FolwWritable(int up,int down) {
        this.upFolw=up;
        this.downFlow=down;
        this.sumFlow=up+down;
    }
    
    public int getUpFolw() {
        return upFolw;
    }
    public void setUpFolw(int upFolw) {
        this.upFolw = upFolw;
    }
    public int getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(int downFlow) {
        this.downFlow = downFlow;
    }
    public int getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(int sumFlow) {
        this.sumFlow = sumFlow;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFolw=in.readInt();
        this.downFlow=in.readInt();
        this.sumFlow=in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(upFolw);
        out.writeInt(downFlow);
        out.writeInt(sumFlow);
        
    }
    @Override
    public String toString() {
        return "FolwWritable [upFolw=" + upFolw + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + "]";
    }
    

}

分组partion类:

package flowstat;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


public class NumsegPartion extends Partitioner<Text, IntWritable> {

    @Override
    public int getPartition(Text key, IntWritable value, int arg2) {
        String phone=key.toString();
        String mseg=phone.substring(3, 7);
        
        if(mseg.equals("0013")) {
            return 1;
        }
        if(mseg.equals("0023")) {
            return 2;
        }
        
        return 0;
    }

    

}

mapper类:

package flowstat;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FolwWritable>{

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FolwWritable>.Context context)
            throws IOException, InterruptedException {
        //读一行上网记录
        String line=value.toString();
        //拆开字符串
        String[] fields=line.split("	");
        //获取特殊位置信息手机号
        Text phone=new Text(fields[2]);
        
        int up=Integer.parseInt(fields[3]);
        
        int down=Integer.parseInt(fields[4]);
        //构造写入值,流量序列化对象
        FolwWritable fw=new FolwWritable(up,down);
        //map函数输出,写入到context
        context.write(phone, fw);
    }
    
    
}

reduce类:

package flowstat;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReduce extends Reducer<Text, FolwWritable, Text, FolwWritable>{

    @Override
    protected void reduce(Text key, Iterable<FolwWritable> values,
            Reducer<Text, FolwWritable, Text, FolwWritable>.Context context) throws IOException, InterruptedException {
        //遍历所有values中的FlowWritable对象,汇总流量
        int sumUp=0;
        int sumDn=0;
        for(FolwWritable value:values) {
            sumUp+=value.getUpFolw();
            sumDn+=value.getDownFlow();
        }
        //获取手机号
        //构造流量序列化完对象
        FolwWritable v=new FolwWritable(sumUp,sumDn);
        context.write(key, v);
        
    }
    
    

}

任务提交类:

package flowstat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;



public class FlowMain {
    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        
        //创建作业
        Job job=Job.getInstance(conf,"FlowStatMR");
        
        //设定jar
        job.setJarByClass(FlowMain.class);
        
        //设定map和ruduce类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);
        
        //添加分组类
        job.setPartitionerClass(NumsegPartion.class);
        job.setNumReduceTasks(3);
        
        //设定输入初始格式
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        //设置key value 类
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FolwWritable.class);
        
        //设置输入输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //开启作业,等待结果
        boolean succ=job.waitForCompletion(true);
        System.out.println(succ?"执行成功":"执行失败");
        //作业执行
        job.waitForCompletion(true);
        
        
        
    }
}

接下来就是生产jar包,将其导入虚拟机,执行命令进行计算分组。具体步骤可以参考上一篇单词统计MapReduce博文。

原文地址:https://www.cnblogs.com/qianshuixianyu/p/9366595.html