spark从入门开始01

spark从入门开始01-介绍、集群安装

1.开始:spark是什么?

  • 什么是spark?Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群。Spark得到了众多大数据公司的支持,这些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、

    腾讯、京东、携程、优酷土豆。当前百度的Spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用GraphX构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯Spark集群达到8000台的规模,是当前已知的世界上最大的Spark集群。

2.为什么要学Spark

  • Spark是一个开源的类似于Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Spark中的Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。它仅仅涉及数据计算,并没有涉及到数据存储,后期需要使用spark对接外部的数据源。比如hdfs。

3.Spark特性

  • 速度快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。

    比MapReduce快的原因:基于内存计算,在mapreduce任务后期计算每个job输出结果需要保存磁盘,后续其他job需要依赖于前面job输出结果。这样大量磁盘IO操作,性能比较低。而spark在进行计算时候,job可以保存在内存中,后续其他job需要依赖于前面job输出结果。这个时候就直接从内存中获得到。避免磁盘io操作,性能比较高。
    
    • 进程与线程
    mapreduce任务以进程 的方式运行在yarn集群中,比如程序由100个mapTask,一个task就需要一个进程,这些task要运行就需要开启100个进程。
    
    
    spark任务以线程的方式运行在进程中,比如程序中有100个mapTask,后期一个task就对应一个线程,这里就不在是进程,这些task需要运行,这里可以极端一点:
    	只需要开启一个进程,在这个进程中启动100个线程就可以了。进程中可以启动很多个线程,而开启一个进程与开启一个线程需要的时间和调度代价是不一样。开启一个进程需要的时间远远大于开启一个线程。
    
  • 易用性:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

  • 兼容性:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

  • 通用性:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

    sparksql	通过sql去开发spark程序做离线分析
    sparkStreaming	用来解决公司实时计算场景
    Mlib	封装了一些机器学习的算法库
    Graphx	图计算
    

4.Spark集群安装

4.1下载安装包

4.2解压:

tar -zxvf spark-2.2.0-bin-hadoop2.7.tgz -C /opt/spk/
# 重命名
mv spark-2.2.0-bin-hadoop2.7 spark

4.3配置conf

  • 进入配置文件:
[root@linux01 conf]# mv spark-env.sh.template spark-env.sh
[root@linux01 conf]# vim spark-env.sh
  • 配置spark-env.sh
# 配置java环境变量
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/
# 配置Spark集群的Master地址
export SPARK_MASTER_HOST=linux01
# 配置Spark集群的Master端口
export SPARK_MASTER_PORT=7077
  • slaves更改mv slaves.template slaves,并配置slaves
linux02
linux03
  • 将第一台spark文件传送到其他台服务器(linux02,linux03)上
scp -r spark linux02:$PWD
scp -r spark linux03:$PWD

4.4环境变量配置

vim /etc/profle
export SPARK_HOME=/opt/spk/spark
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin

5.Spark集群的启动和停止

  • 启停相关shell脚本都在sbin目录下
  • 启动:
[root@linux01 sbin]# ./start-all.sh
  • 停止
/stop-all.sh

6.Spark集群web界面

  • 通过访问http://linux01:8080/ 查看web界面:

可以查看 版本、work数量、CPU使用情况,内存、运行程序和完成程序等信息

