Spark GraphX图计算核心算子实战【AggreagteMessage】

一.简介

  参考博客:https://www.cnblogs.com/yszd/p/10186556.html

二.代码实现

 1 package graphx
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.graphx.util.GraphGenerators
 5 import org.apache.spark.sql.SparkSession
 6 
 7 /**
 8   * Created by Administrator on 2019/10/22.
 9   */
10 object AggregateMessage {
11   /**
12     * 设置日志级别为WARN
13     */
14   Logger.getLogger("org").setLevel(Level.WARN)
15   def main(args: Array[String]) {
16     /**
17       * 创建spark入口
18       */
19     val spark = SparkSession.builder().appName("AggregateMessage").master("local[2]").getOrCreate()
20     val sc = spark.sparkContext
21 
22     /**
23       * 随机生成图,默认出度为4,标准偏差为1.3,并行生成numVertices,partition默认为sc的默认partition
24       */
25     val graph = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices((id, _) => id.toDouble)
26     graph.vertices.take(5).foreach(println)
27     /**
28       * 将用户定义的sendMsg函数应用于图形中的每个边三元组,然后使用mergeMsg函数汇聚信息到目标顶点
29       */
30     val olderFollowers = graph.aggregateMessages[(Int, Double)](triplet =>{
31       if(triplet.srcAttr > triplet.dstAttr){
32         triplet.sendToDst(1, triplet.srcAttr)
33       }
34     },
35       (a, b) => (a._1 + b._1, a._2 + b._2)
36     )
37 
38     /**
39       * 求平均值
40       */
41     val avgAgeOfOlderFollowers = olderFollowers.mapValues((id, value) => value match {case (count, totalAge) => totalAge / count})
42 
43     /**
44       * 输出结果
45       */
46     avgAgeOfOlderFollowers.collect().take(5).foreach(println)
47   }
48 }

三.结果

  随机生成的顶点数据:

    

  聚合结果:

    

原文地址:https://www.cnblogs.com/yszd/p/11726921.html