RDD的转换操作

1.map、flatMap、distinct
  map说明:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
       输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。
  flatMap说明:同Map算子一样,最后将所有元素放到同一集合中;
  distinct说明:将RDD中重复元素做去重处理
    注意:针对Array[String]类型,将String对象视为字符串数组
    scala> val rdd =sc.textFile("/worldcount/test1.txt")
    rdd: org.apache.spark.rdd.RDD[String] = /worldcount/test1.txt MapPartitionsRDD[1] at textFile at <console>:24

    scala> val rdd1 = rdd.map(x=>x.split(" "))
    rdd1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:26

    scala> rdd1.collect
    res0: Array[Array[String]] = Array(Array(hello, world), Array(how, are, you?), Array(ni, hao), Array(hello, tom))

    scala> val rdd2 = rdd1.flatMap(x=>x)
    rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:28

    scala> rdd2.collect
    res1: Array[String ] = Array(hello, world, how, are, you?, ni, hao, hello, tom)

    scala> rdd2.flatMap(x=>x).collect
    res3: Array[Char] = Array(h, e, l, l, o, w, o, r, l, d, h, o, w, a, r, e, y, o, u, ?, n, i, h, a, o, h, e, l, l, o, t, o, m)

    scala> val rdd3 = rdd2.distinct
    rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at distinct at <console>:30

    scala> rdd3.collect
    res4: Array[String] = Array(are, tom, how, you?, hello, hao, world, ni)

2.coalesce和repartition:修改RDD分区数:重分区
  coalesce说明:将RDD的分区数进行修改,并生成新的RDD;有两个参数:第一个参数为分区数,第二个参数为shuffle Booleean类型,默认为false
         如果更改分区数比原有RDD的分区数小,shuffle为false;
         如果更改分区数比原有RDD的分区数大,shuffle必须为true;
  应用说明:一般处理filter或简化操作时,新生成的RDD中分区内数据骤减,可考虑重分区
    scala> val rdd4 = rdd.coalesce(1)
    rdd4: org.apache.spark.rdd.RDD[String] = CoalescedRDD[8] at coalesce at <console>:26

    scala> rdd4.partitions.size
    res10: Int = 1

    scala> val rdd5 = rdd.coalesce(5)
    rdd5: org.apache.spark.rdd.RDD[String] = CoalescedRDD[9] at coalesce at <console>:26

    scala> rdd5.partitions.size
    res12: Int = 2

    scala> val rdd5 = rdd.coalesce(5,true)
    rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at coalesce at <console>:26

    scala> rdd5.partitions.size
    res13: Int = 5

3.randomSplit:
  def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]
  说明:将RDD按照权重(weights)进行随机分配,返回指定个数的RDD集合;
  应用案例:Hadoop全排操作
    scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

    scala> val rdd1 = rdd.randomSplit(Array(0.5,2.5,7))
    rdd1: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[1] at randomSplit at <console>:26, MapPartitionsRDD[2] at randomSplit at <console>:26, MapPartitionsRDD[3] at randomSplit at     <console>:26)

    scala> rdd1(0).collect
    res0: Array[Int] = Array(1, 5)

    scala> rdd1(1).collect
    res1: Array[Int] = Array()

    scala> rdd1(2).collect
    res2: Array[Int] = Array(2, 3, 4, 6, 7)

4.glom
  说明:返回每个分区中的数据项
  scala>val a = sc.parallelize(1 to 100, 3)
  scala>a.glom.collect
5.union:并集
  说明:将两个RDD进行合并,不去重
  scala>val rdd = sc.parallelize(1 to 6)
  scala>val rdd1 = sc.parallelize(7 to 10)
  scala>val rdd2 =rdd.union(rdd1)
6.subtrat:差集
  val a = sc.parallelize(1 to 9, 3)
  val b = sc.parallelize(1 to 3, 3)
  val c = a.subtract(b)
  c.collect
  res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)
7.intersection:交集,去重
  val x = sc.parallelize(1 to 20)
  val y = sc.parallelize(10 to 30)
  val z = x.intersection(y)
  z.collect
  res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11)

