Spark ---RDD

RDD的弹性表现:

1、弹性之一:自动的进行内存和磁盘数据存储的切换; 
2、弹性之二:基于Lineage的高效容错(第n个节点出错,会从第n-1个节点恢复,血统容错); 

  Lineage由spark的依赖关系确定。
3、弹性之三:Task如果失败会自动进行特定次数的重试(默认4次); 
4、弹性之四:Stage如果失败会自动进行特定次数的重试(可以只运行计算失败的阶段);只计算失败的数据分片; 
5、checkpoint和persist 
6、数据调度弹性:DAG TASK 和资源 管理无关 
7、数据分片的高度弹性(人工自由设置分片函数),repartition 

RDD容错:

1.checkpoint(本质将RDD写入disk进行做检查点)

  checkpoint是为了lineage做辅助,血统过长会造成容错成本过高,这样的话就不如去中间做先checkpoint然后血统从checkpoint开始算起

2.记录更新

RDD的5大特点

     1)有一个分片列表,就是能被切分,和Hadoop一样,能够切分的数据才能并行计算。

  一组分片(partition),即数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。每个分配的存储是由BlockManager实现的,每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。

     2)由一个函数计算每一个分片,这里指的是下面会提到的compute函数。

        Spark中的RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

     3)对其他RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。

        RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

     4)可选:key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。

    一个partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

     5)可选:每一分片的优先计算位置,比如HDFS的block的所在位置应该是优先计算的位置。

    一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

  

RDD的特点:
  • 它是在集群节点上的不可变的、已分区的集合对象。
  • 通过并行转换的方式来创建如(map, filter, join, etc)。
  • 失败自动重建。
  • 可以控制存储级别(内存、磁盘等)来进行重用。
  • 必须是可序列化的。
  • 是静态类型的。

进一步,说:

  worker里有很多Excutor,真正完成计算的是Excutor,Excutor计算都是在内存进行计算,
  Excutor里面有partitioner,partitioner里面的数据如果内存足够大的话放到内存中,它是一点一点读的。
  RDD是分布式数据集,所说RDD就是这个。


  RDD有5个特点:
    1.a list of partiotioner有很多个partiotioner(这里有3个partiotioner),可以明确的说,一个分区在一台机器上,一个分区其实就是放在一台机器的内存上,一台机器上可以有多个分区。

    2.a function for partiotioner一个函数作用在一个分区上。比如说一个分区有1,2,3 在rdd1.map(_*10),把RDD里面的每一个元素取出来乘以10,每个分片都应用这个map的函数。

    3.RDD之间有一系列的依赖rdd1.map(_*10).flatMap(..).map(..).reduceByKey(...),构建成为DAG,这个DAG会构造成很多个阶段,这些阶段叫做stage,RDDstage之间会有依赖关系,后面根据前面的依赖关系来构建,如果前面的数据丢了,它会记住前面的依赖,从前面进行重新恢复。每一个算子都会产生新的RDD。textFile 与flatMap会产生两个RDD.

    4.分区器hash & Integer.Max % partiotioner 决定数据到哪个分区里面,可选,这个RDD是key-value 的时候才能有

    5.最佳位置。数据在哪台机器上,任务就启在哪个机器上,数据在本地上,不用走网络。不过数据进行最后汇总的时候就要走网络。(hdfs file的block块)

  RDD有5个特点:

    1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。


    2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)


    3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。


    4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。


    5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

推荐,阅读源码来进一步学习。可见,知识来自于源码

* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)

Spark RDD来源

1,使用程序中的集合创建RDD(用于小量测试); 
2,使用本地文件系统创建RDD(测试大量数据); 
3,使用HDFS创建RDD(生产环境最常用的RDD创建方式); 
4,基于DB创建RDD; 
5,基于NoSQL,例如HBase; 
6,基于S3创建RDD; 
7,基于数据流创建RDD;

RDD API

map

    def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U] 
    Return a new RDD by applying a function to all elements of this RDD.

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

flatmap

    def flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U] 
    Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。

 

原文地址:https://www.cnblogs.com/Dhouse/p/7516090.html