第八章 Spark安装及配置

 

第八章 Spark安装及配置

 

一、Spark简介

官方文档:http://spark.apache.org/

Lightning-fast cluster computing:快如闪电的集群计算。

Apache Spark™ is a unified analytics engine for large-scale data processing:大规模快速通用的计算引擎。

1、速度Speed

Run workloads 100x faster:比hadoop 100x

 

2、使用Ease of Use

Write applications quickly in Java, Scala, Python, R, and SQL:支持java / Scala /R /python快速构建应用

3、通用Generality

Combine SQL, streaming, and complex analytics:组合SQL ,流计算 + 复杂分析。

 

spark模块:
Spark core //核心模块
Spark SQL //SQL
Spark Streaming //流计算
Spark MLlib //机器学习
Spark graph //图计算

4、运行在任何地方Runs Everywhere

Spark runs on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud. It can access diverse data sourcesSpark运行在Hadoop、Apache Mesos、Kubernetes独立模式或云中。它可以访问不同的数据源.

二、 开发工具环境搭建

软件要求

1) Windows 7操作系统以上

2) JDK 8

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

 

3) hadoop-common-2.2.0-bin

https://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip

 

4) maven

http://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip

 

5) eclipse for scala

http://scala-ide.org/download/sdk.html

JDK环境配置

见第七章

Hadoop环境配置

spark虽然支持standalone模式,并且不依赖hadoop。但是在windows环境下需要hadoop的这个winutils.exe。因此需要下载版本匹配的winutils.exe

 

2.1 下载hadoop-common-2.2.0-bin

https://github.com/srccodes/hadoop-common-2.2.0-bin/archive/master.zip

 

2.2 将压缩包解压到本地磁盘

D:BaiduNetdiskDownloadhadoop-common-2.2.0-bin-master

2.3 查看是否兼容

进入命令行模式,在D:BaiduNetdiskDownloadhadoop-common-2.2.0-bin-masterin目录下运行 winutils.exe, 查看是否与当前操作系统兼容.

2.4 设置HADOOP_HOME

在操作系统的环境变量里设置

HDAOOP_HOME=D:BaiduNetdiskDownloadhadoop-common-2.2.0-bin-master

Maven环境配置

3.1 下载maven-3.6.3

http://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zip

 

3.2 解压maven的压缩文件

解压到D:spark的目录下,maven主目录 D:sparkapache-maven-3.6.3

 

3.3 配置环境变量

新增MAVEN_HOME=D:sparkapache-maven-3.6.3

在系统变量Path中追加 %MAVEN_HOME%bin

打开cmd验证mvn  -version是否安装成功

ScalaIDE for eclipse环境配置

见第七章

三、Spark开发环境搭建(Windows

需求:使用Spark进行单词次数统计

创建Scala Maven工程

1.1 打开Eclipse

 

1.2 选择工作区

工作区可根据个人爱好设置

 

1.3 创建maven工程

1)点击菜单“File” —>”New”—>”Other”, 打开Maven新工程“创建”向导窗口如下图所示,

2)选择‘Maven Project’,进入如下所示的弹窗。

3)点击上图中的Next”按钮,进入下一步,弹窗下图所示:

输入上图所指内容。

4)完成Maven工程创建,如下图所示,

为工程增加scala属性

在工程上点击鼠标右键,增加scala属性

如果scala版本不一致,请更换,同时更换JDK版本,最终项目如下所示:

 

设置maven工程的配置内容

3.1 修改pom.xml

增加如下内容:

<!-- 引入spark核心包,2.11是scala版本,2.1.0是spark版本 -->    
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>

编写Scala 代码

4.1 创建scala代码目录

1) 选择创建 scala代码目录

2) 在弹出的窗口中指定scala代码目录

 

4.2 创建package

1)选择创建Scala package

 

 

2) 在弹出的窗口中输入指定Scala package包名

4.3 创建Scala Object

1)选择创建Scala Object

 

2)在弹出的窗口中输入Scala Object名

