spark python算子讲解

1:spark的算子分类

  1.   Transformation 称为转换,是一种延迟加载的算法,会记录元数据信息,任务触发action时开始执行
  2.   Action 称为动作 出发就执行  
sc.textFile().map  map是transformation
                 .filter  transformation
                 .collect 是action直接执行

 2:创建rdd的两种方式

  1. 通过hdfs支持的文件系统,rdd里面没有真正要计算的数据,只记录元数据
  2. 通过scala集合或者数据以并行化的方式创建rdd

 2:spark python高级算子

 1.mapPartitions

// 传给mapPartitions的方法中 参数是partitions的迭代器对象,返回值也是一个迭代器对象
// python实现如下
def filterOutFromPartion(list):
    //list是partitioins的迭代器集合
    iterator = []
    //elements是具体的partition中元素的迭代器
    for elements in list:
        iterator.append([x for  x in elements if x !=2 ])
    return iter(iterator)
data = [[1,2,3],[3,2,4],[5,2,7]] conf = SparkConf().setAppName("study") sc = SparkContext(conf=conf) partitions = sc.parallelize(data, 2).mapPartitions(filterOutFromPartion).collect() print(partitions)
//yield为简化版,因为yield本身就是返回一个迭代器
def filterOutFromPartion(list): # iterator = [] for elements in list: yield [x for x in elements if x !=2 ] # iterator.append([x for x in elements if x !=2 ]) # return iter(iterator)

2.mapPartitionsWithIndex

Similar to mapPartitions, but also provides a function with an int value to indicate the index position of the partition.

和mapPartitions类似,但是提供了一个带有整形参数用来表明分区位置的方法

parallel = sc.parallelize(range(1,10),4)
//下面这个方法是传进去的方法对象,
def show(index, iterator): yield 'index: '+str(index)+" values: "+ str(list(iterator))

parallel.mapPartitionsWithIndex(show).collect()

//一下为结果
['index: 0 values: [1, 2, 3]',
 'index: 1 values: [4, 5, 6]',
 'index: 2 values: [7, 8, 9]']

 3.sample 

sample(withReplacement,faction,seed):抽样,withReplacement为true表示有放回;faction表示采样的比例;seed为随机种子

parallel = sc.parallelize(range(1,10))
//表示取50%的数据 种子随机
parallel.sample(True,0.5).count()

 4.union

union(ortherDataset):将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重

one = sc.parallelize(range(1,10))
two = sc.parallelize(range(10,21))
one.union(two).collect()
//output
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

5.intersection

intersection(otherDataset):返回两个RDD的交集

one = sc.parallelize(range(1,10))
two = sc.parallelize(range(5,15))
one.intersection(two).collect()
//output
[5, 6, 7, 8, 9]

6.distinct

distinct([numTasks]):对RDD中的元素进行去重

>>> parallel = sc.parallelize(range(1,9))
>>> par2 = sc.parallelize(range(5,15))
>>> parallel.union(par2).distinct().collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

 7.groupByKey算子

 // 但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型变成Iterable这种集合类型
 // 也就是说,按照了key进行分组,那么每个key可能都会有多个value,此时多个value聚合成了Iterable

8.reducebykey  

reduceByKey(func, [numTasks]) //通过key来进行reduce过程,key相同的值的集合进行reduce操作

sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),
... ("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)

# Applying reduceByKey operation on x>>> y = x.reduceByKey(lambda accum, n: accum + n)
>>> y.collect()
[('b', 5), ('a', 3)]

# Define associative function separately >>>def sumFunc(accum, n):
...     return accum + n
...
>>> y = x.reduceByKey(sumFunc)
>>> y.collect()
[('b', 5), ('a', 3)]

 9.aggregatebykey

aggregateByKey

这个函数可用于完成对groupByKey,reduceByKey的相同的功能,用于对rdd中相同的key的值的聚合操作,主要用于返回一个指定的类型U的RDD的transform,在这个函数中,需要传入三个参数:

参数1:用于在每个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量,

参数2:用于在每个分区中,相同的key中V类型的值合并到参数1创建的U类型的变量中,

参数3:用于对重新分区后两个分区中传入的U类型数据的合并的函数.

    //合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型
    def comb(a: String, b: String): String = {
      println("comb: " + a + "	 " + b)
      a + b
    }
    //合并在同一个partition中的值, a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型
    def seq(a: String, b: Int): String = {
      println("seq: " + a + "	 " + b)
      a + b
    }

    rdd.foreach(println)
    
    //zeroValue 中立值,定义返回value的类型,并参与运算
    //seqOp 用来在一个partition中合并值的
    //comb 用来在不同partition中合并值的
    val aggregateByKeyRDD: RDD[(Int, String)] = rdd.aggregateByKey("100")(seq,comb)

 10.sortByKey

sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序

原文地址:https://www.cnblogs.com/zhangweilun/p/6530092.html