7.基于Zookepeer构建高可用spark集群

  • 基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)。

    ​ 用于生产模式。其基本原理是通过zookeeper来选举一个Master,其他的Master处于Standby状态。将spark集群连接到同一个ZooKeeper实例并启动多个Master,利用zookeeper提供的选举和状态保存功能,可以使一个Master被选举成活着的master,而其他Master处于Standby状态。如果现任Master死去,另一个Master会通过选举产生,并恢复到旧的Master状态,然后恢复调度。整个恢复过程可能要1-2分钟。

  • 首先事先搭建好Zookepeer

  • 配置spark-env.sh

    # 配置Spark集群的Master端口
    export SPARK_MASTER_PORT=7077
    # 配置java环境变量
    export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/
    # 配置zk相关信息
    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER  -Dspark.deploy.zookeeper.url=linux01:2181,linux02:2181,linux03:2181  -Dspark.deploy.zookeeper.dir=/spark_save_zk"
    
    参数说明 
    spark.deploy.recoveryMode:恢复模式(Master重新启动的模式)
    有三种:(1)ZooKeeper (2) FileSystem (3)NONE
    spark.deploy.zookeeper.url:ZooKeeper的Server地址
    spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。
    包括Worker,Driver和Application。
    

    注意

    ​ 在普通模式下启动spark集群,只需要在主机上面执行start-all.sh 就可以了。

    ​ 在高可用模式下启动spark集群,先需要在任意一台节点上启动start-all.sh命令。然后在另外一台节点上单独启动master。命令start-master.sh。

  • 同步spark-env.sh

    scp spark-env.sh linux03:$PWD
    scp spark-env.sh linux02:$PWD
    
  • 三台机器启动zookepeer

    /opt/zookeeper-3.4.6/bin/zkServer.sh start /opt/zookeeper-3.4.6/conf/zoo.cfg
    

    或执行启动zookepeer脚本

    #! /bin/sh
    for host in linux01 linux02 linux03
    do
        ssh $host "source /etc/profile;nohub /opt/zookeeper-3.4.6/bin/zkServer.sh start /opt/zookeeper-3.4.6/conf/zoo.cfg > /dev/null 2>&1 &"
    done
    

    nohub后台启动

  • 启动Spark集群

    # 在linux01机器/opt/spk/spark/sbin上启动Spark:
    	./start-all.sh
    # jps查看
    	# 此时linux01是Master, linux02,linux03是worker
    # 此时要启动第二个Master,只需在linux02 或 linux03 上 sbin目录下执行
    	./start-master.sh
    

    启动Spark集群应满足:可以在任意一台服务器来执行(互相免密登录)。

  • 此时进入ZkCli.sh 执行ls / 会出现 spark_save_zk节点

  • 验证:

    kill -9 Spark的master进程id
    # 等待1-2分钟。
    
  • 恢复过程详解:

    在高可用模式下,整个spark集群就有很多个master,其中只有一个master被zk选举成活着的master,其他的多个master都处于standby,同时把整个spark集群的元数据信息通过zk中节点进行保存。
    后期如果活着的master挂掉。首先zk会感知到活着的master挂掉,下面开始在多个处于standby中的master进行选举,再次产生一个活着的master,这个活着的master会读取保存在zk节点中的spark集群元数据信息,恢复到上一次master的状态。整个过程在恢复的时候经历过了很多个不同的阶段,每个阶段都需要一定时间,最终恢复到上个活着的master的转态,整个恢复过程一般需要1-2分钟。
    
  • 在master的恢复阶段对任务的影响

    针对于正在运行的任务,由于它已经分配到了资源,它是不受任何影响.
    受影响的就是在当前这个挂掉的阶段,后面提交的新的任务,由于没有活着的master分配资源,该任务是无法运行。
    

8.Spark中角色介绍

  • Spark架构使用了分布式计算中master-slave模型,master是集群中含有master进程的节点,slave是集群中含有worker进程的节点。

    1.Driver Program :运⾏main函数并且新建SparkContext对象,该对象是所有程序执行入口。
    2.Application:基于Spark的应用程序,包含了driver(客户端)程序和集群上的executor(任务运行的资源信息)。
    3.Cluster Manager:指的是在集群上获取资源的外部服务。目前有三种类型
    (1)Standalone: spark原生的资源管理,由Master负责资源的分配
    (2)Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架
    (3)Hadoop Yarn: 主要是指Yarn中的ResourceManager
    4.Worker Node: 集群中任何可以运行Application代码的节点,在Standalone模式中指的是通过slaves文件配置的Worker节点,在Spark on Yarn模式下就是NodeManager节点
    5.Executor:是在一个worker node上为某应用启动的⼀个进程,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executor。
    6.Task :被送到某个executor上的工作单元。
    

9.运行Spark

9.1提交到spark集群中运行

  • 官网小例子,通过shell运行:
bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master spark://linux01:7077,linux02:7077 
--executor-memory 1G 
--total-executor-cores 1 
examples/jars/spark-examples_2.11-2.2.0.jar 
10
# 释义:
bin/spark-submit	提交任务shell脚本
--class org.apache.spark.examples.SparkPi	spark内置函数
--master spark://linux01:7077	指定计算的Master, 在不知道哪个Master是activate时候,将所有Master写上,会轮询寻找active的Master
--executor-memory	分配内存
--total-executor-cores	分配CPU数
examples/jars/spark-examples_2.11-2.2.0.jar		指定jar包
args	参数
  • 此时web端可以看到正在运行的应用

    • 正在运行状态:

    • 运行完成状态

10.spark-shell

  • spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

10.1 saprk-shell --master local[N] 读取本地文件

  • --master local[N]

    local  		表示程序本地进行计算,跟spark集群目前没有任何关系
    N	   		它是一个正整数,表示使用N个线程参与任务计算
    local[N]	 表示本地采用N个线程计算任务
    
    • 示例:
    bin目录下执行:spark-shell --master local[2]
    # local[2]表示启动2个线程执行任务
    
    Spark context Web UI available at http://10.0.0.134:4040#Jobs 的web端
    Spark context available as 'sc' (master = local[2], app id = local-1612664746787).# Spark context对象命名sc
    Spark session available as 'spark'.# spark2.0之后新特性
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.2.0
          /_/
             
    Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_262)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    
  • 示例:本地单词统计

    // 上面通过命令行进入shell命令操作spark, 并生成一个名为sc的上下文。
    sc.textFile("file:///opt/words.txt").flatMap(x => x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
    // 释义:
    	1.textFile读取本地文件。
    	2.flatMap将每行数据读取,用split按照空格切分
    	3.通过map设置单词为键,计数为值。
    	4.通过reduceByKey根据键进行分组聚合。
    
  • 上述单词统计一行便可以实现。

10.2 spark-shell --master local[N] 读取HDFS上文件进行单词统计

  • hdfs查看目录

    hdfs dfs -ls /
    hdfs dfs -cat /a.txt
    
  • 单词统计

    sc.textFile("hdfs://linux01:9000/a.txt").flatMap(x => x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
    
  • 现在默认指定本地路径,我们如何将查找文件指定hdfs文件呢。

    修改spark下conf的spark-env.sh
    vim spark-env.sh
    
    # spark指向hdfs,指定hadoop路径
    export HADOOP_CONF_DIR=/opt/hdp/hadoop-2.8.5/etc/hadoop
    
    # 更改信息。更新到其他机器上:
    scp spark-env.sh linux02:$PWD
    scp spark-env.sh linux03:$PWD
    
    # 重新进入spakr-shell的交互
    spark-shell --master local[2]
    
    # 重新执行单词统计,不用指定全路径也可以进行单词统计
    sc.textFile("/a.txt").flatMap(x => x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).collect
    
    

10.3 spark-shell --master local[N] 指定集群中活着的master去读取HDFS上文件进行单词统计

  • spak-shell 指定master地址。

    spark-shell --master spark://linux01:7077 --executor-memory 1g --total-executor-cores 2
    
    # 指定地址spark://linux01:7077 
    # --executor-memory指定内存资源 1g
    # --total-executor-core 指定cup数
    

10.4 读取hdfs上文件之后,需要把计算的结构保存在hdfs上

  • 只需要saveAsTextFile即可
sc.textFile("/a.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")
// /out在hdfs应为不存在文件夹,否则会报错

11.利用idea工具 ,scala语言开发spark程序实现单词统计

  • 在spark-shell仅仅在测试和验证时候用的多,在生产环境下,通常会在IDEA中编写程序,然后打成jar包,最后提交到集群上,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

  • IDEA开发spark程序

    • 创建一个Maven工程

    • 改成scala,右键项目->Add Framework Support->选择scala

    • pom.xml添加相关配置

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>xjk.sparkdemo1</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.2.0</spark.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
        </dependencies>
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <!--编译插件-->
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <!--打包插件-->
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass/>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
  • 代码示例:

    package com.xjk.skdemo1
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object wordCount {
      def main(args: Array[String]): Unit = {
        // 1。构建SparkConf对象,设置application名称 和 Master地址
        val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
        // 2.构建SparkContext对象,所有spark程序执行入口
        // 内部构建:DAGScheduler 和TaskScheduler对象
        val sc = new SparkContext(sparkConf)
    
        // 设置日志输出级别
        sc.setLogLevel("warn")
    
        // 3.读取数据
        val data: RDD[String] = sc.textFile("E:\words.txt")
        // 4.切分每一行,获取所有单词
        val words: RDD[String] = data.flatMap(x=>x.split(""))
        // 5. 记录单词个数
        val wordAndOne: RDD[(String, Int)] = words.map(x => (x, 1))
        // 6. 相同单词出现累加 1 操作
        val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x, y) => x + y)
    
        // 按照单词出现次数降序排列
        val sortedRdd: RDD[(String,Int)] = result.sortBy(x => x._2, false)
    
    
        // 7.数据打印
        val finalResult:Array[(String,Int)] = sortedRdd.collect()
        finalResult.foreach(x => println())
        // 8.关闭sc
        sc.stop()
      }
    }
    
    
  • 报错记录:Exception in thread “main” java.lang.NoSuchMethodError: scala.Predef..conforms()Lscala/Predef$$less$

    出现问题原因是由于Spark和scala版本差异,pom.xml的版本是2.11.8。本机scala版本是 2.12.8
    

    解决:file->Project settings ->Global Libraries->删除2.11.8版本的scala即可
    

