Spark:常用transformation及action,spark算子详解

一、常用transformation介绍

操作 介绍
map 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD
filter 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除
flatMap 与map类似,但是对每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable<value>
reduceByKey 对每个Key对应的value进行reduce操作
sortByKey 对每个key对应的value进行排序操作
join 对两个包含<key,value>的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理
cogroup 同join,但是每个key对应的Itreable<value>都会传入自定义函数进行处理

1.1 transformation操作实例

Spark练习之Transformation操作开发

二、常用action介绍

操作 介绍
reduce 将RDD中的所有元素进行聚合操作,第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推
collect 将RDD中所有元素获取到本地客户端
count 获取RDD元素总数
take(n) 获取RDD中前n个元素
saveAsTextFile 将RDD元素保存到文件中,对每个元素调用toString方法
countByKey 对每个key对应的值进行count计数
foreach 遍历RDD中的每个元素

2.1 action操作实例

Spark练习之action操作开发

三、spark算子详解

3.1弹性分布式数据集 (RDD)

spark中很重要的一个概念RDD(弹性分布式数据集)它是可以执行并行操作且跨集群节点的元素的集合。RDD 可以从一个 Hadoop 文件系统(或者任何其它 Hadoop 支持的文件系统),或者一个在 driver program(驱动程序)中已存在的 Scala 集合,以及通过 transforming(转换)来创建一个 RDD。用户为了让它在整个并行操作中更高效的重用,也许会让 Spark persist(持久化)一个 RDD 到内存中。最后,RDD 会自动的从节点故障中恢复。

RDDs support 两种类型的操作: transformations(转换), 它会在一个已存在的 dataset 上创建一个新的 dataset, 和 actions(动作), 将在 dataset 上运行的计算后返回到 driver 程序. 例如, map 是一个通过让每个数据集元素都执行一个函数,并返回的新 RDD 结果的 transformation, reducereduce 通过执行一些函数,聚合 RDD 中所有元素,并将最终结果给返回驱动程序(虽然也有一个并行 reduceByKey 返回一个分布式数据集)的 action.

Spark 中所有的 transformations 都是 lazy(懒加载的), 因此它不会立刻计算出结果. 相反, 他们只记得应用于一些基本数据集的转换 (例如. 文件). 只有当需要返回结果给驱动程序时,transformations 才开始计算. 这种设计使 Spark 的运行更高效. 例如, 我们可以了解到,map 所创建的数据集将被用在 reduce 中,并且只有 reduce 的计算结果返回给驱动程序,而不是映射一个更大的数据集.

默认情况下,每次你在 RDD 运行一个 action 的时, 每个 transformed RDD 都会被重新计算。但是,您也可用 persist (或 cache) 方法将 RDD persist(持久化)到内存中;在这种情况下,Spark 为了下次查询时可以更快地访问,会把数据保存在集群上。此外,还支持持续持久化 RDDs 到磁盘,或复制到多个结点。

3.2Spark 算子大致可以分为以下两类

3.2.1Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理

Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,可以理解为懒加载,需要等到有 Action 操作的时候才会真正触发运算。

3.2.2Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业

Action 算子会触发 Spark 提交作业(Job),spark job的划分就是依据action算子.

很多spark的初学者都会对这样的设计感到疑惑,为什么要这么呢,这样岂不是很麻烦,很复杂吗?,其实,这正是spark设计的精髓之处.

比如我要对一个RDD进行count统计里面有多少条记录,然后我又想filter里面包含jason的信息有多少条,实际上在第一次的时候不需要把所有的数据都加载到RDD里面,而在filter的时候只加载符合条件的那一部分即可,这样就会快很多。

3.2.3区分transformation算子和action算子

transformation算子一定会返回一个rdd,action大多没有返回值,也可能有返回值,但是一定不是rdd.

package test
 
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext}
 
/**
  * spark的RDD的算子详解;
  */
object rddDemo {
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("localTest").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("D:\test.txt",2)
    //因为一些算子只能对PairRDD操作,所以在这我就直接转成PairRDD了
    val pair_rdd = rdd.flatMap(_.split(" ")).map((_,1))
    //---------------------------------------------transformation算子----------------------------------------
 
