Spark简介

spark环境配置###

本地执行spark程序:
1、安装jdk和scala,配置环境变量
(由于环境不一样,我的操作系统是win8.1,scala必须是Scala 2.10.x版本,否则会报出Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;)
2、在环境变量中设置HADOOP-HOME的路径,然后需要下载winutils.exe,将其放在 %HADOOP-HOME%in目录下
3、在http://spark.apache.org/downloads.html中下载spark依赖包

4、新建scala工程,将依赖包导入

5、新建scala文件,执行程序,在程序中需要指定Master和AppName

或者在spark shell中执行,也相当于在本机上运行
在eb170上可以直接spark-shell打开shell命令窗口

集群执行spark程序:
1、安装jdk、scala和sbt,配置环境变量
2、在sbt目录下conf/sbtconfig.txt的最后增加下面两行,可以指定sbt下载的jar包存放的位置
-Dsbt.boot.directory=D:/sbt/boot/
-Dsbt.ivy.home=D:/sbt/ivy/
3、在IDEA中下载scala插件

4、新建sbt工程,在build.sbt中添加以下依赖:

5、新建scala文件,写一个简单的wordcount程序

6、点击refresh project 然后会自动下载sbt所需要的spark-core组件,下载完成后程序将不再报错
7、打包:在build.sbt文件所在目录下执行sbt package打包
8、将jar包放到集群上运行

spark 1.0生态圈

spark运行架构

在分布式环境下,spark集群采用主/从结构,在一个spark集群中,有一个节点负责中央协调,调度各个分布式节点,这个中央协调节点被称为驱动器节点,与之对应的节点被称为执行器节点,驱动器节点可以和大量的执行器节点进行通信,它们也都作为独立的java进程运行,驱动器节点和所有的执行器节点一起被称为一个spark应用。
spark应用通过一个叫做集群管理器的外部服务在集群中的机器上启动,spark自带的集群管理器被称为独立集群管理器。spark也能运行在Hadoop YARN和Apache Mesos这两大开源集群管理器上。

驱动器节点

当启动spark shell,就启动了一个spark驱动器程序,它执行程序中的main()方法,驱动器程序一旦终止,spark应用也就结束了。驱动器程序在spark应用中有两个职责:

  1. 把用户程序转化成任务
    spark驱动器程序负责把用户程序转为多个物理执行的单元,这些单元也被称为任务。从上层来看,所有spark程序都遵循同样的结构:创建RDD--转化派生RDD--行动操作存储结果,实际上即使隐式创建出了一个由操作组成的逻辑上的有向无环图,当驱动器程序运行时,它会把这个逻辑图转为物理执行计划。 这会在下面进行详细介绍。
    spark会对逻辑执行计划作一些优化,比如将连续的映射转化为流水线化执行,将多个操作合并到一个步骤等,这样spark将逻辑计划转为一系列步骤,每个步骤又由多个任务组成,任务是spark中最小的工作单元,用户程序通常要启动成百上千的独立任务。
  2. 为执行器节点调度任务
    有了物理执行计划之后,spark驱动器程序必须在各个执行器进程间协调任务的调度,执行器进程启动后,会向驱动器进程注册自己。因此,驱动器进程始终对应用中所有的执行器节点有完整的记录,每个执行器节点代表一个能够处理任务和存储RDD数据的进程。
    spark驱动器程序会根据当前得执行器节点集合,尝试把所有的任务基于数据所在的位置分配给合适的执行器进程。当任务执行时,执行器进程会把缓存数据存储起来,而驱动器进程同样会跟踪这些缓存数据的位置,并且利用这些位置信息来调度以后的任务,以尽量减少数据的网络传输。
执行器节点

spark应用启动时,执行器节点就被同时启动,并且伴随着整个spark应用的生命周期而存在,如果有执行器节点发生异常或崩溃,spark应用也可以正常执行。
执行器节点有两大作用:

  1. 负责运行组成spark应用的任务,并将结果返回给驱动器进程;
  2. 通过自身的块管理器,为用户程序中要求的缓存RDD提供内存式存储。因此任务可以在运行时充分利用缓存数据加速运算。
集群管理器

spark依赖集群管理器来启动执行器节点,而在某些特殊情况下,也依赖集群管理器来启动驱动器节点。

spark运行机制总结
  1. 用户通过spark-submit脚本提交应用到集群管理器;
  2. spark-submit脚本启动驱动器程序,调用用户定义的main()方法;
  3. 驱动器程序与集群管理器通信,申请资源以启动执行器节点;
  4. 集群管理器为驱动器程序启动执行器节点;
  5. 驱动器进程执行用户应用中的操作,根据程序中定义的对RDD的转化操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进程;
  6. 任务在执行器程序中进行计算并保存结果;
  7. 如果驱动器程序中的main()方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。

