Spark编程指南(Python版)

Spark编程指南

译者说在前面:近期在学习Spark相关的知识,在网上没有找到比較详细的中文教程,仅仅找到了官网的教程。出于自己学习同一时候也造福其它刚開始学习的人的目的,把这篇指南翻译成了中文。笔者水平有限,文章中难免有很多谬误,请高手指教。

本文翻译自Spark Programming Guide,由于笔者比較喜欢Python,在日常中使用也比較多,所以仅仅翻译了Python部分。只是Java和Scala大同小异。

文章出处:http://cholerae.com/2015/04/11/-%E7%BF%BB%E8%AF%91-Spark%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97-Python%E7%89%88/

概述

从高层次上来看,每个Spark应用都包括一个驱动程序。用于执行用户的main函数以及在集群上执行各种并行操作。

Spark提供的主要抽象是弹性分布式数据集(RDD)。这是一个包括诸多元素、被划分到不同节点上进行并行处理的数据集合。RDD通过打开HDFS(或其它hadoop支持的文件系统)上的一个文件、在驱动程序中打开一个已有的Scala集合或由其它RDD转换操作得到。

用户能够要求Spark将RDD持久化到内存中。这样就能够有效地在并行操作中复用。另外,在节点错误发生时RDD能够自己主动恢复。

Spark提供的还有一个抽象是能够在并行操作中使用的共享变量。在默认情况下,当Spark将一个函数转化成很多任务在不同的节点上执行的时候。对于全部在函数中使用的变量,每个任务都会得到一个副本。有时,某一个变量须要在任务之间或任务与驱动程序之间共享。Spark支持两种共享变量:广播变量,用来将一个值缓存到全部节点的内存中。累加器,仅仅能用于累加,比方计数器和求和。

这篇指南将展示这些特性在Spark支持的语言中是怎样使用的(本文仅仅翻译了Python部分)。

假设你打开了Spark的交互命令行——bin/spark-shell的Scala命令行或bin/pyspark的Python命令行都能够——那么这篇文章你学习起来将是非常easy的。

连接Spark

Spark1.3.0仅仅支持Python2.6或更高的版本号(但不支持Python3)。

它使用了标准的CPython解释器,所以诸如NumPy一类的C库也是能够使用的。

通过Spark文件夹下的bin/spark-submit脚本你能够在Python中执行Spark应用。这个脚本会加载Spark的Java/Scala库然后让你将应用提交到集群中。你能够执行bin/pyspark来打开Python的交互命令行。

假设你希望訪问HDFS上的数据。你须要为你使用的HDFS版本号建立一个PySpark连接。常见的HDFS版本号标签都已经列在了这个第三方发行版页面。

最后,你须要将一些Spark的类import到你的程序中。加入例如以下这行:

from pyspark import SparkContext, SparkConf

初始化Spark

在一个Spark程序中要做的第一件事就是创建一个SparkContext对象来告诉Spark怎样连接一个集群。为了创建SparkContext,你首先须要创建一个SparkConf对象,这个对象会包括你的应用的一些相关信息:

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName參数是在集群UI上显示的你的应用的名称。master是一个Spark、Mesos或YARN集群的URL,假设你在本地执行那么这个參数应该是特殊的”local”字符串。在实际使用中,当你在集群中执行你的程序。你一般不会把master參数写死在代码中。而是通过用spark-submit执行程序来获得这个參数。可是,在本地測试以及单元測试时。你仍须要自行传入”local”来执行Spark程序。

使用命令行

在PySpark命令行中,一个特殊的集成在解释器里的SparkContext变量已经建立好了,变量名叫做sc。

创建你自己的SparkContext不会起作用。你能够通过使用—master命令行參数来设置这个上下文连接的master主机,你也能够通过—py-files參数传递一个用逗号隔开的列表来将Python的.zip、.egg或.py文件加入到执行时路径中。

你还能够通过—package參数传递一个用逗号隔开的maven列表来给这个命令行会话加入依赖(比方Spark的包)。不论什么额外的包括依赖包的仓库(比方SonaType)都能够通过传给—repositorys參数来加入进去。Spark包的全部Python依赖(列在这个包的requirements.txt文件里)在必要时都必须通过pip手动安装。