3)完成的结果

 

4.4 编写代码

在WordCount.scala文件中补充如下代码

  def main(args: Array[String]) {
        //创建Spark配置对象
        val conf = new SparkConf().setAppName("WordCount")
        //设置master属性
        conf.setMaster("local")
        //通过conf创建sc
        val sc = new SparkContext(conf)
        //加载文本文件
        val rdd1 = sc.textFile("file:///D:/spark/words.txt")
        //将数据进行切分,并压扁
        val rdd2 = rdd1.flatMap(line => line.split(" "))
        //遍历当前数据组成二元组w =>(w,1)
        val rdd3 = rdd2.map(word => (word, 1))
        //进行聚合操作,相同key的value进行相加
        val rdd4 = rdd3.reduceByKey(_ + _)
        //collect方法用于将spark的RDD类型转化为我们熟知的java常见类型
        val r = rdd4.collect()
        //循环按行打印结果
        r.foreach(println)
    }

运行Spark 单词次数统计程序 

5.1 准备数据原料

d盘中创建 words.txt(D:/spark/words.txt),输入内容如下

abc cdf bbb
aaa bbb
ddd ghb aaa aaa
aaa
adfs
aaa

5.2 运行WordCount.scala 

1)WordCount.scala文件里点击鼠标右键,选择‘Run As’—>”Scala Application”

2) 运行结果

四、Spark集群环境搭建(Linux)

集群规划

01node 192.168.11.130

02node 192.168.11.131

03node     192.168.11.132

启动Hadoop  请跳过此步骤

安装Scala   请跳过此步骤

1、单机Scala安装

见第七章 二、Scala运行环境安装(Linux)

2、进入/usr/local目录,分发Scala

scp -r scala root@02node:/usr/local

scp -r scala root@03node:/usr/local

3、分发或直接修改配置文件/etc/profile

scp /etc/profile root@02node:/usr/local

scp /etc/profile root@03node:/usr/local

记得刷新配置文件:source /usr/local/profile

安装Spark

1、下载Spark

http://us.mirrors.quenda.co/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz

2、解压

tar -zvxf spark-2.4.5-bin-hadoop2.7.tgz

3、修改名称

mv  spark-2.4.5-bin-hadoop2.7   spark

4.配置slaves

cd spark/conf
cp slaves.template slaves
vi slaves,写入如下内容
02node
03node

5.配置spark-env.sh

cp spark-env.sh.template spark-env.sh
vi spark-env.sh写入如下内容
export JAVA_HOME=/usr/local/jdk

6.将配置好的spark拷贝到02node03node节点上

scp -r spark root@02node:/usr/local
scp -r spark root@03node:/usr/local

7.在三台机器上配置环境变量

vi /etc/profile
export SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
source /etc/profile

8.启动

启动spark/usr/local/spark/sbin/start-all.sh

    http://192.168.11.130:8080/ 查看是否成功

关闭spark/usr/local/spark/sbin/stop-all.sh

s

Spark测试

1)本地文件测试

启动spark-shell,连接spark集群上:spark-shell --master spark://01node:7077

就可以进行spark应用的开发了,比如统计单词个数,还是以上次课堂的文档为例:

words.txt内容如下,将words.txt分发(或通过复制)到01node02node03node三台主机的/usr/local/spark/目录下。

abc cdf bbb
aaa bbb
ddd ghb aaa aaa
aaa
adfs
aaa

注意:为什么要分发到所有节点?文件读取发生在executor节点上。为了让代码工作,应该在所有节点上分发文件。

//单词统计1
$scala>val rdd1 = sc.textFile("/usr/local/spark/words.txt")
$scala>val rdd2 = rdd1.flatMap(line=>line.split(" "))
$scala>val rdd3 = rdd2.map(word => (word,1))
$scala>val rdd4 = rdd3.reduceByKey(_ + _)
$scala>rdd4.collect

