MapReduce使用Partitioner分区案例

Mapper:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class EmployeeMapper  extends Mapper<LongWritable, Text, LongWritable, Employee> {

    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        String str = value.toString();
        //分词
        String[] words = str.split(",");

        Employee e = new Employee();
        e.setEmpno(Integer.parseInt(words[0]));
        e.setEname(words[1]);
        e.setJob(words[2]);
        try {
            e.setMgr(Integer.parseInt(words[3]));
        } catch (Exception e2) {
            e.setMgr(0);
        }
        e.setHiredate(words[4]);
        e.setSal(Integer.parseInt(words[5]));
        try {
            e.setComm(Integer.parseInt(words[6]));
        } catch (Exception e2) {
            e.setComm(0);
        }		
        e.setDeptno(Integer.parseInt(words[7]));

        //将这个员工输出
        context.write(new LongWritable(e.getDeptno()),e);
    }
}
 
 
 
40
 
 
 
 
1
import java.io.IOException;
2
import org.apache.hadoop.io.LongWritable;
3
import org.apache.hadoop.io.NullWritable;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.mapreduce.Mapper;
6
import org.apache.hadoop.mapreduce.Mapper.Context;
7

8
public class EmployeeMapper  extends Mapper<LongWritable, Text, LongWritable, Employee> {
9

10
    @Override
11
    protected void map(LongWritable key, Text value,Context context)
12
            throws IOException, InterruptedException {
13
        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
14
        String str = value.toString();
15
        //分词
16
        String[] words = str.split(",");
17

18
        Employee e = new Employee();
19
        e.setEmpno(Integer.parseInt(words[0]));
20
        e.setEname(words[1]);
21
        e.setJob(words[2]);
22
        try {
23
            e.setMgr(Integer.parseInt(words[3]));
24
        } catch (Exception e2) {
25
            e.setMgr(0);
26
        }
27
        e.setHiredate(words[4]);
28
        e.setSal(Integer.parseInt(words[5]));
29
        try {
30
            e.setComm(Integer.parseInt(words[6]));
31
        } catch (Exception e2) {
32
            e.setComm(0);
33
        }
34
        e.setDeptno(Integer.parseInt(words[7]));
35

36
        //将这个员工输出
37
        context.write(new LongWritable(e.getDeptno()),e);
38
    }
39
}
 
 

Reducer:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> {

    @Override
    protected void reduce(LongWritable deptno, Iterable<Employee> values,Context context)
            throws IOException, InterruptedException {
        for(Employee e:values){
            context.write(deptno, e);
        }
    }
}
 
 
 
16
 
 
 
 
 
1
import java.io.IOException;
2
import org.apache.hadoop.io.LongWritable;
3
import org.apache.hadoop.mapreduce.Reducer;
4

5
public class EmployeeReducer extends Reducer<LongWritable, Employee, LongWritable, Employee> {
6

7
    @Override
8
    protected void reduce(LongWritable deptno, Iterable<Employee> values,Context context)
9
            throws IOException, InterruptedException {
10
        for(Employee e:values){
11
            context.write(deptno, e);
12
        }
13
    }
14
}
 
 

Employee:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class Employee implements Writable{
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    public Employee(){

    }

    @Override
    public String toString() {
        return "Employee [empno=" + empno + ", ename=" + ename + ", job=" + job
                + ", mgr=" + mgr + ", hiredate=" + hiredate + ", sal=" + sal
                + ", comm=" + comm + ", deptno=" + deptno + "]";
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.empno = in.readInt();
        this.ename = in.readUTF();
        this.job = in.readUTF();
        this.mgr = in.readInt();
        this.hiredate = in.readUTF();
        this.sal = in.readInt();
        this.comm = in.readInt();
        this.deptno = in.readInt();
    }

    @Override
    public void write(DataOutput output) throws IOException {
        ////7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        output.writeInt(empno);
        output.writeUTF(ename);
        output.writeUTF(job);
        output.writeInt(mgr);
        output.writeUTF(hiredate);
        output.writeInt(sal);
        output.writeInt(comm);
        output.writeInt(deptno);
    }

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}
 
 
 
x
 
 
 
 
1
import java.io.DataInput;
2
import java.io.DataOutput;
3
import java.io.IOException;
4
import org.apache.hadoop.io.Writable;
5
import org.apache.hadoop.io.WritableComparable;
6

