Mapreduce中的join操作

一、背景

    MapReduce提供了表连接操作其中包括Map端join、Reduce端join还有半连接,现在我们要讨论的是Map端join,Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

二、具体join

   1、join的例子

    比如我们有两个文件,分别存储 订单信息:products.txt,和 商品信息:orders.txt ,详细数据如下:

    • products.txt:
      //商品ID,商品名称,商品类型(数字表示,我们假设有一个数字和具体类型的映射)
      p0001,xiaomi,001
      p0002,chuizi,001
    • orders.txt:
      //订单号,时间,商品id,购买数量 
      1001,20170710,p0001,1 
      1002,20170710,p0001,3 
      1003,20170710,p0001,3 
      1004,20170710,p0002,1

      我们想象有多个商品,并有海量的订单信息,并且存储在多个 HDFS 块中。

      xiaomi,7
      chuizi,1

      该怎么处理? 我们分析上面我们想要的结果,商品名称和销量,这两个属性分别存放到不同的文件中,那我们就要考虑 在一个地方(mapper)读取这两个文件的数据,并把数据在一个地方(reducer)进行结合。这就是 MapReduce 中的 Join 了。

    • 代码如下:
      • Mapper:
        public class joinMapper extends Mapper<LongWritable,Text,Text,Text> {
        
            private Text outKey=new Text();
            private Text outValue=new Text();
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] split = line.split(",");
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                String name = inputSplit.getPath().getName();
                //两个文件 在一个 mapper 中处理
                //通过文件名判断是那种数据
                if(name.startsWith("a")){
                    //取商品ID 作为 输出key 和 商品名称 作为 输出value,即 第0、1 的数据
                    outKey.set(split[0]);
                    outValue.set("product#" + split[1]);
                    context.write(outKey, outValue);
                }else{
                    //取商品ID 作为 输出key 和 购买数量 作为 输出value,即 第2、3 的数据
                    outKey.set(split[2]);
                    outValue.set("order#" + split[3]);
                    context.write(outKey, outValue);
                }
            }
        }
      • Reducer

        public class joinReducer extends Reducer<Text,Text,Text,Text> {
            private Text outValue = new Text();
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //用来存放:商品ID、商品名称
                List<String> productsList = new ArrayList<String>();
                //用来存放:商品ID、购买数量
                List<Integer> ordersList = new ArrayList<Integer>();
        
                for (Text text:values){
                    String value = text.toString();
                    if(value.startsWith("product#")) {
                        productsList.add(value.split("#")[1]); //取出 商品名称
                    } else if(value.startsWith("order#")){
                        ordersList.add(Integer.parseInt(text.toString().split("#")[1].trim())); //取出商品的销量
                    }
                }
                int totalOrders = 0;
                for (int i=0; i < productsList.size(); i++) {
                    System.out.println(productsList.size());
        
                    for (int j=0; j < ordersList.size(); j++) {
                        System.out.println(ordersList.size());
                        totalOrders += ordersList.get(j);
                    }
                    outValue.set(productsList.get(i) + "	" + totalOrders );
                    //最后的输出是:商品ID、商品名称、购买数量
                    context.write(key, outValue);
                }
        
            }
        }
      • App:

        public class App  {
            public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", "file:///");
        
                Path path = new Path("F:\mr\join\out");
                FileSystem fileSystem = path.getFileSystem(conf);
                if(fileSystem.isDirectory(path)){
                    fileSystem.delete(path,true);
                }
                Job job = Job.getInstance(conf);
                //设置job的各种属性
                job.setJobName("App");                        //作业名称
                job.setJarByClass(App.class);                 //搜索类
                job.setInputFormatClass(TextInputFormat.class); //设置输入格式
        
                job.setMapperClass(joinMapper.class);
                job.setReducerClass(joinReducer.class);
                //添加输入路径
                FileInputFormat.addInputPath(job,new Path("F:\mr\join\map"));
                //设置输出路径
                FileOutputFormat.setOutputPath(job,new Path("F:\mr\join\out"));
                //map输出类型
                job.setOutputKeyClass(Text.class);           //
                job.setOutputValueClass(Text.class);        //
                job.waitForCompletion(true);
        
            }
        }
      • 输出结果

        p0001    xiaomi    7
        p0002    chuizi    1
                

