Spark 实现共同好友

核心代码如下

object Sprk {
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
def main(args: Array[String]): Unit = {

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
System.setProperty("hadoop.home.dir", "E:\hadoop-2.6.0-cdh5.15.0")
val ss = SparkSession
.builder()
.appName(" spark 2.0")
.master("local")
.getOrCreate()
val sc = ss.sparkContext
val rdd=sc.parallelize(List(("A","BCDEKL"),("B","EKL"),("C","EKLBCD")))
val rdd1=rdd.flatMapValues(x=>x.split("")).map(x=>(x._2,x._1))
rdd1
.join(rdd1)
.filter(x=>x._2._1<x._2._2)
.map(x=>(x._2,x._1))
.groupByKey()
.foreach(println)
}

}

第一步: 这是开始的数据 , ID  好友.

 第二步: 展开value , 使用到了 mapValues()  算子

 第三步: 将 PairRDD 互调位置

因为我们是要以共同好友来聚合 , 所以好友在左边 , 用户ID 在右边

.map(x=>(x._2,x._1))

第四步 ,很重要 , 我们用join , 

得到的就是 好友 , ID1 ,ID2  ,但是会有1/4符合条件 , 需要过滤

.filter(x=>x._2._1<x._2._2)

让 字母比大小 , 小的在前面 ,这样就只会有一个数据

 这样得到的是 ID的组合 , 以及他们的共同好友 , 然后反转 K V ,再reduce就好了

原文地址:https://www.cnblogs.com/alpha-cat/p/13090651.html