0010.MapReduce编程案例1


05-20-Shuffle的过程

shuffle.png


05-21-数据去重

职位去重.png

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//                                                             k2 职位job
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		//数据:7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
		String data = value1.toString();
		
		//分词
		String[] words = data.split(",");
		
		//输出:把职位job作为key2
		context.write(new Text(words[2]), NullWritable.get());
	}
}
import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text k3, Iterable<NullWritable> v3,Context context) throws IOException, InterruptedException {
		// 直接把k3输出即可
		context.write(k3, NullWritable.get());
	}

}

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistinctMain {

	public static void main(String[] args) throws Exception {
		//1、创建一个任务
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(DistinctMain.class); //任务的入口		
		
		//2、指定任务的map和map输出的数据类型
		job.setMapperClass(DistinctMapper.class);
		job.setMapOutputKeyClass(Text.class);  //k2的数据类型
		job.setMapOutputValueClass(NullWritable.class);  //v2的类型
	
		//3、指定任务的reduce和reduce的输出数据的类型
		job.setReducerClass(DistinctReducer.class);
		job.setOutputKeyClass(Text.class); //k4的类型
		job.setOutputValueClass(NullWritable.class); //v4的类型
		
		//4、指定任务的输入路径、任务的输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//5、执行任务
		job.waitForCompletion(true);
	}

}


05-22-复习SQL的多表查询

笛卡尔积.png


05-23-分析等值连接的处理过程

分析等值连接的处理过程.png


05-24-使用MR实现等值连接

等值连接
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EqualJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		//数据可能是部门,也可能是员工
		String data = value1.toString();
		
		//分词
		String[] words = data.split(",");
		
		//判断数组的长度
		if(words.length == 3){
			//得到是部门数据:部门号 部门名称
			context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1]));
		}else{
			//员工数据 : 员工的部门号 员工的姓名
			context.write(new IntWritable(Integer.parseInt(words[7])), new Text(words[1]));
		}
	
	}
}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EqualJoinReducer extends Reducer<IntWritable, Text, Text, Text> {

	@Override
	protected void reduce(IntWritable k3, Iterable<Text> v3, Context context)
			throws IOException, InterruptedException {
		// 处理v3:可能是部门名称、也可能是员工的姓名
		String dname = "";
		String empNameList = "";
		
		for(Text value:v3){
			String str = value.toString();
			//判断是否存在*
			int index = str.indexOf("*");
			if(index >= 0){
				//代表是部门的名称
				dname = str.substring(1);
			}else{
				//代表是员工的名称
				empNameList = str + ";" + empNameList;
			}
		}
		
		//输出
		context.write(new Text(dname), new Text(empNameList));
	}

}


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistinctMain {

	public static void main(String[] args) throws Exception {
		//1、创建一个任务
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(DistinctMain.class); //任务的入口		
		
		//2、指定任务的map和map输出的数据类型
		job.setMapperClass(DistinctMapper.class);
		job.setMapOutputKeyClass(Text.class);  //k2的数据类型
		job.setMapOutputValueClass(NullWritable.class);  //v2的类型
	
		//3、指定任务的reduce和reduce的输出数据的类型
		job.setReducerClass(DistinctReducer.class);
		job.setOutputKeyClass(Text.class); //k4的类型
		job.setOutputValueClass(NullWritable.class); //v4的类型
		
		//4、指定任务的输入路径、任务的输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//5、执行任务
		job.waitForCompletion(true);
	}

}


05-25-分析自连接的处理过程

分析自连接的处理过程.png

查询员工信息,要求显示:员工老板的名字 员工的名字
			select b.ename,e.ename
			from emp b,emp e
			where b.empno=e.mgr;
			
			在oracle中,当查询的数据满足是一棵树的时候,可以使用层次查询来取代自连接

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

	@Override
	protected void map(LongWritable key1, Text value1, Context context)
			throws IOException, InterruptedException {
		// 数据: 7566,JONES,MANAGER,7839,1981/4/2,2975,0,20
		String data = value1.toString();
		
		//分词操作
		String[] words = data.split(",");
		
		//输出数据
		//1、作为老板表                                         员工号
		context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1]));
				
		//2、作为员工表                                         老板的员工号
		context.write(new IntWritable(Integer.parseInt(words[3])), new Text(words[1]));
		/*
		 * 注意一个问题:如果数据存在非法数据,一定处理一下(数据清洗)
		 * 如果产生例外,一定捕获
		 */
	}
}


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SelfJoinReducer extends Reducer<IntWritable, Text, Text, Text> {

	@Override
	protected void reduce(IntWritable k3, Iterable<Text> v3, Context context)
			throws IOException, InterruptedException {
		//定义变量保存:老板的姓名、员工的姓名
		String bossName = "";
		String empNameList = "";
		
		for(Text t:v3){
			String str = t.toString();
			
			//判断是否存在*号
			int index = str.indexOf("*");
			if(index >= 0 ){
				//老板的姓名
				bossName = str.substring(1);
			}else{
				//员工的姓名
				empNameList = str + ";" + empNameList;
			}
		}
		
		//输出:如果存在老板,也存在员工,才进行输出
		if(bossName.length() > 0 && empNameList.length() > 0)
			context.write(new Text(bossName), new Text(empNameList));
	}
}


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SelfJoinMain {

	public static void main(String[] args) throws Exception {
		//1、创建一个任务
		Job job = Job.getInstance(new Configuration());
		job.setJarByClass(SelfJoinMain.class); //任务的入口		
		
		//2、指定任务的map和map输出的数据类型
		job.setMapperClass(SelfJoinMapper.class);
		job.setMapOutputKeyClass(IntWritable.class);  //k2的数据类型
		job.setMapOutputValueClass(Text.class);  //v2的类型
	
		//3、指定任务的reduce和reduce的输出数据的类型
		job.setReducerClass(SelfJoinReducer.class);
		job.setOutputKeyClass(Text.class); //k4的类型
		job.setOutputValueClass(Text.class); //v4的类型
		
		//4、指定任务的输入路径、任务的输出路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//5、执行任务
		job.waitForCompletion(true);

	}

}

原文地址:https://www.cnblogs.com/RoyalGuardsTomCat/p/13861865.html