spark Graphx 之 基础操作

https://www.bookstack.cn/read/spark-graphx-source-analysis/vertex-edge-triple.md

一、基本操作

1,当顶点和边的属性全都具备,直接构建Graph

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession

case class Users(userid:Long,username:String,age:Int)
object MyGraphx01 {
  def main(args: Array[String]): Unit = {
    // 读所有的点和图
    val spark = SparkSession.builder().appName("app").master("local[2]").getOrCreate();
    val edge = spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/e.txt")
    val vects = spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/v.txt")
    // 所有的点构成1个 ((1l,users),(2L,users))
    val vectorSeq = vects.map(x => {
      val arr = x.split(",")
      val key = arr(0).toLong
      (key, Users(arr(0).toLong, arr(1), arr(2).toInt))
    })
    // 所有的边构成 ((1L,2L,同事),(2l,3l,同学))
    val edgeSeq = edge.map(x => {
      val arr = x.split(",")
      Edge(arr(0).toLong, arr(1).toLong, arr(2))
    })
    // 所有的点边构成图结构
    val graph = Graph(vectorSeq, edgeSeq).cache()
    
    //.foreach(println)
    spark.stop()
  }
}

2,当顶点和边的属性不需要时(默认为1)

    // 另一种构建Graph的方法:  GraphLoader.edgeListFile
    // 适用于不知道点和边的属性(默认为1[INT]),只有边 - 边的情况
    // 格式必须是 1空格2
    val graph = GraphLoader.edgeListFile(sc, "src/main/resources/graphx01/test02.txt").cache()
    graph.edges.collect().foreach(println(_))
    graph.vertices.collect().foreach(println(_))

二、应用

import org.apache.spark.graphx.{Edge, Graph}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object MyLove {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").appName("app").getOrCreate();
    val sc = spark.sparkContext
    // 创建所有的点
    val points = Seq((1L,("Alice",28)),(2L,("bob",43)),(3L,("charlie",64)),(4L,("David",42)),(5L,("Ed",53)),(6L,("Fran",50)))
    // 所有的边(关系和权重)
    val eds = Seq(Edge(2L,1L,7),Edge(2L,4L,2),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8)
      ,Edge(5L,6L,3),Edge(3L,2L,4),Edge(3L,6L,3))
    val edges = sc.makeRDD(eds);
    val vectices:RDD[(Long, (String, Int))] = sc.makeRDD(points)
    // 创建图
    val graph = Graph(vectices, edges)
    // 找到大于30岁的人
    graph.vertices.filter(x=>x._2._2>=60)
     .foreach(println)
    // 找真爱,打电话次数大于5次
    // triplets => 包含顶点和边的属性:((srcId,srcAttr),(dstId,dstAttr),attr)
    // srcAttr/dstAttr => 顶点属性
    // attr => 权重
    graph.triplets
      .foreach(x=>println(x.srcAttr+"==>"+x.dstAttr+"=="+x.attr))
    graph.triplets.filter(_.attr>5)
     .foreach(x=>println(x.srcAttr._1,x.dstAttr._1))

    // inDegrees => VertexRDD[VertexId,Int(有多少个入度)]
    graph.inDegrees
      .foreach(println(_))
    println("====outdegrees====")
    graph.outDegrees
      .foreach(println(_))
   println(graph.numVertices,graph.numEdges,graph.inDegrees,graph.outDegrees)
    // 算子:增加权重
    graph.mapEdges(x=>x.attr+2).triplets.filter(x=>x.attr>5).foreach(x=>println(x.srcAttr._1,x.dstAttr._1))

} }

三、图的算子

1)属性算子 —— 只改变属性

2)结构算子 —— 对图结构改变

 PS: EdgeTriplet[VD,ED]的属性接口有:

3)JOIN算子 —— 用外部RDD修改顶点属性:

joinVertices && outerjoinVertices

// joinVertices: 只能修改顶点属性的值,不能修改属性的类型
    // 顶点的属性 VD: ("Alice",28)
    // U: newVal
    // newVD: ("ALICE"+@+newVal,VD._2)
    // (RDD[(VertexId, U))(mapFunc: (VertexId, 顶点的属性VD, 新值U) => 顶点的属性VD2)
    // 满足条件的处理,不满足不处理
    graph.joinVertices(newPoints)((id,old,newval)=>(old._1+"@"+newval,old._2)).vertices
      //.foreach(println(_))
    // 满足条件的处理;不满足条件的另一种处理方法
    graph.outerJoinVertices(newPoints)((id,old,newval)=>(old._1+"@"+newval.getOrElse("XXX"),old._2)).vertices
      .foreach(println(_))

<=  结果对比 =>  

 

原文地址:https://www.cnblogs.com/sabertobih/p/13781411.html