spark Graphx 之 应用: 谁是网络红人?

一、数据:user和各自粉丝的关系

((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User42,197134784))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User37,14559570),(User64,24742040))
((User31,63644892),(User10,123004655))
((User10,123004655),(User50,17613979))
((User37,14559570),(User11,14269220))
((User78,3365291),(User30,93905958))
((User14,199097645),(User60,16547411))
((User3,14874480),(User42,197134784))
((User40,813286),(User9,15434432))
((User10,123004655),(User34,10211502))
((User90,34870269),(User53,25566593))
((User12,24741956),(User60,16547411))
((User12,24741956),(User5,18927441))
((User37,14559570),(User39,22831657))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User28,7465732),(User65,90569268))
((User89,74286565),(User9,15434432))
((User26,16112634),(User71,20065583))
((User80,520835497),(User94,15913))
((User75,15919138),(User94,15913))
((User1,849131),(User86,101760242))
((User36,59804598),(User79,30582362))
((User42,197134784),(User4,34236703))
((User48,63433165),(User25,20397258))
((User72,18859819),(User5,18927441))
((User47,86566510),(User24,183967095))
((User9,15434432),(User32,15670515))
((User85,38521400),(User74,811377))
((User28,7465732),(User78,3365291))
((User36,59804598),(User68,29758446))
((User41,15959795),(User36,59804598))
((User88,90961424),(User64,24742040))
((User13,16409781),(User87,24741685))
((User93,174347580),(User92,460486951))
((User93,174347580),(User77,54959836))
((User93,174347580),(User77,54959836))

二、

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer

object Celebrity {

  /**
   * 思路 =>
   * 拿顶点:读文件,压平,distinct
   * 拿边:ID,ID,1 => 计数权重
   * 拿到第一indegrees的id,通过vertices.filter拿到顶点属性
   * @param args
   */
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate()
    val sc = spark.sparkContext
    val rdd = sc.textFile("src/main/resources/Celebrity/twitter_graph_data.txt")
    // 拿顶点
    val vertices = rdd
      // ((User47,86566510),(User83,15647839))
      .flatMap(line => {
        val reg = "\(([a-zA-Z]+[0-9]{1,2}),([0-9]+)\)".r
        var arr: ListBuffer[(Long, String)] = ListBuffer[(Long, String)]()
        for (patternMatch <- reg.findAllMatchIn(line)) {
          arr.append((patternMatch.group(2).toLong, patternMatch.group(1)))
        }
        arr
      }).distinct()
    //.foreach(println)
    // 拿边:((User47,86566510),(User83,15647839)) => (User47,User83,1)
    val edge = rdd.map(line=>{
      // 拿出来的是一个String,需要分割或者正则
      val reg = "\([a-zA-Z]+[0-9]{1,2},([0-9]+)\),\([a-zA-Z]+[0-9]{1,2},([0-9]+)\)".r
      var arr: ListBuffer[(String, String)] = ListBuffer[(String, String)]()
      for (patternMatch <- reg.findAllMatchIn(line)) {
        arr.append((patternMatch.group(1),patternMatch.group(2)))
      }
      val tuple = arr.toList.head
      ((tuple._1.toLong,tuple._2.toLong),1)
      // 或者可以维持ListBuffer,然后map=>x._1
    }
    )
      .reduceByKey(_+_)
      .map(x=>Edge(x._1._1.toLong,x._1._2.toLong,x._2))
     // .foreach(println)
    // 网络红人:inDegrees大
    val graph = Graph(vertices, edge)
    // inDegrees => VertexRDD[VertexId,Int(有多少个入度)]
    val id = graph.inDegrees
      // local[*] 开启不同分区
      // sort by 部分排序
      // 现在全部合成一个 sortBy就行了
    //  .repartition(1).sortBy( - _._2).take(1)(0)._1
    // .sortBy(_._2,false,1).take(1)(0)._1
    //  .sortBy( - _._2).collect.take(1)(0)._1
    // 先局部有序,再分区效率高
    //  .map(x=>(x._2,x._1)).sortByKey(false,1).take(1)(0)._1

    vertices.filter(f=>f._1==id)
      //.foreach(println)

    spark.stop()

  }

}

涉及sortBy的分区排序方法:见 https://www.cnblogs.com/sabertobih/p/13792372.html

原文地址:https://www.cnblogs.com/sabertobih/p/13792365.html