Spark--RDD

RDD  Resilient Distributed Datasets
弹性分布式数据集


Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

可被并行处理的容错元素集合

a single RDD has one or more partitions scattered across multiple nodes,
a single partition is processed on a single node,
a single node can handle multiple partitions (with optimum 2-4 partitions per CPU according to the official documentation)
Since Spark supports pluggable resource management details of the distribution will depend on the one you use (Standalone, Yarn, Messos).

一个RDD有一个或多个partitions在不同的nodes上

一个partition只能在一个节点上运行(不能在多节点上运行)

一个节点可以处理多个partitions

RDD支持两种操作

transformations and actions

Transformations

创建新的数据集在已经存在的数据集中。通过函数处理数据集元素,然后返回一个新的RDD。所有Transformations操作都是懒操作。所有transformations操作都是通过actions操作触发的。

Actions

对数据集进行计算,然后返回结果给driver program.

 

持久化

cache()

  使用默认的存储级别:StorageLevel.MEMORY_ONLY

persist()

  存储方式如下所示:

checkpoint()

调用rdd 的checkpoint()方法来设置检查点,将数据存放到硬盘中,使用SparkContext.setCheckpointDir()来设置文件的存储目录,文件存储在hdfs中;

在调用checkpoint()之前,需要进行persist()操作,因为在进行checkpoint过程中需要获取rdd信息,避免重新计算,最好进行persist()操作。

RDD宽依赖

RDD窄依赖

Shuffle(洗牌)操作

理解闭包

打印元素

打印RDD元素可以考虑使用如下方法:

rdd.foreach(println) or rdd.map(println)

但是只能应用在local模式,如果运行在cluster模式,输出会在不同的executor节点。

在cluster模式可以考虑此方法:

rdd.collect().foreach(println)

但此方式是将所有RDD元素都取回到driver节点。如果数据量过大可能导致内存溢出。

如果只是打印部分数据,进行查看,可以考虑如下方法:

rdd.take(100).foreach(println)

广播变量 Broadcast Variables

广播变量值被分发到各个节点

    val data = (1 to 10)
    val data1 = (1 to 10)
    
    val v = sc.broadcast(data)
    val rdd = sc.parallelize(data1)

    def func(x:Int)={
      println(x)
      for(i<- v.value) print(i)  //打印 broadcast 变量值
      println()
      x+1
    }
    val info = rdd.map(x=>func(x)) //调用函数

    info.foreach(println(_))

累加器 Accumulators

SparkContext.accumulator(v),在tasks的集群中使用+=来对值进行累加, 只有driver program才能读累加器的值,使用累加器的value方法来读取值。

    val data = (1 to 10)
    var counter = sc.accumulator(0,"Counter")
    var rdd = sc.parallelize(data)

    rdd.foreach(x => counter += x)
    println("Counter value: " + counter.value)

这里注意Accumnulator作用域

    val data = (1 to 10)
    var counter = sc.accumulator(0,"Counter")
    var rdd = sc.parallelize(data)

    def func(i:Int)={
      counter+=i
      i
    }

    //val info = rdd.map(x => func(x))            //这里调用函数实现累加
    val info = rdd.map{x=>println(x);counter+=x;x}

    println("Counter value: " + counter.value)  //这里counter.value的值为0,因为还没有actions执行,

    info.foreach(println(_))    //data数据在每个node节点上,打印输出
    info.collect().foreach(println(_)) //将data数据取回到 driver program 打印输出

    println("Counter value: " + counter.value) //这里counter.value的值为想要的结果
原文地址:https://www.cnblogs.com/one--way/p/5816955.html