7
public class Employee implements Writable{
8
    private int empno;
9
    private String ename;
10
    private String job;
11
    private int mgr;
12
    private String hiredate;
13
    private int sal;
14
    private int comm;
15
    private int deptno;
16

17
    public Employee(){
18

19
    }
20

21
    @Override
22
    public String toString() {
23
        return "Employee [empno=" + empno + ", ename=" + ename + ", job=" + job
24
                + ", mgr=" + mgr + ", hiredate=" + hiredate + ", sal=" + sal
25
                + ", comm=" + comm + ", deptno=" + deptno + "]";
26
    }
27

28
    @Override
29
    public void readFields(DataInput in) throws IOException {
30
        this.empno = in.readInt();
31
        this.ename = in.readUTF();
32
        this.job = in.readUTF();
33
        this.mgr = in.readInt();
34
        this.hiredate = in.readUTF();
35
        this.sal = in.readInt();
36
        this.comm = in.readInt();
37
        this.deptno = in.readInt();
38
    }
39

40
    @Override
41
    public void write(DataOutput output) throws IOException {
42
        ////7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
43
        output.writeInt(empno);
44
        output.writeUTF(ename);
45
        output.writeUTF(job);
46
        output.writeInt(mgr);
47
        output.writeUTF(hiredate);
48
        output.writeInt(sal);
49
        output.writeInt(comm);
50
        output.writeInt(deptno);
51
    }
52

53
    public int getEmpno() {
54
        return empno;
55
    }
56

57
    public void setEmpno(int empno) {
58
        this.empno = empno;
59
    }
60

61
    public String getEname() {
62
        return ename;
63
    }
64

65
    public void setEname(String ename) {
66
        this.ename = ename;
67
    }
68

69
    public String getJob() {
70
        return job;
71
    }
72

73
    public void setJob(String job) {
74
        this.job = job;
75
    }
76

77
    public int getMgr() {
78
        return mgr;
79
    }
80

81
    public void setMgr(int mgr) {
82
        this.mgr = mgr;
83
    }
84

85
    public String getHiredate() {
86
        return hiredate;
87
    }
88

89
    public void setHiredate(String hiredate) {
90
        this.hiredate = hiredate;
91
    }
92

93
    public int getSal() {
94
        return sal;
95
    }
96

97
    public void setSal(int sal) {
98
        this.sal = sal;
99
    }
100

101
    public int getComm() {
102
        return comm;
103
    }
104

105
    public void setComm(int comm) {
106
        this.comm = comm;
107
    }
108

109
    public int getDeptno() {
110
        return deptno;
111
    }
112

113
    public void setDeptno(int deptno) {
114
        this.deptno = deptno;
115
    }
116
}
 
 

Partitioner:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class EmployeePartition extends Partitioner<LongWritable, Employee> {

    @Override
    public int getPartition(LongWritable key2, Employee e, int numPartition) {
        // 分区的规则
        if(e.getDeptno() == 10){
            return 1%numPartition;
        }else if(e.getDeptno() == 20){
            return 2%numPartition;
        }else{
            return 3%numPartition;
        }
    }
}
 
 
 
17
 
 
 
 
 
1
import org.apache.hadoop.io.LongWritable;
2
import org.apache.hadoop.mapreduce.Partitioner;
3

4
public class EmployeePartition extends Partitioner<LongWritable, Employee> {
5

6
    @Override
7
    public int getPartition(LongWritable key2, Employee e, int numPartition) {
8
        // 分区的规则
9
        if(e.getDeptno() == 10){
10
            return 1%numPartition;
11
        }else if(e.getDeptno() == 20){
12
            return 2%numPartition;
13
        }else{
14
            return 3%numPartition;
15
        }
16
    }
17
}
 
 

Driver:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PartitionMain {

    public static void main(String[] args) throws Exception {
        // 求员工工资的总额
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //指明程序的入口
        job.setJarByClass(PartitionMain.class);

        //指明任务中的mapper
        job.setMapperClass(EmployeeMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Employee.class);

        //设置分区的规则
        job.setPartitionerClass(EmployeePartition.class);
        job.setNumReduceTasks(3);

        job.setReducerClass(EmployeeReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Employee.class);

        //指明任务的输入路径和输出路径	---> HDFS的路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //启动任务
        job.waitForCompletion(true);
    }
}
 
 
 
38
 
 
 
 
 
1
import org.apache.hadoop.conf.Configuration;
2
import org.apache.hadoop.fs.Path;
3
import org.apache.hadoop.io.LongWritable;
4
import org.apache.hadoop.io.NullWritable;
5
import org.apache.hadoop.mapreduce.Job;
6
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
7
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
8
public class PartitionMain {
9

10
    public static void main(String[] args) throws Exception {
11
        // 求员工工资的总额
12
        Configuration conf = new Configuration();
13
        Job job = Job.getInstance(conf);
14

15
        //指明程序的入口
16
        job.setJarByClass(PartitionMain.class);
17

18
        //指明任务中的mapper
19
        job.setMapperClass(EmployeeMapper.class);
20
        job.setMapOutputKeyClass(LongWritable.class);
21
        job.setMapOutputValueClass(Employee.class);
22

23
        //设置分区的规则
24
        job.setPartitionerClass(EmployeePartition.class);
25
        job.setNumReduceTasks(3);
26

27
        job.setReducerClass(EmployeeReducer.class);
28
        job.setOutputKeyClass(LongWritable.class);
29
        job.setOutputValueClass(Employee.class);
30

31
        //指明任务的输入路径和输出路径---> HDFS的路径
32
        FileInputFormat.addInputPath(job, new Path(args[0]));
33
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
34

35
        //启动任务
36
        job.waitForCompletion(true);
37
    }
38
}
 
 
 
 



原文地址:https://www.cnblogs.com/TiePiHeTao/p/126c97716f7c505105c1347f0f0e0989.html