//单词统计2
$scala> sc.textFile("/usr/local/spark/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect

2hdfs测试

启动hdfs,直接启动hadoop即可,但是我们只使用了hadoop里面的hdfs

创建文件夹:

[root@01node ~]# hdfs dfs -mkdir /spark

将测试文件word.txt上传到hdfs

[root@01node ~]# hdfs dfs -put /usr/local/spark/words.txt  /spark

进行单词统计:

$scala> sc.textFile("hdfs://01node:9000/spark/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect

五、理解Spark

Spark是基于hadoopMapReduce,扩展MapReduce模型,高效使用MapReduce模型,内存型集群计算,提高应用程序的处理速度。

Spark核心概念介绍:

SparkContextSpark的核心模块

//创建Spark配置对象
val conf = new SparkConf().setAppName("WordCount")
//设置master属性
conf.setMaster("local")
//通过conf创建sc
val sc = new SparkContext(conf)

通过创建SparkConf对象来配置应用,然后基于这个SparkConf创建一个SparkContext对象。驱动器程序通过SparkContext对象来访问Spark。这个对象代表对计算集群的一个连接,它是程序的主要入口点。一旦有了SparkContext,就可以用它来创建RDD任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数。初始化后,就可以使用SparkContext对象所包含的各种方法来创建和操作RDD和共享变量。

1master(local) 设置要连接的master URL,例如:

“local”在本地运行
“local[4]”以4核在本地运行
“spark://master:7077”在spark独立集群上运行

2appName( ) 设置将在spark Web UI中显示的应用程序的名称。如果未设置应用程序名称,则将使用随机生成的名称。

RDDResilient distributed dataset弹性分布式数据集

//加载文本文件,可以是本地文件,也可以是分布式文件系统
//RDD : Resilient distributed dataset,弹性分布式数据集。
val rdd1 = sc.textFile("file:///D:/spark/words.txt")
//将数据按空格进行切分,并压扁
val rdd2 = rdd1.flatMap(line => line.split(" "))
//遍历当前数据组成二元组w =>(w,1)
val rdd3 = rdd2.map(word => (word, 1))
//进行聚合操作,相同key的value进行相加
val rdd4 = rdd3.reduceByKey(_ + _)
//collect方法用于将spark的RDD类型转化为我们熟知的java常见类型
val r = rdd4.collect()
//循环按行打印结果
r.foreach(println)

RDD是只读的记录分区集合。RDD具有容错机制。RDDSpark的基本数据结构,是不可变数据集。RDD中的数据集进行逻辑分区,每个分区可以单独在集群节点进行计算。可以包含任何类型对象,包括javascalapython等,甚至是自定义的类型。

创建RDD方式有两种:

  1. 对现有集合进行并行处理,如Arraylist

Spark Shell交互模式下,输入如下Scala语言内容:

val array = Array(1,2,3,4,5)
val rdd = sc.parallelize(array)

Spark Shell交互模式下,输入如下Scala语言内容:

val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)

 2.引用外部的数据存储集,如HDFS、HBase

spark shell模式下,输入如下scala语言内容

val lines = sc.textFile("file:///f:/gitee/spark-project/dataset/helloworld")

Spark通过RDD实现了快速而高效的MapReduce操作,下面就让我们来讨论为什么Spark能够替代MapReduce操作?或者为什么MapReduce效率低?

MapReduce里面进行数据共享是比较慢的。MapReduce被广泛采用于大型的数据集处理系统,不幸的是大部分并行框架唯一的方式就是将数据存储到外部磁盘上,导致大部分应用在使用hadoop时,90%时间是在进行数据的读写。

使用Spark RDD进行数据分享。认识到MapReduce的缺陷后研究者开发了Spark,而Spark的理念就是RDD:分布式弹性数据集,它支持内存处理计算,在job间进行数据共享。内存的IO速率高于网络和disk10 ~ 100之间。

每个RDD对象中包含5个主要属性:

1.分区列表

2.针对每个Split的计算函数

3.对其他RDD的依赖列表

4.可选,如果是Key-Value RDD的话,可以带分区类

5.可选,首选块位置列表(HDFS block location)

六、学生成绩分析

需求:得到成绩为100分的学生ID

  1. 准备数据原料

数据内容如下,放入/usr/local/spark目录下

大数据课程成绩

result_bigdata.txt

1001 大数据基础 90

1002 大数据基础 94

1003 大数据基础 100

1004 大数据基础 99

1005 大数据基础 90

1006 大数据基础 93

1007 大数据基础 100

1008 大数据基础 93

1009 大数据基础 89

1010 大数据基础 78

1011 大数据基础 91

1012 大数据基础 84

应用数学课程成绩

result_math.txt

1001 应用数学 90

1002 应用数学 94

1003 应用数学 100

1004 应用数学 100

1005 应用数学 94

1006 应用数学 80

1007 应用数学 90

1008 应用数学 94

1009 应用数学 84

1010 应用数学 86

1011 应用数学 79

1012 应用数学 91

2.获取学生数学成绩的map映射

在Spark Shell模式下,输入如下scala语言内容:

val math = sc.textFile("/usr/local/spark/result_math.txt")
val m_math = math.map{x=> val line=x.split(" ");(line(0),line(1),line(2).toInt)}

3.获取学生大数据课程成绩的map映射

spark shell模式下,输入如下scala语言内容

val bigdata = sc.textFile("/usr/local/spark/result_bigdata.txt")
val m_bigdata = bigdata.map{x=>val line=x.split(" ");(line(0),line(1),line(2).toInt)}

4.获取学生成绩100的学生id

spark shell模式下,输入如下scala语言内容

val student_100 = m_math.filter(_._3 == 100).union(m_bigdata.filter(_._3 == 100))
val result = student_100.map(_._1).distinct
val resultArray = result.collect

使用Spark SQL进行排序

Spark SQL支持三种窗口函数:排名函数、分析函数和聚合函数。

案例数据,/usr/local/spark/score.json,学生名字、课程、分数:

{"name":"A","lesson":"Math","score":100}
{"name":"B","lesson":"Math","score":100}
{"name":"C","lesson":"Math","score":99}
{"name":"D","lesson":"Math","score":98}
{"name":"A","lesson":"E","score":100}
{"name":"B","lesson":"E","score":99}
{"name":"C","lesson":"E","score":99}
{"name":"D","lesson":"E","score":98}

import org.apache.spark.sql.expressions.Windowspark shell模式下,输入如下scala语言内容:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
val df = hiveContext.read.json("/usr/local/spark/score.json")
case class Score(val name: String, val lesson: String, val score: Int)
df.registerTempTable("score") // 注册临时表
sc.setLogLevel("WARN") // 日志级别,可不改
val hiveContext = new HiveContext(sc)
// SQL语句
val stat = "select".
concat(" name,lesson,score, ").
concat(" ntile(2) over (partition by lesson order by score desc ) as ntile_2,").
concat(" ntile(3) over (partition by lesson order by score desc ) as ntile_3,").
concat(" row_number() over (partition by lesson order by score desc ) as row_number,").
concat(" rank() over (partition by lesson order by score desc ) as rank, ").
concat(" dense_rank() over (partition by lesson order by score desc ) as dense_rank, ").
concat(" percent_rank() over (partition by lesson order by score desc ) as percent_rank ").
concat(" from score ").
concat(" order by lesson,name,score")
hiveContext.sql(stat).show // 执行语句得到的结果

rank遇到相同的数据则rank并列,因此rank值可能是不连续的 

  • dense_rank遇到相同的数据则rank并列,但是rank值一定是连续的
  • row_number 很单纯的行号,类似excel的行号,不会因为数据相同而rank的值重复或者有间隔
  • percent_rank = 相同的分组中 (rank -1) / ( count(score) - 1 )
  • ntile(n) 是将同一组数据 循环的往n个 桶中放,返回对应的桶的indexindex1开始。
原文地址:https://www.cnblogs.com/zhouyeqin/p/15516267.html