分区(partition)和全排序(WritableComparable)

一、概念

  1、分区:

    Hadoop默认分区是根据key的hashCode对ReduceTask个数取模得到的,用户无法控制哪个key存储到哪个分区。

   想要控制哪个key存储到哪个分区,需要自定义类继承Partitioner<KEY, VALUE>,
   泛型
KEY, VALUE分别对应Mapper里的输出key,value,因为分区是在map()之后,环形缓冲区溢写时完成的。
    提示:如果ReduceTask的数量大于自定义类中重写的
getPartition()设置的分区数时,会产生空的输出文件part-r-00000
       如果ReduceTask的数量小于自定义类中重写的getPartition()设置的分区数时,有一部分分区数据无处安放,就会报错
       如果ReduceTask的数量等于1,则不会走自定义的分区方法,系统默认分区就是1,最终只会输出一个分区文件
       分区号必须从0开始,逐一增加

  2、全排序:
    
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,
    因为一台机器
处理所有文件,完全丧失了MapReduce所提供的并行架构。
二、项目举例
  1、待处理文本
  
      data.txt
   2、需求:
    分别统计出各市感染人员信息,输出到对应文件中(说明:武汉的人员信息统一输出到一个文件,十堰的人员信息统一输出到一个文件),
    输出结果按照感染人员的年龄做倒叙排列。
输出结果举例:
    地区 姓名 年龄 性别  
   
武汉 张三 70 女

    武汉 李四 50 男
      武汉  王五   60   女 
      武汉  赵六   55   男
  3、
Person2Bean.java    
      
package com.jh.work8;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Person2Bean implements WritableComparable<Person2Bean> {
    private String area; // 感染地区
    private String name; // 感染姓名
    private Integer age; // 感染年龄
    private String sex; // 感染性别

    public Person2Bean() {
    }

    @Override
    public String toString() {
        return area + "	" + name + "	" + age + "	" + sex;
    }

    public String getArea() {
        return area;
    }

    public void setArea(String area) {
        this.area = area;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public String getSex() {
        return sex;
    }

    public void setSex(String sex) {
        this.sex = sex;
    }

    // 排序
    @Override
    public int compareTo(Person2Bean o) {
        // 按感染年龄倒序排序(这里主要是区内排序)
        return o.getAge().compareTo(this.age);
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(area);
        out.writeUTF(name);
        out.writeUTF(sex);
        out.writeInt(age);
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        area = in.readUTF();
        name = in.readUTF();
        sex = in.readUTF();
        age = in.readInt();
    }
}
Person2Bean
  4、Person2Mapper.java
      
package com.jh.work8;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Person2Mapper extends Mapper<LongWritable,Text,Person2Bean,NullWritable> {
    private Person2Bean bean = new Person2Bean();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 取文本每行内容,并分割
        String[] split = value.toString().split("	");

        // 赋值
        bean.setArea(split[0]);
        bean.setAge(Integer.parseInt(split[1]));
        bean.setSex(split[2]);
        bean.setName(split[3]);
        
        context.write(bean,NullWritable.get());
    }
}
Person2Mapper


  5、PersonPartition.java
      
package com.jh.work8;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class PersonPartition extends Partitioner<Person2Bean,NullWritable> {
    /*
        自定义分区,继承Partitioner
        泛型对应Mapper端的输出
     */
    @Override
    public int getPartition(Person2Bean person2Bean, NullWritable nullWritable, int numPartitions) {
        // 根据感染地区做三个分区
        switch (person2Bean.getArea()){
            case "武汉市":
                return 0;
            case "黄石市":
                return 1;
            default:
                return 2;
        }
    }
}
PersonPartition


  6、Person2Reduce.java
      
package com.jh.work8;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Person2Reduce extends Reducer<Person2Bean,NullWritable,Person2Bean,NullWritable> {
    @Override
    protected void reduce(Person2Bean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 输出所有
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}
Person2Reduce


  7、Person2Driver.java
      
package com.jh.work8;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Person2Driver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取配置文件和job对象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 设置jar的路径
        job.setJarByClass(Person2Driver.class);

        // 设置mapper类和reducer类
        job.setMapperClass(Person2Mapper.class);
        job.setReducerClass(Person2Reduce.class);

        // 设置mapper输出的key和value的数据类型
        job.setMapOutputKeyClass(Person2Bean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 设置reduce输出的key和value的数据类型
        job.setOutputKeyClass(Person2Bean.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置自定义分区的类
        job.setPartitionerClass(PersonPartition.class);
        // 设置ReduceTask的数量
        job.setNumReduceTasks(3);

        // 设置要处理文件的输入路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        // 设置计算完毕后的数据文件的输出路径
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        // 提交计算任务(job)
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0:1);
    }
}
Person2Driver


  8、最终输出为三个文件:
        
















 



 
 


 




原文地址:https://www.cnblogs.com/si-137/p/13415188.html