Spark RDD中两种算子之一:常见Action算子小结

**RDD:**弹性分布式数据集,是一种特殊集合,支持多来源,有容错机制,可以被缓存,支持并行操作,一个RDD代表多个分区里的数据集。

RDD有两种算子:
1.Transformation(转换):属于延迟Lazy计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住数据集的逻辑操作;
2.Action(执行):触发Spark作业运行,真正触发转换算子的计算;
RDD中算子的运行过程:
输入:
在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
运行:
在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业,。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
输出:
程序运行结束,数据会输出Spark运行时的空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)

常见Action算子(代码统一写在最下面):

reduce(func):通过函数func聚集集合中的所有的元素。func函数接收2个同构的元素,返回一个值。这个函数必须是关联性的,确保可以被正确地并发执行。这个算子不像reduceByKey一样通过key进行分组,所以其是一个全量的操作。

collect():在Driver的程序中,以数组的形式,返回数据集的所有元素。但是,请注意,这个只能在返回一个较小的数据子集时才能使用,不然会很容易导致OOM异常。

count():返回数据集的元素个数(Long类型的数)。

take(n):返回数据集中前n(Int类型)个元素组成的一个数组。注意,这个操作并不在多个节点上运行,而是在Driver所在的节点上。如果要拿到的数据量较大,尽量不要使用该算子,会导致Driver所在节点压力过大。

first():返回数据集的第一个元素(类似于take(1))。

saveAsTextFile(path):将数据集中的所有元素以textfile的格式保存到本地,hdfs等文件系统中的指定目录下。Spark会调用toString()方法将每一个元素转换为一行文本保存。

saveAsSequenceFile(path):将数据集中的所有元素以sequencefile的格式保存到本地,hdfs等文件系统中的指定的目录下。但是,这种方法需要RDD的元素必须是key-value对组成,并实现Writable接口或隐性可以转换为Writable(Spark中的基本类型包含了该转换)。

foreach(func):在数据集中的每一个元素,运行函数func。

countByKey:和reduceByKey效果相同,只是reduceByKey是一个Transformation算子。

代码实现:

package com.aura.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ActionTest {

def main(args: Array[String]): Unit = {

//使最后不输出日志,便于看结果
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.spark-project").setLevel(Level.WARN)

/**
* 建立SparkContext
*
* setMaster()参数说明:
* Master URL配置
* 说明:Master URL代表的含义是Spark作业运行的方式
* 本地模式(local):--Spark作业在本地运行(Spark Driver和Executor在本地运行)
* local :为当前的Spark作业分配一个工作线程
* local[M] :为当前的Spark作业分配M个工作线程
* local[*] :为当前的Spark作业分配当前计算集中所有可用的工作线程
* local[M,N]:为当前的Spark作业分配M个工作线程,如果Spark作业提交失败会进行最多N次的重试
*/
val conf = new SparkConf()
.setAppName(s"${ActionTest.getClass.getSimpleName}")
.setMaster("local[*]")
val sc = new SparkContext(conf)
//调用下面的方法
action_t(sc)
sc.stop()
}

def action_t(sc:SparkContext): Unit ={
val list = List(
"1,dashi",
"2,ersha",
"3,sansha",
"4,sisha"
)
val list_rdd:RDD[String] = sc.parallelize(list)
val list_map:RDD[(Int,String)] = list_rdd.map(line =>{
val tup = line.split(",")
(tup(0).toInt,tup(1))
})

//reduce
val list_red:(Int,String) = list_map.reduce((l1,l2) => {
val k = l1._1 + l2._1
val v = l1._2 + "_" + l2._2
(k,v)
})
println("----reduce算子的结果:")
println(list_red)

//collect
//因为如果collect得到的数据量太大会导致OOM异常,所以为更提醒读者,在这里假装先使用filter(func)进行过滤操作
val list_fil:RDD[(Int,String)] = list_map.filter(_._1 > 2)
//其实,我们操作时下面的操作常写成一行代码:
//list_fil.collect().foreach(t => println(t + " "))
val list_col:Array[(Int,String)] = list_fil.collect()
println("----collect算子的结果:")
list_col.foreach(t => println(t + " "))

//count
val coun:Long = list_map.count()
println("----count算子的结果:")
println("list_map中的元素个数为:" + coun)

//take
val list_take:Array[(Int,String)] = list_map.take(2)
println("----take算子的结果:")
print("取list_map中的前两个元素:")
list_take.foreach(t => println(t + " "))

//first(类似于take(1))
val list_fir:(Int, String) = list_map.first()
println("----first算子的结果:")
print("取list_map中的第一个元素:" + list_fir)

//saveAsTextFile和saveAsSequenceFile用法一样,用saveAsTextFile为例
/**
* 保存至本地文件写法为:file:///D:/dasha.txt
* 在项目导入hdfs-site.xml和core-site.xml文件后,为了和hdfs区别必须加file:///
* 若没有,则可直接写D:/dasha.txt
*/
println("----saveAsTextFile算子没有返回值")
//list_map.saveAsTextFile("hdfs://dfs01/data/dasha.txt")

//foreach
//该算子较多的是用于上面的代码,例如: list_take.foreach(t => println(t + " "))
//同样它也可以对集合中的每一个元素进行相应的func运算
var count = 1
val list_for:Unit = list.foreach(line => {
val words = line.split(",")
val tup = (words(1),words(0))
println("将list中的第" + count + "元素互换位置:" + tup)
count += 1
})
}

运行结果:

----reduce算子的结果:
(10,ersha_dashi_sansha_sisha)
----collect算子的结果:
(3,sansha)
(4,sisha)
----count算子的结果:
list_map中的元素个数为:4
----take算子的结果:
取list_map中的前两个元素:(1,dashi)
(2,ersha)
----first算子的结果:
取list_map中的第一个元素:(1,dashi)----saveAsTextFile算子没有返回值
将list中的第1元素互换位置:(dashi,1)
将list中的第2元素互换位置:(ersha,2)
将list中的第3元素互换位置:(sansha,3)
将list中的第4元素互换位置:(sisha,4)
————————————————
版权声明:本文为CSDN博主「Mr.Liq」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_44319333/article/details/88818907

原文地址:https://www.cnblogs.com/javalinux/p/15073871.html