比方,使用四核来执行bin/pyspark应当输入这个命令:

$ ./bin/pyspark –master local[4]

又比方。把code.py文件加入到搜索路径中(为了能够import在程序中),应当使用这条命令:

$ ./bin/pyspark –master local[4] –py-files code.py

想要了解命令行选项的完整信息请执行pyspark --help命令。在这些场景下,pyspark会触发一个更通用的spark-submit脚本。

在IPython这个加强的Python解释器中执行PySpark也是可行的。PySpark能够在1.0.0或更高版本号的IPython上执行。为了使用IPython。必须在执行bin/pyspark时将PYSPARK_DRIVER_PYTHON变量设置为ipython,就像这样:

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

你还能够通过设置PYSPARK_DRIVER_PYTHON_OPTS来自省定制ipython。比方。在执行IPython Notebook
时开启PyLab图形支持应该使用这条命令:

$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=“notebook –pylab inline” ./bin/pyspark

弹性分布式数据集(RDD)

Spark是以RDD概念为中心执行的。RDD是一个容错的、能够被并行操作的元素集合。创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合;从外部存储系统中引用一个数据集,这个存储系统能够是一个共享文件系统,比方HDFS、HBase或随意提供了Hadoop输入格式的数据来源。

并行化集合

并行化集合是通过在驱动程序中一个现有的迭代器或集合上调用SparkContext的parallelize方法建立的。为了创建一个能够并行操作的分布数据集,集合中的元素都会被拷贝。比方。下面语句创建了一个包括1到5的并行化集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

分布数据集(distData)被建立起来之后,就能够进行并行操作了。比方。我们能够调用disData.reduce(lambda a, b: a+b)来对元素进行叠加。在后文中我们会描写叙述分布数据集上支持的操作。

并行集合的一个重要參数是将数据集划分成分片的数量。

对每个分片,Spark会在集群中执行一个相应的任务。

典型情况下,集群中的每个CPU将相应执行2-4个分片。普通情况下。Spark会依据当前集群的情况自行设定分片数量。可是,你也能够通过将第二个參数传递给parallelize方法(比方sc.parallelize(data, 10))来手动确定分片数量。注意:有些代码中会使用切片(slice。分片的同义词)这个术语来保持向下兼容性。

外部数据集

PySpark能够通过Hadoop支持的外部数据源(包括本地文件系统、HDFS、 Cassandra、HBase、亚马逊S3等等)建立分布数据集。Spark支持文本文件、序列文件以及其它不论什么Hadoop输入格式文件。

