Spark(八)【利用广播小表实现join避免Shuffle】

使用场景

大表join小表 只能广播小表

普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。

注意:RDD是并不能进行广播的,只能将RDD内部的数据通过collect拉取到Driver内存然后再进行广播

核心思路

将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

代码演示

正常join

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MapJoin")
    val sc: SparkContext = new SparkContext(conf)
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List("key1" -> 2, "key1" -> 10, "key2" -> 20, "key3" -> 30))
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List("key1" -> 5, "key1" -> 20, "key2" -> 40, "key4" -> 30))
	 //join
    rdd1.join(rdd2).collect().foreach(println)

控制台

(key1,(2,5))
(key1,(2,20))
(key1,(10,5))
(key1,(10,20))
(key2,(20,40))

正常left join

//left join
rdd1.leftOuterJoin(rdd2).collect().foreach(println)
(k1,(10,Some(-10)))
(k1,(10,Some(-100)))
(k2,(20,Some(-20)))
(k1,(100,Some(-10)))
(k1,(100,Some(-100)))
(k3,(30,None))

广播:join

    //广播rdd2
    val bd: Broadcast[Array[(String, Int)]] = sc.broadcast(rdd2.collect())
    val result = rdd1.flatMap {
      case (key1, value1) => {
        bd.value
          .filter(key1 == _._1)
          .map {
            case (key2, value2) =>
              (key1, (value1, value2))
          }
      }
    }
    result.collect().foreach(println)

广播:left join

    //广播rdd2
    val bd: Broadcast[Array[(String, Int)]] = sc.broadcast(rdd2.collect())
    val result: RDD[(String, (Int, Option[Int]))] = rdd1.flatMap {
      case (key1, value1) =>
        val arr = bd.value
        val keys = arr.map(_._1)
        if (keys.contains(key1)) {
          bd.value.filter(key1 == _._1).map {
            case (key2, value2) =>
              (key1, (value1, Some(value2)))
          }
        } else {
          Array(key1 -> (value1, None))
        }
    }
    result.collect.foreach(println)

不适用场景

由于Spark的广播变量是在每个Executor中保存一个副本,如果两个RDD数据量都比较大,那么如果将一个数据量比较大的 RDD做成广播变量,那么很有可能会造成内存溢出

原文地址:https://www.cnblogs.com/wh984763176/p/13668240.html