进阶程序 ==> 二次排序

mr会自动对key按字典序排序,而不会对value排序。如果想对value进行排序,就要自己手动写逻辑来实现针对value的二次排序。

比如有这样一个文件:

a 1
b 3
a 5
d 10
a 4
b 8
d 9
c 2
b 5
c 1

经过排序后,变成这样:

a 1
a 4
a 5
b 3
b 5
b 8
c 1
c 2
d 9
d 10

既然mr会自动对key做排序,我们就要想办法利用它的这个特点,构造一个key。

我们把原始的key和value组装在一起,作为新key,类似于这样:<(a,1),1> <(b,3),3> ... 

//我们创建一个对象,这个对象如果想被mr使用,就必须要实现WritableComparable<>这个接口。需要重写它的序列化和反序列化方法,注意read和write传入属性的顺序必须一致!

//我们还需要重写这个类的compareTo方法,先比较原始key,如果一致再比较原始value。

package com.rabbit.hadoop.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class CustomWritable implements WritableComparable<CustomWritable>{

private String key;
private int value;

public CustomWritable() {

}

public CustomWritable(String key,int value) {
set(key, value);
}

public void set(String key,int value) {
this.key = key;
this.value = value;
}

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public Integer getValue() {
return value;
}

public void setValue(Integer value) {
this.value = value;
}

/*
* x序列化
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
public void write(DataOutput out) throws IOException {
out.writeUTF(key);
out.writeInt(value);
}

/*
* 反序列化
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
public void readFields(DataInput in) throws IOException {
this.key = in.readUTF();
this.value = in.readInt();
}

public int compareTo(CustomWritable o) {
int result = this.getKey().compareTo(o.getKey());

//原始key相等,就比较原始value值,做二次排序。
if (result == 0) {
return Integer.valueOf(this.value).compareTo(Integer.valueOf(o.getValue()));

}

return result;
}

@Override
public String toString() {
return "CustomWritable [key=" + key + ", value=" + value + "]";
}

}

重新定义了key之后,可以保证排序的正确性,但是分组是会出问题的。因为我们最后还是要 (a,1),(a,4),(a,5),(b,3),(b,5)...这种形式的结果。如果按照默认的分组方式,不会把所有的a分在一个组。因为它现在的key是(a,1),(b,3)这种东西,肯定不能把所有的key为 a 的正确的分到一个f里。所以必须重写分组方法,还是按照原始的key分组。

//实现RawComparator,重写compare()方法,传入我们自定义的类。

package com.rabbit.hadoop.secondarysort;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class CustomComparator implements RawComparator<CustomWritable>{

public int compare(CustomWritable o1, CustomWritable o2) {
return o1.getKey().compareTo(o2.getKey());
}

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
}

}

//分组不对,分区也不对。原因跟分组差不多,a 和 (a,1) 对 reduce个数取模的结果几乎肯定不一样。如果想按照原始key正确分区,就需要重新定义分区方法。

//创建一个分区类,继承Partitioner<>,重写getPartition()方法。

package com.rabbit.hadoop.secondarysort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class CustomPartitioner extends Partitioner<CustomWritable, IntWritable>{

@Override
public int getPartition(CustomWritable key, IntWritable value, int numPartitions) {

return key.getKey().hashCode() & Integer.MAX_VALUE % numPartitions;
}

}

到这里,前期的准备工作就算完成了。接下来就是按模板,重写map() 方法和 reduce()方法了。

//重写map

package com.rabbit.hadoop.secondarysort;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//输出key为我们自定义的类,输出value还是原始的value

public class SecondarySortMapper extends Mapper<LongWritable, Text, CustomWritable, IntWritable> {

private CustomWritable outputKey = new CustomWritable();
private IntWritable outputValue = new IntWritable();

/**
* a 1 <(a 1),1> a 100 <(a 100),100> a 3 b 2 b 1 b 5 c 3
*/

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

StringTokenizer stringTokenizer = new StringTokenizer(line, " ");

while (stringTokenizer.hasMoreTokens()) {

String string = stringTokenizer.nextToken();
//这里我偷了个懒,不想写 if 判断了,写了个取巧的办法。在解析的时候,如果是能正确的解析成Integer,那么说明当前拿到的这个field是value。如果不能正确的解析成Integer,说明拿到的是key。
try {
Integer parseValue = Integer.parseInt(string);
outputKey.setValue(parseValue);
outputValue.set(parseValue);

} catch (Exception e) {
outputKey.setKey(string);
}
}

context.write(outputKey, outputValue);

}

}

//重写reduce

package com.rabbit.hadoop.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//map的输出就是reduce的输入。所以reduce输入的key就是我们自定义的类,输入的value是原始value

public class CustomReducer extends Reducer <CustomWritable, IntWritable, Text, IntWritable>{

private Text outputKey = new Text();

/**
* <a,[1,3,100]>
*/
@Override
protected void reduce(CustomWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {

String oriKey = key.getKey(); //拿到原始key

for(IntWritable value : values) {
outputKey.set(oriKey);
context.write(outputKey, value); //写出原始key和原始value
}
}

}

//写个Driver类,调用一下

package com.rabbit.hadoop.secondarysort;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SecondarySortDriver extends Configured implements Tool {

public int run(String[] args) throws Exception {

Configuration configuration = getConf();

Job job = Job.getInstance(configuration,this.getClass().getSimpleName());

job.setJarByClass(SecondarySortDriver.class);

job.setPartitionerClass(CustomPartitioner.class);
job.setGroupingComparatorClass(CustomComparator.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(SecondarySortMapper.class);
job.setMapOutputKeyClass(CustomWritable.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(CustomReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.setNumReduceTasks(4);  //测试一下能否正确分区

boolean issuccess = job.waitForCompletion(true);
return issuccess ? 0 : 1;
}


public static void main(String[] args) throws Exception {

//我的程序是在本地windows环境下跑的,输入输出文件都在windows文件系统。如果要放到HDFS集群上,需要指定参数,configuration.set("fs.defaultFS", "hdfs://bd27-server.ibeifeng.com:8020")

Configuration configuration = new Configuration();
args = new String[] {"D:\input\sec-sort.txt","D:\output5"};

int status = ToolRunner.run(configuration, new SecondarySortDriver(), args);

System.exit(status);
}

}

原文地址:https://www.cnblogs.com/rabbit624/p/10553189.html