spark1.6.2 Transformations 测试

1.定义两个rdd,分别为rdd1和rdd2

val rdd1 = sc.parallelize(List(("a",1),("b",2)))

 val rdd2 = sc.parallelize(List(("a",3),("b",4),("c",5)))

2.测试各项算子计算结果

(1)sample 

  运行: rdd1.sample(true,0.1).collect

  结果:res3: Array[(String, Int)] = Array()      

(2)intersection 计算两个rdd的交集

   运行:rdd1.intersection(rdd2).collect

  结果: Array[(String, Int)] = Array()

(3)fullOuterJoin

  运行:rdd1.fullOuterJoin(rdd2).collect

  结果:Array[(String, (Option[Int], Option[Int]))] = Array((a,(Some(1),Some(3))), (b,(Some(2),Some(4))), (c,(None,Some(5))))

(4)cogroup

  运行:rdd1.cogroup(rdd2).collect

  结果:Array[(String, (Iterable[Int], Iterable[Int]))] = Array((a,(CompactBuffer(1),CompactBuffer(3))), (b,(CompactBuffer(2),CompactBuffer(4))), (c,(CompactBuffer(),CompactBuffer(5))))

(5)cartesian

  运行: rdd1.cartesian(rdd2).collect

  结果:Array[((String, Int), (String, Int))] = Array(((a,1),(a,3)), ((a,1),(b,4)), ((a,1),(c,5)), ((b,2),(a,3)), ((b,2),(b,4)), ((b,2),(c,5)))

(6)pipe 管道操作

spark在RDD上提供了 pipe() 方法。通过pipe(),你可以使用任意语言将RDD中的各元素从标准输入流中以字符串形式读出,并将这些元素执行任何你需要的操作,然后把结果以字符串形式写入标准输出,这个过程就是RDD的转化操作过程。

使用pipe()的方法很简单,假如我们有一个用其他语言写成的从标准输入接收数据并将处理结果写入标准输出的可执行脚本,我们只需要将该脚本分发到各个节点相同路径下,并将其路径作为pipe()的参数传入即可。

(7)coalesce和repartition

他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)

1)、N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。

2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false,在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。

3)如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。

总之:如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDDde分区数变多的。

  

原文地址:https://www.cnblogs.com/native-hadoop/p/8482644.html