030 RDD Join中宽依赖与窄依赖的判断

1.规律

   如果JoinAPI之前被调用的RDD API是宽依赖(存在shuffle), 而且两个join的RDD的分区数量一致,join结果的rdd分区数量也一样,这个时候join api是窄依赖
  除此之外的,rdd 的join api是宽依赖

2.Join的理解

  

  

3.举例

 1 A表数据:
 2   1 a
 3   2 b
 4   3 c
 5 B表数据:
 6   1 aa1
 7   1 aa2
 8   2 bb1
 9   2 bb2
10   2 bb3
11   4 dd1
12 
13 A inner join B:
14   1    a 1 aa1
15   1    a 1 aa2
16   2   b 2 bb1
17   2   b 2 bb2
18   2   b 2 bb3
19 
20 A left outer join B:
21   1    a 1 aa1
22   1    a 1 aa2
23   2   b 2 bb1
24   2   b 2 bb2
25   2   b 2 bb3
26   3   c null null
27 
28 A right outer join B:
29   1    a 1 aa1
30   1    a 1 aa2
31   2   b 2 bb1
32   2   b 2 bb2
33   2   b 2 bb3
34   null null 4 dd1
35 
36 A full outer join B:
37   1    a 1 aa1
38   1    a 1 aa2
39   2   b 2 bb1
40   2   b 2 bb2
41   2   b 2 bb3
42   3   c null null
43   null null 4 dd1
44 
45 A left semi join B:
46   1 a
47   2 b

  

4.API

  必须是Key/value键值对

  

