hadoop-04

hadoop-04

1.流量案例分析

  • 统计每个人总流量
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Flow {
	static class FlowMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
		Text k = new Text();
		LongWritable v = new LongWritable();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			// 行数据切割
			String[] split = line.split("\s+");
			String tel = split[0];
			String url = split[1];
			long upFlow = Long.parseLong(split[2]);
			long downFlow = Long.parseLong(split[3]);
			long sumFlow = upFlow+downFlow;
			k.set(tel);
			v.set(sumFlow);
			// map写出数据
			context.write(k, v);
		}
	}
	static class FlowReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
		// 初始化v,v为行的流量
		LongWritable v = new LongWritable();
		@Override
		protected void reduce(Text key, Iterable<LongWritable> value,
				Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
			long sumFlow = 0;
			for (LongWritable longWritable : value) {
				sumFlow += longWritable.get();
			}
			v.set(sumFlow);
			context.write(key, v);
		}
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
		Job job = Job.getInstance(conf);
		job.setMapperClass(FlowMapper.class);
		job.setReducerClass(FlowReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/http.txt"));
		FileOutputFormat.setOutputPath(job, new Path("d:/data/flow/"));
		
		boolean b = job.waitForCompletion(true);
		
		// 程序退出 0正常退出 ,非0异常退出
		System.exit(b?0:-1);
	}
}

2.流量案例分析对象

  • FlowBean.java
package com.xjk.mr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {
	private String tel;
	private String url;
	private long upFlow;
	private long downFlow;
	@Override
	public String toString() {
		return "FlowBean [tel=" + tel + ", url=" + url + ", upFlow=" + upFlow + ", downFlow=" + downFlow + "]";
	}
	public String getTel() {
		return tel;
	}
	public void setTel(String tel) {
		this.tel = tel;
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public long getUpFlow() {
		return upFlow;
	}
	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}
	public long getDownFlow() {
		return downFlow;
	}
	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}
	// 序列化写数据
	public void write(DataOutput out) throws IOException {
		out.writeUTF(tel);
		out.writeUTF(url);
		out.writeLong(upFlow);
		out.writeLong(downFlow);
	}
	// 反序列化读取
	public void readFields(DataInput in) throws IOException {
		// 读取数据的顺序和写出的顺序一致
		this.tel = in.readUTF() ;
		this.url = in.readUTF() ;
		this.upFlow = in.readLong() ;
		this.downFlow = in.readLong() ;
	}
	
}

  • Flow2.java
package com.xjk.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Flow2 {
	static class FlowMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
		Text k = new Text();
		FlowBean v = new FlowBean();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			// 行数据切割
			String[] split = line.split("\s+");
			String tel = split[0];
			String url = split[1];
			long upFlow = Long.parseLong(split[2]);
			long downFlow = Long.parseLong(split[3]);
			// long sumFlow = upFlow+downFlow;
			k.set(tel);
			v.setUrl(url);
			v.setTel(tel);
			v.setUpFlow(upFlow);
			v.setDownFlow(downFlow);
			// map写出数据
			context.write(k, v);
		}
	}
	static class FlowReducer extends Reducer<Text,FlowBean,Text,LongWritable>{
		// 初始化v,v为行的流量
		LongWritable v = new LongWritable();
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values,
				Reducer<Text, FlowBean, Text, LongWritable>.Context context) throws IOException, InterruptedException {
			long sum = 0;
			for (FlowBean flowBean : values) {
				long upFlow = flowBean.getUpFlow();
				long downFlow = flowBean.getDownFlow();
				sum += (upFlow + downFlow);
			}
			v.set(sum);
			context.write(key, v);
		}
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		conf.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache"); 
		Job job = Job.getInstance(conf);
		job.setMapperClass(FlowMapper.class);
		job.setReducerClass(FlowReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/http.txt"));
		FileOutputFormat.setOutputPath(job, new Path("d:/data/flow2/"));
		
		boolean b = job.waitForCompletion(true);
		
		// 程序退出 0正常退出 ,非0异常退出
		System.exit(b?0:-1);
	}
}

3.电影案例分析评分最高N部电影

  • RateTopN.java
package MovieDemo;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.google.gson.Gson;

