SparkGraphXTest.scala

/**
 * Created by root on 9/8/15.
 */
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object SparkGraphXTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("graphx app").setMaster("local")
    val sc = new SparkContext(conf)
    val users: RDD[(VertexId, (String, String))] = sc.parallelize(
      Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
    val relationships: RDD[Edge[String]] = sc.parallelize(
      Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
    val defaultUser = ("John Doe", "Missing")
    val graph = Graph(users, relationships, defaultUser)
    val count1 = graph.vertices.filter{ case (id, (name, pos)) => pos == "postdoc" }.count()
    val count2 = graph.edges.filter(e => e.srcId > e.dstId).count()
    val count3 = graph.edges.filter{ case Edge(src, dst, prop) => src > dst }.count()
    println(count1)
    println(count2)
    println(count3)
    val facts: RDD[String] = graph.triplets.map(triplet =>
      triplet.srcAttr._2 + " is the " + triplet.attr + " of " +triplet.dstAttr._2)
    facts.collect().foreach(println(_))

    val users2: RDD[(VertexId, (String, String ,String))] = sc.parallelize(
      Array((3L, ("rxin", "student", "20")), (7L, ("jgonzal", "postdoc", "22")), (5L, ("franklin", "prof", "24")), (2L, ("istoica", "prof", "26"))))
    val relationships2: RDD[Edge[String]] = sc.parallelize(
      Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
    val defaultUser2 = ("Amy Sun", "aaa", "18")
    val graph2 = Graph(users2, relationships2, defaultUser2)
    val facts2: RDD[String] = graph2.triplets.map(triplet =>
      triplet.srcAttr.toString() + " is the " + triplet.attr + " of " +triplet.dstAttr.toString())
    facts2.collect().foreach(println(_))
  }
}
原文地址:https://www.cnblogs.com/sunflower627/p/4795435.html