0009.Mapreduce的高级功能


05-12-复习SQL的排序


05-13-Java对象的排序

Java对象的排序
//学生对象:按照学生的age年龄进行排序
public class Student implements Comparable<Student>{

	private int stuID;
	private String stuName;
	private int age;
		
	@Override
	public String toString() {
		return "Student [stuID=" + stuID + ", stuName=" + stuName + ", age=" + age + "]";
	}

	@Override
	public int compareTo(Student o) {
		// 定义排序规则:按照学生的age年龄进行排序
		if(this.age >= o.getAge()){
			return 1;
		}else{
			return -1;
		}
	}
	
	public int getStuID() {
		return stuID;
	}
	public void setStuID(int stuID) {
		this.stuID = stuID;
	}
	public String getStuName() {
		return stuName;
	}
	public void setStuName(String stuName) {
		this.stuName = stuName;
	}
	public int getAge() {
		return age;
	}
	public void setAge(int age) {
		this.age = age;
	}

}

import java.util.Arrays;

public class StudentMain {

	public static void main(String[] args) {
		//创建几个学生对象
		Student s1 = new Student();
		s1.setStuID(1);
		s1.setStuName("Tom");
		s1.setAge(24);
		
		Student s2 = new Student();
		s2.setStuID(2);
		s2.setStuName("Mary");
		s2.setAge(26);
		
		Student s3 = new Student();
		s3.setStuID(3);
		s3.setStuName("Mike");
		s3.setAge(25);
		
		//生成一个数组
		Student[] list = {s1,s2,s3};
		
		//排序
		Arrays.sort(list);
		
		//输出
		for(Student s:list){
			System.out.println(s);
		}
	}
}


05-14-MR对象的排序

多个列排序

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Emp implements WritableComparable<Emp>{
	
	private int empno;//员工号
	private String ename; //员工姓名
	private String job; //职位
	private int mgr; //经理的员工号
	private String hiredate;//入职日期
	private int sal; //月薪
	private int comm; //奖金
	private int deptno; //部门号
	
//	@Override
//	public int compareTo(Emp o) {
//		// 定义自己的排序规则:一个列的排序
//		// 按照薪水进行排序
//		if(this.sal >= o.getSal()){
//			return 1;
//		}else{
//			return -1;
//		}
//	}
	
	@Override
	public int compareTo(Emp o) {
		// 定义自己的排序规则:多个列的排序
		// 先按照部门号进行排序,再按照薪水进行排序
		if(this.deptno > o.getDeptno()){
			return 1;
		}else if(this.deptno < o.getDeptno()){
			return -1;
		}
		
		//再按照薪水进行排序
		if(this.sal >= o.getSal()){
			return 1;
		}else{
			return -1;
		}
	}	
		
	@Override
	public String toString() {
		return "Emp [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
	}

	@Override
	public void readFields(DataInput input) throws IOException {
		//实现反序列化,从输入流中读取对象
		this.empno = input.readInt();
		this.ename = input.readUTF();
		this.job = input.readUTF();
		this.mgr = input.readInt();
		this.hiredate = input.readUTF();
		this.sal = input.readInt();
		this.comm = input.readInt();
		this.deptno = input.readInt();
	}
	
	@Override
	public void write(DataOutput output) throws IOException {
		// 实现序列化,把对象输出到输出流
		output.writeInt(this.empno);
		output.writeUTF(this.ename);
		output.writeUTF(this.job);
		output.writeInt(this.mgr);
		output.writeUTF(this.hiredate);
		output.writeInt(this.sal);
		output.writeInt(this.comm);
		output.writeInt(this.deptno);
	}
	
	
	public int getEmpno() {
		return empno;
	}
	public void setEmpno(int empno) {
		this.empno = empno;
	}
	public String getEname() {
		return ename;
	}
	public void setEname(String ename) {
		this.ename = ename;
	}
	public String getJob() {
		return job;
	}
	public void setJob(String job) {
		this.job = job;
	}
	public int getMgr() {
		return mgr;
	}
	public void setMgr(int mgr) {
		this.mgr = mgr;
	}
	public String getHiredate() {
		return hiredate;
	}
	public void setHiredate(String hiredate) {
		this.hiredate = hiredate;
	}
	public int getSal() {
		return sal;
	}
	public void setSal(int sal) {
		this.sal = sal;
	}
	public int getComm() {
		return comm;
	}
	public void setComm(int comm) {
		this.comm = comm;
	}
	public int getDeptno() {
		return deptno;
	}
	public void setDeptno(int deptno) {
		this.deptno = deptno;
	}


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/*
 * 一定要把Emp作为key2
 * 没有value2,返回null值
 */
public class EmpSortMapper extends Mapper<LongWritable, Text, Emp, NullWritable> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
		String data = value1.toString();
		
		//分词
		String[] words = data.split(",");
		
		//生成员工对象
		Emp emp = new Emp();
		emp.setEmpno(Integer.parseInt(words[0]));
		emp.setEname(words[1]);
		emp.setJob(words[2]);
		emp.setMgr(Integer.parseInt(words[3]));
		emp.setHiredate(words[4]);
		emp.setSal(Integer.parseInt(words[5]));
		emp.setComm(Integer.parseInt(words[6]));
		emp.setDeptno(Integer.parseInt(words[7]));
		
		//输出员工对象  k2:员工对象            v2:空值
		context.write(emp,  NullWritable.get());
	}
}