scala实现单词计数

java实现单词计数

spark的优点

  1. spark能够在内存中进行计算,因而更快,即使是必须在硬盘上进行的复杂计算,spark依然比MapReduce更加高效。
  2. spark支持多种计算模式,包括批处理、迭代计算、交互式查询、流处理,通过在一个统一框架下支持这些不同的计算,spark可以简单而低耗的把各种处理流程整合到一起。主要为了交互式查询和迭代算法设计,因其在MapReduce中表现得效率低下;同时还支持内存式存储和高效的容错机制。
  3. spark提供了丰富的API。

spark程序或shell会话的工作方式

  1. 从外部数据创建出输入RDD;
  2. 使用转化操作对RDD进行转化,以定义新的RDD;
  3. 缓存需要被重用的中间结果RDD;
  4. 使用行动操作触发一次并行计算,spark会对计算进行优化后再执行。
    spark在被转化的RDD第一个行动操作中用到时才进行计算,避免浪费存储空间(惰性求值)

数据持久化(缓存)

为了避免多次计算同一个RDD,可以让spark对数据进行持久化,这时,计算出RDD的节点会分别保存它们所求出的分区数据,如果有一个有持久化数据的节点发生故障,spark会在需要用到缓存的数据时重算丢失的数据分区,如果不希望影响执行速度,也可以把数据备份到多个节点上。
spark中有多种持久化级别,默认情况是以序列化后的对象存储在JVM堆空间中。

选择何种存储级别?

Spark的存储级别是为了提供内存使用率和CPU效率的均衡。我们建议您通过以下方式来选择:

  1. 如果你的RDD在默认的存储级别下工作的很好,就不要用其它的级别。这是最有CPU效率的选项,允许RDD上的操作尽快的完成。
  2. 如果不行,试下MEMORY_ONLY_SER并使用一个能快速序列化的库,这样更节省空间,同时访问速度也比较快。
  3. 不要存储数据到磁盘,除非在数据集上的计算操作是昂贵的,或者过滤了大量的数据。否则重新计算可能比从磁盘中读取更快
  4. 使用备份级别,如果你需要更快的恢复。(比如,使用Spark为网络应用程序提供服务)。所有的存储级别都通过重新计算提供了全面的容错性,但是备份级别允许你继续在RDD上执行任务而无需重新计算丢失的分区。
  5. 在拥有大量内存和多应用程序的环境中,实验下OFF_HEAP方式有以下优势:
    允许多个执行体共享同一个内存池。
    显著地减少了垃圾回收……
    如果某个执行体崩溃,缓存数据不会丢失。

如果要缓存的数据太多,内存中放不下,spark会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就需要重新计算,但是对于使用内存与硬盘的缓存级别的分区来说,被移除的分区都会写入磁盘。另外,RDD的unpersist()方法可以手动把持久化的RDD从缓存中移除。

物理执行计划

上面说到驱动器程序会把逻辑图转化为物理执行计划,这里详细举一个例子:

这一系列命令生成了一个叫做counts的RDD,在shell中执行完这些命令以后,程序没有执行任何行动操作(只有当行动操作被触发,程序才会运行),相反,程序定义了一个RDD对象的有向无环图。每个RDD维护了其指向一个或多个父节点的引用,以及表示其与父节点之间关系的信息。
当调用行动操作counts.collect(),spark调度器会创建出用于计算行动操作的RDD物理执行计划,RDD的每个分区都会被物化出来并发送到驱动器程序中,向上回溯所有必须计算的RDD。调度器为有向图中每个RDD输出计算步骤,步骤中包括RDD上需要应用于每个分区的任务,然后以相反的顺序执行这些步骤,计算得出最终所求的RDD。
当RDD不需要混洗数据就可以从父节点计算出来时,调度器就会自动进行流水线执行,如上例中实际上物理执行只需要两个步骤:

除了流水线执行的优化,当一个RDD已经缓存在集群内存或磁盘时,spark的内部调度器也会自动截短RDD谱系图,在这种情况下,spark会“短路”求值,直接基于缓存下来的RDD进行计算。还有一种截短RDD谱系图的情况发生在当RDD已经在之前的数据混洗中作为副产品物化出来时,哪怕该RDD并没有被显式调用persist()方法。这种内部优化是基于spark数据混洗操作的输出均被写入磁盘的特性,同时也充分利用了RDD图的某些部分会被多次计算的事实。

总结一下spark执行时流程

用户代码定义RDD的有向无环图

RDD上的操作会创建出新的RDD,并引用它们的父节点,这样就创建出了一个图。

