MapReduce入门2-流量监控

3、流量监控汇总(使用LongWritable实现)

hdfs文件路径:/tmp/flow.txt
查看文件内容:
13770759991	50	100	25	400
13770759991	800	600	500	100
13770759992	400	300	250	1400
13770759992	800	1200	600	900

字符串含义:
号码	上行	下行	上传	下载
phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
 

代码:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowTest {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		Path fromPath = new Path(args[0]);
		Path toPath = new Path(args[1]);
		
		try {
			Configuration conf = new Configuration();
			Job job = Job.getInstance();; 
			job.setJarByClass(FlowTest.class);
			
			FileInputFormat.addInputPath(job, fromPath);
			FileOutputFormat.setOutputPath(job, toPath);
			
			job.setMapperClass(FlowMapper.class);
			job.setReducerClass(FlowReducer.class);
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			
			try {
				job.waitForCompletion(true);
			} catch (ClassNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		

	}

}

/*
号码	上行	下行	上传	下载
phoneNum	uppackBytes	downpackBytes	uploadBytes	downloadBytes
13770759991	50L	100L	25L	400L
13770759991	800L	600L	500L	100L
13770759992	400L	300L	250L	1400L
13770759992	800L	1200L	600L	900L
*/
class FlowMapper extends Mapper<LongWritable,Text,Text,Text>{

	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		String[] line = value.toString().split("\W+"); 
		String phoneNum = line[0];
		long uppackBytes = Long.parseLong(line[1]);
		long downpackBytes = Long.parseLong(line[2]);
		long uploadBytes = Long.parseLong(line[3]);
		long downloadBytes = Long.parseLong(line[4]);
		context.write(new Text(phoneNum), new Text(uppackBytes+"-"+downpackBytes+"-"+uploadBytes+"-"+downloadBytes));
	}

	
}

class FlowReducer extends Reducer<Text,Text,Text,Text>{

	 @Override
	protected void reduce(Text phoneNum, Iterable<Text> text, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub

		long sumUppack = 0L;
		long sumDownpack = 0L;
		long sumUpload = 0L;
		long sumDownload = 0L;
		for(Text t : text){
			String[] line  = t.toString().split("-");
			
			sumUppack += Long.parseLong(line[0].toString());
			sumDownpack += Long.parseLong(line[1].toString());
			sumUpload += Long.parseLong(line[2].toString());
			sumDownload += Long.parseLong(line[3].toString());
			
		}
		
		context.write(phoneNum,new Text(sumUppack+"-"+sumDownpack+"-"+sumUpload+"-"+sumDownload) );
	}
	
}

输出:

导出成flow.jar并上传至服务器的/opt目录
执行:
hadoop jar flow.jar "FlowTest" "/tmp/flow.txt" "/tmp/flow/out"

再执行:
hadoop fs -ls /tmp/flow/out/*  查看输出的文件:

4、流量监控汇总(使用自定义的writable类NetflowWritable实现)

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class NetflowTest {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		
		Path fromPath = new Path(args[0]);
		Path toPath = new Path(args[1]);
		
		
		try {
			Configuration conf = new Configuration();
			Job job = Job.getInstance();
			job.setJarByClass(NetflowTest.class);
			
			
			FileInputFormat.addInputPath(job, fromPath);
			FileOutputFormat.setOutputPath(job, toPath);
			
			job.setMapperClass(NetflowMapper.class);
			job.setReducerClass(NetflowReducer.class);
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(NetflowWritable.class);
			
			job.setOutputKeyClass(NullWritable.class);
			job.setOutputValueClass(NetflowWritable.class);
			
			try {
				job.waitForCompletion(true);
			} catch (ClassNotFoundException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

}


class NetflowWritable implements Writable{

	private long uppackBytes;
	private long downpackBytes;
	private long uploadBytes;
	private long downloadBytes;
	
	//创建一个无参的构造方法,不加的话会执行报错
	public NetflowWritable(){}
	
	public NetflowWritable(long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {
		//this.phoneNum=phoneNum;
		this.uppackBytes = uppackBytes;
		this.downpackBytes = downpackBytes;	
		this.uploadBytes = uploadBytes;
		this.downloadBytes = downloadBytes;
	}
	

	public long getUppackBytes() {
		return uppackBytes;
	}

	public long getDownpackBytes() {
		return downpackBytes;
	}

	public long getUploadBytes() {
		return uploadBytes;
	}

	public long getDownloadBytes() {
		return downloadBytes;
	}
    
	public void set( long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {
		this.uppackBytes = uppackBytes;
		this.downpackBytes = downpackBytes;	
		this.uploadBytes = uploadBytes;
		this.downloadBytes = downloadBytes;
	}
	
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		
		this.uppackBytes = in.readLong();
		this.downpackBytes = in.readLong();
		this.uploadBytes = in.readLong();
		this.downloadBytes = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		
		out.writeLong(uppackBytes);
		out.writeLong(downpackBytes);
		out.writeLong(uploadBytes);
		out.writeLong(downloadBytes);
	
		
	}
	
	
	@Override
	//重写toString方法
	public String toString() {
		// TODO Auto-generated method stub
		return "NetflowWritable [uppackBytes="+uppackBytes+",downpackBytes="+downpackBytes+",uploadBytes="+uploadBytes+",downloadBytes="+downloadBytes+"]" ;
	}
}


class NetflowMapper extends Mapper<LongWritable,Text,Text,NetflowWritable>{
	private String phoneNum;
	private long uppackBytes;
	private long downpackBytes;
	private long uploadBytes;
	private long downloadBytes;
	
	
	NetflowWritable nf = new NetflowWritable();
	//Text text = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NetflowWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		String[] line = value.toString().split("\t");
		phoneNum  = line[0];
		uppackBytes = Long.parseLong(line[1]);
		downpackBytes = Long.parseLong(line[2]);
		uploadBytes = Long.parseLong(line[3]);
		downloadBytes = Long.parseLong(line[4]);
		nf.set( uppackBytes, downpackBytes, uploadBytes, downloadBytes);
		context.write(new Text(phoneNum), nf);
		
	}
	
}



class NetflowReducer extends Reducer<Text,NetflowWritable,Text,NetflowWritable>{
	private NetflowWritable nf;

	@Override
	protected void reduce(Text arg0, Iterable<NetflowWritable> arg1,
			Reducer<Text, NetflowWritable, Text, NetflowWritable>.Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		long uppackBytes = 0L;
		long downpackBytes = 0L;
		long uploadBytes = 0L;
		long downloadBytes = 0L;
		
		for(NetflowWritable nw : arg1){
			uppackBytes += nw.getUppackBytes();
			downpackBytes += nw.getDownpackBytes();
			uploadBytes  += nw.getUploadBytes();
			downloadBytes += nw.getDownloadBytes();
		}
		
		nf = new NetflowWritable(uppackBytes,downpackBytes,uploadBytes,downloadBytes);
		context.write(arg0, nf);
	}
	
}

  

输出:

导出成netflow.jar并上传至服务器的/opt目录
执行:
hadoop jar netflow.jar "NetflowTest" "/tmp/flow.txt" "/tmp/netflow/out"

再执行:
hadoop fs -ls /tmp/netflow/out/*  查看输出的文件:

原文地址:https://www.cnblogs.com/cangos/p/6422144.html