public class RateTopN {
	/*
	 * 读取数据
	 * 电影id为key,整个电影为value 输出
	 * 
	 * */
	static class RateTopNMapper extends Mapper<LongWritable, Text, Text, MoiveBean>{
		Gson gs = new Gson();
		Text k = new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, MoiveBean>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			// 行数据转换json对象
			MoiveBean mb = gs.fromJson(line, MoiveBean.class);
			// 写入电影id
			k.set(mb.getMovie());// 同一个电影id,分到同一组,同一部电影分布到同一个reduce端
			context.write(k, mb);
		}
	}
	static class RateTopNReducer extends Reducer<Text, MoiveBean, MoiveBean, NullWritable>{
		@Override
		protected void reduce(Text arg0, Iterable<MoiveBean> values,
				Reducer<Text, MoiveBean, MoiveBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			// 将mb中存储list中,按照分数教育,获取前n个数据
			try {
				List<MoiveBean> list = new ArrayList<>();
				for (MoiveBean moiveBean : values) {
					
					MoiveBean m = new MoiveBean();
					// 将moiveBean属性拷贝到m上去
					BeanUtils.copyProperties(m, moiveBean);
					list.add(m);
				}
				// 对list排序
				Collections.sort(list, new Comparator<MoiveBean>() {

					@Override
					public int compare(MoiveBean o1, MoiveBean o2) {
						// 降序排列, 比较double类型
						return Double.compare(o2.getRate(), o1.getRate());
					}
				});
				//
				//Math.min(1, 4);//用于返回一个小的数据
				//Math.max(2, 10);// 用于返回一个大的数据
				for (int i = 0; i < Math.min(3, list.size()); i++) {
					MoiveBean moiveBean = list.get(i);
					// 写数据
					context.write(moiveBean, NullWritable.get());
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
			
		}
	}
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration configuration = new Configuration();
		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
		Job job = Job.getInstance(configuration);
		// 指定map和reduce的类
		job.setMapperClass(RateTopNMapper.class);
		job.setReducerClass(RateTopNReducer.class);
		// map 输出k-v
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(MoiveBean.class);
		// reduce 输出k-v
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		// 输入数据文件路经
		FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/simple"));
		//输出数据路径
		FileOutputFormat.setOutputPath(job, new Path("d:/data/ratetopn"));
		// 设置reduce数量
		job.setNumReduceTasks(2);
		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
		job.waitForCompletion(true);
	}
	
}

  • MoiveBean.java
package MovieDemo;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class MoiveBean implements Writable {
	private String movie;
	private double rate;
	private String timeStamp;
	private int uid;
	@Override
	public String toString() {
		return "MoiveBean [movie=" + movie + ", rate=" + rate + ", timeStamp=" + timeStamp + ", uid=" + uid + "]";
	}
	public String getMovie() {
		return movie;
	}
	public void setMovie(String movie) {
		this.movie = movie;
	}
	public double getRate() {
		return rate;
	}
	public void setRate(double rate) {
		this.rate = rate;
	}
	public String getTimeStamp() {
		return timeStamp;
	}
	public void setTimeStamp(String timeStamp) {
		this.timeStamp = timeStamp;
	}
	public int getUid() {
		return uid;
	}
	public void setUid(int uid) {
		this.uid = uid;
	}
	// 序列化写数据
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(movie);
		out.writeDouble(rate);
		out.writeUTF(timeStamp);
		out.writeInt(uid);
	}
	// 反序列化读数据
	@Override
	public void readFields(DataInput in) throws IOException {
		this.movie = in.readUTF();
		this.rate = in.readDouble();
		this.timeStamp = in.readUTF();
		this.uid = in.readInt();
		
	}
	
}

4.求评论次数最多前5个人

