034 二次排序

一:准备

1.二测排序

  其中1说明了自定义类型

  2与3说明了shuffle阶段的分区与分组,同时说明了程序的写法。

  

2.RawComparator class

  

3.二次排序的要点

  组合key,key是一个组合的字段,自定义数据类型

    实现WritableComparable

  保证原来的分区不变,自定义分区规则

    继承Patitioner

  保证原来的分组不变,自定义分组规则

    继承RawComparator

4.输入的数据

  

5.需求

  平时的只有一次排序,就是第一个会排序,但是输出的结果中第二个没有排序处理。

  现在希望。在第一个key排序之后,后面的key也可以排序出来。

  

二:第一次排序

3.输出第一排序的程序

  MAPPER--------------

  

  REDUCER------------

  

4.结果

  

三:二次排序

5.map和reduce程序

  

6.自定义类型的程序

  需要实现接口WritableComparable

  输入String,int。

  

7.自定义分组比较器

  需要实现RawComparator

  两个函数都是相同的意思,都是在返回first的比较结果。

  

8.定义分区规则

  继承Patitioner

  

9.运行结果

  

四:优化点

  例如分区就属于优化,但是这里说的是正负数的优化。

  

五:重新整理

1.项目结构

  

2.程序代码

RealSecondSort.class

package com.senior.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.senior.network.WebPvCount;
import com.senior.network.WebPvCount.WebPvCountMapper;
import com.senior.network.WebPvCount.WebPvCountReducer;

public class RealSecondSort extends Configured implements Tool{
	//Mapper
	public static class SortMapper extends Mapper<LongWritable,Text,PariWritable,IntWritable>{
		private PariWritable mapoutkey=new PariWritable();
		private IntWritable mapoutvalue=new IntWritable();
		@Override
		protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
			String valueStr=value.toString();
			String strs[]=valueStr.split(",");
			mapoutkey.set(strs[0],Integer.valueOf(strs[1]));
			mapoutvalue.set(Integer.valueOf(strs[1]));
			context.write(mapoutkey, mapoutvalue);
		
		}
		
	}
	
	//Reducer
	public static class SortReducer extends Reducer<PariWritable,IntWritable,Text,IntWritable>{
		private Text outkey=new Text();
		@Override
		protected void reduce(PariWritable key, Iterable<IntWritable> value,Context context)throws IOException, InterruptedException {
			for(IntWritable str : value){
				outkey.set(key.getFirst());
				context.write(outkey, str);
			}
		}
		
	}
	
	//Driver
	public int run(String[] args) throws Exception {
		Configuration conf=this.getConf();
		Job job=Job.getInstance(conf,this.getClass().getSimpleName());
		job.setJarByClass(RealSecondSort.class);
		//input
		Path inpath=new Path(args[0]);
		FileInputFormat.addInputPath(job, inpath);
		
		//output
		Path outpath=new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outpath);
		
		//map
		job.setMapperClass(SortMapper.class);
		job.setMapOutputKeyClass(PariWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//shuffle*************************************
		job.setPartitionerClass(PartitionNum.class);
		job.setGroupingComparatorClass(GroupingComparator.class);	
		
		//shuffle*************************************
		
		//reduce
		job.setReducerClass(SortReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//submit
		boolean isSucess=job.waitForCompletion(true);
		return isSucess?0:1;

	}
	
	//Main
	public static void main(String[] args) throws Exception{
		Configuration conf=new Configuration();
		args=new String[]{
				"hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/inputSortData",
				"hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/outputSortData2"
		};
		int status=ToolRunner.run(new RealSecondSort(), args);
		System.exit(status);
	}

}

 

PariWritable.java

这个地方使用的接口可以看看下面的说明。

