Spark中的编程模型

1. Spark中的基本概念

Application:基于Spark的用户程序,包含了一个driver program和集群中多个executor。

Driver Program:运行Application的main()函数并创建SparkContext。通常SparkContext代表driver program。

Executor:为某Application运行在worker node上的一个进程。该进程负责运行Task,并负责将数据存在内存或者磁盘

上。每个Application都有自己独立的executors。

Cluster Manager: 在集群上获得资源的外部服务(例如 Spark Standalon,Mesos、Yarn)。

Worker Node: 集群中任何可运行Application代码的节点。

Task:被送到executor上执行的工作单元。

Job:可以被拆分成Task并行计算的工作单元,一般由Spark Action触发的一次执行作业。

Stage:每个Job会被拆分成很多组Task,每组任务被称为stage,也可称TaskSet。该术语可以经常在日志中看到。

RDD:Spark的基本计算单元,通过Scala集合转化、读取数据集生成或者由其他RDD经过算子操作得到。

2. Spark应用框架

客户Spark程序(Driver Program)来操作Spark集群是通过SparkContext对象来进行,SparkContext作为一个操作和调度的总入口,在初始化过程中集群管理器会创建DAGScheduler作业调度和TaskScheduler任务调度。
DAGScheduler作业调度模块是基于Stage的高层调度模块(参考:Spark分析之DAGScheduler),DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图中,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。它为每个Spark Job计算具有依赖关系的多个Stage任务阶段(通常根据Shuffle来划分Stage,如groupByKey, reduceByKey等涉及到shuffle的transformation就会产生新的stage),然后将每个Stage划分为具体的一组任务,以TaskSets的形式提交给底层的任务调度模块来具体执行。其中,不同stage之前的RDD为宽依赖关系。 TaskScheduler任务调度模块负责具体启动任务,监控和汇报任务运行情况。
创建SparkContext一般要经过下面几个步骤:

a). 导入Spark的类和隐式转换

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

b). 构建Spark应用程序的应用信息对象SparkConf

val conf = new SparkConf().setAppName(appName).setMaster(master_url)

c). 利用SparkConf对象来初始化SparkContext

val sc = new SparkContext(conf)

d). 创建RDD、并执行相应的Transformation和action并得到最终结果。

e). 关闭Context

在完成应用的设计和编写后,使用spark-submit来提交应用的jar包。spark-submit的命令行参考如下:
Submitting Applications

./bin/spark-submit 
  --class <main-class>
  --master <master-url> 
  --deploy-mode <deploy-mode> 
  ... # other options
  <application-jar> 
  [application-arguments]

Spark的运行模式取决于传递给SparkContext的MASTER环境变量的值。master URL可以是以下任一种形式:
Master URL 含义
local 使用一个Worker线程本地化运行SPARK(完全不并行)
local[*] 使用逻辑CPU个数数量的线程来本地化运行Spark
local[K] 使用K个Worker线程本地化运行Spark(理想情况下,K应该根据运行机器的CPU核数设定)
spark://HOST:PORT 连接到指定的Spark standalone master。默认端口是7077.
yarn-client 以客户端模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。
yarn-cluster 以集群模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。
mesos://HOST:PORT 连接到指定的Mesos集群。默认接口是5050.
而spark-shell会在启动的时候自动构建SparkContext,名称为sc。

3. RDD的创造

Spark所有的操作都围绕弹性分布式数据集(RDD)进行,这是一个有容错机制并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征。目前有两种类型的基础RDD:并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行计算。 Hadoop数据集(Hadoop Datasets):在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。 这两种类型的RDD都可以通过相同的方式进行操作,从而获得子RDD等一系列拓展,形成lineage血统关系图。

1). 并行化集合
并行化集合是通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组创建一个并行集合。
例如:val rdd = sc.parallelize(Array(1 to 10)) 根据能启动的executor的数量来进行切分多个slice,每一个slice启动一个Task来进行处理。
val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的数量

(2). Hadoop数据集
Spark可以将任何Hadoop所支持的存储资源转化成RDD,如本地文件(需要网络文件系统,所有的节点都必须能访问到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。

a). 使用textFile()方法可以将本地文件或HDFS文件转换成RDD
支持整个文件目录读取,文件可以是文本或者压缩文件(如gzip等,自动执行解压缩并加载数据)。如textFile(”file:///dfs/data”)

支持通配符读取,例如:

val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");
val rdd2=rdd1.map(_.split("t")).filter(_.length==6)
rdd2.count()
......
14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903
......

textFile()可选第二个参数slice,默认情况下为每一个block分配一个slice。用户也可以通过slice指定更多的分片,但不能使用少于HDFS block的分片数。

b). 使用wholeTextFiles()读取目录里面的小文件,返回(用户名、内容)对
c). 使用sequenceFile[K,V]()方法可以将SequenceFile转换成RDD。SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。
d). 使用SparkContext.hadoopRDD方法可以将其他任何Hadoop输入类型转化成RDD使用方法。一般来说,HadoopRDD中每一个HDFS block都成为一个RDD分区。
此外,通过Transformation可以将HadoopRDD等转换成FilterRDD(依赖一个父RDD产生)和JoinedRDD(依赖所有父RDD)等。

