hadoop二次排序

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
//import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

import com.hadoop.compression.lzo.LzoCodec;

// 002484 18.29
// 600879 12.89
public class SecondSotrStr {
public static class StrPair implements WritableComparable<StrPair> {
private Text first;
private Text second;
private Text third;
private Text fourth;

public StrPair() {
set(new Text(), new Text(), new Text(), new Text());
}

public void set(Text left, Text right, Text third, Text fourth) {
this.first = left;
this.second = right;
this.third = third;
this.fourth = fourth;
}

public Text getFirst() {
return first;
}

public Text getSecond() {
return second;
}

public Text getThird() {
return third;
}

public Text getFourth() {
return fourth;
}

@Override
public String toString() {
return first + "	" + second + "	" + third + "	" + fourth;
}

@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
third.readFields(in);
fourth.readFields(in);
}

@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
third.write(out);
fourth.write(out);
}

@Override
public int hashCode() {
return first.hashCode() * 157 + second.hashCode() * 10
+ third.hashCode();
}

@Override
public boolean equals(Object right) {
if (right instanceof StrPair) {
StrPair r = (StrPair) right;
return first.equals(r.first) && second.equals(r.second)
&& third.equals(r.third) && fourth.equals(r.fourth);
} else {
return false;
}
}

/** A Comparator that compares serialized StrPair. */
public static class Comparator extends WritableComparator {
private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

public Comparator() {
super(StrPair.class);
}

// 排序比较器,数据全部存在byte数组
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
// 二进制数组读取
/*
* try { //System.out.println("--" + b1[s1]); Integer firstL1 =
* WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
* //String str = readSt // System.out.println("firstL1 = " +
* firstL1); } catch (IOException e) { // TODO Auto-generated
* catch block e.printStackTrace(); }
*/
// int intvalue = readInt(b1, s1);
/*
* int third = 0; for(int i =s1 + 9; i<= s1+ 12; i++){ third +=
* (b1[i]&0xff) << (24-8*i); } System.out.println("third = " +
* third);
*/
System.out.println("l1 = " + l1);
return compareBytes(b1, s1, l1, b2, s2, l2);
/*
try {
int firstl1 = WritableUtils.decodeVIntSize(b1[s1])
+ readVInt(b1, s1);
int firstl2 = WritableUtils.decodeVIntSize(b2[s2])
+ readVInt(b2, s2);
int cmp = TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2,
firstl2);
if (cmp != 0)
return cmp;


int firstl12 = WritableUtils.decodeVIntSize(b1[s1 + firstl1])
+ readVInt(b1 , s1 + firstl1);
int firstl22 = WritableUtils.decodeVIntSize(b2[s2 + firstl2])
+ readVInt(b2, s2 + firstl2);
cmp = TEXT_COMPARATOR.compare(b1, s1 + firstl1, firstl12, b2, s2 + firstl2,
firstl22);
if (cmp != 0) 
return cmp;




int firstl13 = WritableUtils.decodeVIntSize(b1[s1+ firstl1 + firstl12])
+ readVInt(b1 , s1 + firstl1 + firstl22);
int firstl23 = WritableUtils.decodeVIntSize(b2[s2 + firstl2 + firstl22])
+ readVInt(b2, s2 + firstl2 + firstl22);
cmp = TEXT_COMPARATOR.compare(b1, s1+ firstl1 + firstl12, firstl13, b2, s2 + firstl2 + firstl22,
firstl23);
//if (cmp != 0)
return cmp;





return TEXT_COMPARATOR.compare(b1, s1 + firstl1, l1
- firstl1, b2, s2 + firstl2, l1 - firstl2);


} catch (IOException e) {
throw new IllegalArgumentException(e);
}
*/

}
}

static { // register this comparator
WritableComparator.define(StrPair.class, new Comparator());

}

// @Override
public int compareTo(StrPair o) {/*
* if (first != o.first) { return first
* < o.first ? -1 : 1; } else if (second
* != o.second) { return second <
* o.second ? -1 : 1; }// else if (third
* != o.third) { // return third <
* o.third ? -1 : 1;}
* 
* return 0;
*/
return 0;
}

}

/**
* Partition based on the first part of the pair.
*/
public static class FirstPartitioner extends Partitioner<StrPair, Text> {
@Override
//
public int getPartition(StrPair key, Text value, int numPartitions) {
return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
}
}

/**
* Compare only the first part of the pair, so that reduce is called once
* for each value of the first part.
*/

// 调用这里
public static class FirstGroupingComparator implements
RawComparator<StrPair> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
b2, s2, Integer.SIZE / 8);
}

@Override
public int compare(StrPair o1, StrPair o2) {
System.out.println("-----group2-----");
Text l = o1.getFirst();
Text r = o2.getFirst();
return l.equals(r) ? 0 : 1;
// return l == r ? 0 : (l < r ? -1 : 1);
}
}