2、 Map Join   

    一个数据集很大,另一个数据集很小(能够被完全放进内存中),MAPJION会把小表全部读入内存中,把小表拷贝多份分发到大表数据所在实例上的内存里,在map阶段直接 拿另 外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率会高很多;

    适用于关联表中有小表的情形;可以将小表分发到所有的map节点,这样,map节点就可以在本地对自己所读到的大表数据进行join并输出最终结果,可以大大提高join操作的并发度,加快处理速度。并用distributedcache机制将小表的数据分发到每一个maptask执行节点,从而每一个maptask节点可以从本地加载到小表的数据,进而在本地即可实现join

    • left outer join的左表必须是大表    
    • right outer join的右表必须是大表
    • inner join左表或右表均可以作为大表
    • full outer join不能使用mapjoin;
    •  mapjoin支持小表为子查询,使用mapjoin时需要引用小表或是子查询时,需要引用别名;在mapjoin中,可以使用不等值连接或者使用or连接多个条件;    

  1.2、 Map Join事例

      • product表

        p0001,xiaomi,001
        p0002,chuizi,001

      • orders表

        1001,20170710,p0001,1
        1002,20170710,p0001,3
        1003,20170710,p0001,3
        1004,20170710,p0002,1

      • 期望输出

        xiaomi 1001,20170710,p0001,1
        xiaomi 1002,20170710,p0001,3
        xiaomi 1003,20170710,p0001,3
        chuizi 1004,20170710,p0002,1

      • 代码实现
        • Mapper
          /**
           * 链接操作  map端链接
           */
          public class MapJoinMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
          
              private Map<String,String> pdInfoMap =new HashMap<String,String>();
              private Text keyOut=new Text();
              /**
               * 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作
               */
              @Override
              protected void setup(Context context) {
                  try {
                      Configuration conf = context.getConfiguration();
                      FileSystem fs= null;
                      fs = FileSystem.get(conf);
                      FSDataInputStream fis = fs.open(new Path("file:/F:/mr/join/map/input/a.txt"));
                      //得到缓冲区阅读器
                      BufferedReader br = new BufferedReader(new InputStreamReader(fis));
                      String line=null;
                      while((line=br.readLine())!=null){
                          String[] fields = line.split(",");
                          pdInfoMap.put(fields[0],fields[1]);
                      }
                      fis.close();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
              // 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  //订单信息
                  String orderline = value.toString();
                  String[] fields = orderline.split(",");
                  String pName = pdInfoMap.get(fields[2]);
                  keyOut.set(pName+"	"+orderline);
                  context.write(keyOut,NullWritable.get());
              }
          }
        • App
          public class MapJoinApp {
              public static void main(String[] args) throws Exception {
                  Configuration conf = new Configuration();
                  conf.set("fs.defaultFS", "file:///");
                  Job job = Job.getInstance(conf);
                  //设置job的各种属性
                  job.setJobName("MapJoinApp");                        //作业名称
                  job.setJarByClass(MapJoinApp.class);                 //搜索类
                  //添加输入路径
                  FileInputFormat.addInputPath(job,new Path("F:/mr/join/map/input/b.txt"));
                  //设置输出路径
                  FileOutputFormat.setOutputPath(job,new Path("F:/mr/join/map/output"));
                  job.setMapperClass(MapJoinMapper.class);             //mapper类
                  //没有reduce
                  job.setNumReduceTasks(0);
                  job.setMapOutputKeyClass(Text.class);           //
                  job.setMapOutputValueClass(NullWritable.class);  //
          
                  job.waitForCompletion(true);
              }
          }
        • 输出和期望输出一致

