RDD转换算子(transformantion)

RDD算子

作用于RDD上的Operation分为转换(transformantion)和动作(action)。 Spark中的所有“转换”都是惰性的,在执行“转换”操作,并不会提交Job,只有在执行“动作”操作,所有operation才会被提交到cluster中真正的被执行。这样可以大大提升系统的性能。

RDD的转换算子

  • map(func)

    返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

      rdd1 = sc.parallelize(['b','a','c'])
      rdd1.map(lambda x: (x,1)).collect()
      #结果
      [('b',1),('a',1),('c',1)]
    
  • filter(func)

    返回包含指定过滤条件的元素。RDD是一个分布式数据集,filter转换操作针对RDD所有分区的每一个元素进行过滤,filter方法将满足条件的元素返回,不满足条件的元素被忽略

      def fun(x):
          return x % 2 == 0
      rdd1 = sc.parallelize([1,2,3,4,5,6,7])
      rdd2 = rdd1.filter(fun)
      rdd2.collect()
      #结果
      [2,4,6]
    
  • flatMap(func)

    类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

      rdd1 = sc.parallelize(['hello world','hello china'])
      rdd2 = rdd1.flatMap(lambda x:x.split(' '))
      rdd2.collect()
      #结果
      ['hello','world','hello','china']
    
  • glom

    该函数是将RDD中每一个分区中元素转为数组

      rdd1 = sc.parallelize([1,2,3,4,5,6],3)
      rdd2 = rdd1.glom()
      rdd2.collect()
      #结果
      [[1, 2], [3, 4], [5, 6]]
    
  • mapPartitions(funs)

    类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

      x = sc.parallelize([1,2,3,4,5,6], 2)
      def f(x):
      	li = list(x)
      	result = []
      	for i in li:
      		result.append(i+10)
      	return result
      y = x.mapPartitions(f)
      print('x原来分区信息:{}'.format(x.glom().collect()))
      print('x经过f计算后的结果:{}'.format(y.glom().collect()))
      #结果
      x原来分区信息:[[1, 2, 3], [4, 5, 6]]
      x经过f计算后的结果:[[11, 12, 13], [14, 15, 16]]
    
  • mapPartitionsWithIndex(func)

    类似于mapPartitions,但func带有一个整数参数表示分片的索引值

      x = sc.parallelize([1,2,3,4,5,6], 2)
      def f(idx,x):
      	li = list(x)
      	result = []
      	for i in li:
      		result.append(i+10)
      	return (idx,result)
      	y = x.mapPartitionsWithIndex(f)
    
      print('x原来分区信息:{}'.format(x.glom().collect()))
      print('x经过f计算后的结果:{}'.format(y.glom().collect())
    
      #结果
      x原来分区信息:[[1, 2, 3], [4, 5, 6]]
      x经过f计算后的结果:[[0, [11, 12, 13]], [1, [14, 15, 16]]]
    
  • distinct()

    返回包含不同元素的新RDD

      rdd = sc.parallelize([1,1,1,2,3,4,5,5,6])
      rdd.distinct().collect()
      #结果
      [1, 2, 3, 4, 5, 6]   
    
  • union() (并集)

    源RDD和参数RDD求并集后返回一个新的RDD

      rdd = sc.parallelize([1, 1, 2, 3])
      rdd1 = sc.parallelize([5, 3, 4, 6])
      rdd.union(rdd1).collect()
      #结果
      [1,1,2,3,5,3,4,6]
    
  • intersection() (交集)

    对源RDD和参数RDD求交集后返回一个新的RDD

      rdd1 = sc.parallelize([1, 1, 2, 3])
      rdd2 = sc.parallelize([5, 3, 4, 6])
      rdd1.intersection(rdd2).collect()
      #结果
      [3]
    
  • groupBy

    通过函数f对RDD进行分组。

      rdd1 = sc.parallelize([1, 1, 2, 3, 5, 8])
      rdd2 = rdd1.groupBy(lambda x: x % 2)
      result = rdd2.collect()
      for (key,value) in result:
      	li = list(value)
      	print("key:{},数据:{}".format(key,li))
      #结果
      key:0,数据:[2, 8]
      key:1,数据:[1, 1, 3, 5]
    
  • sortBy(keyfunc, ascending=True, numPartitions=None)

    按给定的函数,对RDD进行排序

      x = sc.parallelize(['wills', 'kris', 'april', 'chang'])
      def sortByFirstLetter(s): return s[0]
      def sortBySecondLetter(s): return s[1]
    
      y = x.sortBy(sortByFirstLetter).collect()
      yy = x.sortBy(sortBySecondLetter).collect()
      
      print ('按第一个字母排序结果: {}'.format(y))
      print ('按第二个字母排序结果:{}'.format(yy))
      #结果
      按第一个字母排序结果: ['april', 'chang', 'kris', 'wills']
      按第二个字母排序结果:['chang', 'wills', 'april', 'kris']
    
  • cogroup(other, numPartitions=None)

    在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

      x = sc.parallelize([("a", 1), ("b", 4)])
      y = sc.parallelize([("a", 2)])
      result = x.cogroup(y).collect()
    
      for (x,y) in result:
      	tmp = []
      	for i in y:
      		tmp.append(list(i))
      	print('key是{},value是:{}'.format(x,tmp))
      #结果
      key是a,value是:[[1], [2]]
      key是b,value是:[[4], []]
    
  • partitionBy

    该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区

      pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
      sets = pairs.partitionBy(2).glom().collect()
      #结果
      sets
      [[(2,2),(4,4),(2,2),(4,4)],[(1,1),(3,3),(1,1)]]
    
  • coalesce(numPartitions)

    cartesian 返回一个RDD和另一个RDD的笛卡尔乘积,即所有元素对(a,b)的RDD,其中a在自身中,b在另一个中。

    假设集合A={a, b},集合B={0, 1, 2},则两个集合的笛卡尔积为{(a, 0), (a, 1), (a, 2), (b, 0), (b, 1), (b, 2)}。

      rdd = sc.parallelize([2,3])
      sorted(rdd.cartesian(rdd).collect())
      #结果
      [(2, 2), (2, 3), (3, 2), (3, 3)]
原文地址:https://www.cnblogs.com/jiajiaba/p/10621775.html