RDD编程

RDD创建

每个RDD被分为多个分区,这些分区运行在集群的不同节点上。

用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里driver分发驱动器程序中的对象集合(list和set)。textFile()和parallelize().

RDD支持两种类型的操作:转换操作和行动操作。惰性计算,只有行动操作才真正计算。

RDD持久化,RDD.persist()让Spark把这个RDD缓存在内存中,在之后的行动操作中,可以重用这些数据。也可以把RDD缓存到磁盘中。

两种读取数据的方式
lines = sc.parallelize(["asda","asdsa"])//创建RDD的parallelize方法,python
JavaRDD<String> lines = sc.parallelize(Arrays.asList("",""));//Java的parallelize方法

更常用的是从外部存储中读取数据创建RDD。

lines = sc.textFile("")//python
val lines = sc.textFile("")//scala
JavaRDD<String> lines = sc.textFile("ssa");//Java

RDD操作

//python
errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lamdda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)//两个RDD合并操作

Spark会使用lineage graph(DAG)来表示RDD之间的依赖关系。

用take()来收集RDD的元素,提取badLinesRDD的10个元素。

for line in badLinesRDD.take(10):
    print line

collect()函数获取整个RDD的数据

把数据写入HDFS和Amazon S3,可以使用saveAsTextFile()、saveAsSequenceFile()

转换操作传入函数

在Python中,有三种方式把函数传递给Spark:

  1. lambda表达式,大多使用lambda
  2. 顶层函数
  3. 定义的局部函数
word = rdd.filter(lambda x: "error" in x)//lambda

def containsError(s):
    return "error" in s
word = rdd.filter(containError)//局部函数

在Scala中,可以把定义的内联函数、方法的引用或静态方法传递给Spark。传递的函数及其引用的数据需要是可序列化的。

在Java中,传递org.apache.spark.api.java.function中的函数接口对象。

转换函数

filter() map() flatMap() groupByKey() reduceByKey()

map(),接收一个函数,把这个函数用于RDD中的每个元素,把函数的返回结果作为RDD元素对应的值。返回值的类型和输入值的类型不需要一样。

filter(),接收一个函数,把RDD中满足该函数的元素放入新的RDD中返回。

flatMap(),返回一个包含各个迭代器可访问的所有元素的RDD,相当于执行map()然后flat拍扁。(map返回一行一行,flatMap返回一个词一个词)

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()//返回hello
  • groupByKey(),根据相同的键,分组,返回[key,iterable(value)]
  • reduceByKey(func),相同的键,把value加起来,返回[key,value],函数func,对v操作,相加或相乘等
  • RDD.distinct()生成一个只包含不同元素的新RDD,开销很大,需要shuffle。
  • union()两个RDD合并成一个;intersection()只返回两个RDD都有的元素。
  • subtract(other)返回只存在第一个RDD不存在第二个RDD中所有元素组成的RDD,也要shuffle。
如果缓存数据太多,内存放不下,Spark会用LRU策略把最老的分区从内存中移除。写入磁盘。

行动操作

count() collect() first() take(n) reduce(func) foreach(func)等,遇到行动操作要开始计算。

  • count(),返回数据集中元素个数
  • reduce(func),接收一个函数作为参数,操作RDD的元素数据并返回相同类型的结果元素
rdd=[1,2,3,4,5]
sum = rdd.reduce(lambda x, y : x + y)
//sum = 15
  • collect(),以数组的形式返回数据集的所哟u元素,即把RDD内容返回
  • first(),返回第一个元素
  • take(n)返回RDD的n个元素
  • foreach(func)对RDD中的每个元素进行函数func操作

持久化

避免多次计算同一个RDD,可以对数据进行持久化,计算出RDD的节点会分别保存它们分区数据,在scala和Java中,persist()默认把数据缓存在JVM堆空间。unpersist()持久化RDD从缓存中移除。

  • persist()不会马上持久化,遇到行动操作了,才会持久化。持久化后的RDD会被保留在计算节点的内存中,被后面的行动操作反复使用。
  • 策略为LRU。
rdd.cache()//会调用persist(MEMORY_ONLY)
原文地址:https://www.cnblogs.com/chenshaowei/p/12402250.html