8.mapPartitions
  说明:针对每个分区进行操作;
  应用:对RDD进行数据库操作时,需采用mapPartitions对每个分区实例化数据库连接conn对象;
  val a = sc.parallelize(1 to 9, 3)
  def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
    var res = List[(T, T)]()
    var pre = iter.next
    while (iter.hasNext)
    {
      val cur = iter.next;
      res .::= (pre, cur)
      pre = cur;
    }
    res.iterator
  }
  a.mapPartitions(myfunc).collect
  res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
9.mapPartitionsWithIndex
  val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
  def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
    iter.map(x => index + "," + x)
  }
  注意:iter: Iterator[Int]:Iterator[T]类型,应和RDD内部数据类型一致
  x.mapPartitionsWithIndex(myfunc).collect()
  res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)

10.zip
  def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
  说明:1.两个RDD之间数据类型可以不同;
     2.要求每个RDD具有相同的分区数
     3.需RDD的每个分区具有相同的数据个数
11.zipParititions
  要求:需每个RDD具有相同的分区数;
12.zipWithIndex
  def zipWithIndex(): RDD[(T, Long)]
  将现有的RDD的每个元素和相对应的Index组合,生成新的RDD[(T,Long)]
13.zipWithUniqueId
  def zipWithUniqueId(): RDD[(T, Long)]

  scala> val rdd = sc.parallelize(List(1,2,3,4,5),2)
  rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

  scala> rdd.glom.collect
  res25: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5))

  scala> val rdd2 = rdd.zipWithUniqueId()
  rdd2: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[23] at zipWithUniqueId at <console>:26

  scala> rdd2.collect
  res26: Array[(Int, Long)] = Array((1,0), (2,2), (3,1), (4,3), (5,5))
  计算规则:
    step1:第一个分区的第一个元素0,第二个分区的第一个元素1
    step2:第一个分区的第二个元素0+2
    step2:第二个分区的第二个元素1+2=3;第二个分区的第三个元素3+2=5;

14.reduceByKey
  def reduceByKey(func: (V, V) => V): RDD[(K, V)]
  说明:合并具有相同键的值
  val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
  val b = a.map(x => (x.length, x))
  b.reduceByKey(_ + _).collect
  res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

  val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
  val b = a.map(x => (x.length, x))
  b.reduceByKey(_ + _).collect
  res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
15.groupByKey()
  def groupByKey(): RDD[(K, Iterable[V])]
  说明:按照相同的key进行分组,返回值为RDD[(K, Iterable[V])]
  val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
  val b = a.keyBy(_.length)
  b.groupByKey.collect
  res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

16.keyBy
  def keyBy[K](f: T => K): RDD[(K, T)]
  说明:将f函数的返回值作为Key,与RDD的每个元素构成piarRDD{RDD[(K, T)]}
  val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
  val b = a.keyBy(_.length)
  b.collect
  res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
17.keys
  def keys: RDD[K]
  说明:返回具有key的RDD
  val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
  val b = a.map(x => (x.length, x))
  b.keys.collect
  res2: Array[Int] = Array(3, 5, 4, 3, 7, 5)
18.values
  def values: RDD[V]
  说明:返回具有value的RDD
  val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
  val b = a.map(x => (x.length, x))
  b.values.collect
  res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

19.sortByKey
  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P]
  说明:根据key进行排序,默认为ascending: Boolean = true(“升序”)
  val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
  val b = sc.parallelize(1 to a.count.toInt, 2)
  val c = a.zip(b)
  c.sortByKey(true).collect
  res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
  c.sortByKey(false).collect
  res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
20.partitionBy
  def partitionBy(partitioner: Partitioner): RDD[(K, V)]
  说明:通过设置Partitioner对RDD进行重分区
  scala> val rdd = sc.parallelize(List((1,"a"),(2,"b"),(3,"c"),(4,"d")),2)
  rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[26] at parallelize at <console>:24

  scala> rdd.glom.collect
  res28: Array[Array[(Int, String)]] = Array(Array((1,a), (2,b)), Array((3,c), (4,d)))

  scala> val rdd1=rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
  rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[28] at partitionBy at <console>:26

  scala> rdd1.glom.collect
  res29: Array[Array[(Int, String)]] = Array(Array((4,d), (2,b)), Array((1,a), (3,c)))

 

原文地址:https://www.cnblogs.com/lyr999736/p/9556515.html