Spark案例----影评分析(年份,电影id,电影名字,平均评分)

题目:

现有如此三份数据:(这里只需用后两份)
1、users.dat    数据格式为:  2::M::56::16::70072
对应字段为:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
对应字段中文解释:用户id,性别,年龄,职业,邮政编码
 
2、movies.dat        数据格式为:1::Toy Story (1995)::Animation|Children's|Comedy  ;  2::Jumanji (1995)::Adventure|Children's|Fantasy  ;  3::Grumpier Old Men (1995)::Comedy|Romance
对应字段为:MovieID BigInt, Title String, Genres String
对应字段中文解释:电影ID,电影名字,电影类型
 
3、ratings.dat        数据格式为:  1::1193::5::978300760  ;  1::661::3::978302109  ;  1::914::3::978301968
对应字段为:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
对应字段中文解释:用户ID,电影ID,评分,评分时间戳
 
用户ID,电影ID,评分,评分时间戳,性别,年龄,职业,邮政编码,电影名字,电影类型
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

需求说明:
关联两张表。
按照年份进行分组。计算每部电影的平均评分,平均评分保留小数点后一位,并按评分大小进行排序。
评分一样,按照电影名排序。相同年份的输出到一个文件中。
结果展示形式:
    年份,电影id,电影名字,平均分。
尝试使用自定义分区、自定义排序和缓冲。

思路:

 * 整体实现思路
 * 1、先处理评分数据,计算出电影id、平均评分
 * 2、再处理电影数据,提取出电影id、电影年份、电影名
 * 3、将上两步计算出的数据根据电影id进行关联join
 * 4、提取出年份、电影id、电影名、平均评分封装到对象里,并将年份作为key、对象作为value进行分区
 * 5、自定义分区规则,将相同年份的放到一个分区中,实现相同年份的写入到一个结果文件中
 * 6、对象实现排序规则,对分区内的数据进行排序

撸代码:

package com.lhb.demo;

import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Test1AvgRate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("GroupFavTeacher")
      .set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)
    //先读取电影id和评分
    val rdd2: RDD[String] = sc.textFile("目录\ratings.dat")
    //按指定分割符切分并过滤脏数据
    val filterData = rdd2.map(_.split("::")).filter(_.length >= 4)
    //去除掉不必要的字段,并按电影id分组,计算每部电影的平均分
    val movieavg = filterData.map(arr => {
      (arr(1), arr(2).toDouble)
    })
      .groupBy(_._1).mapValues(iter => {
      val num: Int = iter.size
      val sum = iter.map(_._2).sum
      (sum / num).formatted("%.1f")
    })
    //截取电影id、电影名字、年份
    val movies = sc.textFile("目录\movies.dat")
    val sp1 = movies.map(_.split("::")).filter(_.length >= 3)
    val unit = sp1.map(arr => {
      val movieId = arr(0)
      val movieName = arr(1)
      val year = arr(1).substring(arr(1).length - 5, arr(1).length - 1)
      (movieId, (year, movieName))
    })
    //将平均评分和电影数据进行join,将数据封装到对象中
    //RDD[(String, ((String, String), String))]
    val allData = unit.join(movieavg)
      .map(x => {
        val year = x._2._1._1
        val movieId = x._1
        val movieName = x._2._1._2
        val avg = x._2._2.toFloat
        (year, MovieBean(year, movieId, movieName, avg))
      })
    val years: Array[String] = allData.map(_._1).distinct().collect()
    allData.partitionBy(new MyPatitioner(years))
      .mapPartitions(iter => {
        iter.toList.sortBy(_._2).toIterator
      }).map(t => t._2.year + "," + t._2.movieId + "," + t._2.movieName + "," + t._2.avg)
      .saveAsTextFile("输出目录\output")
    sc.stop()
  }
}

//自定义分区器,通过年份进行分区
class MyPatitioner(years: Array[String]) extends Partitioner {
  override def numPartitions: Int = years.length
  override def getPartition(key: Any): Int = {
    val year = key.asInstanceOf[String]
    years.indexOf(year)
  }
}

//自定义对象封装数据,并实现对应的比较逻辑
case class MovieBean(year: String, movieId: String, movieName: String, avg: Float) extends Ordered[MovieBean] {
  override def compare(that: MovieBean): Int = {
    if (that.avg == this.avg) {
      if (that.movieName.compareTo(this.movieName) > 0)
        -1
      else
        1
    } else {
      if (that.avg > this.avg)
        1
      else
        -1
    }
  }
}

运行部分结果如下:

数据如下:

链接: https://pan.baidu.com/s/1hc84MTWm5xosl4o_LrGoSw 提取码: z59t 

MapReduce案例----影评分析(年份,电影id,电影名字,平均评分)

原文地址:https://www.cnblogs.com/hong-bo/p/11730396.html