行动操作把有向无环图强制转译为执行计划

当调用RDD的一个行动操作时,这个RDD就必须计算出来,这也要求计算出该RDD的父节点。spark调度器提交一个作业来计算所有必要的RDD,这个作业会包含一个或多个步骤,每个步骤其实也就是一波并行执行的计算任务,一个步骤对应有向无环图中的一个或多个RDD,对应多个RDD是因为发生了流水线执行。

任务于集群中调度并执行

步骤是按顺序处理的,任务则独立的启动来计算RDD的一部分,一旦作业的最后一个步骤结束,一个行动操作也就执行完毕了。

spark调优

并行度

输入RDD一般会根据其底层的存储系统选择并行度。例如,从HDFS上读数据的输入RDD会为数据在HDFS上的每个文件区块创建一个分区,从数据混洗后的RDD派生下来的RDD则会采用与其父RDD相同的并行度。
spark提供了两种方法来对操作的并行度进行调优。第一种是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度;第二种方法是对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数。

内存管理

在各个执行器进程中,内存有一下几种用途:

  1. RDD存储。当调用RDD的persist()或cache()方法时,这个RDD的分区会被存储到缓存区中。
  2. 数据混洗与聚合的缓存区。当进行数据混洗时,spark会创建出一些中间缓存区来存储数据混洗的输出数据。这些缓存区用来存储聚合操作的中间结果,以及数据混洗操作中直接输出的部分缓存数据。
  3. 用户代码。数组或者对象会占用内存,用户代码可以访问JVM堆空间中除分配给RDD存储和数据混洗存储以外的全部剩余空间。
    默认情况下,上面三种比例为6:2:2

除了调整内存各区域比例,我们还可以改进缓存行为:

  1. 将存储等级由MEMORY-ONLY调用cache()改为MEMORY-AND-DISK调用persist(),会有更好的效果,这样多出的分区数据会写入磁盘,避免重算。
  2. 缓存序列化后的对象而非直接缓存。可以通过MEMORY-ONLY-SER或MEMORY-AND-DISK-SER来实现。由于垃圾回收的代价与堆里的对象数目有关,而非数据字节数,因此当需要以对象的形式缓存大量数据或长时间的垃圾回收暂停,应该考虑配置这个选项,这样虽然缓存过程变慢(序列化对象会有消耗),但是会显著减少垃圾回收的时间。

sbt构建spark项目之坑

1. 代码目录结构不对
在linux下使用find .命令查看代码文件目录,必须是以下这样的结构,否则打包运行后会出现Classnotfoundexception

2. scala版本和包版本要对应
笔者使用的scalaSDK是2.10.4,引用的包如下:

maven构建spark项目之坑

除了以上两点代码目录结构和版本一致问题,用maven构建spark项目有更多意想不到的坑
1. 新建的scala文件用find .命令需要看到.scala后缀
笔者在新建文件时没有事先添加scala的SDK,因此新建的时候没有scala文件选项,然后自己强行手动将文件改成了scala文件,看着和scala文件是一样的,但是IDEA还是认为不一样,创建的时候就决定了文件的类型,所以需要在添加完scala的SDK以后再重新新建一遍。
2. pom依赖
笔者用之前项目的经验将pom复制过来打包运行同样也是Classnotfoundexception,用下面的方式就可以,附上我的pom文件

<groupId>com.eb.bi.rs.frame-spark</groupId>
<artifactId>frame-spark</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.10.4</scala.version>
    <scala.compat.version>2.10</scala.compat.version>
</properties>

<dependencies>
    <dependency>
        <groupId>dom4j</groupId>
        <artifactId>dom4j</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.16</version>
    </dependency>
    <dependency>
        <groupId>com.eb.bi.rs.frame2</groupId>
        <artifactId>frame2</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.10.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.0.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.10</artifactId>
        <version>2.0.1</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <version>2.15.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.1</version>
            <configuration>
                <source>1.6</source>
                <target>1.6</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <archive>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.10</version>
            <configuration>
                <useFile>false</useFile>
                <disableXmlReport>true</disableXmlReport>
                <!-- If you have classpath issue like NoDefClassError,... -->
                <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                <includes>
                    <include>**/*Test.*</include>
                    <include>**/*Suite.*</include>
                </includes>
            </configuration>
        </plugin>
    </plugins>
</build>

3. 用本地模式运行必须指定核的个数不小于两个
spark-submit --class movielens.moviesvd.movieSVD --master local[1] target/frame-spark-1.0-SNAPSHOT.jar /user/ebupt/liyang/spark/svd/data/ratings.dat 50 20 0.01 /user/ebupt/liyang/spark/svd/recommendations
上面命令中的local[1]只指定了一个内核,运行以后将循环报出Getting 1 non-empty blocks out of 1 blocks的信息,表示没有足够的节点处理数据。