4. RDD操作

RDD支持两类操作:
转换(transformation)现有的RDD通关转换生成一个新的RDD,转换是延时执行(lazy)的。
动作(actions)在RDD上运行计算后,返回结果给驱动程序或写入文件系统。
例如,map就是一种transformation,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。reduce则是一种action,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。

— Transformations —

(1). map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.
返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成

(2). filter(func)

Return a new dataset formed by selecting those elements of the source on which func returns true.
返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成
Test:

val num=sc.parallelize(1 to 100)
val num2 = num.map(_*2)
val num3 = num2.filter(_ % 3 == 0)
......
num3.collect
//res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198)
num3.toDebugString
//res5: String =
//FilteredRDD[20] at filter at <console>:16 (48 partitions)
//  MappedRDD[19] at map at <console>:14 (48 partitions)
//    ParallelCollectionRDD[18] at parallelize at <console>:12 (48 partitions)

(3). flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)
Test:

val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))
kv.flatMap(x=>x.map(_+1)).collect
//res0: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9)

//Word Count
sc.textFile('hdfs://hdp01:9000/home/debugo/*.txt').flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)

  

(4). mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。mapPartitions将会被每一个数据集分区调用一次。各个数据集分区的全部内容将作为顺序的数据流传入函数func的参数中,func必须返回另一个Iterator[T]。被合并的结果自动转换成为新的RDD。下面的测试中,元组(3,4)和(6,7)将由于我们选择的分区策略和方法而消失。
The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose
Test:

val nums = sc . parallelize (1 to 9 , 3)
def myfunc[T] ( iter : Iterator [T] ) : Iterator [( T , T ) ] = {
    var res = List [(T , T) ]()
    var pre = iter.next
    while ( iter.hasNext )
    {
        val cur = iter . next ;
        res .::= ( pre , cur )
        pre = cur ;
    }
    res . iterator
}
//myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
nums.mapPartitions(myfunc).collect
//res12: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

  

(5). mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T=>) ==> Iterator<U=> when running on an RDD of type T.
类似于mapPartitions, 其函数原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。下面测试中,将分区索引和分区数据一起输出。
Test:

val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)
def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {
iter . toList . map ( x => index + "-" + x ) . iterator
}
//myfunc: (index: Int, iter: Iterator[Int])Iterator[String]
x . mapPartitionsWithIndex ( myfunc ) . collect()
res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)

  

(6). sample(withReplacement,fraction, seed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子。

val a = sc . parallelize (1 to 10000 , 3)
a . sample ( false , 0.1 , 0) . count
res0 : Long = 960
a . sample ( true , 0.7 , scala.util.Random.nextInt(10000)) . count
res1: Long = 7073

  

(7). union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成。

(8). intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

(9). distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.
返回一个包含源数据集中所有不重复元素的新数据集
Test:

(7). union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成。

(8). intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.

(9). distinct([numTasks]))
Return a new dataset that contains the distinct elements of the source dataset.
返回一个包含源数据集中所有不重复元素的新数据集
Test:

  

(10.)groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集
注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它。如果分组是用来计算聚合操作(如sum或average),那么应该使用reduceByKey 或combineByKey 来提供更好的性能。
groupByKey, reduceByKey等transformation操作涉及到了shuffle操作,所以这里引出两个概念宽依赖和窄依赖。

窄依赖(narrow dependencies)
子RDD的每个分区依赖于常数个父分区(与数据规模无关)
输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap
输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce
从输入中选择部分元素的算子,如filter、distinct、substract、sample
宽依赖(wide dependencies)
子RDD的每个分区依赖于所有的父RDD分区
对单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey
对两个RDD基于key进行join和重组,如join
经过大量shuffle生成的RDD,建议进行缓存。这样避免失败后重新计算带来的开销。
注意:reduce是一个action,和reduceByKey完全不同。

(11).reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的

(12).sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定
Test:

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
res0: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.sortByKey().collect //注意sortByKey的小括号不能省
res1: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.groupByKey().collect
res1: Array[(String, Iterable[Int])] = Array((A,ArrayBuffer(1, 4)), (B,ArrayBuffer(2, 5)), (C,ArrayBuffer(3)))
kv1.reduceByKey(_+_).collect
res2: Array[(String, Int)] = Array((A,5), (B,7), (C,3))

  

(13). join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集

(14).cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操作也可以称之为groupwith
Test:

val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum + = x)
accum.value
val num=sc.parallelize(1 to 100)

JackyKen声明:

本文出自:http://debugo.com, 原文地址:http://debugo.com/spark-programming/, 感谢原作者分享。

谨言慎行,专注思考 , 工作与生活同乐
原文地址:https://www.cnblogs.com/tmeily/p/4210108.html