import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmpSortMain {

	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(EmpSortMain.class);
		
		job.setMapperClass(EmpSortMapper.class);
		job.setMapOutputKeyClass(Emp.class); //k2 是员工对象
		job.setMapOutputValueClass(NullWritable.class);  // v2:是空值
		
		job.setOutputKeyClass(Emp.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);

	}

}


05-15-分区的基本概念

什么是分区.png


05-16-什么是Hash分区


05-17-分区的编程案例

自定义的分区规则:按照部门号进行分区

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Emp implements Writable{
	
	private int empno;//员工号
	private String ename; //员工姓名
	private String job; //职位
	private int mgr; //经理的员工号
	private String hiredate;//入职日期
	private int sal; //月薪
	private int comm; //奖金
	private int deptno; //部门号
	
		
	@Override
	public String toString() {
		return "Emp [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
	}

	@Override
	public void readFields(DataInput input) throws IOException {
		//实现反序列化,从输入流中读取对象
		this.empno = input.readInt();
		this.ename = input.readUTF();
		this.job = input.readUTF();
		this.mgr = input.readInt();
		this.hiredate = input.readUTF();
		this.sal = input.readInt();
		this.comm = input.readInt();
		this.deptno = input.readInt();
	}
	
	@Override
	public void write(DataOutput output) throws IOException {
		// 实现序列化,把对象输出到输出流
		output.writeInt(this.empno);
		output.writeUTF(this.ename);
		output.writeUTF(this.job);
		output.writeInt(this.mgr);
		output.writeUTF(this.hiredate);
		output.writeInt(this.sal);
		output.writeInt(this.comm);
		output.writeInt(this.deptno);
	}
	
	
	public int getEmpno() {
		return empno;
	}
	public void setEmpno(int empno) {
		this.empno = empno;
	}
	public String getEname() {
		return ename;
	}
	public void setEname(String ename) {
		this.ename = ename;
	}
	public String getJob() {
		return job;
	}
	public void setJob(String job) {
		this.job = job;
	}
	public int getMgr() {
		return mgr;
	}
	public void setMgr(int mgr) {
		this.mgr = mgr;
	}
	public String getHiredate() {
		return hiredate;
	}
	public void setHiredate(String hiredate) {
		this.hiredate = hiredate;
	}
	public int getSal() {
		return sal;
	}
	public void setSal(int sal) {
		this.sal = sal;
	}
	public int getComm() {
		return comm;
	}
	public void setComm(int comm) {
		this.comm = comm;
	}
	public int getDeptno() {
		return deptno;
	}
	public void setDeptno(int deptno) {
		this.deptno = deptno;
	}
}


//自定义的分区规则:按照部门号进行分区                                              k2 部门号                     v2  员工对象
public class MyPartitioner extends Partitioner<IntWritable, Emp> {

	/**
	 * numTask:分区的个数
	 */
	@Override
	public int getPartition(IntWritable k2, Emp v2, int numTask) {
		// 建立我们的分区规则
		//得到该员工的部门号
		int deptno = v2.getDeptno();
		
		if(deptno == 10){
			//放入一号分区
			return 1%numTask;
		}else if(deptno == 20){
			//放入二号分区
			return 2%numTask;
		}else{
			//30号部门,放入零号分区
			return 3%numTask;
		}
	}

}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

//就是同一个部门的员工
public class MyPartitionerReducer extends Reducer<IntWritable, Emp, IntWritable, Emp> {

	@Override
	protected void reduce(IntWritable k3, Iterable<Emp> v3,Context context) throws IOException, InterruptedException {
		// 直接输出
		for(Emp e:v3){
			context.write(k3, e);
		}
	}

}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                                                 k2 部门号      v2 员工
public class MyPartitionerMapper extends Mapper<LongWritable, Text, IntWritable, Emp> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
		String data = value1.toString();
		
		//分词
		String[] words = data.split(",");
		
		//生成员工对象
		Emp emp = new Emp();
		emp.setEmpno(Integer.parseInt(words[0]));
		emp.setEname(words[1]);
		emp.setJob(words[2]);
		emp.setMgr(Integer.parseInt(words[3]));
		emp.setHiredate(words[4]);
		emp.setSal(Integer.parseInt(words[5]));
		emp.setComm(Integer.parseInt(words[6]));
		emp.setDeptno(Integer.parseInt(words[7]));
		
		//输出员工对象  k2:部门号                                                                     v2:员工对象
		context.write(new IntWritable(emp.getDeptno()), emp);
	}
}


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyPartitionerMain {

	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(MyPartitionerMain.class);
		
		job.setMapperClass(MyPartitionerMapper.class);
		job.setMapOutputKeyClass(IntWritable.class); //k2 是部门号
		job.setMapOutputValueClass(Emp.class);  // v2输出就是员工对象
		
		//加入分区规则
		job.setPartitionerClass(MyPartitioner.class);	
		//指定分区的个数
		job.setNumReduceTasks(3);
		
		job.setReducerClass(MyPartitionerReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Emp.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}

}

按照部门号进行分区.png


05-18-什么是Combiner

Combiner的作用.png

分析WordCount数据处理的过程.png


05-19-不能使用Combiner的情况

不能使用Combiner.png

原文地址:https://www.cnblogs.com/RoyalGuardsTomCat/p/13861663.html