Spark基本运行流程

在看基本运行流程之前先对一些名词进行解释:

Application

Spark中的Application和Hadoop MapReduce中的概念是相似的,指的是用户编写的Spark应用程序,内含了一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。

Driver Program

Spark中的Driver即运行上述Application的main()函数并且创建SparkContext,其中创建SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和Cluster Manager通信,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。通常用SparkContext代表Driver。

Executor

Application运行在Worker节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor。在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于Hadoop MapReduce中的YarnChild。一个CoarseGrainedExecutorBackend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取出一个空闲线程运行Task。每个CoarseGrainedExecutorBackend能并行运行Task的数量就取决于分配给它的CPU的个数了。

Cluster Mananger

指的是在集群上获取资源的外部服务,目前有:Standalone:Spark原生的资源管理,由Master负责资源的分配;Hadoop Yarn:由YARN中的ResourceManager负责资源的分配;

Worker

集群中任何可以运行Application代码的节点,类似于YARN中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点。

Job

包含多个Task组成的并行计算,往往由Spark Action催生,一个JOB包含多个RDD及作用于相应RDD上的各种Operation。

Starge

每个Job会被拆分很多组Task,每组任务被称为Stage,也可称TaskSet,一个作业分为多个阶段。

Task

被送到某个Executor上的工作任务

流程图如下所示:

  1. 构建Spark Application的运行环境,启动SparkContext
  2. SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源,并启动StandaloneExecutorbackend,Executor向SparkContext申请Task
  3. SparkContext将应用程序分发给Executor
  4. SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行
  5. Task在Executor上运行,运行完释放所有资源

DAG Scheduler
把一个Spark作业转换成Stage的DAG(Directed Acyclic Graph有向无环图),根据RDD和Stage之间的关系找出开销最小的调度方法,然后把Stage以TaskSet的形式提交给TaskScheduler

TaskScheduler
DAGScheduler决定了Task的理想位置,并把这些信息传递给下层的TaskScheduler。此外,DAGScheduler还处理由于Shuffle数据丢失导致的失败,还有可能需要重新提交运行之前的Stage(非Shuffle数据丢失导致的Task失败由TaskScheduler处理)
TaskScheduler维护所有TaskSet,当Executor向Driver发生心跳时,TaskScheduler会根据资源剩余情况分配相应的Task。另外TaskScheduler还维护着所有Task的运行标签,重试失败的Task。
在不同运行模式中任务调度器具体为:

  1. Spark on Standalone模式为TaskScheduler
  2. YARN-Client模式为YarnClientClusterScheduler
  3. YARN-Cluster模式为YarnClusterScheduler

Spark三种运行模式

1. Spark On Local

此种模式下,我们只需要在安装Spark时不进行hadoop和Yarn的环境配置,只要将Spark包解压即可使用,运行时Spark目录下的bin目录执行bin/spark-shell即可。Local[N]:本地模式,使用N个线程。

2. Spark On Local Cluster(Spark Standalone)

Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用spark-submit工具提交Job或者在Eclips、IDEA等开发平台上使用”new SparkConf.setManager(“spark://master:7077”)”方式运行Spark任务时,Driver是运行在本地Client端上的。

其运行过程如下:

  1. SparkContext连接到Master,向Master注册并申请资源(CPU Core 和Memory)
  2. Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;
  3. StandaloneExecutorBackend向SparkContext注册;
  4. SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生),然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
  5. StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。
  6. 所有Task完成后,SparkContext向Master注销,释放资源。
3. Spark On Yarn

具体利用Yarn作为集群管理器的运行过程(包括Yarn-Client模式和YARN-Cluster模式)见![http://blog.csdn.net/gamer_gyt/article/details/51833681]

Standalone/Spark On Yarn/Spark On Mesos比较

![http://wenku.baidu.com/link?url=Z-WDXcbRmzZX_AxDJsHMC6WnluqGFr0WUGl4GA9KGTSnrtY7npWDGTgmkSmnfKAEEX8zPaQR8nSlMaBA_E3f9UjJLRd6Viuz6KDpwqzbyNW]
从对比上看,mesos似乎是Spark更好的选择,也是被官方推荐的
但如果你同时运行hadoop和Spark,从兼容性上考虑,Yarn是更好的选择
如果你不仅运行了hadoop,spark。还在资源管理上运行了docker,Mesos更加通用
Standalone对于小规模计算集群更适合

原文地址:https://www.cnblogs.com/LeonNew/p/5848166.html