-- 筛选
val rdd = sc.parallelize(List("ABC","BCD","DEF"))
val filtered = rdd.filter(_.contains("C"))
filtered.collect()
Result:
Array[String] = Array(ABC, BCD)
-- 相乘
val rdd=sc.parallelize(List(1,2,3,4,5))
val times2 = rdd.map(_*2)
times2.collect()
Result:
Array[Int] = Array(2, 4, 6, 8, 10)
-- 分割
val rdd=sc.parallelize(List("Spark is awesome","It is fun"))
val fm=rdd.flatMap(str=>str.split(" "))
fm.collect()
Result:
Array[String] = Array(Spark, is, awesome, It, is, fun)
-- 频数
val word1=fm.map(word=>(word,1))
val wrdCnt=word1.reduceByKey(_+_)
wrdCnt.collect()
Result:
Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))
-- 交换
val cntWrd = wrdCnt.map{case (word, count) => (count, word)}
cntWrd.groupByKey().collect()
Result:
Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is)))
-- 排重
fm.distinct().collect()
Result:
Array[String] = Array(is, It, awesome, Spark, fun)
-- 并集
val rdd1=sc.parallelize(List('A','B'))
val rdd2=sc.parallelize(List('B','C'))
rdd1.union(rdd2).collect()
-- 交集
rdd1.intersection(rdd2).collect()
-- 笛卡尔积
rdd1.cartesian(rdd2).collect()
-- 相减
rdd1.subtract(rdd2).collect()
-- 连接
val personFruit = sc.parallelize(Seq(("Andy", "Apple"), ("Bob", "Banana"), ("Charlie", "Cherry"), ("Andy","Apricot")))
val personSE = sc.parallelize(Seq(("Andy", "Google"), ("Bob", "Bing"), ("Charlie", "Yahoo"), ("Bob","AltaVista")))
personFruit.join(personSE).collect()
Result:
Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))
-- 计数
val rdd = sc.parallelize(list('A','B','c'))
rdd.count()
Result:
long = 3
-- 展示数组
val rdd = sc.parallelize(list('A','B','c'))
rdd.collect()
Result:
Array[char] = Array(A, B, c)
-- 求和
val rdd = sc.parallelize(list(1,2,3,4))
rdd.reduce(_+_)
Result:
Int = 10
-- 截取
val rdd = sc.parallelize(list(1,2,3,4))
rdd.take(2)
Result:
Array[Int] = Array(1, 2)
-- 分别格式化
val rdd = sc.parallelize(list(1,2,3,4))
rdd.foreach(x=>println("%s*10=%s".format(x,x*10))) Result:
1*10=10 4*10=40 3*10=30 2*10=20
val rdd = sc.parallelize(list(1,2,3,4))
-- 首项
rdd.first()
Result:
Int = 1
-- 另存为
val hamlet = sc.textFile("/users/akuntamukkala/temp/gutenburg.txt")
-- 针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3, 9)})
-- subtractByKey 删掉RDD 中键与other RDD 中的键相同的元素
rdd.subtractByKey(other) {(1, 2)}
-- join 对两个RDD 进行内连接
rdd.join(other) {(3, (4, 9)), (3,(6, 9))}
-- rightOuterJoin 对两个RDD 进行连接操作,确保第一个RDD 的键必须存在(右外连接)
rdd.rightOuterJoin(other) {(3,(Some(4),9)),(3,(Some(6),9))}
-- leftOuterJoin 对两个RDD 进行连接操作,确保第二个RDD 的键必须存在(左外连接)
rdd.leftOuterJoin(other) {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))}
-- cogroup 将两个RDD 中拥有相同键的数据分组到一起
rdd.cogroup(other) {(1,([2],[])), (3,([4, 6],[9]))}
-- 返回RDD 中的所有元素
rdd.collect() {1, 2, 3, 3}
-- RDD 中的元素个数
rdd.count() 4
-- 各元素在RDD 中出现的次数
rdd.countByValue() {(1, 1),(2, 1),(3, 2)}
-- 从RDD 中返回num 个元素
rdd.take(2) {1, 2} top(num)
-- 从RDD 中返回最前面的num个元素
rdd.top(2) {3, 3}
-- 从RDD 中按照提供的顺序返回最前面的num 个元素
rdd.takeOrdered(2)(myOrdering) {3, 3}
-- 从RDD 中返回任意一些元素
rdd.takeSample(false, 1)
-- 并行整合RDD 中所有数据(例如sum)
rdd.reduce((x, y) => x + y) 9
-- 和reduce() 一样, 但是需要提供初始值
rdd.fold(0)((x, y) => x + y) 9
-- 和reduce() 相似, 但是通常返回不同类型的函数
rdd.aggregate((0, 0))
((x, y) =>(x._1 + y, x._2 + 1),
(x, y) =>(x._1 + y._1, x._2 + y._2))
(9,4)
-- 对RDD 中的每个元素使用给定的函数
rdd.foreach(func)