0008.MapReduce基础


05-06-分析求每个部门的工资总额

emp.png

emp.csv

7369	SMITH	CLERK	7902	1980/12/17	800	0	20
7499	ALLEN	SALESMAN	7698	1981/2/20	1600	300	30
7521	WARD	SALESMAN	7698	1981/2/22	1250	500	30
7566	JONES	MANAGER	7839	1981/4/2	2975	0	20
7654	MARTIN	SALESMAN	7698	1981/9/28	1250	1400	30
7698	BLAKE	MANAGER	7839	1981/5/1	2850	0	30
7782	CLARK	MANAGER	7839	1981/6/9	2450	0	10
7788	SCOTT	ANALYST	7566	1987/4/19	3000	0	20
7839	KING	PRESIDENT	-1	1981/11/17	5000	0	10
7844	TURNER	SALESMAN	7698	1981/9/8	1500	0	30
7876	ADAMS	CLERK	7788	1987/5/23	1100	0	20
7900	JAMES	CLERK	7698	1981/12/3	950	0	30
7902	FORD	ANALYST	7566	1981/12/3	3000	0	20
7934	MILLER	CLERK	7782	1982/1/23	1300	0	10

员工表.png

求部门的工资总额

分析WordCount数据处理的过程


05-07-开发程序求每个部门的工资总额

开发程序

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

	@Override
	protected void map(LongWritable key1, Text value1,Context context)
			throws IOException, InterruptedException {
		// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
		String data = value1.toString();
		
		//分词
		String[] words = data.split(",");
		
		//输出:k2 部门号   v2员工的工资
		context.write(new IntWritable(Integer.parseInt(words[7])), 
				      new IntWritable(Integer.parseInt(words[5])));
	}
}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class SalaryTotalReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{

	@Override
	protected void reduce(IntWritable k3, Iterable<IntWritable> v3,Context context)
			throws IOException, InterruptedException {
		// 求v3求和
		int total = 0;
		for(IntWritable v:v3){
			total = total + v.get();
		}
		
		//输出  k4  部门号    v4是部门的工资总额
		context.write(k3, new IntWritable(total));
	}

}


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SalaryTotalMain {

	public static void main(String[] args) throws Exception {
		//1、创建任务、指定任务的入口 
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(SalaryTotalMain.class);
		
		//2、指定任务的map和map输出的数据类型
		job.setMapperClass(SalaryTotalMapper.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//3、指定任务的reducer和reducer输出的类型
		job.setReducerClass(SalaryTotalReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		
		//4、指定任务输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//5、执行任务
		job.waitForCompletion(true);
	}
}
[root@bigdatal11 temp]# 1s *.csv dept.csv emp.csv
[root@bigdatal11 temp]# hdfs dfs -mkdir /scott
[root@bigdatalll temp]# hdfs dfs -put *.csv  /scott
[root@bigdatal11 temp]# hdfs dfs-1s/scott Found 2 items
-rw-r--r--1 root supergroup842018-09-1020:38/scott/dept.csv
-rw-r--r--1 root supergroup 6292018-09-1020:38/scott/emp.csv
[root@bigdatal11 temp]# hadoop jar sl.jar /scott/emp.csv /output/0910/s1

[root@bigdatal11 temp]# hdfs dfs-1s /output/0910/s1
Found 2 items
-rw-r--r--1 root supergroup 02018-09-1020:41/output/0910/s1/_SUCCESS
-rw-r--r--1 root supergroup 252018-09-1020:41/output/0910/s1/part-r-00000
[root@bigdatal11 temp]# hdfs dfs -cat /output/0910/s1/part-r-00000
10   8750
20   10875
30   9400
[root@bigdatal11 temp]#

05-08-Java的序列化

Java的序列化
import java.io.Serializable;

public class Student implements Serializable {

	private int stuID;
	private String stuName;
	
	public int getStuID() {
		return stuID;
	}
	public void setStuID(int stuID) {
		this.stuID = stuID;
	}
	public String getStuName() {
		return stuName;
	}
	public void setStuName(String stuName) {
		this.stuName = stuName;
	}
	
}
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;

public class TestMain {

	public static void main(String[] args) throws Exception {
		// 创建一个学生对象
		Student s = new Student();
		s.setStuID(1);
		s.setStuName("Tom");
		
		//把这个对象保存到文件中 -----> 序列化
		OutputStream out = new FileOutputStream("d:\temp\student.ooo");
		ObjectOutputStream oos = new ObjectOutputStream(out);
		
		oos.writeObject(s);
		
		oos.close();
		out.close();
	}
}

05-09-MapReduce的序列化

读取员工数据,生成员工的对象,直接输出到HDFS

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Emp implements Writable{
	
	private int empno;//员工号
	private String ename; //员工姓名
	private String job; //职位
	private int mgr; //经理的员工号
	private String hiredate;//入职日期
	private int sal; //月薪
	private int comm; //奖金
	private int deptno; //部门号
	
		
	@Override
	public String toString() {
		return "Emp [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
	}

	@Override
	public void readFields(DataInput input) throws IOException {
		//实现反序列化,从输入流中读取对象
		this.empno = input.readInt();
		this.ename = input.readUTF();
		this.job = input.readUTF();
		this.mgr = input.readInt();
		this.hiredate = input.readUTF();
		this.sal = input.readInt();
		this.comm = input.readInt();
		this.deptno = input.readInt();
	}
	
	@Override
	public void write(DataOutput output) throws IOException {
		// 实现序列化,把对象输出到输出流
		output.writeInt(this.empno);
		output.writeUTF(this.ename);
		output.writeUTF(this.job);
		output.writeInt(this.mgr);
		output.writeUTF(this.hiredate);
		output.writeInt(this.sal);
		output.writeInt(this.comm);
		output.writeInt(this.deptno);
	}
	
	
	public int getEmpno() {
		return empno;
	}
	public void setEmpno(int empno) {
		this.empno = empno;
	}
	public String getEname() {
		return ename;
	}
	public void setEname(String ename) {
		this.ename = ename;
	}
	public String getJob() {
		return job;
	}
	public void setJob(String job) {
		this.job = job;
	}
	public int getMgr() {
		return mgr;
	}
	public void setMgr(int mgr) {
		this.mgr = mgr;
	}
	public String getHiredate() {
		return hiredate;
	}
	public void setHiredate(String hiredate) {
		this.hiredate = hiredate;
	}
	public int getSal() {
		return sal;
	}
	public void setSal(int sal) {
		this.sal = sal;
	}
	public int getComm() {
		return comm;
	}
	public void setComm(int comm) {
		this.comm = comm;
	}
	public int getDeptno() {
		return deptno;
	}
	public void setDeptno(int deptno) {
		this.deptno = deptno;
	}
}


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//                                                             k2:员工号           v2:员工对象
public class EmpInfoMapper extends Mapper<LongWritable, Text, IntWritable, Emp> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
		String data = value1.toString();
		
		//分词
		String[] words = data.split(",");
		
		//生成员工对象
		Emp emp = new Emp();
		emp.setEmpno(Integer.parseInt(words[0]));
		emp.setEname(words[1]);
		emp.setJob(words[2]);
		emp.setMgr(Integer.parseInt(words[3]));
		emp.setHiredate(words[4]);
		emp.setSal(Integer.parseInt(words[5]));
		emp.setComm(Integer.parseInt(words[6]));
		emp.setDeptno(Integer.parseInt(words[7]));
		
		//输出员工对象  k2:员工号                                                                     v2:员工对象
		context.write(new IntWritable(emp.getEmpno()), emp);		
	}
}


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmpInfoMain {

	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(EmpInfoMain.class);
		
		job.setMapperClass(EmpInfoMapper.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Emp.class);  // 输出就是员工对象
		
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Emp.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}

}