12.scala语言开发spark程序实现单词统计提交任务到集群

  • 打包jar包运行在服务器上:

    package com.wordCountdemo2
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    
    object WordCountOnSpark {
      def main(args: Array[String]): Unit = {
        // 1。构建SparkConf对象,设置application名称 和 Master地址
        val sparkConf: SparkConf = new SparkConf().setAppName("WordCountOnSpark")
        // 2.构建SparkContext对象,所有spark程序执行入口
        // 内部构建:DAGScheduler 和TaskScheduler对象
        val sc = new SparkContext(sparkConf)
    
        // 设置日志输出级别
        sc.setLogLevel("warn")
    
        // 3.读取数据,动态传入
        val data: RDD[String] = sc.textFile(args(0))
        // 4.切分每一行,获取所有单词
        val words: RDD[String] = data.flatMap(x=>x.split(" "))
        // 5. 记录单词个数
        val wordAndOne: RDD[(String, Int)] = words.map(x => (x, 1))
        // 6. 相同单词出现累加 1 操作
        val result: RDD[(String, Int)] = wordAndOne.reduceByKey((x, y) => x + y)
        // 7.计算结果保存在hdfs上,动态传入
        result.saveAsTextFile(args(1))
        // 8.关闭sc
        sc.stop()
      }
    }
    
    
  • 打包jar包, Maven->Lifecycle->package。放到服务器运行:

    spark-submit --master [Master地址] --class [指定执行类] --executor-memory [分配内存大小] --total-executor-cores [cpu核数] [打包好的jar包名] [hdfs上进行数据处理文件] [hdfs保存路径]
    
    
    spark-submit --master spark://linux01:7077 --class com.wordCountdemo2.WordCountOnSpark --executor-memory 1g --total-executor-cores 1 original-com.xjk.sparkdemo002-1.0-SNAPSHOT.jar /a.txt /sparktest
    
    • 释义
    spark-submit 
    --master spark://linux01:7077  指定master地址
    --class com.wordCountdemo2.WordCountOnSpark 指定jar包主类
    --executor-memory 1g 分配内存
    --total-executor-cores 1 指定处理cpu核数
    original-com.xjk.sparkdemo002-1.0-SNAPSHOT.jar 指定jar包路径 
    /a.txt 指定要处理文件
    /sparktest 统计结果输出哪个位置(这里指的是hdfs的路径)
    
原文地址:https://www.cnblogs.com/xujunkai/p/14515172.html