RDD

1、RDD概念:

  RDD 叫做 弹性分布式数据集,是spark中最基本的数据抽象。代表着一个可分区、元素可并行计算、不可变的数据集合。

RDD特点:自动容错、位置感知性调度、可伸缩性,允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

2、RDD原理:

(1)分区partition:RDD的基本组成单位,每个分区partition都会被一个计算任务task处理,并决定并行计算的粒度。分区partition在创建RDD的时候可以指定,若不指定就采用默认的分区数,默认是程序所分配到的CPU Core的数目。

(2)计算分区数据函数compute:RDD抽象类要求其所有子类都必须实现compute方法,目的是计算该分区中的数据,compute函数负责的是父RDD分区数据到子RDD分区数据的变换逻辑。

  RDD抽象类要求其所有子类都必须实现compute方法,该方法介绍的参数之一是一个Partition对象,目的是计算该分区中的数据。以MapPartitionsRDD类为例,其compute方法如下

override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))

  MapPartitionsRDD类的compute方法调用当前RDD内的第一个父RDD的iterator方法,该方法的目的是拉取父RDD对应分区的数据,iterator方法会返回一个迭代器对象,迭代器内部存储的每一个元素即父RDD对应分区内的数据记录。
  RDD的粗粒度转换体现在map方法上,f函数是map转换操作函数,RDD会对一个分区(而不是一条一条数据记录)内的数据执行单的的操作f,最终返回包含所有经过转换过的数据记录的新迭代器,即新的分区。
  其他RDD子类的compute方法与之类似,在需要用用到父RDD的分区数据时,就会调用iterator方法,然后根据需求在得到的数据上执行相应的操作。换句话说,compute函数负责的是父RDD分区数据到子RDD分区数据的变换逻辑。

https://blog.csdn.net/jiangpeng59/article/details/53213694

(3)分区函数 Partitioner:spark有两种分区函数,一种是 HashPartitioner、RangePartitioner,只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

 (4)分区位置列表:这个列表存储着每个分区partition所在的hdfs数据块block的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

 3、WordCount粗图解RDD:

 

4、RDD的创建方式

(1)读取文件生成:

val file = sc.textFile("/spark/hello.txt")

val lines: RDD[String] = sc.textFile(args(0))

 

(2) 并行化方式创建RDD

a、parallelize()

scala> val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26

scala> 

将java的list转成scala RDD

val summaries: util.List[S3ObjectSummary] = objectListing.getObjectSummaries
val scalaArray: Array[S3ObjectSummary] = Array()
val newRDD: RDD[S3ObjectSummary] = sc.parallelize(summaries.toArray(scalaArray))

b、makeRDD()

val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))  
val rdd2 = sc.makeRDD(seq) 

  这两种创建方式的区别:

  当调用parallelize()方法的时候,不指定分区数的时候,使用系统给出的分区数,而调用makeRDD()方法的时候,会为每个集合对象创建最佳分区,而这对后续的调用优化很有帮助。

  两种创建方式及区别博客:https://www.iteye.com/blog/xiaotutu365-2379890

(3)其他方式:

读取数据库等等其他的操作。也可以生成RDD。

RDD可以通过其他的RDD转换而来的。

参考博客:https://www.cnblogs.com/frankdeng/p/9301653.html

5、RDD对象,RDD的实现类

1、MapPartitionRDD:是RDD类的一种实现,在word count程序中经常出现。

org.apache.spark.rdd
Class MapPartitionsRDD<U,T>

Object   org.apache.spark.rdd.RDD<U>     org.apache.spark.rdd.MapPartitionsRDD<U,T>
public class MapPartitionsRDD<U,T> extends RDD<U>

All Implemented Interfaces: java.io.Serializable, Logging

2、ParallelCollectionRDD:利用并行化方式,将集合创建为RDD的时候生的的对象

public class ParallelCollectionRDD<T> extends RDD<T>

参考官网MapPartitionsRDDhttp://spark.apache.org/docs/1.2.2/api/java/org/apache/spark/rdd/MapPartitionsRDD.html

各种RDD的实现:http://spark.apache.org/docs/1.2.2/api/java/org/apache/spark/rdd/package-summary.html

6、RDD的Lineage血统

1、RDD血统:数据容错,发生错误,可以进行重算恢复。Lineage记录的是特定数据的 Transformation 转换操作。

  为了保证RDD中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。

  相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升

2、宽窄依赖的Lineage容错

对与Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思)。

Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。


参考博客:https://blog.csdn.net/u013063153/article/details/73865123

原文地址:https://www.cnblogs.com/guoyu1/p/12091197.html