Spark之常用操作

-- 筛选
val rdd = sc.parallelize(List("ABC","BCD","DEF")) 
val filtered = rdd.filter(_.contains("C")) 
filtered.collect() 
Result:
Array[String] = Array(ABC, BCD)
-- 相乘
val rdd=sc.parallelize(List(1,2,3,4,5)) 
val times2 = rdd.map(_*2) 
times2.collect() 
Result: 
Array[Int] = Array(2, 4, 6, 8, 10)
-- 分割
val rdd=sc.parallelize(List("Spark is awesome","It is fun")) 
val fm=rdd.flatMap(str=>str.split(" ")) 
fm.collect() 
Result: 
Array[String] = Array(Spark, is, awesome, It, is, fun)
-- 频数
val word1=fm.map(word=>(word,1)) 
val wrdCnt=word1.reduceByKey(_+_) 
wrdCnt.collect() 
Result: 
Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))
-- 交换
val cntWrd = wrdCnt.map{case (word, count) => (count, word)} 
cntWrd.groupByKey().collect() 
Result: 
Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is)))
-- 排重
fm.distinct().collect() 
Result: 
Array[String] = Array(is, It, awesome, Spark, fun)
-- 并集
val rdd1=sc.parallelize(List('A','B')) 
val rdd2=sc.parallelize(List('B','C')) 
rdd1.union(rdd2).collect() 
-- 交集
rdd1.intersection(rdd2).collect()
-- 笛卡尔积
rdd1.cartesian(rdd2).collect()
-- 相减 
rdd1.subtract(rdd2).collect()
-- 连接
val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot"))) 
val personSE = sc.parallelize(Seq(("Andy", "Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista"))) 
personFruit.join(personSE).collect() 
Result: 
Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))
-- 计数
val rdd = sc.parallelize(list('A','B','c')) 
rdd.count() 
Result: 
long = 3
-- 展示数组
val rdd = sc.parallelize(list('A','B','c')) 
rdd.collect() 
Result: 
Array[char] = Array(A, B, c)
-- 求和
val rdd = sc.parallelize(list(1,2,3,4)) 
rdd.reduce(_+_) 
Result: 
Int = 10
-- 截取
val rdd = sc.parallelize(list(1,2,3,4)) 
rdd.take(2) 
Result: 
Array[Int] = Array(1, 2)
-- 分别格式化
val rdd = sc.parallelize(list(1,2,3,4)) 
rdd.foreach(x=>println("%s*10=%s".format(x,x*10))) Result: 
1*10=10 4*10=40 3*10=30 2*10=20
val rdd = sc.parallelize(list(1,2,3,4)) 
-- 首项
rdd.first() 
Result: 
Int = 1
-- 另存为
val hamlet = sc.textFile("/users/akuntamukkala/temp/gutenburg.txt") 
-- 针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}  other = {(3, 9)})

-- subtractByKey 删掉RDD 中键与other RDD 中的键相同的元素
rdd.subtractByKey(other) {(1, 2)}

-- join 对两个RDD 进行内连接
rdd.join(other) {(3, (4, 9)), (3,(6, 9))}

-- rightOuterJoin 对两个RDD 进行连接操作,确保第一个RDD 的键必须存在(右外连接)
rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))}

-- leftOuterJoin 对两个RDD 进行连接操作,确保第二个RDD 的键必须存在(左外连接)
rdd.leftOuterJoin(other) {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))}

-- cogroup 将两个RDD 中拥有相同键的数据分组到一起
rdd.cogroup(other) {(1,([2],[])), (3,([4, 6],[9]))}
-- 返回RDD 中的所有元素
rdd.collect() {1, 2, 3, 3}
-- RDD 中的元素个数
rdd.count() 4
-- 各元素在RDD 中出现的次数
rdd.countByValue() {(1, 1),(2, 1),(3, 2)}
-- 从RDD 中返回num 个元素
rdd.take(2) {1, 2}
top(num)

-- 从RDD 中返回最前面的num个元素
rdd.top(2) {3, 3}


-- 从RDD 中按照提供的顺序返回最前面的num 个元素
rdd.takeOrdered(2)(myOrdering) {3, 3}


-- 从RDD 中返回任意一些元素
rdd.takeSample(false, 1)


-- 并行整合RDD 中所有数据(例如sum)
rdd.reduce((x, y) => x + y) 9


-- 和reduce() 一样, 但是需要提供初始值
rdd.fold(0)((x, y) => x + y) 9


-- 和reduce() 相似, 但是通常返回不同类型的函数
rdd.aggregate((0, 0))
((x, y)
=>(x._1 + y, x._2 + 1),
(x, y)
=>(x._1 + y._1, x._2 + y._2))
(
9,4)
--
对RDD 中的每个元素使用给定的函数
rdd.foreach(func)
原文地址:https://www.cnblogs.com/wangbin2188/p/8252797.html