通过文本文件创建RDD要使用SparkContext的textFile方法。这种方法会使用一个文件的URI(或本地文件路径。hdfs://、s3n://这种URI等等)然后读入这个文件建立一个文本行的集合。下面是一个例子:

>>> distFile = sc.textFile(“data.txt”)

建立完毕后distFile上就能够调用数据集操作了。比方,我们能够调用map和reduce操作来叠加全部文本行的长度。代码例如以下:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

在Spark中读入文件时有几点要注意:

  1. 假设使用了本地文件路径时。要保证在worker节点上这个文件也能够通过这个路径訪问。这点能够通过将这个文件复制到全部worker上或者使用网络挂载的共享文件系统来解决。
  2. 包括textFile在内的全部基于文件的Spark读入方法,都支持将文件夹、压缩文件、包括通配符的路径作为參数。比方,下面代码都是合法的:
textFile(“/my/directory”)
textFile(“/my/directory/*.txt”)
textFile(“/my/directory/*.gz”)

textFile方法也能够传入第二个可选參数来控制文件的分片数量。默认情况下,Spark会为文件的每个块(在HDFS中块的大小默认是64MB)创建一个分片。

可是你也能够通过传入一个更大的值来要求Spark建立很多其它的分片。

注意。分片的数量绝不能小于文件块的数量。

除了文本文件之外,Spark的Python API还支持多种其它数据格式:

  1. SparkContext.wholeTextFiles能够读入包括多个小文本文件的文件夹,然后为每个文件返回一个(文件名称。内容)对。这是与textFile方法为每个文本行返回一条记录相相应的。

  2. RDD.saveAsPickleFile和SparkContext.pickleFile支持将RDD以串行化的Python对象格式存储起来。串行化的过程中会以默认10个一批的数量批量处理。
  3. 序列文件和其它Hadoop输入输出格式。

(注意,这个特性眼下仍处于试验阶段,被标记为Experimental。眼下仅仅适用于高级用户。这个特性在未来可能会被基于Spark SQL的读写支持所取代。由于Spark SQL是更好的方式。)

可写类型支持

PySpark序列文件支持利用Java作为中介加载一个键值对RDD。将可写类型转化成Java的基本类型,然后使用Pyrolite将java结果对象串行化。当将一个键值对RDD储存到一个序列文件里时PySpark将会执行上述过程的相反过程。首先将Python对象反串行化成Java对象,然后转化成可写类型。下面可写类型会自己主动转换:

| 可写类型 | Python类型 |
| ——————— | ———— |
| Text | unicode str |
| IntWritable | int |
| FloatWritable | float |
| DoubleWritable | float |
| BooleanWritable | bool |
| BytesWritable | bytearray |
| NullWritable | None |
| MapWritable | dict |

数组是不能自己主动转换的。用户须要在读写时指定ArrayWritable的子类型.在读入的时候,默认的转换器会把自己定义的ArrayWritable子类型转化成Java的Object[],之后串行化成Python的元组。为了获得Python的array.array类型来使用主要类型的数组。用户须要自行指定转换器。

保存和读取序列文件

和文本文件相似,序列文件能够通过指定路径来保存与读取。键值类型都能够自行指定,可是对于标准可写类型能够不指定。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, “a” * x ))
>>> rdd.saveAsSequenceFile(“path/to/file”)
>>> sorted(sc.sequenceFile(“path/to/file”).collect())
[(1, u’a’), (2, u’aa’), (3, u’aaa’)]

保存和读取其它Hadoop输入输出格式

PySpark相同支持写入和读出其它Hadoop输入输出格式。包括’新’和’旧’两种Hadoop MapReduce API。

假设有必要。一个Hadoop配置能够以Python字典的形式传入。下面是一个例子,使用了Elasticsearch ESInputFormat:

$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
>>> conf = {“es.resource” : “index/type”} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD(“org.elasticsearch.hadoop.mr.EsInputFormat”,
“org.apache.hadoop.io.NullWritable”, “org.elasticsearch.hadoop.mr.LinkedMapWritable”, conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u’Elasticsearch ID’,
{u’field1′: True,
u’field2′: u’Some Text’,
u’field3′: 12345})

注意,假设这个读入格式仅仅依赖于一个Hadoop配置和/或输入路径。而且键值类型都能够依据前面的表格直接转换,那么刚才提到的这种方法非常合适。

假设你有一些自己定义的序列化二进制数据(比方从Cassandra/HBase中读取数据)。那么你须要首先在Scala/Java端将这些数据转化成能够被Pyrolite的串行化器处理的数据类型。一个转换器特质已经提供好了。简单地拓展这个特质同一时候在convert方法中实现你自己的转换代码就可以。

记住,要确保这个类以及訪问你的输入格式所需的依赖都被打到了Spark作业包中,而且确保这个包已经包括到了PySpark的classpath中。

这里有一些通过自己定义转换器来使用Cassandra/HBase输入输出格式的Python例子和转换器例子。

RDD操作

RDD支持两类操作:转化操作,用于从已有的数据集转化产生新的数据集;启动操作,用于在计算结束后向驱动程序返回结果。举个例子,map是一个转化操作,能够将数据集中每个元素传给一个函数,同一时候将计算结果作为一个新的RDD返回。还有一方面,reduce操作是一个启动操作。能够使用某些函数来聚集计算RDD中全部的元素。而且向驱动程序返回终于结果(同一时候还有一个并行的reduceByKey操作能够返回一个分布数据集)。

在Spark全部的转化操作都是惰性求值的,就是说它们并不会立马真的计算出结果。相反,它们仅仅是记录下了转换操作的操作对象(比方:一个文件)。仅仅有当一个启动操作被执行,要向驱动程序返回结果时。转化操作才会真的開始计算。

这种设计使得Spark执行更加高效——比方,我们会发觉由map操作产生的数据集将会在reduce操作中用到,之后仅仅是返回了reduce的终于的结果而不是map产生的庞大数据集。

在默认情况下,每个由转化操作得到的RDD都会在每次执行启动操作时又一次计算生成。可是,你也能够通过调用persist(或cache)方法来将RDD持久化到内存中。这样Spark就能够在下次使用这个数据集时高速获得。

Spark相同提供了对将RDD持久化到硬盘上或在多个节点间复制的支持。

基本操作

为了演示RDD的基本操作。请看下面的简单程序:

lines = sc.textFile(“data.txt”)
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行定义了一个由外部文件产生的基本RDD。

这个数据集不是从内存中加载的也不是由其它操作产生的;lines仅仅是一个指向文件的指针。第二行将lineLengths定义为map操作的结果。再强调一次,由于惰性求值的缘故,lineLengths并不会被马上计算得到。最后,我们执行了reduce操作。这是一个启动操作。

从这个操作開始,Spark将计算过程划分成很多任务并在多机上执行,每台机器执行自己部分的map操作和reduce操作,终于将自己部分的运算结果返回给驱动程序。

假设我们希望以后反复使用lineLengths,仅仅需在reduce前加入下面这行代码:

lineLengths.persist()

这条代码将使得lineLengths在第一次计算生成之后保存在内存中。

向Spark传递函数

Spark的API严重依赖于向驱动程序传递函数作为參数。有三种推荐的方法来传递函数作为參数。

  1. Lambda表达式,简单的函数能够直接写成一个lambda表达式(lambda表达式不支持多语句函数和无返回值的语句)。
  2. 对于代码非常长的函数。在Spark的函数调用中在本地用def定义。
  3. 模块中的顶级函数。

比方,传递一个无法转化为lambda表达式长函数,能够像下面代码这样:

“MyScript.py”“”
if __name__ == “__main__”:
def myFunc(s):
words = s.split(” “)
return len(words)
sc = SparkContext(…)
sc.textFile(“file.txt”).map(myFunc)

值得指出的是。也能够传递类实例中方法的引用(与单例对象相反),这种传递方法会将整个对象传递过去。比方。考虑下面代码:

class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)

在这里,假设我们创建了一个新的MyClass对象,然后对它调用doStuff方法,map会用到这个对象中func方法的引用,所以整个对象都须要传递到集群中。

还有还有一种相似的写法,訪问外层对象的数据域会传递整个对象的引用:

class MyClass(object):
def __init__(self):
self.field = “Hello”
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + x)

此类问题最简单的避免方法就是,使用一个本地变量缓存一份这个数据域的拷贝,直接訪问这个数据域:

def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + x)

使用键值对

尽管大部分Spark的RDD操作都支持全部种类的对象,可是有少部分特殊的操作仅仅能作用于键值对类型的RDD。这类操作中最常见的就是分布的shuffle操作。比方将元素通过键来分组或聚集计算。

在Python中。这类操作一般都会使用Python内建的元组类型,比方(1, 2)。

它们会先简单地创建相似这种元组,然后调用你想要的操作。

比方,一下代码对键值对调用了reduceByKey操作,来统计每一文本行在文本文件里出现的次数:

lines = sc.textFile(“data.txt”)
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我们还能够使用counts.sortByKey(),比方,当我们想将这些键值对依照字母表顺序排序,然后调用counts.collect()方法来将结果以对象列表的形式返回。

转化操作

下面的表格列出了Spark支持的经常使用转化操作。欲知细节。请查阅RDD API文档(Scala, Java, Python)和键值对RDD函数文档(Scala, Java)。

(译者注:这部分翻译比較简略。仅供简单參考。详细细节请看文档)

转化操作 | 作用
————| ——
map(func) | 返回一个新的分布数据集。由原数据集元素经func处理后的结果组成
filter(func) | 返回一个新的数据集,由传给func返回True的原数据集元素组成
flatMap(func) | 与map相似。可是每个传入元素可能有0或多个返回值,func能够返回一个序列而不是一个值
mapParitions(func) | 相似map。可是RDD的每个分片都会分开独立执行,所以func的參数和返回值必须都是迭代器
mapParitionsWithIndex(func) | 相似mapParitions,可是func有两个參数,第一个是分片的序号。第二个是迭代器。返回值还是迭代器
sample(withReplacement, fraction, seed) | 使用提供的随机数种子取样。然后替换或不替换
union(otherDataset) | 返回新的数据集。包括原数据集和參数数据集的全部元素
intersection(otherDataset) | 返回新数据集,是两个集的交集
distinct([numTasks]) | 返回新的集。包括原集中的不反复元素
groupByKey([numTasks]) | 当用于键值对RDD时返回(键。值迭代器)对的数据集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 用于键值对RDD时返回(K,U)对集,对每个Key的value进行聚集计算
sortByKey([ascending], [numTasks])用于键值对RDD时会返回RDD按键的顺序排序。升降序由第一个參数决定
join(otherDataset, [numTasks]) | 用于键值对(K, V)和(K, W)RDD时返回(K, (V, W))对RDD
cogroup(otherDataset, [numTasks]) | 用于两个键值对RDD时返回
(K, (V迭代器。 W迭代器))RDD
cartesian(otherDataset) | 用于T和U类型RDD时返回(T, U)对类型键值对RDD
pipe(command, [envVars]) | 通过shell命令管道处理每个RDD分片
coalesce(numPartitions) | 把RDD的分片数量减少到參数大小
repartition(numPartitions) | 又一次打乱RDD中元素顺序并又一次分片,数量由參数决定
repartitionAndSortWithinPartitions(partitioner) | 依照參数给定的分片器又一次分片。同一时候每个分片内部依照键排序

启动操作

下面的表格列出了Spark支持的部分经常使用启动操作。欲知细节,请查阅RDD API文档(Scala, Java, Python)和键值对RDD函数文档(Scala, Java)。


(译者注:这部分翻译比較简略。仅供简单參考,详细细节请看文档)
启动操作 | 作用
————| ——
reduce(func) | 使用func进行聚集计算,func的參数是两个,返回值一个,两次func执行应当是全然解耦的,这样才干正确地并行运算
collect() | 向驱动程序返回数据集的元素组成的数组
count() | 返回数据集元素的数量
first() | 返回数据集的第一个元素
take(n) | 返回前n个元素组成的数组
takeSample(withReplacement, num, [seed]) | 返回一个由原数据集中随意num个元素的suzuki,而且替换之
takeOrder(n, [ordering]) | 返回排序后的前n个元素
saveAsTextFile(path) | 将数据集的元素写成文本文件
saveAsSequenceFile(path) | 将数据集的元素写成序列文件,这个API仅仅能用于Java和Scala程序
saveAsObjectFile(path) | 将数据集的元素使用Java的序列化特性写到文件里。这个API仅仅能用于Java和Scala程序
countByCount() | 仅仅能用于键值对RDD,返回一个(K, int) hashmap。返回每个key的出现次数
foreach(func) | 对数据集的每个元素执行func, 通经常使用于完毕一些带有副作用的函数,比方更新累加器(见下文)或与外部存储交互等

RDD持久化

Spark的一个重要功能就是在将数据集持久化(或缓存)到内存中以便在多个操作中反复使用。当我们持久化一个RDD是,每个节点将这个RDD的每个分片计算并保存到内存中以便在下次对这个数据集(或者这个数据集衍生的数据集)的计算中能够复用。

这使得接下来的计算过程速度能够加快(经常能加快超过十倍的速度)。缓存是加快迭代算法和高速交互过程速度的关键工具。

你能够通过调用persist或cache方法来标记一个想要持久化的RDD。

在第一次被计算产生之后,它就会始终停留在节点的内存中。

Spark的缓存是具有容错性的——假设RDD的随意一个分片丢失了,Spark就会依照这个RDD产生的转化过程自己主动重算一遍。

另外,每个持久化的RDD都有一个可变的存储级别,这个级别使得用户能够改变RDD持久化的储存位置。比方,你能够将数据集持久化到硬盘上。也能够将它以序列化的Java对象形式(节省空间)持久化到内存中,还能够将这个数据集在节点之间复制,或者使用Tachyon将它储存到堆外。

这些存储级别都是通过向persist()传递一个StorageLevel对象(Scala, Java, Python)来设置的。

存储级别的全部种类请见下表:

注意:在Python中,储存的对象永远是通过Pickle库序列化过的。所以设不设置序列化级别不会产生影响。

Spark还会在shuffle操作(比方reduceByKey)中自己主动储存中间数据,即使用户没有调用persist

这是为了防止在shuffle过程中某个节点出错而导致的全盘重算。只是假设用户打算复用某些结果RDD,我们仍然建议用户对结果RDD手动调用persist,而不是依赖自己主动持久化机制。

应该选择哪个存储级别?

Spark的存储级别是为了提供内存使用与CPU效率之间的不同取舍平衡程度。我们建议用户通过考虑下面流程来选择合适的存储级别:

  1. 假设你的RDD非常适合默认的级别(MEMORY_ONLY),那么久使用默认级别吧。这是CPU最高效执行的选择,能够让RDD上的操作以最高速度执行。
  2. 否则。试试MEMORY_ONLY_SER选项而且选择一个快的序列化库来使对象的空间利用率更高,同一时候尽量保证訪问速度足够快。
  3. 不要往硬盘上持久化,除非重算数据集的过程代价确实非常昂贵,或者这个过程过滤了巨量的数据。否则,又一次计算分片有可能跟读硬盘速度一样快。

  4. 假设你希望高速的错误恢复(比方用Spark来处理web应用的请求),使用复制级别。全部的存储级别都提供了重算丢失数据的完整容错机制。可是复制一份副本能省去等待重算的时间。
  5. 在大内存或多应用的环境中,处于实验中的OFF_HEAP模式有诸多长处:
    (1)这个模式同意多个执行者共享Tachyon中的同一个内存池
    (2)这个模式显著减少了垃圾回收的花销。
    (3)在某一个执行者个体崩溃之后缓存的数据不会丢失。

删除数据

Spark会自己主动监视每个节点的缓存使用同一时候使用LRU算法丢弃旧数据分片。假设你想手动删除某个RDD而不是等待它被自己主动删除。调用RDD.unpersist()方法。

共享变量

通常情况下,当一个函数传递给一个在远程集群节点上执行的Spark操作(比方mapreduce)时,Spark会对涉及到的变量的全部副本执行这个函数。这些变量会被复制到每个机器上,而且这个过程不会被反馈给驱动程序。

通常情况下,在任务之间读写共享变量是非常低效的。可是。Spark仍然提供了有限的两种共享变量类型用于常见的使用场景:广播变量和累加器。

广播变量

广播变量同意程序猿在每台机器上保持一个仅仅读变量的缓存而不是将一个变量的拷贝传递给各个任务。它们能够被使用,比方,给每个节点传递一份大输入数据集的拷贝是非常低效的。Spark试图使用高效的广播算法来分布广播变量。以此来减少通信花销。

能够通过SparkContext.broadcast(v)来从变量v创建一个广播变量。

这个广播变量是v的一个包装,同一时候它的值能够功过调用value方法来获得。下面的代码展示了这一点:

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]

在广播变量被创建之后,在全部函数中都应当使用它来取代原来的变量v,这样就能够保证v在节点之间仅仅被传递一次。另外,v变量在被广播之后不应该再被改动了,这样能够确保每个节点上储存的广播变量的一致性(假设这个变量后来又被传输给一个新的节点)。

累加器

累加器是在一个相关过程中仅仅能被”累加”的变量,对这个变量的操作能够有效地被并行化。它们能够被用于实现计数器(就像在MapReduce过程中)或求和运算。Spark原生支持对数字类型的累加器,程序猿也能够为其它新的类型加入支持。累加器被以一个名字创建之后,会在Spark的UI中显示出来。这有助于了解计算的累进过程(注意:眼下Python中不支持这个特性)。

能够通过SparkContext.accumulator(v)来从变量v创建一个累加器。在集群中执行的任务随后能够使用add方法或+=操作符(在Scala和Python中)来向这个累加器中累加值。可是。他们不能读取累加器中的值。仅仅有驱动程序能够读取累加器中的值。通过累加器的value方法。

下面的代码展示了向一个累加器中累加数组元素的过程:

>>> accum = sc.accumulator(0)
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
…
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
10

这段代码利用了累加器对int类型的内建支持,程序猿能够通过继承AccumulatorParam类来创建自己想要的类型支持。

AccumulatorParam的接口提供了两个方法:zero'用于为你的数据类型提供零值;'addInPlace'用于计算两个值得和

比方。假设我们有一个Vector类表示数学中的向量,我们能够这样写:

class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)
def addInPlace(self, v1, v2):
v1 += v2
return v1
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(…), VectorAccumulatorParam())

累加器的更新操作仅仅会被执行一次,Spark提供了保证。每个任务中对累加器的更新操作都仅仅会被执行一次。

比方,重新启动一个任务不会再次更新累加器。在转化过程中。用户应该留意每个任务的更新操作在任务或作业又一次运算时是否被执行了超过一次。

累加器不会该别Spark的惰性求值模型。假设累加器在对RDD的操作中被更新了,它们的值仅仅会在启动操作中作为RDD计算过程中的一部分被更新。所以,在一个懒惰的转化操作中调用累加器的更新,并没法保证会被及时执行。

下面的代码段展示了这一点:

accum = sc.accumulator(0)
data.map(lambda x => acc.add(x); f(x))
# Here, acc is still 0 because no actions have cause the `map` to be computed.

在集群上部署

这个应用提交指南描写叙述了一个应用被提交到集群上的过程。简而言之,仅仅要你把你的应用打成了JAR包(Java/Scala应用)或.py文件的集合或.zip压缩包(Python应用)。bin/spark-submit脚本会将应用提交到随意支持的集群管理器上。

单元測试

Spark对单元測试是友好的,能够与不论什么流行的单元測试框架相容。你仅仅须要在測试中创建一个SparkContext,并如前文所述将master的URL设为local。执行你的程序,最后调用SparkContext.stop()来终止执行。请确保你在finally块或測试框架的tearDown方法中终止了上下文,由于Spark不支持两个上下文在一个程序中同一时候执行。

从1.0之前版本号的Spark迁移

Spark1.0冻结了1.X系列Spark的核心API。如今版本号中没有标注”experimental”或是”developer API”的API在未来的版本号中仍会被支持。对Python用户来说唯一的变化就是组管理操作,比方groupByKey, cogroup, join, 它们的返回值都从(键,值列表)对变成了(键。 值迭代器)对。

你还能够阅读Spark Streaming, MLlib和GraphX的迁移指南。

还有什么要做的

你能够在Spark的站点上看到很多其它的Spark例子程序。另外,在examples文件夹下还有很多例子代码(Scala, Java, Python)。

你能够通过将类名称传给Spark的bin/run-example 脚本来执行Java和Scala语言例子,举例说明:

./bin/run-example SparkPi

对于Python例子,使用spark-submit脚本取代:

./bin/spark-submit examples/src/main/python/pi.py

为了给你优化代码提供帮助,配置指南和调优指南提供了关于最佳实践的一些信息。

确保你的数据储存在以高效的格式储存在内存中,这非常重要。为了给你部署应用提供帮助。集群模式概览描写叙述了很多内容,包括分布式操作和支持的集群管理器。

最后,完整的API文档在这里:

Scala版本号:https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.package
Java版本号:https://spark.apache.org/docs/latest/api/java/
Python版本号:https://spark.apache.org/docs/latest/api/python/

原文地址:https://www.cnblogs.com/wgwyanfs/p/7248649.html