大三寒假学习进度(十三)

今天主要学习了Spark环境的搭建以及一些RDD算子的学习

Spark环境搭建

比起hadoop的环境搭建,要搭建起一个Spark的学习环境不要太简单。我们使用Idea创建一个Maven项目,导入scala的支持,然后导入如下依赖:

 
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.1</version>
        </dependency>
    </dependencies>

编写一

/**
 * @Description:
 * @author: LiuGe
 * @date: 2021/1/25
 */
object Spark03_WordCount {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)

    val lines: RDD[String] = sc.textFile("datas")
    val words: RDD[String] = lines.flatMap(_.split(" "))
    val wordToOne = words.map {
      word => (word, 1)
    }
    // Spark框架提供了更多的功能,可以把分组和聚合用一个方法实现
    // reduceByKey:相同的key的数据可以对value进行reduce聚合
    val wordToCount = wordToOne.reduceByKey(_ + _)
    // 5.将转换结果采集到控制台打印出来
    val array = wordToCount.collect()
    array.foreach(println)
    // 关闭连接
    sc.stop()
  }

}

如果控制台能打印出结果,说明我们的本地测试环境就搭建完成了。但其实真正的spark运行环境不是这样的,我这里就不记录了。

常见RDD算子学习

什么是RDD算子

首先要明确的概念是RDD:

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

简单说,就是Spark中最小的单位。而RDD算子指的其实就是RDD的方法(函数)

map && flatmap && groupby && filter

基本和scala中的功能一致,不再细说

mapPartitions

首先先来看代码:

/**
 * @Description:
 * @author: LiuGe
 * @date: 2021/1/25
 */
object Spark02_RDD_Operator_Transform {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    // 算法 — mapPartitions
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    // 可以以分区为单位进行数据转换操作
    // 但会将整个分区的数组加载到内存中进行引用
    // 处理完的数据是不会释放的,存在对象的引用
    // 内存较小,数据量较大的场合下容易出现内存溢出
    val mapRDD: RDD[Int] = rdd.mapPartitions(iter => {
      println(">>>>>>>>>>")
      iter.map(_ * 2)
    })
    mapRDD.collect()
    sc.stop()
  }

}

可以看到,mapPartitions相比较于map,其实就是把对每个元素的处理变成了对整个分区的处理。这样的效率会更高,但由于它处理完的数据不会自动释放,在内存不够,数据量大的时候会出现内存溢出,要注意map和mapPartitions的选择。

mapPartitionsWithIndex

 
/**
 * @Description:
 * @author: LiuGe
 * @date: 2021/1/25
 */
object Spark03_RDD_Operator_Transform {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    // 算法 — mapPartitions
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    // [1,2], [3,4]
    val mpiRdd: RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {
      if (index == 1) iter else Nil.iterator
    })
    mpiRdd.collect().foreach(println)
    sc.stop()
  }

}

类似的,mapPartitionsWithIndex就是加上了每个分区的编号,让我们可以实现一些特殊的需求。

glom

 
/**
 * @Description:
 * @author: LiuGe
 * @date: 2021/1/25
 */
object Spark05_RDD_Operator_Transform {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    // 算法 — glom
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
    // List => Int
    // Int => Array
    val glomRDD: RDD[Array[Int]] = rdd.glom()
    glomRDD.collect().foreach(data => println(data.mkString(",")))
    sc.stop()
  }

}

glom,指的是将同一个分组的数据再转换成Array。

sample

 
/**
 * @Description:
 * @author: LiuGe
 * @date: 2021/1/25
 */
object Spark08_RDD_Operator_Transform {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    // 算法 — sample
    val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
    // sample算子需要传递三个参数
    // 1.第一个参数表示,抽取数据后是否将数据返回
    // 2.第二个参数表示 如果是抽取不放回的场合(基准值的概念),抽取放回的场合(条件概率)
    // 3.第三个参数 随机算法的种子 如果不传递时,会使用当前系统时间
//    println(rdd.sample(withReplacement = false, 0.4).collect().mkString(","))
    println(rdd.sample(withReplacement = true, 2).collect().mkString(","))

    sc.stop()
  }

}

sample,指采样方法,可以随机抽取一些数据,主要用来应对数据倾斜的情况。

distinct

 
/**
 * @Description:
 * @author: LiuGe
 * @date: 2021/1/25
 */
object Spark09_RDD_Operator_Transform {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    // 算法 — distinct
    val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))

    val rdd1: RDD[Int] = rdd.distinct()
    rdd1.collect().foreach(println)
    sc.stop()
  }

}

去重,非常常规的功能。

总结

总的来说,在花费了大量时间学习scala后,发现Spark学习起来十分简单,因为和scala的语法相似度十分高,基本就相当于在写scala的程序。也是没有白学那么久的scala

原文地址:https://www.cnblogs.com/hang-hang/p/14871726.html