5.测试程序

 1 import org.apache.spark.{SparkConf, SparkContext}
 2 
 3 /**
 4   * RDD数据Join相关API讲解
 5   * Created by ibf on 02/09.
 6   */
 7 object RDDJoin {
 8   def main(args: Array[String]): Unit = {
 9     val conf = new SparkConf()
10       .setMaster("local[*]")
11       .setAppName("RDD-Join")
12     val sc = SparkContext.getOrCreate(conf)
13 
14     // ==================具体代码======================
15     // 模拟数据产生
16     val rdd1 = sc.parallelize(Array(
17       (1, "张三1"),
18       (1, "张三2"),
19       (2, "李四"),
20       (3, "王五"),
21       (4, "Tom"),
22       (5, "Gerry"),
23       (6, "莉莉")
24     ), 1)
25 
26     val rdd2 = sc.parallelize(Array(
27       (1, "上海"),
28       (2, "北京1"),
29       (2, "北京2"),
30       (3, "南京"),
31       (4, "纽约"),
32       (6, "深圳"),
33       (7, "香港")
34     ), 1)
35 
36     // 调用RDD API实现内连接
37     val joinResultRDD = rdd1.join(rdd2).map {
38       case (id, (name, address)) => {
39         (id, name, address)
40       }
41     }
42     println("----------------")
43     joinResultRDD.foreachPartition(iter => {
44       iter.foreach(println)
45     })
46     // 调用RDD API实现左外连接
47     val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map {
48       case (id, (name, addressOption)) => {
49         (id, name, addressOption.getOrElse("NULL"))
50       }
51     }
52     println("----------------")
53     leftJoinResultRDd.foreachPartition(iter => {
54       iter.foreach(println)
55     })
56     // 左外连接稍微变化一下:需要左表出现,右表不出现的数据(not in)
57     println("----------------")
58     rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map {
59       case (id, (name, _)) => (id, name)
60     }.foreachPartition(iter => {
61       iter.foreach(println)
62     })
63 
64     // 右外连接
65     println("----------------")
66     rdd1
67       .rightOuterJoin(rdd2)
68       .map {
69         case (id, (nameOption, address)) => {
70           (id, nameOption.getOrElse("NULL"), address)
71         }
72       }
73       .foreachPartition(iter => iter.foreach(println))
74 
75     // 全外连接
76     println("----------------")
77     rdd1
78       .fullOuterJoin(rdd2)
79       .map {
80         case (id, (nameOption, addressOption)) => {
81           (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL"))
82         }
83       }
84       .foreachPartition(iter => iter.foreach(println))
85 
86     // 休眠为了看4040页面
87         Thread.sleep(1000000)
88   }
89 }

6.说明 

 RDD join API:
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    返回值是RDD,RDD中的类型是一个二元组(a),a第一个元素是KEY类型的值(join的key), a第二个元素又是二元组(b), b的第一个元素是来自调用join函数的RDD的value,
    b的第二个元素是来自参数other这个RDD的value

  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    对于右边的数据返回的是Option类型是数据,所以如果右表数据不存在,返回的是None;否则是一个Some的具体数据

  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
    对于左边的数据返回的是Option类型是数据,所以如果左表数据不存在,返回的是None;否则是一个Some的具体数据

  def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
    返回的value类型是Option封装后的数据,如果数据不存在, 返回的是None,存在返回的是Some具体数据

7.缺点

  

8.优化程序

  没有使用API,根据原理写一个。

  减少shufflw算子的使用。

  1 import org.apache.spark.{SparkConf, SparkContext}
  2 
  3 /**
  4   * RDD数据Join相关API讲解
  5   * Created by ibf on 02/09.
  6   */
  7 object RDDJoin {
  8   def main(args: Array[String]): Unit = {
  9     val conf = new SparkConf()
 10       .setMaster("local[*]")
 11       .setAppName("RDD-Join")
 12     val sc = SparkContext.getOrCreate(conf)
 13 
 14     // ==================具体代码======================
 15     // 模拟数据产生
 16     val rdd1 = sc.parallelize(Array(
 17       (1, "张三1"),
 18       (1, "张三2"),
 19       (2, "李四"),
 20       (3, "王五"),
 21       (4, "Tom"),
 22       (5, "Gerry"),
 23       (6, "莉莉")
 24     ), 1)
 25 
 26     val rdd2 = sc.parallelize(Array(
 27       (1, "上海"),
 28       (2, "北京1"),
 29       (2, "北京2"),
 30       (3, "南京"),
 31       (4, "纽约"),
 32       (6, "深圳"),
 33       (7, "香港")
 34     ), 1)
 35 
 36     // 假设rdd2的数据比较少,将rdd2的数据广播出去
 37     val leastRDDCollection = rdd2.collect()
 38     val broadcastRDDCollection = sc.broadcast(leastRDDCollection)
 39 
 40     println("++++++++++++++++++")
 41     // 类似Inner Join的操作,Inner Join的功能:将两个表都出现的数据合并
 42     println("-------------------")
 43     rdd1
 44       // 过滤rdd1中的数据,只要在rdd1中出现的数据,没有出现的数据过滤掉
 45       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
 46       // 数据合并,由于一条rdd1的数据可能在rdd2中存在多条对应数据,所以使用flatMap
 47       .flatMap {
 48       case (id, name) => {
 49         broadcastRDDCollection.value.filter(_._1 == id).map {
 50           case (_, address) => {
 51             (id, name, address)
 52           }
 53         }
 54       }
 55     }
 56       .foreachPartition(iter => iter.foreach(println))
 57 
 58     // 左外连接
 59     println("---------------------")
 60     rdd1
 61       .flatMap {
 62         case (id, name) => {
 63           // 从右表所属的广播变量中获取对应id的集合列表
 64           val list = broadcastRDDCollection.value.filter(_._1 == id)
 65           // 对应id的集合可能为空,也可能数据有多个
 66           if (list.nonEmpty) {
 67             // 存在多个
 68             list.map(tuple => (id, name, tuple._2))
 69           } else {
 70             // id在右表中不存在,填默认值
 71             (id, name, "NULL") :: Nil
 72           }
 73         }
 74       }
 75       .foreachPartition(iter => iter.foreach(println))
 76 
 77     // 右外连接
 78     /**
 79       * rdd2中所有数据出现,由于rdd2中的数据在driver中可以存储,可以认为rdd1和rdd2通过right join之后的数据也可以在driver中保存下
 80       **/
 81     println("---------------------")
 82     // 将rdd1中符合条件的数据过滤出来保存到driver中
 83     val stage1 = rdd1
 84       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
 85       .collect()
 86     // 将driver中两个集合进行right join
 87     val stage2 = leastRDDCollection.flatMap {
 88       case (id, address) => {
 89         val list = stage1.filter(_._1 == id)
 90         if (list.nonEmpty) {
 91           list.map(tuple => (id, tuple._2, address))
 92         } else {
 93           Iterator.single((id, "NULL", address))
 94         }
 95       }
 96     }
 97     stage2.foreach(println)
 98 
 99     // TODO: 全外连接,不写代码,因为代码比较复杂
100 
101     //====================================
102     // 左半连接:只出现左表数据(要求数据必须在右表中也出现过),如果左表的数据在右表中出现多次,最终结果只出现一次
103     println("+++++++++++++++++")
104     println("-----------------------")
105     rdd1
106       .join(rdd2)
107       .map {
108         case (id, (name, _)) => (id, name)
109       }
110       .distinct()
111       .foreachPartition(iter => iter.foreach(println))
112     println("------------------------")
113     rdd1
114       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
115       .foreachPartition(iter => iter.foreach(println))
116 
117     // 休眠为了看4040页面
118         Thread.sleep(1000000)
119   }
120 }

9.Join的窄依赖程序

  使用reduceByKey,里面的程序会给一个分区 

 1 package com.ibeifeng.senior.join
 2 
 3 import org.apache.spark.{SparkConf, SparkContext}
 4 
 5 /**
 6   * RDD数据Join相关API讲解
 7   * Created by ibf on 02/09.
 8   */
 9 object RDDJoin2 {
10   def main(args: Array[String]): Unit = {
11     val conf = new SparkConf()
12       .setMaster("local[*]")
13       .setAppName("RDD-Join")
14     val sc = SparkContext.getOrCreate(conf)
15 
16     // ==================具体代码======================
17     // 模拟数据产生, 添加map、reduceByKey、mapPartitions等api的主要功能是给rdd1和rdd2中添加一个分区器(表示当前rdd是存在shuffle过程的)
18     val rdd1 = sc.parallelize(Array(
19       (1, "张三1"),
20       (1, "张三2"),
21       (2, "李四"),
22       (3, "王五"),
23       (4, "Tom"),
24       (5, "Gerry"),
25       (6, "莉莉")
26     ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
27       iter => iter.map(tuple => tuple._1),
28       true // 使用上一个RDD的分区器,false表示不使用, 设置为None
29     )
30 
31     val rdd2 = sc.parallelize(Array(
32       (1, "上海"),
33       (2, "北京1"),
34       (2, "北京2"),
35       (3, "南京"),
36       (4, "纽约"),
37       (6, "深圳"),
38       (7, "香港")
39     ), 1).map(x => (x, null)).reduceByKey((x,y) => x, 1).mapPartitions(
40       iter => iter.map(tuple => tuple._1),
41       true // 使用上一个RDD的分区器,false表示不使用, 设置为None
42     )
43 
44     // 调用RDD API实现内连接
45     val joinResultRDD = rdd1.join(rdd2).map {
46       case (id, (name, address)) => {
47         (id, name, address)
48       }
49     }
50     println("----------------")
51     joinResultRDD.foreachPartition(iter => {
52       iter.foreach(println)
53     })
54 
55     // 休眠为了看4040页面
56         Thread.sleep(1000000)
57   }
58 }

 

原文地址:https://www.cnblogs.com/juncaoit/p/6528146.html