spark 笔记

RDD是Spark中的核心数据模型,一个RDD代表着一个被分区(partition)的只读数据集。

RDD的生成只有两种途径:

一种是来自于内存集合或外部存储系统;

另一种是通过转换操作来自于其他RDD;

一般需要了解RDD的以下五个接口:

partition 分区,一个RDD会有一个或者多个分区
dependencies() RDD的依赖关系
preferredLocations(p) 对于每个分区而言,返回数据本地化计算的节点
compute(p,context) 对于分区而言,进行迭代计算
partitioner() RDD的分区函数

 

 一个RDD包含一个或多个分区,每个分区都有分区属性,分区的多少决定了对RDD进行并行计算的并行度。

窄依赖:

每一个父RDD的分区最多只被子RDD的一个分区所使用,(只有一个独子)

宽依赖:

  子 RDD 多个分区会依赖同一个 RDD 分区,  父RDD 的一个分区被多次依赖

RDD 分区函数:

  目前Spark中实现了两种类型的分区函数,HashPartitioner(哈希分区)和RangePartitioner(区域分区)。

 

保存到hadoop

saveAsHadoopFile,saveAsHadoopDataset

saveAsTextFile、saveAsSequenceFile、saveAsObjectFile

 

aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

 

fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

first

def first(): T

first返回RDD中的第一个元素,不排序。

reduce

def reduce(f: (T, T) ⇒ T): T

根据映射函数f,对RDD中的元素进行二元计算,返回计算结果

 

collect

def collect(): Array[T]

collect用于将一个RDD转换成数组。

 

 

 

注册类到 kryo

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

val sc = new SparkContext(conf)

 

 

 

 

任务的并行度由分区数(Partitions)决定,一个Stage有多少分区,就会有多少Task。每个Task默认占用一个Core,一个Executor上的所有core共享Executor上的内存,一次并行运行的Task数等于num_executor*executor_cores,如果分区数超过该值,则需要运行多个轮次,一般来说建议运行3~5轮较为合适,否则考虑增加num_executor或executor_cores。由于一个Executor的所有tasks会共享内存executor_memory,所以建议executor_cores不宜过大。executor_memory的设置则需要综合每个分区的数据量以及是否有缓存等逻辑。

 

 

 

1.num-executors  执行器个数,一般设置在50-100之间,必须设置,不然默认启动的executor非常少,不能充分利用集群资源,运行速度慢

2.executor-memory 线程内存:执行器内存 参考值4g-8g,num-executor乘以executor-memory不能超过队列最大内存,申请的资源最好不要超过最大内存的1/3-1/2

3.executor-cores 线程, 执行器 cpu 个数. CPU core数量:core越多,task线程就能快速的分配,参考值2-4,num-executor*executor-cores的1/3-1/2

 

1.spark-submit spark提交

2.--queue spark 在spark队列

3.--master yarn 在yarn节点提交

4.--deploy-mode client 选择client模型,还是cluster模式;在同一个节点用client,在不同的节点用cluster

5.--executor-memory=4G 线程内存:参考值4g-8g,num-executor乘以executor-memory不能超过队列最大内存,申请的资源最好不要超过最大内存的1/3-1/2

6.--conf spark.dynamicAllocation.enabled=true 是否启动动态资源分配

7.--executor-cores 2 线程CPU core数量:core越多,task线程就能快速的分配,参考值2-4,num-executor*executor-cores的1/3-1/2

8.--conf spark.dynamicAllocation.minExecutors=4 执行器最少数量

9.--conf spark.dynamicAllocation.maxExecutors=10 执行器最大数量

10.--conf spark.dynamicAllocation.initialExecutors=4 若动态分配为true,执行器的初始数量

11.--conf spark.executor.memoryOverhead=2g 堆外内存:处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;此时就去调节这个参数,到至少1G(1024M),甚至说2G、4G)

12.--conf spark.speculation=true 推测执行:在接入kafaka的时候不能使用,需要考虑情景

