2021.11.20 MapReduce实验

一、今日学习内容

  单表join

04Mapreduce实例——单表join

实验目的

1.准确理解MapReduce单表连接的设计原理

2.熟练掌握MapReduce单表连接程序的编写

3.了解单表连接的运用场景

4.学会编写MapReduce单表连接程序代码解决问题

实验原理

以本实验的buyer1(buyer_id,friends_id)表为例来阐述单表连接的实验原理。单表连接,连接的是左表的buyer_id列和右表的friends_id列,且左表和右表是同一个表。因此,在map阶段将读入数据分割成buyer_idfriends_id之后,会将buyer_id设置成keyfriends_id设置成value,直接输出并将其作为左表;再将同一对buyer_idfriends_id中的friends_id设置成keybuyer_id设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value中再加上左右表的信息,比如在valueString最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个keyvalue-list就包含了"buyer_idfriends_id--friends_idbuyer_id"关系。取出每个keyvalue-list进行解析,将左表中的buyer_id放入一个数组,右表中的friends_id放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

实验环境

Linux Ubuntu 14.04

jdk-7u75-linux-x64

hadoop-2.6.0-cdh5.4.5

hadoop-2.6.0-eclipse-cdh5.4.5.jar

eclipse-java-juno-SR2-linux-gtk-x86_64

 

实验内容

现有某电商的用户好友数据文件,名为 buyer1buyer1中包含(buyer_id,friends_id)两个字段,内容是以"\t"分隔,编写MapReduce进行单表连接,查询出用户的间接好友关系。例如:10001的好友是10002,而10002的好友是10005,那么1000110005就是间接好友关系。

buyer1(buyer_id,friends_id)

  1. 10001   10002  
  2. 10002   10005  
  3. 10003   10002  
  4. 10004   10006  
  5. 10005   10007  
  6. 10006   10022  
  7. 10007   10032  
  8. 10009   10006  
  9. 10010   10005  
  10. 10011   10013  

统计结果数据如下:

  1. 好友id  用户id  
  2. 10005   10001  
  3. 10005   10003  
  4. 10007   10010  
  5. 10007   10002  
  6. 10022   10004  
  7. 10022   10009  
  8. 10032   10005  

实验步骤

1.切换到/apps/hadoop/sbin目录下,开启hadoop

  1. cd /apps/hadoop/sbin  
  2. ./start-all.sh  

2.Linux本地新建/data/mapreduce7目录。

  1. mkdir -p /data/mapreduce7  

3.Linux中切换到/data/mapreduce7目录下,用wget命令从http://192.168.1.100:60000/allfiles/mapreduce7/buyer1网址上下载文本文件buyer1

  1. cd /data/mapreduce7  
  2. wget http://192.168.1.100:60000/allfiles/mapreduce7/buyer1  

然后在当前目录下用wget命令从http://192.168.1.100:60000/allfiles/mapreduce7/hadoop2lib.tar.gz网址上下载项目用到的依赖包。

  1. wget http://192.168.1.100:60000/allfiles/mapreduce7/hadoop2lib.tar.gz  

hadoop2lib.tar.gz解压到当前目录下。

  1. tar zxvf hadoop2lib.tar.gz  

4.首先在hdfs上新建/mymapreduce7/in目录,然后将Linux本地/data/mapreduce7目录下的buyer1文件导入到hdfs/mymapreduce7/in目录中。

  1. hadoop fs -mkdir -p /mymapreduce7/in  
  2. hadoop fs -put /data/mapreduce7/buyer1 /mymapreduce7/in  

5.新建maven项目,项目名为mapreduce7

导入依赖

6.编写Java代码,并描述其设计思路

Map代码

  1. public static class Map extends Mapper<Object,Text,Text,Text>{  
  2. //实现map函数  
  3. public void map(Object key,Text value,Context context)  
  4. throws IOException,InterruptedException{  
  5. String line = value.toString();  
  6. String[] arr = line.split("\t");   //按行截取  
  7. String mapkey=arr[0];  
  8. String mapvalue=arr[1];  
  9. String relationtype=new String();  //左右表标识  
  10. relationtype="1";  //输出左表  
  11. context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
  12. //System.out.println(relationtype+"+"+mapvalue);  
  13. relationtype="2";  //输出右表  
  14. context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
  15. //System.out.println(relationtype+"+"+mapvalue);  
  16. }  
  17. }  

Map处理的是一个纯文本文件,Mapper处理的数据是由InputFormat将数据集切分成小的数据集InputSplit,并用RecordReader解析成<key/value>对提供给map函数使用。map函数中用split("\t")方法把每行数据进行截取,并把数据存入到数组arr[],把arr[0]赋值给mapkeyarr[1]赋值给mapvalue。用两个contextwrite()方法把数据输出两份,再通过标识符relationtype12对两份输出数据的value打标记。

