Hadoop自定义分组Group

matadata:

hadoop  a  
spark   a  
hive    a  
hbase   a  
tachyon a  
storm   a  
redis   a  



自定义分组

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class MyGroup {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if(otherArgs.length!=2){
			System.err.println("Usage databaseV1 <inputpath> <outputpath>");
		}
		
		Job job = Job.getInstance(conf, MyGroup.class.getSimpleName() + "1");
		job.setJarByClass(MyGroup.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setMapperClass(MyMapper1.class);
		job.setGroupingComparatorClass(MyGroupComparator.class);
		job.setReducerClass(MyReducer1.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		job.waitForCompletion(true);
	}
	public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text>{
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] spl=value.toString().split("	");
			context.write(new Text(spl[0].trim()), new Text(spl[1].trim()));
		}
	}
	public static class MyReducer1 extends Reducer<Text, Text, Text, Text>{
		@Override
		protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			Long count=0L;
			for (@SuppressWarnings("unused") Text v2 : v2s) {
				count++;
				context.write(new Text("in--"+k2), new Text(count.toString()));
			}
			context.write(new Text("out--"+k2), new Text(count.toString()));
		}
	}
	public static class MyGroupComparator extends WritableComparator{
		public MyGroupComparator(){
			super(Text.class,true);
		}
		@SuppressWarnings("rawtypes")
		public int compare(WritableComparable a, WritableComparable b) {
			Text p1 = (Text) a;
			Text p2 = (Text) b;
			p1.compareTo(p2);
			return  0;
		  }
	}
}



结果

in--hadoop      1
in--hbase       2
in--hive        3
in--redis       4
in--spark       5
in--storm       6
in--tachyon     7
out--tachyon    7



然后看下默认分组

public static class MyGroupComparator extends WritableComparator{
		public MyGroupComparator(){
			super(Text.class,true);
		}
		@SuppressWarnings("rawtypes")
		public int compare(WritableComparable a, WritableComparable b) {
			Text p1 = (Text) a;
			Text p2 = (Text) b;
			return p1.compareTo(p2);
		  }
	}



结果

in--hadoop      1
out--hadoop     1
in--hbase       1
out--hbase      1
in--hive        1
out--hive       1
in--redis       1
out--redis      1
in--spark       1
out--spark      1
in--storm       1
out--storm      1
in--tachyon     1
out--tachyon    1



通过对比,自定义分组就很容易理解了

原文地址:https://www.cnblogs.com/qiaoyihang/p/6166152.html