spark Graphx 之 PageRank

PageRank(PR)算法

  1. 用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
  2. 从本质上讲,PageRank是找出图中顶点(网页链接)的重要性
  3. GraphX提供了PageRank API用于计算图的PageRank

使用方法: graph.pageRank(0.0001).vertices  => (id,rank)

案例分析1:

找出用户社交网络中最重要的用户

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object PageRankTest01 {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate()

    val tweeters = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)))
    val vertexRDD: RDD[(Long, (String, Int))] = spark.sparkContext.parallelize(tweeters)

    val followRelations = Array(
      Edge[Int](2L, 1L, 7),
      Edge[Int](2L, 4L, 2),
      Edge[Int](3L, 2L, 4),
      Edge[Int](3L, 6L, 3),
      Edge[Int](4L, 1L, 1),
      Edge[Int](5L, 2L, 2),
      Edge[Int](5L, 3L, 8),
      Edge[Int](5L, 6L, 3))
    val edgeRDD = spark.sparkContext.parallelize(followRelations)

    val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

    val ranks = graph.pageRank(0.0001)
    ranks.vertices.sortBy(_._2, false).collect.foreach(println)
    println("===============indegrees==============")
    // indegrees
    graph.inDegrees.foreach(println(_))
    println("===============outDegrees==============")
    graph.outDegrees.foreach(println(_))

  }
}

案例分析2:

找出各个用户的重要性

name.txt

1,Mike
2,Nancy
3,Selina
4,Tom

test02.txt

1 2
2 2
3 2
4 2
3 1
2 2
3 1
2 2
1 4
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

object PageRankTest02 {
  def main(args: Array[String]): Unit = {
//    val conf = new SparkConf().setAppName("app").setMaster("local[2]")
//    val sc = SparkContext.getOrCreate(conf)

    val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate()
    val sc = spark.sparkContext
    // 1. 另一种构建Graph的方法:  GraphLoader.edgeListFile
    // 适用于不知道点和边的属性(默认为1),只有边 - 边的情况
    // 格式必须是 1空格2
    val graph = GraphLoader.edgeListFile(sc, "src/main/resources/graphx01/test02.txt").cache()
    //  graph.edges.collect().foreach(println(_))
    //  graph.vertices.collect().foreach(println(_))
    
    // 2. 根据边边关系,可做pagerank: (id,rankRate)
    val rankVertices = graph.pageRank(0.001).vertices
    // 3. verticeRDD
    val verticeRDD = sc.textFile("src/main/resources/graphx01/name.txt").map(x=>{
      val arr = x.split(",")
    // (id,NAME)
      (arr(0).toLong,arr(1))
    })

    // VD顶点的属性 = 1[INT]
    // (VertexId, VD, U) => VD
    // joinVertices方法只能改顶点的属性值,不能修改顶点属性类型
//    graph.joinVertices(verticeRDD)((id,old,newval)=>(2)).vertices.foreach(println(_))

    // 4.使用RDD的join => (K,(V,W))
    // Rankgraph.vertices:(1,rate) VRDD:(1,Mike) => (Mike,rate)
    rankVertices.join(verticeRDD).map(f = {
      case (id, (rate, name)) => (name, rate)
    })
      .foreach(println)



  }

}

PS: 

原文地址:https://www.cnblogs.com/sabertobih/p/13795085.html