Mapreduce学习之路四

MR之排序

概述:

  1. MapTask和ReduceTask均会为数据按照 key 进行排序,该操作属于为Hadoop的默认行为

  2. 任何程序中的数据都会被排序,而不管逻辑上是否需要

  3. 默认排序是按照字典顺序排序,且实现改排序的方法是 快速排序

 

拷贝阶段中的排序:

在reducetask中,它从每个MapTask远程拷贝相应的数据文件,如果文件大小超过一定的阈值,则溢写磁盘上,否则存储在内存中

如果磁盘上的文件数量达到一定的阈值,则进行一次 归并排序 合成一个更大的文件

所有数据拷贝完成之后,ReduceTask统一对内存和磁盘的所有数据进行一次 归并排序 

分类:

  1. 部分排序:MapReduce根据输入记录的键对数据集进行排序,保证输出的每个文件都有序

  2. 全排序:最终输出结果只有一个文件,且文件内部有序,即reducetask只设置为一个。在处理大型文件时,效率低下

  3. 辅助排序:在reduce端对 key 进行分组,应用于在接受的key为bean对象时,想让一个或者几个字段相同的key进入到同一个reduce方法时,可以采用分组排序

  4. 二次排序:在自定义排序过程中,如果CompareTo中的判断条件为两个即为二次排序

  5. 自定义排序 WriteableComparable

    5.1 原理分析:bean对象作为key传输,需要实现WriteableComparable接口重写CompareTo方法即可

    5.2 代码实现:

  @Override
      public int compareTo(FlowBean o){
          int result;
          // 按照总流量大小,倒序排列
          if (sumFlow > bean.getSumFlow()){
              result = -1;
          }else if (sumFlow < bean.getSumFlow()){
              result = 1;
          }else {
              result = 0;
          }
          return result;
      }

案例演示:

  需求:找出订单中最贵的商品

  数据准备:

00000001    Pdt_01    222.8
00000002    Pdt_05    722.4
00000001    Pdt_02    33.8
00000003    Pdt_06    232.8
00000003    Pdt_02    33.8
00000002    Pdt_03    522.8
00000002    Pdt_04    122.4

  设计订单的JavaBean对象,属性包括:订单id、订单价格

public class OrderBean implements WritableComparable<OrderBean> {

    private int order_id; // 订单id
    private double price; // 订单价格

    public OrderBean() {
    }

    public OrderBean(int order_id, double price) {
        this.order_id = order_id;
        this.price = price;
    }

    @Override
    public int compareTo(OrderBean bean) {
        // 先按照订单id升序排序,如果相同按照价格的降序排序
        int result;
        if (order_id > bean.getOrder_id()){
            result = 1;
        }else if (order_id < bean.getOrder_id()){
            result = -1;
        }else {
            if (price > bean.getPrice()){
                result = -1;
            }else if (price < bean.getPrice()){
                result = 1;
            }else {
                result = 0;
            }
        }
        return result;
    }

    // 序列化
    @Override
    public void write(DataOutput output) throws IOException {
        output.writeInt(order_id);
        output.writeDouble(price);
    }

    // 反序列化
    @Override
    public void readFields(DataInput input) throws IOException {
        order_id = input.readInt();
        price = input.readDouble();
    }

    @Override
    public String toString() {
        return order_id + "	" + price;
    }

    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

  Mapper类:切割,封装对象

public class OrderSortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
    OrderBean k = new OrderBean();
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 获取一行数据
        String line = value.toString();
        // 切割
        String[] fields = line.split("	");
        // 封装对象
        k.setOrder_id(Integer.parseInt(fields[0]));
        k.setPrice(Double.parseDouble(fields[2]));
        // 写出
        context.write(k,NullWritable.get());
    }
}

  Reducer类:写出数据

public class OrderSortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)
            throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

  OrderGroupingComparato:分组排序

public class OrderGroupingComparator extends WritableComparator {
    protected OrderGroupingComparator() {
        super(OrderBean.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // 要求只要id相同就认为是相同的key
        OrderBean aBean = (OrderBean) a;
        OrderBean bBean = (OrderBean) b;
        int result;
        if (aBean.getOrder_id() > bBean.getOrder_id()) {
            result = 1;
        } else if (aBean.getOrder_id() < bBean.getOrder_id()) {
            result = -1;
        } else {
            result = 0;
        }

        return result;
    }
}

  驱动类Driver:

public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"E:/input/group","E:/output"};
        // 获取配置信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(OrderDriver.class);
        job.setMapperClass(OrderSortMapper.class);
        job.setReducerClass(OrderSortReducer.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 指定分组类
        job.setGroupingComparatorClass(OrderGroupingComparator.class);

        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}
原文地址:https://www.cnblogs.com/joey-413/p/13997972.html