Spark程序设计

 一、Spark编程模型

Spark常规步骤:

  1. 创建SparkContext对象:连接集群,提供创建RDD和广播变量等接口
  2. 输入数据:从Hadoop等外部数据源或Scala数据集创建RDD
    • 创建RDD-Scala集合/本地文件
      • 并行度3,创建3个partition
    •  HDFS

    •  文件被切分成block分布在多个节点上,通过textFile读入机器内存,转为RDD的partition对象。action触发之后,才真物理上去执行。

    • 总结
    • 外部的数据,比如HDFS上从外部来,变为RDD,不断转换也是RDD,
    • 内部的数据,比如一些变量,是Scala集合或容器,存在Driver当前APP单机的数据集,包括Broadcast变量,在单机有副本。而外部数据,最后通过collect等返回的数据,也返回到Driver单机数据
    • RDD上的数据,被每一台节点上spark的block manager管理的,里面存着。Driver节点上则知道全局RDD到底有哪些partition,究竟被每台机器怎么管理
    • broadcast变量声明初始化在Driver,可以分发到不同的节点上,节点上就有副本,供Task共享
  3. 处理:RDD上执行操作API
    • 核心API 按语义分类
  4. 输出:Action触发执行,保存结果到外部数据源或回收到Driver

其他[Optional]

  1. “共享变量”:Driver分发的全局变量:广播变量等
    • 声明初始化共享变量
    • 闭包传递,每个Task里都会有该变量,是和任务打包一起分发到节点上的
    • 使用broadcast,则每个Excutor只有一个,Task共享该变量,减少了数据传输,提高空间使用率

       2.Checkpoint:检查点备份

       3.Cache:缓存复用数据

       4.采样(sample):小数据集验证

       5.调试:take,foreach等

二、Spark优化

 通过程序或配置参数控制:

  • 控制任务并行度
  • 降低单条数据处理开销
  • 数据倾斜问题
  • RDD缓存复用
  • 操作符的选择
  • 为作业设置合理的资源

1.控制任务并行度

问题与影响

  • 数目过小:运行过慢,容易OOM
  • 数目过大:产生过多小任务,启动和调度开销较大

推荐数量

  • 每个CPU core对应2~3个Task

控制并行任务(Task)数量

  • Map任务并行度
    • 输入的Stage的Task并行度
    • 默认值:输入文件的数据块数量一致(HDFS block)
    • 通过API控制sc.textFile("input.txt", 100)
  • Reduce任务并行度
    • 默认值:使用parent RDD的partition数量
    • 通过设置配置spark.default.parallelism更改默认配置
    • 通过API控制groupByKey,reduceByKey等提供了相关参数
      • rdd.reduceByKey(_+_, 100)

2.降低单条记录开销

 mapPartitions

3.数据倾斜或任务倾斜问题

问题:某个任务T负载过大,造成拖慢整体Stage进度或Task出现异常

原因:任务T数据过多或任务T所在节点有问题

解决:

  • 数据:选择合适的Partition Key
  • 调度:spark.speculation设置为true
  • 节点:剔除所有问题节点

Stage0中下面Task数据量过大

可以将该Task根据随机数再切分为两个Task,三个Task处理的数据量基本就一致了

可以自定义方法解决该问题

 4.缓存重复使用RDD

 两个action

5.操作符的选择

6.为作业设置合理资源

7.监控与诊断

8.Spark参数配置方式

监控实例

作业提交到了Yarn

打开resourcemanager的端口,可看到所有正在运行或运行结束的Application

 点开正在运行的UserClick

 点击TracingUrl,进入Spark的监控界面

 如果是standalone模式,直接打开4040端口即可

三、案例

 电影受众分析

1.电影受众分析背景

 2.电影受众分析数据

用户数据:用户ID,性别,年龄,职业, 编码

电影数据:电影ID,电影名,风格

评价数据:用户ID,电影ID,评分,时间戳

3.电影受众分析任务

  • 看过“Sixteen Candles”用户年龄和性别分布
    • 电影受众分析数据:过滤
    • 连接
    • 分布:聚合运算,年龄和性别为key统计数量

创建Object

package org.sparkcourse.movie

import org.apache.spark._


