Hadoop 多表关联

一、实例描述

  多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息。下面进入这个实例。

  输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址列,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出工厂名-地址名表。

  样例输入:

  factory:

  factoryname addressed
  Beijing Red Star 1
  Shenzhen Thunder 3
  Guangzhou Honda 2
  Beijing Rising 1
  Guangzhou Development Bank 2
  Tencent 3
  Bank of Beijing 1

  address:

  addressID addressname
  1 Beijing
  2 Guangzhou
  3 Shenzhen
  4 Xian

  样例输出:

二、设计思路

  多表关联和单表关联类似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更清楚,因此可以采用和单表关联相同的处理方式。Map识别出输入的行属于哪个表之后,对其进行分割,将连接的值保存在key中,另一列和左右表标志保存在value中,然后输出。Reduce拿到连接结果后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。

  这个实例的具体分析参考Hadoop 单表关联博客,下面贴出代码。

三、程序代码

  程序代码如下:

  1 import java.io.IOException;
  2 import java.util.Iterator;
  3 
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.Path;
  6 import org.apache.hadoop.io.Text;
  7 import org.apache.hadoop.mapreduce.Job;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.Reducer;
 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 12 import org.apache.hadoop.util.GenericOptionsParser;
 13 
 14 
 15 public class MTjoin {
 16     
 17     public static int time = 0;
 18     
 19     public static class Map extends Mapper<Object, Text, Text, Text>{
 20         // 在Map中先区分输入行属于左表还是右表,然后对两列值进行分割,
 21         // 连接列保存在key值,剩余列和左右表标志保存在value中,最后输出
 22         @Override
 23         protected void map(Object key, Text value,Mapper<Object, Text, Text, Text>.Context context)
 24                 throws IOException, InterruptedException {
 25             // super.map(key, value, context);
 26             String line = value.toString();
 27             int i=0;
 28             // 输入文件首行,不处理
 29             if(line.contains("factoryname")==true || line.contains("addressID")==true){
 30                 return ;
 31             }
 32             // 找出数据中的分割点
 33             while(line.charAt(i)>='9' || line.charAt(i)<='0'){
 34                 i++;
 35             }
 36             if (line.charAt(0)>='9'||line.charAt(0)<='0') {
 37                 // 左表
 38                 int j = i-1;
 39                 while(line.charAt(j)!=' ') j--;
 40                 String [] values = {line.substring(0,j),line.substring(i)};
 41                 context.write(new Text(values[1]), new Text("1+"+values[0]));
 42             }else {
 43                 // 右表
 44                 int j = i+1;
 45                 while(line.charAt(j)!=' ') j++;
 46                 String[] values = {line.substring(0,i+1),line.substring(j)};
 47                 context.write(new Text(values[0]), new Text("2"+values[1]));
 48             }
 49         }
 50     }
 51 
 52     public static class Reduce extends Reducer<Text, Text, Text, Text>{
 53         // Reduce解析Map输出,将value中数据按照左右表分别保存,然后求 // 笛卡尔积,输出
 54         @Override
 55         protected void reduce(Text key, Iterable<Text> values,Reducer<Text, Text, Text, Text>.Context context)
 56                 throws IOException, InterruptedException {
 57             // super.reduce(arg0, arg1, arg2);
 58             if (time==0) {
 59                 //  输出文件第一行
 60                 context.write(new Text("factoryname"), new Text("addressname"));
 61                 time++;
 62             }
 63             int factorynum = 0;
 64             String[] factory = new String[10];
 65             int addressnum = 0;
 66             String[] address = new String[10];
 67             Iterator ite = values.iterator();
 68             while (ite.hasNext()) {
 69                 String record = ite.next().toString();
 70                 int len = record.length();
 71                 int i = 2;
 72                 char type = record.charAt(0);
 73                 String factoryname = new String();
 74                 String addressname = new String();
 75                 if (type=='1') {
 76                     // 左表
 77                     factory[factorynum] = record.substring(2);
 78                     factorynum++;
 79                 }else {
 80                     // 右表
 81                     address[addressnum] = record.substring(2);
 82                     addressnum++;
 83                 }
 84             }
 85             if (factorynum != 0 && addressnum !=0) {
 86                 // 求笛卡尔积
 87                 for(int m=0;m<factorynum;m++){
 88                     for(int n=0;n<addressnum;n++){
 89                         context.write(new Text(factory[m]), new Text(address[n]));
 90                     }
 91                 }
 92             }
 93         }
 94     }
 95     
 96     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
 97         
 98         Configuration conf = new Configuration();
 99         String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
100         if(otherArgs.length!=2){
101             System.out.println("Usage:wordcount <in> <out>");
102             System.exit(2);
103         }
104         Job job = new Job(conf,"multiple table join");
105         job.setJarByClass(MTjoin.class);
106         job.setMapperClass(Map.class);
107         job.setReducerClass(Reduce.class);
108         job.setOutputKeyClass(Text.class);
109         job.setOutputValueClass(Text.class);
110         FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
111         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
112         System.exit(job.waitForCompletion(true)?0:1);
113     }
114 
115 }
原文地址:https://www.cnblogs.com/xiaoyh/p/9343425.html