05-10-使用序列化求部门工资总额

使用MapReduce序列化重写“求部门工资的总额的例子”

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Emp implements Writable{
	
	private int empno;//员工号
	private String ename; //员工姓名
	private String job; //职位
	private int mgr; //经理的员工号
	private String hiredate;//入职日期
	private int sal; //月薪
	private int comm; //奖金
	private int deptno; //部门号
	
		
	@Override
	public String toString() {
		return "Emp [empno=" + empno + ", ename=" + ename + ", sal=" + sal + ", deptno=" + deptno + "]";
	}

	@Override
	public void readFields(DataInput input) throws IOException {
		//实现反序列化,从输入流中读取对象
		this.empno = input.readInt();
		this.ename = input.readUTF();
		this.job = input.readUTF();
		this.mgr = input.readInt();
		this.hiredate = input.readUTF();
		this.sal = input.readInt();
		this.comm = input.readInt();
		this.deptno = input.readInt();
	}
	
	@Override
	public void write(DataOutput output) throws IOException {
		// 实现序列化,把对象输出到输出流
		output.writeInt(this.empno);
		output.writeUTF(this.ename);
		output.writeUTF(this.job);
		output.writeInt(this.mgr);
		output.writeUTF(this.hiredate);
		output.writeInt(this.sal);
		output.writeInt(this.comm);
		output.writeInt(this.deptno);
	}
	
	
	public int getEmpno() {
		return empno;
	}
	public void setEmpno(int empno) {
		this.empno = empno;
	}
	public String getEname() {
		return ename;
	}
	public void setEname(String ename) {
		this.ename = ename;
	}
	public String getJob() {
		return job;
	}
	public void setJob(String job) {
		this.job = job;
	}
	public int getMgr() {
		return mgr;
	}
	public void setMgr(int mgr) {
		this.mgr = mgr;
	}
	public String getHiredate() {
		return hiredate;
	}
	public void setHiredate(String hiredate) {
		this.hiredate = hiredate;
	}
	public int getSal() {
		return sal;
	}
	public void setSal(int sal) {
		this.sal = sal;
	}
	public int getComm() {
		return comm;
	}
	public void setComm(int comm) {
		this.comm = comm;
	}
	public int getDeptno() {
		return deptno;
	}
	public void setDeptno(int deptno) {
		this.deptno = deptno;
	}
}


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                                                k2:部门号           v2:员工对象
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Emp> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		// 数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
		String data = value1.toString();
		
		//分词
		String[] words = data.split(",");
		
		//生成员工对象
		Emp emp = new Emp();
		emp.setEmpno(Integer.parseInt(words[0]));
		emp.setEname(words[1]);
		emp.setJob(words[2]);
		emp.setMgr(Integer.parseInt(words[3]));
		emp.setHiredate(words[4]);
		emp.setSal(Integer.parseInt(words[5]));
		emp.setComm(Integer.parseInt(words[6]));
		emp.setDeptno(Integer.parseInt(words[7]));
		
		//输出员工对象  k2:部门号                                                                     v2:员工对象
		context.write(new IntWritable(emp.getDeptno()), emp);	
	}
}


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class SalaryTotalReducer extends Reducer<IntWritable, Emp, IntWritable, IntWritable> {

	@Override
	protected void reduce(IntWritable k3, Iterable<Emp> v3,Context context) throws IOException, InterruptedException {
		int total = 0;
		
		//取出员工薪水,并求和
		for(Emp e:v3){
			total = total + e.getSal();
		}
		
		context.write(k3, new IntWritable(total));
	}
}