object MovieUser{
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val master = if(args.length > 0) args(0).toString else "local"
    val datapath = if(args.length > 1) args(1).toString else "data/ml-1m"
    val conf = new SparkConf().setMaster(master).setAppName("MovieUser")
    val sc = new SparkContext(conf)
    //输入数据
    val usersRdd = sc.textFile(datapath+"/users.dat")
    val ratingsRdd = sc.textFile(datapath+"/ratings.dat")
    //抽取数据的属性,过滤符合条件的电影
    //RDD[(userId, (gender, age))]
    val users = usersRdd.map(_.split("::")).map(x=>{
      (x(0), (x(1), x(2)))
    })
    //RDD[(userID, movieID)] split返回数组
    val rating = ratingsRdd.map(_.split("::")).map(x =>
      (x(0), x(1))).filter(x => x._2.equals("2144"))
    //join两个数据集
    val userRating = rating.join(users)
    userRating.take(num=1).foreach(println(_))
    //统计分析
    val userDistribution = userRating.map(x=>{
      (x._2._2, 1)
    }).reduceByKey(_+_)
      .foreach(println(_))
    sc.stop()

  }
}

users  (userID, (gender, age))

rating  (userID, movieID)

userRating (userID, (movieID, (gender, age))) 例如,(4425, (2144, (M, 35)))

userDistribution ((gender, age), 1) 求和

  • 年龄段20-30的男性年轻人,最喜欢看哪10部电影
    • 年龄段:过滤
    • 最喜欢10部电影:聚合,排序
package org.sparkcourse.movie

import org.apache.spark._

import scala.collection.immutable.HashSet

object PopularMovie {
  def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setMaster("local").setAppName("PopularMovie")
    val sc = new SparkContext(conf)
    //输入数据
    val usersRdd = sc.textFile("data/ml-1m/users.dat")
    val ratingsRdd =  sc.textFile("data/ml-1m/ratings.dat")
    //抽取数据和过滤
    val users = usersRdd.map(_.split("::")).map(x=>{
      (x(0), x(2)) //userid, age
    }).filter(x=>x._2.toInt>=20&&x._2.toInt<=30)
      .map(_._1)
      .collect() // 变为了Driver上单机的变量,用广播变量发出去
    val userSet = HashSet() ++ users
    val broadcastUserSet = sc.broadcast(userSet)
    //聚合和排序
    val topKMovies = ratingsRdd.map(_.split("::"))
      .map(x=>{(x(0), x(1))}) //userid, movieid
      .filter(x => {
      broadcastUserSet.value.contains(x._1)
    }).map(x=>{
      (x._2, 1)
    }).reduceByKey(_+_)
      .map(x =>(x._2, x._1))
      .sortByKey(false) // ascending=false
      .map(x=>{(x._2, x._1)})
      .take(3)
      .foreach(println(_))
  }
}
  • 最受欢迎的前3部电影
package org.sparkcourse.movie

import org.apache.spark._

object TopKMovie {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("TopKMovie")
    val sc = new SparkContext(conf)

    // 输入
    val ratingsRdd = sc.textFile("data/ml-1m/ratings.dat")

    // 数据抽取
    val ratings = ratingsRdd.map(_.split("::"))
      .map(x => {
        (x(0), x(1), x(2)) // userid, movieid, rating
      })

    // 分析
    val topKScoreMostMovie = ratings.map(x => {
      (x._2, (x._3.toInt, 1)) // (movieid, (rating, 1))
    }).reduceByKey((v1, v2) => { 
      (v1._1 + v2._1, v1._2 + v2._2) // {movieid: (rating之和,数量之和)}
    }).map(x => {
      (x._2._1.toFloat / x._2._2.toFloat, x._1) // (平均分, movieid)
    }).sortByKey(false)
        .take(3)
        .foreach(println(_))
    sc.stop()
  }
}

 reduceByKey((v1, v2) => { (v1._1 + v2._1, v1._2 + v2._2)})

v1和v2都是value

即对于(movieid, (rating, 1))来说,是(rating, 1)

reducByKey对相同的键的进行value的操作

reduceByKey(binary_function)
reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

那么讲到这里,差不多函数功能已经明了了,而reduceByKey的是如何运行的呢?下面这张图就清楚了揭示了其原理:

 

它会在数据搬移以前,即在reduce之前就进行了reduce操作。

可以实现同样功能的还有GroupByKey函数,但是,groupbykey函数并不能提前进行reduce,也就是说,上面的处理过程会翻译成这样:

 

所以在处理大规模应用的时候,应该使用reduceByKey函数。

原文地址:https://www.cnblogs.com/aidata/p/11541657.html