一些spark core的使用笔记,供以后参考

Spark总结

Spark配置函数

  • 建立连接

    • 1>SparkConf().setAppName("xxx").setMaster("local") 设置配置文件

    • 2> SparkContext.parallelize(Array(1,2,2,4),4)将数据进行4个分片,分别存在不同的集群中

    • 3> .textFile("path") 加载数据

  • 关闭连接

    • 4> SparkContext.stop()

Transformation类算子

  • 一般的RDD处理类算子,RDD称为弹性分布式数据集(理解:一个容器,可以放不同的类型数组)

  • 1> .map(func) 将RDD[(long,long,long)]转化为.map(x=>x._1.toString)

  • 2> .filter(func) 例如: .filter(x => x.1>0)实现筛选大于0的数据

  • 3> a.union(b) 合并两个相同类型的数据集a.union(b)

  • 4> .flatMap(func) 将数据集(String数组)中的每一个String拆分使用.flatMap(x=>x.split(",")),这个操作,也被叫做压扁操作,目的是将string数组进行分割,分割后的结果放入数组中。这个过程将String的数组内容变多了,形象的叫做压扁。

  • 5> .groupByKey() 将数据进行groupBy,就是将map中的内容放入key:list,同一个相同的key会有一个list,内容也是key

  • 6> .distinct(),将RDD数据内的内容去重

  • 集合运算

  • 7> rdd1.union(rdd2) 将两个数据集求并

  • 8> rdd1.intersection(rdd2) 将两个数据集求交集

  • 9> rdd1.subtract(rdd2) 求数据集rdd1所有,但是rdd2没有

Action类算子

  • 1> collect()算子返回所有rdd的元素
  • 2> count() 计数
  • 3> countByValue() 返回一个map,表示唯一元素出现的次数
  • 4> take(num) 返回几个元素,尝试访问最少的分区,测试使用
  • 5> top(num) 返回前最前面的几个元素
  • 6> takeOrdered(num)(ordering) 返回前几个元素,基于排序算法
  • 7> takeSample(withReplacement,num,[seed]),取样例
  • 8> reduce(func) 合并RDD中的元素
  • 9> fold(zero)(func) 与reduce类似 提供zero value
  • 10> aggregate(zero Value)(seqOp,combOp) 与fold相似,返回值类型不同。
  • 11> foreach(func) 对每一个RDD作用函数,但是什么也不会返回。不返回本地,测试使用foreach(println)
  • 12> saveAsTextfile("path") 一般在单机内存可以容纳下的内容时,可以使用collect,将分布集群的数据收集回来,当单机内存容纳不下的时候,使用saveAsTextFile函数,将数据存于分布式的内存中。

RDD的特性

  • 血统关系:
    • Spark维系RDDs之间的依赖关系和创建关系,Spark使用如下的系统关系图,计算RDD的需求和丢失数据的需求。inputRDD -> filter 得到errorsRDD和warningsRDD,最后将其union
  • 延迟计算:
    • 并不是所有操作都会让Spark对RDDs进行计算,真正的计算发生在第一次action之中的。
    • 好处,减少数据的传输。
    • Spark内部记录了metadata表名transformations操作已经被响应了。
    • 加载数据也是延迟的,数据只有必要的时候才会被加载进去。
  • 数据的持久化:
    • 默认每次都在action操作上进行,Spark每次都重新计算RDDs,如果想重复使用RDD(不重新计算),通过使用RDD.persist()进行保存。unpersist将数据从缓存中移除。
    • Memory Only 空间占用高,CPU消耗低,在内存中,不在硬盘上
    • Memory Ser(序列化) 空间占用低,CPU消耗高,在内存中,不在硬盘上
    • Disk Only 空间占用低,CPU消耗低,不在内存,在硬盘上
    • Memory and Disk 内存满,占用硬盘 cpu中等
    • Memory and Disk ser序列化,空间消耗低,cpu占用高

KeyValue对的RDDs

  • reduceByKey(func) 把相同的key结合,rdd.reduceByKey((x,y)=>x+y)

  • groupByKey()把相同的key Value分组rdd.groupByKey()

  • combineByKey()

  • mapValues(func)函数作用于每一个pairRDD的每一个元素,Key不变,rdd.mapValues(x=>x+1) {(1,3),(3,5),(3,7)}

  • flatMapValues(func) 符号化使用,rdd.flatMapValues(x=>(xto 5)),将每一个value 不到5的键值对列入map中,超过5的键值对移除

  • keys 仅返回keys rdd.keys{1,2,3}

  • values() 仅返回values

  • sortByKey() 按照key的值排序,rdd.sortByKey()

KeyValue对的RDDs中CombineByKey

  • combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) 最常用的聚合函数返回值类型可以与输入类型不一样

  • RDD分区组成,key要不然是见过的,要不然不是。

    • 如果是新元素,使用createConmbiner函数。
    • 如果是分区中存在的key,使用mergeValue()函数。
    • 最后合计每一个partition的结果的时候,使用mergeConmbiners()函数
      例子:求平均值,见IDEA

Spark环境的运行

  • 命令行的运行程序


bin/spark-submit 
--class org.apache.spark.examples.SparkPi 
--master 'local[2]' 
./examples/jars/spark-examples_2.12-3.0.0.jar 
10

  • 官方文档
(1)基本语法
bin/spark-submit 
--class <main-class>
--master <master-url> 
--deploy-mode <deploy-mode> 
--conf <key>=<value> 
... # other options
<application-jar> 
[application-arguments]
(2)参数说明:
--master 指定Master的地址,默认为Local
--class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)
--deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*
--conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value” 
application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar
application-arguments: 传给main()方法的参数
--executor-memory 1G 指定每个executor可用内存为1G
--total-executor-cores 2 指定每个executor使用的cup核数为2个

参考:
https://www.imooc.com/video/14395
https://www.imooc.com/article/278738
https://www.bilibili.com/video/BV11A411L7CK

原文地址:https://www.cnblogs.com/kobe961231/p/14535629.html