spark Graphx 之 Pregel

Pregel是Google提出的用于大规模分布式图计算框架

  1. 图遍历(BFS)
  2. 单源最短路径(SSSP)
  3. PageRank计算

Pregel的计算由一系列迭代组成,称为supersteps

Pregel迭代过程

  1. 每个顶点从上一个superstep接收入站消息
  2. 计算顶点新的属性值
  3. 在下一个superstep中向相邻的顶点发送消息
  4. 当没有剩余消息时,迭代结束

应用一、计算单源最短路径

求从0到任意点的最短路径(SSSP)

import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 求从0到任意点的最短路径
 */
object PregelTest03 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("mytest")
    val sc = SparkContext.getOrCreate(conf)

    val vect = sc.parallelize(Array
      ((0L, ("Alice", 28)),
      (1L, ("Bob", 27)),
      (2L, ("Charlie", 65)),
      (3L, ("David", 42)),
      (4L, ("Ed", 55))
      ))

    val edges = sc.parallelize(Array(
      Edge(0L, 1L, 100),
      Edge(0L, 2L, 30),
      Edge(0L, 4L, 10),
      Edge(4L, 3L, 50),
      Edge(3L, 1L, 10),
      Edge(2L, 1L, 60),
      Edge(2L, 3L, 60)
      ))

    val graphx = Graph(vect,edges)

    // 设置起始顶点
    val srcVectId = 0L
    // 用mapVertices修改属性
    val initialGraph = graphx.mapVertices({case (vid,(name,age))=>if (vid==srcVectId) 0.0 else Double.PositiveInfinity})

    // 调用pregel
    val pregelGraph = initialGraph.pregel(
      Double.PositiveInfinity,                                       //每个点的初始值,无穷大
      Int.MaxValue,                                                  //最大迭代次数
      EdgeDirection.Out                                              //发送信息的方向
    )(
      // 1. sendMsg条件+发送什么消息 2.mergeMsg合并多条消息 3.vprog顶点接受消息
      // 3) vprog:用户定义函数,用于顶点接收消息
      //vprog (接受到的消息和自己的消息进行合并)
      (vid:VertexId,vd:Double,distMsg:Double)=>{
        val minDist = math.min(vd,distMsg)
        println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")
        minDist
      },
      // 1) sendMsg:确定下一个迭代发送的消息及发往何处
      //发送消息,如果自己的消息+权重<目的地的消息,则发送 : Iterator[(VertexId, A)]
      (edgeTriplet:EdgeTriplet[Double,PartitionID])=>{
        // 控制的意义:防止无意义的消息发送(无限发给无限)【详见下图】
        if(edgeTriplet.srcAttr+edgeTriplet.attr < edgeTriplet.dstAttr){
          println(s"顶点${edgeTriplet.srcId} 给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
          
      // Iterator 可以继续迭代
      Iterator[(VertexId,Double)]((edgeTriplet.dstId,edgeTriplet.srcAttr
+edgeTriplet.attr)) }else{ Iterator.empty } // println(s"顶点${edgeTriplet.srcId} 给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}") // Iterator[(VertexId,Double)]((edgeTriplet.dstId,edgeTriplet.srcAttr+edgeTriplet.attr)) }, // 2) mergeMsg:在vprog前,合并到达顶点的多个消息 //多条接收消息,mergeMessage,取小合并多条消息 (msg1:Double,msg2:Double)=>math.min(msg1,msg2) ) println("===============") // 输出结果 pregelGraph.triplets.foreach(println) println(pregelGraph.vertices.collect.mkString(",")) // 关闭资源 sc.stop() } }

应用二、求最小值

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, EdgeDirection, EdgeTriplet, Graph, VertexId}
import org.apache.spark.rdd.RDD

object PregelTest02 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("mytest")
    val sc = SparkContext.getOrCreate(conf)
    // 创建顶点集RDD
    val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)),  (3L, (2,-1)), (4L, (6,-1))))
    // 创建边集RDD
    val relationships: RDD[Edge[Boolean]] = sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true),  Edge(2L, 4L, true), Edge(3L, 1L, true),  Edge(3L, 4L, true)))
    // 创建图
    val graph = Graph(vertices, relationships)
    val initialMsg = 9999
    def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {
      // 第一次全部顶点先执行 message == initialMsg
      // 控制目的:首次不需要操作
      if (message == initialMsg)  value else (message min value._1, value._1)
    }

    // 注意:返回成Iterator的才会继续迭代
    def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = {
      val sourceVertex = triplet.srcAttr
      if (sourceVertex._1 == sourceVertex._2) Iterator.empty  else  Iterator((triplet.dstId, sourceVertex._1))
    }

    def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2
    //Pregel
    val minGraph = graph.pregel(initialMsg, Int.MaxValue,  EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
    // minGraph.vertices.collect.foreach{case (vertexId, (value, original_value)) => println(value)}
    minGraph.vertices.collect.foreach(println(_))
  }
}
原文地址:https://www.cnblogs.com/sabertobih/p/13798259.html