spark PageRank

import java.io.{File, PrintWriter}
import java.util
import java.util.regex.Pattern

import org.apache.spark.graphx.GraphLoader
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
import scala.io.Source


object PageRankTest {

  def main(args: Array[String]): Unit = {
    val masterUrl = "local[2]"
    val appName = "PageRank_test"
    val sparkConf = new SparkConf().setMaster(masterUrl).setAppName(appName)
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")

    //原始数据文件
    val urlSourceFilePath="Peanut/httpFile.txt"
    //预处理后要写入的数据集1 : 边
    val urlFollowerFilePath="Peanut/urlFollower.txt"
    //预处理后要写入的数据集2 : 点
    val urlPointFilePath="Peanut/urlPoint.txt"
    //结果文件
    val PageRankResultPath="Peanut/PageRankResult.txt"

    
    // 每个url 对应一个数值(int)
    val urlToIntMap = mutable.Map[String,Int]()
    var count:Int = 1

    // 将原始数据中url映射成一个int数值
    val sourceFile=Source.fromFile(urlSourceFilePath)
    for(line <- sourceFile.getLines){
      val list = evaluate(line)
      list.map(x=>{
        if (!urlToIntMap.contains(x)) {
          urlToIntMap.put(x,count)
          count += 1
        }
      })
    }

    // 写 点数据集: point.txt
    val writerFollower = new PrintWriter(new File(urlPointFilePath))
    urlToIntMap.foreach(x=>{
      val writeContain = x._2 + "	" + x._1 + "
"
      writerFollower.write(writeContain)
    })
    writerFollower.close()


    //写 边数据集: follower.txt
    val sourceFile2=Source.fromFile(urlSourceFilePath)
    val writerPoint = new PrintWriter(new File(urlFollowerFilePath))
    for(line <- sourceFile2.getLines) {
      val list = evaluate(line).toList
      val firstUrl = list.head
      val firstUrlNum = urlToIntMap(firstUrl)
      val otherUrlList = list.tail
      otherUrlList.foreach(x=>{
        val writeNum = urlToIntMap(x)
        val writeString = firstUrlNum + "	" + writeNum + "
"
        writerPoint.write(writeString)
      })
    }
    writerPoint.close()
    sourceFile.close
    sourceFile2.close




    // 从特定的边列表文件中读取数据生成图框架
    val graph = GraphLoader.edgeListFile(sc, urlFollowerFilePath)

    // 核心api:    pageRank
    // 0.0001为前后两次收敛的误差阈值,小于这个阈值时则结束计算,越小精度越到
    val ranks = graph.pageRank(0.0001).vertices


   // 将上面得到的ranks(顶点属性)和用户进行关系连接
    // 首先也是读取一个包含了用户信息的文件,然后调用了一个map函数,即将文件里的每行数据按 ”,” 切开并返回存储了处理后数据的RDD
    val users = sc.textFile(urlPointFilePath).map { line =>
      val fields = line.split("	")
      (fields(0).toLong, fields(1))
    }
   // println("===users: "+users.collect().toBuffer)

    // 这里具体实现了将ranks和用户列表一一对应起来
    // 从map函数的内容可以看出是按id来进行连接,但返回的结果只含用户名和它的相应rank值
    val ranksByUsername = users.join(ranks).map {
      case (id, (username, rank)) => (username, rank)
    }

    // 结果
    val pageRankResult=ranksByUsername.collect()  //返回 Array[String,Double]
    //打印数据
    println(pageRankResult.mkString("
"))
    val writerPageRankResult = new PrintWriter(new File(PageRankResultPath))
    pageRankResult.toList.foreach(x=>{
      val writeString=x._1+","+x._2+"
"
      writerPageRankResult.write(writeString)
    })
    writerPageRankResult.close()


  }




  /**
    * 预处理数据
    *
    * @param links
    * @return   第一个元素为 原url, 后面是 链接url
    */
  def evaluate(links: String) = {
    val pattern = "(\[){1,2}.*?, "
    val r = Pattern.compile(pattern)
    val m = r.matcher(links)
    val linksList = new util.ArrayList[String]
    val title = links.split("	")(0)
    linksList.add(title)
    while ( {
      m.find
    }) {
      val ret = m.group
      val lastIndex = ret.lastIndexOf("[")
      val re = ret.substring(lastIndex + 1, ret.length - 2)
      linksList.add(re)
    }
    linksList.toArray(new Array[String](0))
  }



}
原文地址:https://www.cnblogs.com/ShyPeanut/p/13628133.html