Spark RDD

1、RDD是Resilient Distributed Dataset(即"弹性分布式数据”)的缩写,它是Spark中的基本抽象类,包含在所有RDD中存在的基本操作:map、filter、persist。
immutable:不可变的;
implicit conversion:隐式变换;
propagated back to : 传回;
 
It is a fault-tolerant collection of elements that can be operated on in parallel.
Two ways to create RDDs:
(1) parallelizing an existing collection in your driver program;
(2) referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
 
(1)Parallelized Collections:
Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program.The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.For example:
             val data=Array(1,2,3,4,5)
             val distData=sc.parallelize(data)
Once created, the distributed dataset (distData) can be operated on in parallel.
              distData.reduce((a,b)=>a+b)
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.
               sc.parallelize(data, 10)   //set it manually
 
(2)External Datasets:
Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
                 scala> val distFile=sc.textFile("data.txt")
add up the sizes of all the lines using the map and reduce operations as follows:
                 distFile.map(s => s.length).reduce((a, b) => a + b)
Notice:
A: If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
B: All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
C: The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
 
(3)RDD Operations:
Transformations:which create a new dataset from an existing one, eg map.
Actions:which return a value to the driver program after running a computation on the dataset, eg reduce.
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
                 val lines = sc.textFile("data.txt")      //not loaded in memory or otherwise acted on
                 val lineLengths = lines.map(s => s.length)       //lineLengths is not immediately computed
                 lineLengths.persist()    //use lineLengths again later
                 val totalLength = lineLengths.reduce((a, b) => a + b)
Passing Functions to Spark:
One: Anonymous function syntax, which can be used for short pieces of code.
Two: Static methods in a global singleton object.
Understanding closures:
                 var counter = 0
                 var rdd = sc.parallelize(data)
                 rdd.foreach(x => counter += x)      // Wrong: Don't do this!!  Can use Accumulator replace
                 println("Counter value: " + counter)
                
                 Printing elements of an RDD:
                 rdd.foreach(println) or rdd.map(println)   //Wrong
                 rdd.collect().foreach(println)          //Right
                 rdd.take(100).foreach(println)         //打印前100行
Working with Key-Value Pairs: shuffle()、reduceByKey()
Transformations: ……
Actions: ……
Shuffle operations:
It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.
        Performance Impact:
The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.
Shuffle also generates a large number of intermediate files on disk. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.
 
(4) RDD Persistence:
A: One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. 
This allows future actions to be much faster (often by more than 10x). 
B: You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. 
C: Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.
Storage Level : MEMORY_ONLY|MEMORY_AND_DISK|MEMORY_ONLY_SER|MEMORY_AND_DISK_SER|DISK_ONLY|MEMORY_ONLY_2 etc|OFF_HEAP (experimental)
       MEMORY_ONLY>MEMORY_ONLY_SER
Removing Data : RDD.unpersist() 
 
2、Shared Variables
Broadcast Variables : Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. 
                 scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
                 scala> broadcastVar.value
Accumulators:
                 scala> val accum = sc.accumulator(0, "My Accumulator")
                 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
                 scala> accum.value
声明自己的Accumulator:
                 object VectorAccumulatorParam extends AccumulatorParam[Vector] {
                    def zero(initialValue: Vector): Vector = {
                        Vector.zeros(initialValue.size)
                     }
                    def addInPlace(v1: Vector, v2: Vector): Vector = {
                        v1 += v2
                     }
                 }
                 // Then, create an Accumulator of this type:
                 val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
Accumulators do not change the lazy evaluation model of Spark.
                 val accum = sc.accumulator(0)
                 data.map { x => accum += x; f(x) }
                 // Here, accum is still 0 because no actions have caused the `map` to be computed.
 
 
 
原文地址:https://www.cnblogs.com/sunflower627/p/4997648.html