Reduce代码

  1. public static class Reduce extends Reducer<Text, Text, Text, Text>{  
  2. //实现reduce函数  
  3. public void reduce(Text key,Iterable<Text> values,Context context)  
  4. throws IOException,InterruptedException{  
  5. int buyernum=0;  
  6. String[] buyer=new String[20];  
  7. int friendsnum=0;  
  8. String[] friends=new String[20];  
  9. Iterator ite=values.iterator();  
  10. while(ite.hasNext()){  
  11. String record=ite.next().toString();  
  12. int len=record.length();  
  13. int i=2;  
  14. if(0==len){  
  15. continue;  
  16. }  
  17. //取得左右表标识  
  18. char relationtype=record.charAt(0);  
  19. //取出record,放入buyer  
  20. if('1'==relationtype){  
  21. buyer [buyernum]=record.substring(i);  
  22. buyernum++;  
  23. }  
  24. //取出record,放入friends  
  25. if('2'==relationtype){  
  26. friends[friensnum]=record.substring(i);  
  27. friendsnum++;  
  28. }  
  29. }  
  30. buyernumfriendsnum数组求笛卡尔积  
  31. if(0!=buyernum&&0!=friendsnum){  
  32. for(int m=0;m<buyernum;m++){  
  33. for(int n=0;n<friendsnum;n++){  
  34. if(buyer[m]!=friends[n]){  
  35. //输出结果  
  36. context.write(new Text(buyer[m]),new Text(frinds[n]));  
  37. }  
  38. }  
  39. }  
  40. }  
  41. }  

reduce端在接收map端传来的数据时已经把相同key的所有value都放到一个Iterator容器中valuesreduce函数中,首先新建两数组buyer[]friends[]用来存放map端的两份输出数据。然后Iterator迭代中hasNext()Next()方法加while循环遍历输出values的值并赋值给record,用charAt(0)方法获取record第一个字符赋值给relationtype,用if判断如果relationtype1则把用substring(2)方法从下标为2开始截取record将其存放到buyer[]中,如果relationtype2时将截取的数据放到frindes[]数组中。然后用三个for循环嵌套遍历输出<key,value>,其中key=buyer[m]value=friends[n]

完整代码

package mapreduce;  
import java.io.IOException;  
import java.util.Iterator;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
public class DanJoin {  
public static class Map extends Mapper<Object,Text,Text,Text>{  
public void map(Object key,Text value,Context context)  
throws IOException,InterruptedException{  
String line = value.toString();  
String[] arr = line.split("\t");  
String mapkey=arr[0];  
String mapvalue=arr[1];  
String relationtype=new String();  
relationtype="1";  
context.write(new Text(mapkey),new Text(relationtype+"+"+mapvalue));  
//System.out.println(relationtype+"+"+mapvalue);  
relationtype="2";  
context.write(new Text(mapvalue),new Text(relationtype+"+"+mapkey));  
//System.out.println(relationtype+"+"+mapvalue);  
}  
}  
public static class Reduce extends Reducer<Text, Text, Text, Text>{  
public void reduce(Text key,Iterable<Text> values,Context context)  
throws IOException,InterruptedException{  
int buyernum=0;  
String[] buyer=new String[20];  
int friendsnum=0;  
String[] friends=new String[20];  
Iterator ite=values.iterator();  
while(ite.hasNext()){  
String record=ite.next().toString();  
int len=record.length();  
int i=2;  
if(0==len){  
continue;  
}  
char relationtype=record.charAt(0);  
if('1'==relationtype){  
buyer [buyernum]=record.substring(i);  
buyernum++;  
}  
if('2'==relationtype){  
friends[friendsnum]=record.substring(i);  
friendsnum++;  
}  
}  
if(0!=buyernum&&0!=friendsnum){  
for(int m=0;m<buyernum;m++){  
for(int n=0;n<friendsnum;n++){  
if(buyer[m]!=friends[n]){  
context.write(new Text(buyer[m]),new Text(friends[n]));  
}  
}  
}  
}  
}  
}  
public static void main(String[] args) throws Exception{  
Configuration conf=new Configuration();  
String[] otherArgs=new String[2];  
otherArgs[0]="hdfs://localhost:9000/mymapreduce7/in/buyer1";  
otherArgs[1]="hdfs://localhost:9000/mymapreduce7/out";  
Job job=new Job(conf," Table join");  
job.setJarByClass(DanJoin.class);  
job.setMapperClass(Map.class);  
job.setReducerClass(Reduce.class);  
job.setOutputKeyClass(Text.class);  
job.setOutputValueClass(Text.class);  
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
System.exit(job.waitForCompletion(true)?0:1);  
}  
}  

7.待执行完毕后,进入命令模式下,在hdfs上从Java代码指定的输出路径中查看实验结果。

  1. hadoop fs -ls /mymapreduce7/out  
  2. hadoop fs -cat /mymapreduce7/out/part-r-00000  

 

 

 

原文地址:https://www.cnblogs.com/wmdww/p/15580839.html