    /**
      * 1.map(func):返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数
      * func 来生成对RDD每个元素转换
      */
    val map_rdd = pair_rdd.map(x=>(x._1,x._2*2))
 
    /**
      * 2.filter(func):返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func
      * 且返回值为 true 的元素来生成.
      */
    val filter_rdd = pair_rdd.filter(x=> x._1.equals("hello"))
 
    /**
      * 3.flatMap(func):与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq
      * 而不是一个单独的 item)对RDD每个元素转换, 然后再扁平化(即将所有对象合并为一个对象)
      * 相当于先map然后flat.
      */
    val flatMap_rdd = pair_rdd.flatMap(_._1.split(" "))
 
    /**
      * 4.mapPartitions(func):与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,
      * 所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型.
      */
    val mapPartitions_rdd = pair_rdd.mapPartitions(testMapPartition)
 
    /**
      * 5.mapPartitionsWithIndex(func):与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的
      * interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型.
      */
    val mapPartitionsWithIndex_rdd = pair_rdd.mapPartitionsWithIndex((x,it)=>{
      var result = List[Int]()
      var i = 0
      while(it.hasNext){
        i += it.next()._2
      }
      result.::(x + "|" + i).iterator
    })
 
    /**
      * 6.sample(withReplacement, fraction, seed):样本数据,设置是否放回(withReplacement), 采样的百分比(fraction)
      * 使用指定的随机数生成器的种子(seed).
      * withReplacement:元素可以多次抽样(在抽样时替换)
      * fraction:期望样本的大小作为RDD大小的一部分,
      * 当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
      * 当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
      * seed:随机数生成器的种子
      */
    val sample_rdd = pair_rdd.sample(true,0.5)
 
    /**
      * 7.union(otherDataset):返回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集.
      */
    val union_rdd = pair_rdd.union(map_rdd)
 
    /**
      * 8.intersection(otherDataset):返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集.
      */
    val intersection_rdd = pair_rdd.intersection(pair_rdd)
 
    /**
      * 9.distinct([numTasks])):返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素.
      */
    val distinct_rdd = pair_rdd.distinct()
 
    /**
      * 10.groupByKey([numTasks]):在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) .
      * Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好.
      * Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数.
      */
    val groupByKey_rdd = pair_rdd.groupByKey().map(x=>(x._1,x._2.size))
 
    /**
      * 11.reduceByKey(func, [numTasks]):在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset,
      * 其中的 values 是针对每个 key使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样,
      * reduce tasks 的数量是可以通过第二个可选的参数来配置的.
      */
    val reduceByKey_rdd = pair_rdd.reduceByKey(_+_)
 