以uid为key,movie为value
  • reduce方法下面有一个cleanUp方法,它特点在task执行完之后调用一次。同样setup方法是在启动task时候调用一次,这样reduce处理完的数据进行一个全局排序。

  • 代码

    package MovieDemo;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import com.google.gson.Gson;
    
    import MovieDemo.RateTopN.RateTopNMapper;
    import MovieDemo.RateTopN.RateTopNReducer;
    
    public class HitTopN {
    	/*
    	 * uid key
    	 * movie value
    	 * */
    	
    	static class HitTopNMapper extends Mapper<LongWritable,Text,Text,MoiveBean>{
    		Gson gs = new Gson();
    		Text k = new Text();
    		MoiveBean v = new MoiveBean();
    		protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,MoiveBean>.Context context) throws IOException, InterruptedException {
    			try {
    				String line = value.toString();
    			// 转换json
    			v = gs.fromJson(line,MoiveBean.class);
    			k.set(v.getUid() + " ");
    			context.write(k, v);
    			} catch (Exception e) {
    				// TODO: handle exception
    			}
    		}
    	}
    	static class HitTopNReducer extends Reducer<Text, MoiveBean, Text, IntWritable>{
    		Map<String,Integer> map = new HashMap<>();
    		@Override
    		protected void reduce(Text key, Iterable<MoiveBean> values,
    				Reducer<Text, MoiveBean, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    			int count = 0;
    			for (MoiveBean moiveBean : values) {
    				count++;
    			}
    			map.put(key.toString(), count);
    		}
    		// cleanup只执行一次
    		@Override
    		protected void cleanup(Reducer<Text, MoiveBean, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			// map转entry
    			Set<Entry<String,Integer>> entrySet = map.entrySet();
    			// 再转list
    			ArrayList<Entry<String,Integer>> list = new ArrayList<>(entrySet);
    			// 排序,重写compare方法
    			Collections.sort(list, new Comparator<Entry<String,Integer>>(){
    				@Override
    				public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
    					return o2.getValue() - o1.getValue();
    				}
    			});
    			// 输出前5个
    			for (int i = 0; i < Math.min(5, list.size()); i++) {
    				Entry<String,Integer> entry = list.get(i);
    				context.write(new Text(entry.getKey()), new IntWritable(entry.getValue()));
    			}
    		}
    	}
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		Configuration configuration = new Configuration();
    		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
    		Job job = Job.getInstance(configuration);
    		// 指定map和reduce的类
    		job.setMapperClass(HitTopNMapper.class);
    		job.setReducerClass(HitTopNReducer.class);
    		// map 输出k-v
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(MoiveBean.class);
    		// reduce 输出k-v
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		// 输入数据文件路经
    		FileInputFormat.setInputPaths(job, new Path("d:/data/sourcedata/rating.json"));
    		//输出数据路径
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/hittopn"));
    		// 设置reduce数量
    		// job.setNumReduceTasks(2);
    		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    		job.waitForCompletion(true);
    	}
    }
    
    
  • 这里执行是一个reduce,而如果设置2个reduce,会得到2个文件的结果,只是针对当前reduce进行前5名排序。也就是cleanup只是对当前reduce排序。如果要求出的最终前5名,需要在这多个reduce中再进行排序。

