Spark快速大数据分析_3:第三章

第 3 章 RDD 编程

目录:

3.1 RDD基础

3.2 创建RDD

3.3 RDD操作

3.4 向spark传递函数

3.5 常见的RDD操作

3.6 持久化

3.1 RDD基础

RDD(Resilient Distributed Dataset):弹性分布式数据集,是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

RDD 可以包含 Python、Java、Scala 中任意类型的对象,甚至可以包含用户自定义的对象。

RDD 的转化操作都是惰性求值的,所以我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。

3.2 创建RDD

1、创建RDD的2种方式:

(1)读取一个外部数据集:eg 

lines = sc.textFile("README.md")

(2)SparkContext 的 parallelize() 方法,在驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set),eg:

lines = sc.parallelize(["pandas", "i like pandas"])

3.3 RDD操作

1、2种RDD操作:

(1)转化操作:由一个 RDD 生成一个新的 RDD,返回值类型:1个RDD

(2)行动操作:对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。返回值类型:其他数据类型

# 转化操作
>>> pythonLines = lines.filter(lambda line: "Python" in line)
# 行动操作
>>> pythonLines.first() u'## Interactive Python Shell'

2、转化操作

(1)返回一个新的RDD

(2)惰性计算,不会触发实际的计算,比如 map() 和 filter()

3、行动操作

(1)返回其他数据类型数据,会把结果返回给驱动器程序(eg:take()、collect() )或把结果写入外部系统(eg: saveAsTextFile()saveAsSequenceFile())

(2)会触发实际的计算,比如 count() 和 first()

4、惰性求值

(1)RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前 Spark 不会开始计算。

(2)优点:可以把一些操作合并到一起来减少计算数据的步骤。用户可以用更小的操作来组织他们的程序, 这样也使这些操作更容易管理。

5、持久化:RDD.persist()

因为RDD是惰性求值的,所以Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。

在第一次对持久化的 RDD 计算之后,Spark 会把 RDD 的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作中,就可以重用这些数据了。我们也可以把 RDD 缓存到磁盘上而不是内存中。

6、每个 Spark 程序或 shell 会话的工作流程

(1) 创建RDD:从外部数据创建出输入 RDD。

(2) 转化操作:使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。

(3) 持久化RDD:告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。

(4) 行动操作:使用行动操作(例如 count() 和 first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。

3.4  向spark传递函数

Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。在我们支持的三种主要语言中,向 Spark 传递函数的方式略有区别。

在 Python 中,我们有三种方式来把函数传递给 Spark。

(1)传递比较短的函数时,可以使用 lambda 表达式来传递

(2)传递顶层函数

(3)传递定义的局部函数

word = rdd.filter(lambda s: "error" in s)

def containsError(s):
    return "error" in s
word = rdd.filter(containsError)

3.5 常见的转化操作和行动操作

1、RDD类型及常见基本操作

(1)RDD类型:数字类型、键值对类型

(2)基本操作:适用于任何类型的RDD

(3)特殊类型的特殊操作:例如:数字类型的 RDD 支持统计型函数操作,而键值对形式的 RDD 则支持诸如根据键聚合数据的键值对操作

2、基本操作

(1)基本转化操作

表3-2:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作

函数名

目的

示例

结果

map()

将函数应用于 RDD 中的每个元素,将返回值构成新的 RDD

rdd.map(x => x + 1)

{2, 3, 4, 4}

flatMap()

将函数应用于 RDD 中的每个元素,将返回的迭代器的所有内容构成新的 RDD。通常用来切分单词

rdd.flatMap(x => x.to(3))

{1, 2, 3, 2, 3, 3, 3}

filter()

返回一个由通过传给 filter() 的函数的元素组成的 RDD

rdd.filter(x => x != 1)

{2, 3, 3}

distinct()

去重

rdd.distinct()

{1, 2, 3}

sample(withReplacement, fraction, [seed])

对 RDD 采样,以及是否替换

rdd.sample(false, 0.5)

非确定的

表3-3:对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作

函数名

目的

示例

结果

union()

生成一个包含两个 RDD 中所有元素的 RDD

rdd.union(other)

{1, 2, 3, 3, 4, 5}

intersection()

求两个 RDD 共同的元素的 RDD

rdd.intersection(other)

{3}

subtract()

移除一个 RDD 中的内容(例如移除训练数据)

rdd.subtract(other)

{1, 2}

cartesian()

与另一个 RDD 的笛卡儿积

rdd.cartesian(other)

{(1, 3), (1, 4), ...(3, 5)}

(2)基本行动操作

表3-4:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作

函数名

目的

示例

结果

collect()

返回 RDD 中的所有元素

rdd.collect()

{1, 2, 3, 3}

count()

RDD 中的元素个数

rdd.count()

4

countByValue()

各元素在 RDD 中出现的次数

rdd.countByValue()

{(1, 1),(2, 1),(3, 2)}

take(num)

从 RDD 中返回 num 个元素

rdd.take(2)

{1, 2}

top(num)

从 RDD 中 返回最前面的 num 个元素

rdd.top(2)

{3, 3}

takeOrdered(num)(ordering)

从 RDD 中按照提供的顺序返回最前面的 num 个元素

rdd.takeOrdered(2)(myOrdering)

{3, 3}

takeSample(withReplacement, num, [seed])

从 RDD 中返回任意一些元素

rdd.takeSample(false, 1)

非确定的

reduce(func)

并行整合 RDD 中 所有数据(例如 sum

rdd.reduce((x, y) => x + y)

9

fold(zero)(func)

和 reduce() 一样,但是需要提供初始值

rdd.fold(0)((x, y) => x + y)

9

aggregate(zeroValue)(seqOp, combOp)

和 reduce() 相似,但是通常返回不同类型的函数

 

rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))

(9,4)

foreach(func)

对 RDD 中的每个元素使用给定的函数

rdd.foreach(func)

3、如何转换RDD类型

有些函数只能用于特定类型的 RDD,比如 mean() 和 variance() 只能用在数值 RDD 上,而 join() 只能用在键值对 RDD 上。我们会在第 6 章讨论数值 RDD 的专门函数,在第 4 章讨论键值对 RDD 的专有操作。在 Scala 和 Java 中,这些函数都没有定义在标准的 RDD 类中,所以要访问这些附加功能,必须要确保获得了正确的专用 RDD 类。

Python 的 API 结构与 Java 和 Scala 有所不同。在 Python 中,所有的函数都实现在基本的 RDD 类中,但如果操作对应的 RDD 数据类型不正确,就会导致运行时错误。

3.6 持久化(缓存)

(1) persist() 方法可以让 Spark 对数据进行持久化。

(2)当我们让 Spark 持久化存储一个 RDD 时,计算出 RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

(3)我们在第一次对这个 RDD 调用行动操作前就调用了 persist() 方法。persist() 调用本身不会触发强制求值。

(4)RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除。

原文地址:https://www.cnblogs.com/hailin2018/p/13902533.html