    /**
      * 12.aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]):在 (K, V) pairs 的 dataset 上调用时, 返回 (K, U) pairs 的 dataset,
      * 其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的. 允许聚合值的类型与输入值的类型不一样,
      * 同时避免不必要的配置. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的.
      */
    val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))
    val rdd1 = sc.parallelize(data)
    val aggregateByKey_rdd : RDD[(Int,Int)] = rdd1.aggregateByKey(0)(
      math.max(_,_),
      _+_
    )
 
    /**
      * 13.sortByKey([ascending], [numTasks]):在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys
      * 升序或降序的 (K, V) pairs 的 dataset, 由 boolean 类型的 ascending 参数来指定.
      */
    val sortByKey_rdd = pair_rdd.sortByKey(true)
 
    /**
      * 14.join(otherDataset, [numTasks]):在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,
      * 它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoin 和 fullOuterJoin 来实现.
      */
    val join_rdd = pair_rdd.join(map_rdd)
 
    /**
      * 15.cogroup(otherDataset, [numTasks]):在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples
      * 的 dataset. 这个操作也调用了 groupWith.
      */
    val cogroup_rdd = pair_rdd.cogroup(map_rdd,4)
 
    /**
      * 16.cartesian(otherDataset):在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积).
      */
    val cartesian_rdd = pair_rdd.cartesian(map_rdd)
 
    /**
      * 18.coalesce(numPartitions):Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的.
      */
    val coalesce_rdd = pair_rdd.coalesce(1)
 
    /**
      * 19.repartition(numPartitions):Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀.
      * 该操作总是通过网络来 shuffles 所有的数据.
      */
    val repartition_rdd = pair_rdd.repartition(10)
 
    /**
      * 20.repartitionAndSortWithinPartitions:根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。
      * 这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行.
      */
    val repartitionAndSortWithinPartitions_rdd = pair_rdd.zipWithIndex().repartitionAndSortWithinPartitions(new HashPartitioner(4))
    repartitionAndSortWithinPartitions_rdd.foreachPartition(pair=>{
      println("第几个分区-------------" + TaskContext.get.partitionId)
      pair.foreach(p=>{
        println(p._1 +"---------" +p._2)
      })
    })
 
    //--------------------------------------------Action算子---------------------------------------
 
    /**
      * 1.reduce(func):使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )
      * 和关联(associative)的,这样才能保证它可以被并行地正确计算.
      */
    val reduce_rdd = pair_rdd.reduce((a,b) =>{
      (a._1+ "---" +b._1,a._2+b._2)
    })
 
    /**
      * 2.collect():在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)
      * 之后返回足够小(sufficiently small)的数据子集通常是有用的.
      */
    val collect_ = pair_rdd.collect()
 
    /**
      * 3.count():返回 dataset 中元素的个数.
      */
    val count_ = pair_rdd.count()
 
    /**
      * 4.first():返回 dataset 中的第一个元素类似于 take(1).
      */
    val first_ = pair_rdd.first()
 
    /**
      * 5.take(n):将数据集中的前 n 个元素作为一个 array 数组返回.
      */
    val take_ = pair_rdd.take(2)
 
    /**
      * 6.takeSample(withReplacement, num, [seed]):对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,
      * 参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子.
      */
    val takeSample_ = pair_rdd.takeSample(true,5)
 
    /**
      * 7.takeOrdered(n, [ordering]):返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素
      */
    val takeOrdered_ = pair_rdd.takeOrdered(3)
 
    /**
      * 8.saveAsTextFile(path):将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop
      * 支持的文件系统中的给定目录中。
      * Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录.
      */
 
    /**
      * 9.saveAsTextFile(path):将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop
      * 支持的文件系统中的给定目录中。
      * Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录.
      */
    val saveAsTextFile_ = pair_rdd.saveAsTextFile("D:\result1.txt")
 
 
    /**
      * 10.saveAsSequenceFile(path):将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS
      * 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)
      * 的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable
      * 的类型(Spark 包括了基本类型的转换,例如 Int, Double, String 等等).
      * (Java and Scala)
      */
    val saveAsSequenceFile_ = pair_rdd.saveAsSequenceFile("D:\result2.txt")
 
    /**
      * 11.saveAsObjectFile(path):使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,
      * 然后使用 SparkContext.objectFile() 进行加载.
      * (Java and Scala)
      */
    val saveAsObjectFile_ = pair_rdd.saveAsObjectFile("D:\result3.txt")
 
    /**
      * 12.countByKey():仅适用于(K,V)类型的 RDD 。返回具有每个 key 的计数的 (K , Int)pairs 的 hashmap.
      */
    val countByKey_ = pair_rdd.countByKey()
 
    /**
      * 13.foreach(func):对 dataset 中每个元素运行函数 func 。这通常用于副作用(side effects),例如更新一个 Accumulator(累加器)
      * 或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()之外的累加器以外的变量(variables)
      * 可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分.
      */
    val foreach_ = pair_rdd.foreach(println)
  }
 
  def testMapPartition(it:Iterator[(String,Int)]):Iterator[(String,Int)] = {
    var res = List[(String,Int)]()
    while (it.hasNext){
      val current = it.next()
      res = res .:: (current._1,current._2*2)
    }
    res.iterator
  }
}
原文地址:https://www.cnblogs.com/aixing/p/13327439.html