0011.MapReduce编程案例2


05-26-实现自连接的MapReduce程序


05-27-分析倒排索引的过程

倒排索引数据处理的过程.png


05-28-使用MapReduce实现倒排索引1


05-29-使用MapReduce实现倒排索引2

使用MapReduce实现倒排索引

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		//数据:/indexdata/data01.txt
		//得到对应文件名
		String path = ((FileSplit)context.getInputSplit()).getPath().toString();
		
		//解析出文件名
		//得到最后一个斜线的位置
		int index = path.lastIndexOf("/");
		String fileName = path.substring(index+1);
		
		//数据:I love Beijing and love Shanghai
		String data = value1.toString();
		String[] words = data.split(" ");
		
		//输出
		for(String word:words){
			context.write(new Text(word+":"+fileName), new Text("1"));
		}
	}
}


import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> {

	@Override
	protected void reduce(Text k3, Iterable<Text> v3, Context context)
			throws IOException, InterruptedException {
		String str = "";
		
		for(Text t:v3){
			str = "("+t.toString()+")"+str;
		}
		
		context.write(k3, new Text(str));
	}

}


import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> {

	@Override
	protected void reduce(Text k21, Iterable<Text> v21, Context context)
			throws IOException, InterruptedException {
		// 求和:对同一个文件中的单词进行求和
		int total = 0;
		for(Text v:v21){
			total = total + Integer.parseInt(v.toString());
		}
		
		//k21是:love:data01.txt
		String data = k21.toString();
		//找到:冒号的位置
		int index = data.indexOf(":");
		
		String word = data.substring(0, index);        //单词
		String fileName = data.substring(index + 1);   //文件名
		
		//输出:
		context.write(new Text(word), new Text(fileName+":"+total));
	}
}

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class RevertedIndexMain {

	public static void main(String[] args) throws Exception {
		//1、创建一个任务
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(RevertedIndexMain.class); //任务的入口		
		
		//2、指定任务的map和map输出的数据类型
		job.setMapperClass(RevertedIndexMapper.class);
		job.setMapOutputKeyClass(Text.class);  //k2的数据类型
		job.setMapOutputValueClass(Text.class);  //v2的类型
	
		//指定任务的Combiner
		job.setCombinerClass(RevertedIndexCombiner.class);
		
		//3、指定任务的reduce和reduce的输出数据的类型
		job.setReducerClass(RevertedIndexReducer.class);
		job.setOutputKeyClass(Text.class); //k4的类型
		job.setOutputValueClass(Text.class); //v4的类型
		
		//4、指定任务的输入路径、任务的输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//5、执行任务
		job.waitForCompletion(true);
	}

}


05-30-使用MRUnit


05-31-第一个阶段小结

原文地址:https://www.cnblogs.com/RoyalGuardsTomCat/p/13864205.html