[root@bigdatal11 temp]# hdfs dfs-1s /output/0910/s2
Found 2 items
-rw-r--r--1root supergroup 02018-09-1021:35/output/0910/s2/_SUCCESS
-rw-r--r--1root supergroup782 2018-09-1021:35/output/0910/s2/part-r-00000
[root@bigdatal11 temp]# hdfs dfs-cat /output/0910/s2/part-r-00000
7369 Emp [empno=7369,ename=SMITH,sal=800,deptno=20]
7499 Emp [empno=7499,ename=ALLEN,sal=1600,deptno=30]
7521 Emp [empno=7521,ename=WARD,sal=1250,deptno=30]
7566 Emp [empno=7566,ename=JONES,sal=2975,deptno=20]
7654 Emp [empno=7654,ename=MARTIN,sal=1250,deptno=30]
7698 Emp [empno=7698,ename=BLAKE,sal=2850,deptno=30]
7782 Emp [empno=7782,ename=CLARK,sal=2450,deptno=10]
7788 Emp [empno=7788,ename=SCOTT,sal=3000,deptno=20]
7839 Emp [empno=7839,ename=KING,sal=5000,deptno=10]
7844 Emp [empno=7844,ename=TURNER,sal=1500,deptno=30]
7876 Emp [empno=7876,ename=ADAMS,sal=1100,deptno=20]
7900 Emp [empno=7900,ename=JAMES,sal=950,deptno=30]
7902 Emp [empno=7902,ename=FORD,sal=3000,deptno=20]
7934 Emp [empno=7934,ename=MILLER,sal=1300,deptno=10]

public class SalaryTotalMain {

	public static void main(String[] args) throws Exception {
		//1、创建任务、指定任务的入口 
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(SalaryTotalMain.class);
		
		//2、指定任务的map和map输出的数据类型
		job.setMapperClass(SalaryTotalMapper.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Emp.class);
		
		//3、指定任务的reducer和reducer输出的类型
		job.setReducerClass(SalaryTotalReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		
		//4、指定任务输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//5、执行任务
		job.waitForCompletion(true);
	}
}
[root@bigdatal11 temp]# hdfs dfs-1s /output/0910/s3
Found 2 items
-rw-r--r--1 root supergroup02018-09-1021:50/output/0910/s3/_SUCCESS
|-rw-r--r--1 root supergroup25 2018-09-1021:50/output/0910/s3/part-r-00000
[root@bigdata111 temp]# hdfs dfs -cat /output/0910/s3/part-r-00000
10  8750
20  10875
30  9400

05-11-基本数据类型的排序


import org.apache.hadoop.io.IntWritable;

//针对数字创建自己的比较规则,执行降序排序
public class MyNumberComparator extends IntWritable.Comparator {

	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		// TODO Auto-generated method stub
		return -super.compare(b1, s1, l1, b2, s2, l2);
	}

}

public class SalaryTotalMain {

	public static void main(String[] args) throws Exception {
		//1、创建任务、指定任务的入口 
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(SalaryTotalMain.class);
		
		//2、指定任务的map和map输出的数据类型
		job.setMapperClass(SalaryTotalMapper.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Emp.class);
		
		// 指定自己的比较规则
		job.setSortComparatorClass(MyNumberComparator.class);
		
		
		//3、指定任务的reducer和reducer输出的类型
		job.setReducerClass(SalaryTotalReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		
		//4、指定任务输入路径和输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//5、执行任务
		job.waitForCompletion(true);
	}
}
[root@bigdatal11 temp]# hdfs dfs-cat /output/0910/s4/part-r-00000
30  9400
20  10875
10  8750
[rootebigdatal11 temp]# 
原文地址:https://www.cnblogs.com/RoyalGuardsTomCat/p/13835009.html