使用 Spark 中的共享变量

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

import java.util.Arrays;
import java.util.List;

public class BroadcastVariable {
public static void main(String[] args) {

SparkConf conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> rdd = sc.parallelize(numbers);

final int factor = 3;

JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
return v1 * factor;
}
});

newNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
}

如上代码在Driver端定义了一个变量 factor,在函数中调用这个factor。实际的执行过程中会发生什么事呢?
假设一个节点上有100个task,那么Spark会为每个task复制一份factor变量放在内存中。
但其实我们只是在函数中读取了这个变量的值进行了计算,完全没有必要复制100份,只需要在当前的Executor中保留一份,所有的task都来读取这一份数据就足够了。
设想一下,如果要共享一个很大的变量,在每个task中都复制一份无疑会消耗巨大的网络带宽和节点内存,这是非常不合理的。

基于这种情况,我们就可以使用广播变量。
package com.rabbit;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

import java.util.Arrays;
import java.util.List;

public class BroadcastVariable {
public static void main(String[] args) {

SparkConf conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<Integer> numbers = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> rdd = sc.parallelize(numbers);

final int factor = 3;
//将factor转为广播变量
final Broadcast<Integer> broadcastFactor = sc.broadcast(factor);
JavaRDD<Integer> newNumbers = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
//使用广播变量时,调用 value()方法获得其内部封装的值
int factor = broadcastFactor.value();
return v1 * factor;
}
});

newNumbers.foreach(new VoidFunction<Integer>() {
public void call(Integer number) throws Exception {
System.out.println(number);
}
});
}
}

Scala 版本:
import org.apache.spark.{SparkConf, SparkContext}

object BroadcastVariable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("BroadcastVariable")
.setMaster("local")

val sc = new SparkContext(conf)

val arr = Array(1,2,3,4,5)
val numbers = sc.parallelize(arr)
val factor = 3;
val broadcastFactor = sc.broadcast(factor)

val newNumbers = numbers.map(number => number * broadcastFactor.value)

newNumbers.foreach(number => println(number))
}

}
 
原文地址:https://www.cnblogs.com/rabbit624/p/10664567.html