5.统计每个单词在每个文件中出现次数

  • 我们现在有三个文件:a.html,b.html,c.html

    a.html ---> hello 1, hello 1---> hello 2
    b.html ---> hello 1, hello1 ---> hello 2
    c.html ---> hello 1 ---> hello 1
    
  • 这里首先通过第一个mapreduce处理来获取每个文件的每个单词的次数。然后再以第一个mapreduce输出文件为第二个mapreduce输入文件,进行聚合操作:

    • index1.java
    package index;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class index1 {
    	static class Index1Mapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    		String fileName = null;
    		// 获取当前任务处理文件名
    		@Override
    		protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			// 获取文件名
    			FileSplit f = (FileSplit)context.getInputSplit();
    			fileName = f.getPath().getName();
    		}
    		Text k = new Text();
    		IntWritable v = new IntWritable();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    				throws IOException, InterruptedException {
    			String line = value.toString();
    			String[] words = line.split(" ");
    			for (String word:words) {
    				k.set(word + "-" + fileName);
    				v.set(1);
    				context.write(k, v);//获得: hello-a.html 1
    			}
    		}
    	}
    	static class Index1Reducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    		IntWritable v = new IntWritable();
    		@Override
    		protected void reduce(Text key, Iterable<IntWritable> values,
    				Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    			int count = 0;
    			for (IntWritable intWritable : values) {
    				count ++;
    			}
    			v.set(count);
    			context.write(key, v);
    		}
    	}
    	public static void main(String[] args) throws Exception {
    		Configuration configuration = new Configuration();
    		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
    		Job job = Job.getInstance(configuration);
    		// 指定map和reduce的类
    		job.setMapperClass(Index1Mapper.class);
    		job.setReducerClass(Index1Reducer.class);
    		// map 输出k-v
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		// reduce 输出k-v
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    		// 输入数据文件路经
    		FileInputFormat.setInputPaths(job, new Path("d:/data/index/input"));
    		//输出数据路径
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/index/output"));
    		// 设置reduce数量
    		 job.setNumReduceTasks(2);
    		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    		job.waitForCompletion(true);
    	}
    	
    }
    
    
    • index2.java
    package index;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import MovieDemo.MoiveBean;
    import index.index1.Index1Mapper;
    import index.index1.Index1Reducer;
    
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    // hello-a.html	4  ---> 输出: hello b.html -->2
    public class index2 {
    	static class Index2Mapper extends Mapper<LongWritable,Text,Text,Text>{
    		Text k = new Text();
    		Text v = new Text();
    		@Override
    		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			String line = value.toString();
    			// reduce产生k-v之间使用	分割的
    			String[] split = line.split("	");
    			String word = split[0].split("-")[0];
    			String fileName = split[0].split("-")[1];
    			String count = split[1];
    			k.set(word);
    			v.set(fileName + "-->" + count);
    			context.write(k, v);
    		}
    	}
    	
    	// 拿到数据:hello b.html-->2
    	static class Index2Reducer extends Reducer<Text, Text, Text, Text>{
    		Text v = new Text();
    		@Override
    		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			StringBuilder sb = new StringBuilder();
    			for (Text text : values) {
    				String v = text.toString();
    				sb.append(v + " ");
    			}
    			v.set(sb.toString().trim());
    			context.write(key, v);
    		}
    	}
    	public static void main(String[] args) throws Exception {
    		Configuration configuration = new Configuration();
    		configuration.set("hadoop.tmp.dir", "E:/hdfs_tmp_cache");
    		Job job = Job.getInstance(configuration);
    		// 指定map和reduce的类
    		job.setMapperClass(Index2Mapper.class);
    		job.setReducerClass(Index2Reducer.class);
    		// map 输出k-v
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    		// reduce 输出k-v
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    		// 输入数据文件路经 上一步输出结果为第二步输入结果
    		FileInputFormat.setInputPaths(job, new Path("d:/data/index/output"));
    		//输出数据路径
    		FileOutputFormat.setOutputPath(job, new Path("d:/data/index/output2"));
    		// 设置reduce数量
    		 job.setNumReduceTasks(2);
    		// 将任务提交,默认在本地运行true将job执行消息打印在控制台上。
    		job.waitForCompletion(true);
    	}
    }
    
    

6.mapreduce内部处理数据流程

  • 在读取任务之前先划分任务切片,根据输入路径文件和文件大小,
Map处理:
FileInputFormat
	TextInputForamt
		LineRecordReader
			getCurrentKey-> LongWritable  
				此处while循环nextKeyValue方法判断是否有下一个k,v
					
				在循环体内是一个map(ki,vi,ko,vo)方法,执行完数据然后context.write(ko,vo) 将数据写出,写到缓存中数组中MapOutputBuffer,然后通过collect(k,v,p)进行分区,而缓存数组是一个环形,当环形数组(100M byte[])就不往里写了,当环形数组Spill溢出(写)之前对数据进行哈希分区(hashPartition)和分区排序(sort快排)。将溢出的数据进行归并和排序(Merger),归并后数据在同一个数组中,只不过根据不同分区进行标识,将数据以二进制方式写入到本地磁盘中(SequenceFileOutputFormat,根据分区写入到不同文件,临时文件缓冲),
			getCurrentValue ->Text
Reduce处理:
	通过shuffle下载,Facher下载将同一分区的数据进行归并和排序(Merger,sort),然后进行分组比较(GroupingPartiton comparator(prek,nkey),比较前一个key和后一个key是否相同,如果相同key在一个Iterator中),然后reducer(key,iters)遍历,并进行聚合。然后通过context.write(ko,vo)写出操作(TextFileOutputFormat ),write(ko,vo)写入到文件夹(part_0000)。
	a	6
	b	4
	c	3

原文地址:https://www.cnblogs.com/xujunkai/p/14058835.html