Spark算子---实战应用

Spark算子实战应用

数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase

相关数据文件 :

users.dat ---UserID::Gender::Age::Occupation::Zip-code

movies.dat --- MovieID::Title::Genres

ratings.dat ---UserID::MovieID::Rating::Timestamp

SogouQ.mini

完成以下业务需求:

1. 年龄段在“18-24”的男性年轻人,最喜欢看哪10部

2.得分最高的10部电影;看过电影最多的前10个人;女性看多最多的10部电影;男性看过最多 的10部电影

3.利用数据集SogouQ2012.mini.tar.gz 将数据按照访问次数进行排序,求访问量前10的网站

scala实现代码如下:

package hw3
import org.apache.spark._


import scala.collection.immutable.HashSet
import org.apache.spark.rdd.RDD
/**
 * @author BIGDATA
 */
object spark_hw3{
  var sc:SparkContext=null
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("MovieDemo")
              .setMaster("local")
    sc=new SparkContext(conf)
  //准备数据 val rating=sc.textFile("C:\Users\BIGDATA\Desktop\文件\BigData\Spark\3.SparkCore_2\data\data\ratings.dat") .map(_.split("::")).map {x => (x(0),x(1),x(2))}
  //年龄段在“18-24”的男性年轻人,最喜欢看哪10部 top10LookeMovie

  //得分最高的10部电影 val topKScoreMostMovie = rating.map{x => (x._2, (x._3.toInt, 1)) }.reduceByKey { (v1, v2) => (v1._1 + v2._1, v1._2 + v2._2) }.map { x => (x._2._1.toDouble / x._2._2.toDouble, x._1) }.sortByKey(false). take(10). foreach(println)
   //女性看最多的10部电影 top10FaleLookMovie
  //男性看最多的10部电影

top10MaleLookMovie

  
//看过电影最多的前10个人 val topKmostPerson = rating.map{ x => (x._1, 1) }.reduceByKey(_ + _). map(x =>(x._2, x._1)). sortByKey(false). take(10). foreach(println) val brower = sc.textFile("C:\Users\BIGDATA\Desktop\文件\BigData\Spark\3.SparkCore_2\SogouQ2012.mini\SogouQ.mini") val brs=brower.map(_.split(" ")).map { x => x(5) }.cache //访问量前10的网站 val topKBrower = brs.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) .sortBy(_._2, false) .take(10) .foreach(println) } /** * @param sc SparkContext对象 * @return 返回用户信息 */ def getUsers(sc:SparkContext):RDD[Array[String]]={ val scobj=sc val users=scobj.textFile("C:\Users\BIGDATA\Desktop\文件\BigData\Spark\3.SparkCore_2\data\data\users.dat") .map(_.split("::")) users } /** * @param sc * @return 返回电影信息 */ def getMovies(sc:SparkContext):RDD[Array[String]]={ val scobj=sc val movies=scobj.textFile("C:\Users\BIGDATA\Desktop\文件\BigData\Spark\3.SparkCore_2\data\data\movies.dat") .map(_.split("::")) movies } /** * * @param sc * @return 电影评分信息 */ def getRatings(sc:SparkContext):RDD[Array[String]]={ val scobj=sc val ratings=scobj.textFile("C:\Users\BIGDATA\Desktop\文件\BigData\Spark\3.SparkCore_2\data\data\ratings.dat") .map(_.split("::")) ratings }
def top10LookeMovie: Unit ={ //获取年龄段在“18-24”的男性年轻人的userid val users=getUsers(sc) val userList=users.filter(x=>x(1).equals("M") && x(2).toInt>=18 && x(2).toInt<=24) .map(x=>x(0)).collect() //注意:HashSet()后面要带小括号 val userSet=HashSet() ++ userList //创建广播变量 val broadcastUserSet=sc.broadcast(userSet) //统计出18-24岁男性喜欢看的前10名电影的movieid和次数 val ratings=getRatings(sc) val topNMovies=ratings.map(x=>(x(0),x(1))) //ratings中所有的(userid,movieid) //从rating数据过滤出“18-24”的男性年轻人的观影信息 .filter(x=>broadcastUserSet.value.contains(x._1)) .map(x=>(x._2,1)) .reduceByKey(_+_) //(movieid,次数) .sortBy(_._2,false) .take(10) //(movieid,次数) val movies=getMovies(sc) //获取所有电影的(movieid,title) val movieTitle=movies.map(x=>(x(0),x(1))).collect().toMap topNMovies.map(x=>(movieTitle.getOrElse(x._1,null),x._2)) .foreach(x=>println(x._1+" "+x._2)) } /** * 女性看过最多的10部电影 */ def top10FaleLookMovie: Unit ={ val users = getUsers(sc) //获取所有女性的userid val faleUserId = users.filter(x => x(1).equals("F")) .map(x => x(0)).collect() val faleUserSet = HashSet() ++ faleUserId //创建广播变量,里面存储所有女性的userid val broadcastFaleSet = sc.broadcast(faleUserSet) val ratings = getRatings(sc) //统计出女性看过最多的10部电影的(movieid,观看次数) val top10moiveid = ratings.map(x => (x(0), x(1))) //(userid,movieid) //过滤出女性观影数据 .filter(x => broadcastFaleSet.value.contains(x._1)) .map(x => (x._2, 1)) //(movieid,1) .reduceByKey(_ + _) .sortBy(_._2, false) .take(10) val top10movieRDD=sc.parallelize(top10moiveid) //(movieid,次数) val movies=getMovies(sc) val allmoviesRDD=movies.map(x=>(x(0),x(1))) //(movieid,title) //对两个RDD进行join操作,取二者的共同匹配项 allmoviesRDD.join(top10movieRDD) //(movieid,(title,次数)) .map(x=>(x._1,x._2._1,x._2._2)) .foreach(x=>println(x._1+" "+x._2+" "+x._3)) } /** * 男性看过最多的10部电影 */ def top10MaleLookMovie: Unit ={ val users = getUsers(sc) //获取所有男性的userid val faleUserId = users.filter(x => x(1).equals("M")) .map(x => x(0)).collect() val faleUserSet = HashSet() ++ faleUserId //创建广播变量,里面存储所有男性的userid val broadcastFaleSet = sc.broadcast(faleUserSet) val ratings = getRatings(sc) //统计出男性看过最多的10部电影的(movieid,观看次数) val top10moiveid = ratings.map(x => (x(0), x(1))) //(userid,movieid) //过滤出男性观影数据 .filter(x => broadcastFaleSet.value.contains(x._1)) .map(x => (x._2, 1)) //(movieid,1) .reduceByKey(_ + _) .sortBy(_._2, false) .take(10) val top10movieRDD=sc.parallelize(top10moiveid) //(movieid,次数) val movies=getMovies(sc) val allmoviesRDD=movies.map(x=>(x(0),x(1))) //(movieid,title) //对两个RDD进行join操作,取二者的共同匹配项 allmoviesRDD.join(top10movieRDD) //(movieid,(title,次数)) .map(x=>(x._1,x._2._1,x._2._2)) .foreach(x=>println(x._1+" "+x._2+" "+x._3)) } }

  

原文地址:https://www.cnblogs.com/yjd_hycf_space/p/7105789.html