13.--conf spark.shuffle.service.enabled=true 提升shuffle计算性能

--total-executor-cores    

 

--conf spark.driver.extraJavaOptions 配置的是Driver的 JVM 参数

--confi spark.executor.extraJavaOptions  配置Driver的 JVM 参数

        此参数用于设置每个stage经TaskScheduler进行调度时生成task的数量,此参数未设置时将会根据读到的RDD的分区生成task,即根据源数据在hdfs中的分区数确定,若此分区数较小,则处理时只有少量task在处理,前述分配的executor中的core大部分无任务可干。通常可将此值设置为num-executors*executor-cores的2-3倍为宜,如果与其相近的话,则对于先完成task的core则无任务可干。2-3倍数量关系的话即不至于太零散,又可是的任务执行更均衡。

 

 

Spark Streaming提供了两类内置的流媒体源。

  • 基本来源:可直接在StreamingContext API中获得的来源。示例:文件系统和套接字连接。
  • 高级资源:可通过额外的实用程序类获得诸如Kafka,Kinesis等资源。如链接部分所述,这些要求针对额外的依赖项进行 链接

 

File Streams

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

 

For text files

streamingContext.textFileStream(dataDirectory)

 

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的所有文件, 忽略更新

 

 

groupBykey yu reduceByKey的区别

 /*

    groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),

    此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。

    同时如果数据量十分大,可能还会造成OutOfMemoryError。

     */

 

    //reduceByValue,先进行局部聚合,再进行全局聚合

    val res14 = res11.reduceByKey(_ + _)

    /*reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,

    有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,

    数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。

     */

 

MapReduce和Spark一个本质的区别:

在MapReduce里,每一个task都在它自己的进程里,map对应maptask,reduce对应reducetask,这些都是进程,当一个task完成(maptask或者reducetask)后,这个task进程就结束了。

但是在Spark里面是不一样的,在Spark里面,它的task能够并发的运行在一个进程里,就是说一个进程里面可以运行多个task,而且这个进程会在Spark Application的整个生命周期一直存在,Spark Application是包含一个driver和多个executor的,即使你的作业不再运行了,job运行完了,没有作业在running,它的executor还是一直在的,

对比MapReduce和Spark可知,MapReduce是基于进程的base-process,Spark是基于线程的base-thread。

这样的话,Spark带来的好处就是:

如果是MR的话,你跑task的进程资源都要去申请,用完之后就销毁;但是Spark的话,只要一开始拿到了这些进程资源,后面所有的作业,不需要申请资源,就可以直接快速的启动,是非常的快。用内存的方式进行计算。

 

 

一些概念:

一个Spark应用程序包含一个driver和多个executor。

Driver program是一个进程,它运行应用程序application里面的main()函数,并在main函数里面创建SparkContext。

在main函数里面创建了一堆RDD,遇到action的时候会触发job,所以程序会有很多job。

job:由Spark action触发的由多个tasks组成的并行计算。当一个Spark action(如save, collect)被触发,一个包含很多个tasks的并行计算的job将会生成。

每个job被切分成小的任务集,这些小的任务集叫做stages。

task是被发送给一个executor的最小工作单元。每个executor上面可以跑多个task。

Executor:在worker node上启动应用程序的进程,这个进程可以运行多个任务并将数据保存在内存或磁盘存储中。每个Spark应用程序都有它自己的一组executors。executor运行在Container里面。

executor是进程级别,一个进程上面可以并行的跑多个线程的,所以每个executor上面可以跑多个task。

 

ApplicationMaster:AM

每一个YARN上面的Application都有一个AM,这个AM进程,是在第一个Container里运行的,就是说第一个Container就是来运行AM的,AM去和RM互相通信请求资源,然后拿到资源后告诉NM,让NM启动其它的Container,给我们的进程使用,比如去跑executor。

是所有executor总共使用的cpu核数 standalone default all cores

 



 

 

 

 

 

 

原文地址:https://www.cnblogs.com/snow-man/p/13166504.html