/**
* Read two integers from each line and generate a key, value pair as
* ((left, right), right).
*/
public static class MapClass extends
Mapper<LongWritable, Text, StrPair, NullWritable> {

private final StrPair key = new StrPair();
private final IntWritable value = new IntWritable();

private Text left = new Text();
private Text right = new Text();
private Text third = new Text();
private Text fourth = new Text();

@Override
public void map(LongWritable inKey, Text inValue, Context context)
throws IOException, InterruptedException {
System.out.println("value" + inValue.toString());
StringTokenizer itr = new StringTokenizer(inValue.toString());
if (itr.hasMoreTokens()) {
left.set((itr.nextToken()));
if (itr.hasMoreTokens()) {
right.set(itr.nextToken());

if (itr.hasMoreTokens()) {
third.set(itr.nextToken());
if (itr.hasMoreTokens()) {
fourth.set(itr.nextToken());
}
}
}
key.set(left, right, third, fourth);
// value.set(right);
context.write(key, NullWritable.get());
}
}
}

/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce extends
Reducer<StrPair, NullWritable, Text, NullWritable> {
private static final Text SEPARATOR = new Text(
"------------------------------------------------");
private final Text first = new Text();

@Override
public void reduce(StrPair key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
// Text outkey = new Text(key.to);
// context.write(SEPARATOR, null);
// first.set(Integer.toString(key.getFirst()));

// System.out.println("key1 " + key );
for (NullWritable value : values) {
System.out.println("key2 " + key);
context.write(new Text(key.toString()), NullWritable.get());
}
}
}

private static boolean flag;

public static boolean deleteFile(String sPath) {
flag = false;
File file = new File(sPath);
// 路径为文件且不为空则进行删除
if (file.isFile() && file.exists()) {
file.delete();
flag = true;
}
return flag;
}

public static boolean deleteDirectory(String sPath) {
// 如果sPath不以文件分隔符结尾,自动添加文件分隔符
if (!sPath.endsWith(File.separator)) {
sPath = sPath + File.separator;
}
File dirFile = new File(sPath);
// 如果dir对应的文件不存在,或者不是一个目录,则退出
if (!dirFile.exists() || !dirFile.isDirectory()) {
return false;
}
flag = true;
// 删除文件夹下的所有文件(包括子目录)
File[] files = dirFile.listFiles();
for (int i = 0; i < files.length; i++) {
// 删除子文件
if (files[i].isFile()) {
flag = deleteFile(files[i].getAbsolutePath());
if (!flag)
break;
} // 删除子目录
else {
flag = deleteDirectory(files[i].getAbsolutePath());
if (!flag)
break;
}
}
if (!flag)
return false;
// 删除当前目录
if (dirFile.delete()) {
return true;
} else {
return false;
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
/*
* conf.setBoolean("mapreduce.map.output.compress", true);
* //conf.setBoolean("mapreduce.output.fileoutputformat.compress",
* false);
* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
* GzipCodec.class, CompressionCodec.class);
*/

// gzip
/*
* conf.setBoolean("mapreduce.map.output.compress", true);
* conf.setClass("mapreduce.map.output.compression.codec",
* GzipCodec.class, CompressionCodec.class);
* conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
* GzipCodec.class, CompressionCodec.class);
*/
conf.set("mapreduce.map.log.level", "DEBUG");

// snappy
/*
* conf.setBoolean("mapreduce.map.output.compress", true);
* conf.setClass("mapreduce.map.output.compression.codec",
* SnappyCodec.class, CompressionCodec.class);
* conf.setBoolean("mapreduce.output.fileoutputformat.compress", false);
* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
* SnappyCodec.class, CompressionCodec.class);
*/

String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: SecondSotrStr <in> <out>");
System.exit(2);
}

Path outputDir = new Path(otherArgs[1]);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputDir)) {
fs.delete(outputDir, true);
}

Job job = new Job(conf, "secondary sort");
job.setJarByClass(SecondSotrStr.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

/*
* conf.setBoolean("mapred.output.compress", true); //
* conf.setClass("mapred.output.compression.codec", GzipCodec.class,
* CompressionCodec.class);
* conf.setClass("mapred.output.compression.codec", SnappyCodec.class,
* CompressionCodec.class);
* 
* conf.setBoolean("reduce.output.compress", true); //
* conf.setClass("mapred.output.compression.codec", GzipCodec.class,
* CompressionCodec.class);
* conf.setClass("reduce.output.compression.codec", SnappyCodec.class,
* CompressionCodec.class);
* 
* /* conf.setBoolean("mapreduce.output.compress", true);
* conf.setClass("mapreduce.output.compression.codec", GzipCodec.class,
* CompressionCodec.class);
*/

// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);

// the map output is StrPair, IntWritable
job.setMapOutputKeyClass(StrPair.class);
job.setMapOutputValueClass(NullWritable.class);

// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

// lzo
/*
* conf.setBoolean("mapreduce.map.output.compress", true);
* conf.setClass("mapreduce.map.output.compression.codec",
* LzoCodec.class, CompressionCodec.class);
* conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
* conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
* LzoCodec.class, CompressionCodec.class);
*/
// 块压缩
// job.setOutputFormatClass(SequenceFileOutputFormat.class);
conf.set("mapred.output.compression.type", "BLOCK");

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

  

原文地址:https://www.cnblogs.com/chengxin1982/p/4076544.html