package com.senior.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PariWritable implements WritableComparable<PariWritable>{
	private String first;
	private Integer second;
	public PariWritable(){}
	public PariWritable(String first,Integer second){
		set(first,second);
	}
	//set get
	public String getFirst() {
		return first;
	}
	public void setFirst(String first) {
		this.first = first;
	}
	public Integer getSecond() {
		return second-Integer.MAX_VALUE;
	}
	public void setSecond(Integer second) {
		this.second = second+Integer.MAX_VALUE;
	}
	public void set(String first, Integer second) {
		this.first=first;
		this.second=second;	
	}
	//
	public void readFields(DataInput input) throws IOException {
		this.first=input.readUTF();
		this.second=input.readInt();		
	}
	
	public void write(DataOutput output) throws IOException {
		output.writeUTF(first);
		output.writeInt(second);
		
	}
	public int compareTo(PariWritable o) {
		int comp=this.first.compareTo(o.getFirst());
		if(0!=comp){
			return comp;
		}
		return Integer.valueOf(getSecond()).compareTo(Integer.valueOf(o.getSecond()));
	}
	@Override
	public String toString() {
		return "PariWritable [first=" + first + ", second=" + second + "]";
	}
	
}

  

PartitionNum.java

package com.senior.sort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionNum extends Partitioner<PariWritable, IntWritable> {

	@Override
	public int getPartition(PariWritable key, IntWritable value, int num) {
		
		return (key.getFirst().hashCode()&Integer.MAX_VALUE)%num;
	}

}

  

GroupingComparator.java

关于程序中的一点仔细看下面的一个部分,就可以很好的理解了。

package com.senior.sort;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class GroupingComparator implements RawComparator<PariWritable> {

	public int compare(PariWritable o1, PariWritable o2) {
		return o1.getFirst().compareTo(o2.getFirst());
	}

	public int compare(byte[] b1, int arg1, int l1, byte[] b2, int arg4,int l2) {
		
		return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4);
	}

}

  

3.效果

  

六:Hadoop的序列化

1.说明

  在上面的程序中使用到了序列化,在整理的过程中对这一块进行整理一番。

  

2.序列化的功能  

  对于需要保存和处理大规模数据的Hadoop来说,其序列化机制要达到以下目的:
  • 排列紧凑:尽量减少带宽,加快数据交换速度
  • 处理快速:进程间通信需要大量的数据交互,使用大量的序列化机制,必须减少序列化和反序列的开支
  • 跨语言:可以支持不同语言间的数据交互啊,如C++
  • 可扩展:当系统协议升级,类定义发生变化,序列化机制需要支持这些升级和变化

3.Writable

  为了支持以上特性,引用了Writable接口。和说明性Serializable接口不一样,它要求实现两个方法。
  这个功能已经在前面的章节中说明过。
public interface Writable {  
  void write(DataOutput out) throws IOException;  
  void readFields(DataInput in) throws IOException;  
}  

  

4.其他接口

  A。WritableComparable:它不仅提供序列化功能,而且还提供比较的功能。这种比较式基于反序列后的对象成员的值,速度较慢。
  B。RawComparator:由于MapReduce十分依赖基于键的比较排序(自定义键还需要重写hashCode和equals方法),因此提供了一个优化接口 RawComparator。该接口允许直接比较数据流中的记录,无需把数据流反序列化为对象,这样避免了新建对象的额外开销。RawComparator定义如下,compare方法可以从每个字节数组b1和b2中读取给定起始位置(s1和s2)以及长度l1和l2的一个整数直接进行比较。
  这个上面已经说明过。
public interface RawComparator<T> extends Comparator<T> {  
  
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);  
  
}  
  C。WritableComparator: 是 RawComparator 的一个通用实现,提供两个功能:提供了一个 RawComparator的comparea()的默认实现,该默认实现只是反序列化了键然后再比较,没有什么性能优势。其次、充当了 RawComaprator 实例的一个工厂方法。
  当我们要实现自定key排序时(自定义分组),需要指定自己的排序规则。
  如需要以StartEndDate为键且以开始时间分组,则需要自定义分组器:
class MyGrouper implements RawComparator<StartEndDate> {  
    @Override  
    public int compare(StartEndDate o1, StartEndDate o2) {  
        return (int)(o1.getStartDate().getTime()- o2.getEndDate().getTime());  
    }  
    @Override  
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
        int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);  
        return compareBytes;  
    }  
       
}  

  

  

原文地址:https://www.cnblogs.com/juncaoit/p/6016873.html