3、Reduce端Join

    • Reduce端连接比Map端连接更为普遍,因为输入的数据不需要特定的结构,但是效率比较低,因为所有数据都必须经过Shuffle过程。
    • 基本思路:
      1. Map端读取所有的文件,并在输出的内容里加上标示,代表数据是从哪个文件里来的。
      2. 在reduce处理函数中,按照标识对数据进行处理
      3. 然后根据Key去join来求出结果直接输出。
    • 例子
      • 数据如上
      • 计算过程:
        • 在Map阶段,把所有数据标记成<key,value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于products表的记录,value的值为"products#"+name;来源于orders的记录,value的值为"orders#"+score。
        • 在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终的结果。
      • 代码如下:
        • Mapper
          /**
           * map阶段打标记
           */
          public class reduceMapper extends Mapper<LongWritable,Text,Text,Text> {
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  String line = value.toString();
                  String[] fields = line.split(",");
          FileSplit fileSplit
          = (FileSplit)context.getInputSplit(); String pathName = fileSplit.getPath().toString(); pathName=pathName.substring(27); //通过文件名判断是那种数据 if (pathName.startsWith("a")){//product数据 //System.out.println(keyOut+" "+valueOut); context.write(new Text(fields[0]),new Text("product#"+fields[1])); }else if (pathName.startsWith("b")){ context.write(new Text(fields[2]),new Text("order#"+fields[0]+" "+fields[1]+" "+fields[3])); } } }
        • Reducer
          public class reduceReducer extends Reducer<Text,Text,Text,Text> {
              @Override
              protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                  //存放产品信息
                  List<String> proInfo = new ArrayList<String>();
                  //存放订单信息
                  List<String> ordInfo = new ArrayList<String>();
                  for (Text text:values){
                      System.out.println("key="+key+"  value="+text);
                      //将数组中的数据添加到对应的数组中去
                      if (text.toString().startsWith("product")){
                          proInfo.add(text.toString().split("#")[1]);
                      }else if(text.toString().startsWith("order")){
                          ordInfo.add(text.toString().split("#")[1]);
                      }
                  }
                  //获取两个数组的大小
                  int sizePro = proInfo.size();
                  int sizeOrd = ordInfo.size();
                  //遍历两个数组将结果写出去
                  for (int i=0;i<sizePro;i++){
                      for (int j=0;j<sizeOrd;j++){
                          context.write(key,new Text(proInfo.get(i)+" "+ordInfo.get(j)));
                      }
                  }
              }
          }
        • App
          public class ReduceApp {
              public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                  Configuration conf = new Configuration();
                  conf.set("fs.defaultFS", "file:///");
                  Job job = Job.getInstance(conf);
          
                  Path path = new Path("F:\mr\join\map/output1");
                  FileSystem fileSystem = path.getFileSystem(conf);
                  if(fileSystem.isDirectory(path)){
                      fileSystem.delete(path,true);
                  }
          
                  //设置job的各种属性
                  job.setJobName("ReduceApp");                        //作业名称
                  job.setJarByClass(ReduceApp.class);                 //搜索类
                  //添加输入路径
                  FileInputFormat.addInputPath(job,new Path("F:\mr\join\map\input"));
                  //设置输出路径
                  FileOutputFormat.setOutputPath(job,new Path("F:\mr\join\map/output1"));
          
                  job.setMapperClass(reduceMapper.class);             //mapper类
                  job.setReducerClass(reduceReducer.class);         //reducer类
          
                  job.setMapOutputKeyClass(Text.class);           //
                  job.setMapOutputValueClass(Text.class);  //
          
                  job.waitForCompletion(true);
              }
          }
        • 输出结果
          p0001    xiaomi 1003    20170710    3 
          p0001    xiaomi 1002    20170710    3 
          p0001    xiaomi 1001    20170710    1 
          p0002    chuizi 1004    20170710    1

         细节:

      • 当map读取源文件时,如何区分出是file1还是file2 
        FileSplit fileSplit = (FileSplit)context.getInputSplit();
        String path =  fileSplit.getPath().toString();

             根据path就可以知道文件的来源咯。

原文地址:https://www.cnblogs.com/tongxupeng/p/10417527.html