Spark Core

<title>Spark Core</title> <p>Spark生态圈: <br> Spark Core : RDD(弹性分布式数据集) <br> Spark SQL <br> Spark Streaming <br> Spark MLLib :协同过滤,ALS,逻辑回归等等 –&gt; 机器学习 <br> Spark Graphx : 图计算</p>

一、Spark Core

1、什么是Spark?特点

https://spark.apache.org/
Apache Spark™ is a unified analytics engine for large-scale data processing.
特点:快、易用、通用性、兼容性(完全兼容Hadoop)

快:快100倍(Hadoop 3 之前)
易用:支持多种语言开发
通用性:生态系统全
易用性:兼容Hadoop

二、安装和部署Spark、Spark 的 HA

1、spark体系结构

Spark的运行方式

Yarn

Standalone:本机调试(demo)

Worker:从节点。每个服务器上,资源和任务的管理者。只负责管理一个节点

执行过程:
一个Worker 有多个 Executor。 Executor是任务的执行者,按阶段(stage)划分任务。————> RDD

客户端:Driver Program 提交任务到集群中
1)spark-submit
2)spark-shell

2、spark的搭建

1)准备工作:JDK 配置主机名 免密码登录

2)伪分布式模式
在一台虚拟机上模拟分布式环境(Master和Worker在一个节点上)
配置spark-env.sh
vi spark-env.sh

export JAVA_HOME=/root/hd/jdk1.8.0_192
export SPARK_MASTER_HOST=hsiehchou121
export SPARK_MASTER_PORT=7077

配置slaves
vi slaves
hsiehchou121

浏览器访问hsiehchou121:8080

在spark中使用scala语言

[root@hsiehchou121 bin]# ./spark-shell --master spark://hsiehchou121:7077

3)全分布式环境
修改slave文件 拷贝到其他三台服务器 启动

3、Spark的 HA

回顾HA(高可用)
(*)HDFS Yarn Hbase Spark 主从结构
(*)单点故障

(1)基于文件目录的单点恢复
主要用于开发或测试环境。当spark提供目录保存spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的spark Application和worker的注册信息

基于文件系统的单点恢复,主要是在spark-en.sh里对SPARK_DAEMON_JAVA_OPTS设置

配置参数 参考值
spark.deploy.recoveryMode 设置为FILESYSTEM开启单点恢复功能,默认值:NONE
spark.deploy.recoveryDirectory Spark 保存恢复状态的目录

参考:
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery”

(*)本质:还是只有一个主节点Master,创建了一个恢复目录,保存集群状态和任务的信息
当Master挂掉,重新启动时,会从恢复目录下读取状态信息,恢复出来原来的状态

用途:这个只用于开发和测试,但是生产使用用zookeeper

export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery"

(2)基于Zookeeper :和Hadoop类似
ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到ZooKeeper,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响

配置参数 参考值
spark.deploy.recoveryMode 设置为ZOOKEEPER开启单点恢复功能,默认值:NONE
spark.deploy.zookeeper.url ZooKeeper集群的地址
spark.deploy.zookeeper.dir Spark信息在ZK中的保存目录,默认:/spark

参考:
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181,hsiehchou124:2181 -Dspark.deploy.zookeeper.dir=/spark”

(*)复习一下zookeeper:
相当于一个数据库,把一些信息存放在zookeeper中,比如集群的信息
数据同步功能,选举功能,分布式锁功能

数据同步:给一个节点中写入数据,可以同步到其他节点

选举:Zookeeper中存在不同的角色,Leader Follower。如果Leader挂掉,重新选举Leader

分布式锁:秒杀。以目录节点的方式来保存数据

修改 spark-env.sh

export JAVA_HOME=/root/hd/jdk1.8.0_192
#export SPARK_MASTER_HOST=hsiehchou121
#export SPARK_MASTER_PORT=7077
#export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181,hsiehchou124:2181 -Dspark.deploy.zookeeper.dir=/spark"

同步到其他三台服务器
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou122:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou123:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou124:/root/hd/spark-2.1.0-bin-hadoop2.7/conf

在hsiehchou121 start-all hsiehchou121 master hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker
在hsiehchou121 start-master hsiehchou121 master hsiehchou122 master(standby) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker

在hsiehchou121 上kill master
hsiehchou122 master(Active) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker

在网页http://192.168.116.122:8080/ 可以看到相应信息

三、执行Spark的任务:两个工具

1、spark-submit:用于提交Spark的任务

任务:jar

举例:蒙特卡洛求PI(圆周率)

./spark-submit --master spark://hsiehchou121:7077 --class

–class指明主程序的名字

[root@hsiehchou121 /]#cd /root/hd/spark-2.1.0-bin-hadoop2.7/bin
[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --class org.apache.spark.examples.SparkPi /root/hd/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 100

其中100指定执行的次数

2、spark-shell 相当于REPL

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序
(*)启动Spark Shell:spark-shell
也可以使用以下参数:
参数说明:

--master spark://hsiehchou121:7077 指定Master的地址
--executor-memory 2g 指定每个worker可用内存为2G
--total-executor-cores 2 指定整个集群使用的cup核数为2个

例如:

spark-shell --master spark://hsiehchou121:7077 --executor-memory 2g --total-executor-cores 2

注意:
如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系

作为一个独立的Application运行
两种模式:
(1)本地模式
spark-shell 后面不接任何参数,代表本地模式
./spark-shell
Spark context available as ‘sc’ (master = local[*], app id = local-1554372019995).
sc 是 SparkContext 对象名。 local[*] 代表本地模式,不提交到集群中运行

(2)集群模式

[root@hsiehchou121 bin]# ./spark-shell --master spark://hsiehchou121:7077

提交到集群运行
Spark context available as ‘sc’ (master = spark://hsiehchou121:7077, app id = app-20190404190030-0000).

master = spark://hsiehchou121:7077
Spark session available as ‘spark’
Spark Session 是 2.0 以后提供的,利用 SparkSession 可以访问spark所有组件

示例:WordCount程序
程序如下:

sc.textFile("hdfs://192.168.116.121:9000/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.116.121:9000/output/wc")

说明:
sc是SparkContext对象,该对象时提交spark程序的入口
textFile(“hdfs://192.168.116.121:9000/data.txt”)是hdfs中读取数据
flatMap(_.split(” “))先map在压平
map((_,1))将单词和1构成元组
reduceByKey(+)按照key进行reduce,并将value累加
saveAsTextFile(“hdfs://192.168.116.121:9000/output/wc”)将结果写入到hdfs中

(*)处理本地文件,把结果打印到屏幕上
vi /root/hd/tmp_files/test_WordCount.txt
I love China
I love Jiangsu
Jiangsu is a beautiful place in China

scala> sc.textFile("/root/hd/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((is,1), (love,2), (China,2), (a,1), (Jiangsu,2), (I,2), (in,1), (place,1), (beautiful,1))

(*)处理HDFS文件,结果保存在hdfs上

[root@hsiehchou121 tmp_files]# hdfs dfs -mkdir /tmp_files
[root@hsiehchou121 tmp_files]# hdfs dfs -copyFromLocal ~/hd/tmp_files/test_WordCount.txt /tmp_files
scala> sc.textFile("hdfs://hsiehchou121:9000/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hsiehchou121:9000/out/0404/test_WordCount")

-rw-r–r– 3 root supergroup 0 2019-04-04 19:12 /out/0404/test_WordCount/_SUCCESS
-rw-r–r– 3 root supergroup 16 2019-04-04 19:12 /out/0404/test_WordCount/part-00000
-rw-r–r– 3 root supergroup 65 2019-04-04 19:12 /out/0404/test_WordCount/part-00001

_SUCCESS 代表程序执行成功

part-00000 part-00001 结果文件,分区。里面内容不重复

(*)单步运行WordCount —-> RDD

scala> val rdd1 = sc.textFile("/root/hd/tmp_files/test_WordCount.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /root/hd/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24
scala> rdd1.collect
res5: Array[String] = Array(I love China, I love Jiangsu, Jiangsu is a beautiful place in China)
scala> val rdd2 = rdd1.flatMap(_.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at flatMap at <console>:26
scala> rdd2.collect
res6: Array[String] = Array(I, love, China, I, love, Jiangsu, Jiangsu, is, a, beautiful, place, in, China)
scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:28
scala> rdd3.collect
res7: Array[(String, Int)] = Array((I,1), (love,1), (China,1), (I,1), (love,1), (Jiangsu,1), (Jiangsu,1), (is,1), (a,1), (beautiful,1), (place,1), (in,1), (China,1))
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:30
scala> rdd4.collect
res8: Array[(String, Int)] = Array((is,1), (love,2), (China,2), (a,1), (Jiangsu,2), (I,2), (in,1), (place,1), (beautiful,1))

RDD 弹性分布式数据集
(1)依赖关系 : 宽依赖和窄依赖
(2)算子:
函数:
Transformation : 延时计算 map flatMap textFile
Action : 立即触发计算 collect

说明:scala复习
(*)flatten:把嵌套的结果展开
scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)

(*)flatmap : 相当于一个 map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))

scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)

myList.flatMap(x=>x.map(_*2))

执行过程:
1、将 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 调用 map(_*2) 方法。x 代表一个List
2、flatten
3、在IDE中开发scala版本和Java版本的WorkCount

四、WordCount(scala版本和java版本)

1、scala版本的WordCount

新建一个工程,把jar引入到工程中

package day1
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
def main(args: Array[String]): Unit = {
//创建一个Spark的配置文件
val conf = new SparkConf().setAppName("My Scala WordCount 0404").setMaster("local")
//创建SparkContext对象
val sc = new SparkContext(conf)
//1.从本地模式运行 .setMaster("local")
//val result = sc.textFile("hdfs://hsiehchou121:9000/tmp_files/test_WordCount.txt")
//.flatMap(_.split(" "))
//.map((_,1))
//.reduceByKey(_+_)
//result.foreach(println)
//2、在集群模式运行
val result = sc.textFile(args(0))
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.saveAsTextFile(args(1))
sc.stop()
}
}

export Demo1.jar 点击下一步,把jar包上传到服务器上/root/hd/tmp_files/下

在spark里面的bin目录下输入

[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --class day1.WordCount /root/hd/tmp_files/Demo1.jar hdfs://hsiehchou121:9000/tmp_files/test_WordCount.txt hdfs://hsiehchou121:9000/out/0405/Demo1

2、java版本的WordCount

package day1;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class JavaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
//创建SparkContext对象
JavaSparkContext sc = new JavaSparkContext(conf);
//读入数据
JavaRDD<String> lines = sc.textFile("hdfs://192.168.116.121:9000/tmp_files/test_WordCount.txt");
//分词,第一个参数表示读进来的每一句话,第二个参数表示返回值
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){
@Override
public Iterator<String> call(String input) throws Exception {
return Arrays.asList(input.split(" ")).iterator();
}
});
//每一个单词记一个数
JavaPairRDD<String,Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String input) throws Exception {
return new Tuple2<String, Integer>(input,1);
}
});
//执行reduce操作
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0 + arg1;
}
});
List<Tuple2<String,Integer>> output = counts.collect();
for(Tuple2<String, Integer> tuple:output) {
System.out.println(tuple._1 + ":" + tuple._2);
}
sc.stop();
}
}
[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --class day1.JavaWordCount /root/hd/tmp_files/Demo2.jar

五、分析Spark的任务流程

1、分析WordCount程序处理过程

wordcount程序

2、Spark调度任务的过程

提交到及群众运行任务时,spark执行任务调度

spark调度任务的过程

六、RDD和RDD特性、RDD的算子

1、RDD:弹性分布式数据集

(*)Spark中最基本的数据抽象
(*)RDD的特性

  • Internally, each RDD is characterized by five main properties:
    *

  • A list of partitions
    1)是一组分区
    RDD由分区组成,每个分区运行在不同的Worker上,通过这种方式来实现分布式计算

RDD

  • A function for computing each split
    在RDD中,提供算子处理每个分区中的数据

  • -A list of dependencies on other RDDs
    RDD存在依赖关系:宽依赖和窄依赖

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    可以自定义分区规则来创建RDD

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    优先选择离文件位置近的节点来执行

如何创建RDD
(1)通过SparkContext.parallelize方法来创建

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29
scala> rdd1.partitions.length
res35: Int = 3
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29
scala> rdd1.partitions.length
res36: Int = 2

(2)通过外部数据源来创建

sc.textFile()
scala> val rdd2 = sc.textFile("/root/hd/tmp_files/test_WordCount.txt")
rdd2: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29

2、 算子

1)Transformation
map(func):相当于for循环,返回一个新的RDD

filter(func):过滤
flatMap(func):flat+map 压平

mapPartitions(func):对RDD中的每个分区进行操作
mapPartitionsWithIndex(func):对RDD中的每个分区进行操作,可以取到分区号

sample(withReplacement, fraction, seed):采样

集合运算
union(otherDataset):对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset):对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks])):去重

聚合操作:group by
groupByKey([numTasks]) :在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]):按照key进行聚合

排序
sortByKey([ascending], [numTasks]):在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]):与sortByKey类似,但是更灵活

join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)

重分区:
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

举例:
(1)创建一个RDD,每个元素乘以2,再排序

scala> val rdd1 = sc.parallelize(Array(3,4,5,100,79,81,6,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.collect
res0: Array[Int] = Array(6, 8, 10, 200, 158, 162, 12, 16)
scala> rdd2.sortBy(x=>x,true).collect
res1: Array[Int] = Array(6, 8, 10, 12, 16, 158, 162, 200)
scala> rdd2.sortBy(x=>x,false).collect
res2: Array[Int] = Array(200, 162, 158, 16, 12, 10, 8, 6)
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true)

过滤出大于20的元素:

scala> val rdd3 = rdd2.filter(_>20)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[53] at filter at <console>:33+
scala> rdd3.collect
res3: Array[Int] = Array(200, 158, 162)

(2)字符串(字符)类型的RDD

scala> val rdd4 = sc.parallelize(Array("a b c","d e f","g h i"))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> rdd4.flatMap(_.split(" ")).collect
res4: Array[String] = Array(a, b, c, d, e, f, g, h, i)

3、RDD的集合运算

scala> val rdd5 = sc.parallelize(List(1,2,3,6,7,8,100))
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd6 = sc.parallelize(List(1,2,3,4))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd7 = rdd5.union(rdd6)
rdd7: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at <console>:28
scala> rdd7.collect
res5: Array[Int] = Array(1, 2, 3, 6, 7, 8, 100, 1, 2, 3, 4)
scala> rdd7.distinct.collect
res6: Array[Int] = Array(100, 4, 8, 1, 6, 2, 3, 7)

4、分组操作:reduceByKey

<key value>
scala> val rdd1 = sc.parallelize(List(("Time",1800),("Dadi",2400),("Giu",1600)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("Dadi",1300),("Time",2900),("Mi",600)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[8] at union at <console>:28
scala> rdd3.collect
res3: Array[(String, Int)] = Array((Time,1800), (Dadi,2400), (Giu,1600), (Dadi,1300), (Time,2900), (Mi,600))
scala> val rdd4 = rdd3.groupByKey
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[9] at groupByKey at <console>:30
scala> rdd4.collect
res4: Array[(String, Iterable[Int])] = Array((Mi,CompactBuffer(600)),
(Time,CompactBuffer(1800, 2900)),
(Dadi,CompactBuffer(2400, 1300)),
(Giu,CompactBuffer(1600)))
scala> rdd3.reduceByKey(_+_).collect
res5: Array[(String, Int)] = Array((Mi,600), (Time,4700), (Dadi,3700), (Giu,1600))

reduceByKey will provide much better performance.
官方不推荐使用 groupByKey 推荐使用 reduceByKey

5、cogroup

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并,与groupByKey返回值上与区别

scala> val rdd1 = sc.parallelize(List(("Tim",1),("Tim",2),("Jert",3),("kiy",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(List(("Tim",1),("Tim",2),("Jert",3),("Kiy",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("Jert",2),("Tim",1),("Sun",2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[15] at cogroup at <console>:28
scala> rdd3.collect
res6: Array[(String, (Iterable[Int], Iterable[Int]))] = Array(
(Tim,(CompactBuffer(1, 2),CompactBuffer(1))),
(Sun,(CompactBuffer(),CompactBuffer(2))),
(Kiy,(CompactBuffer(2),CompactBuffer())),
(Jert,(CompactBuffer(3),CompactBuffer(2))))

6、reduce操作(Action)

聚合操作

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> rdd1.reduce(_+_)
res7: Int = 15

7、需求:按照value排序

做法:
1、交换,把key 和 value交换,然后调用sortByKey方法
2、再次交换

scala> val rdd1 = sc.parallelize(List(("tim",1),("jery",3),("kef",2),("sun",2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(("jery",1),("tim",3),("sun",5),("kef",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at <console>:24
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[19] at union at <console>:28
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:30
scala> rdd4.collect
res8: Array[(String, Int)] = Array((tim,4), (kef,3), (sun,7), (jery,4))
scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[25] at map at <console>:32
scala> rdd5.collect
res10: Array[(String, Int)] = Array((sun,7), (tim,4), (jery,4), (kef,3))

(2)Action
reduce(func):通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的

collect():在驱动程序中,以数组的形式返回数据集的所有元素
count():返回RDD的元素个数
first():返回RDD的第一个元素(类似于take(1))
take(n):返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed]):返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n, [ordering]):takeOrdered和top类似,只不过以和top相反的顺序返回元素

saveAsTextFile(path):将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path) :将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统

saveAsObjectFile(path) :saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中

countByKey():针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

foreach(func):在数据集的每一个元素上,运行函数func进行更新。
与map类似,没有返回值

3、特性
1)RDD的缓存机制
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition

(*)作用:提高性能
(*)使用:标识RDD可以被缓存 persist cache
(*)可以缓存的位置:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()

举例:测试数据,92万条
进入spark-shell命令

./spark-shell --master spark://hsiehchou121:7077
scala> val rdd1 = sc.textFile("hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd1.count --> 直接出发计算
res0: Long = 921911
scala> rdd1.cache --> 标识RDD可以被缓存,不会触发计算
res1: rdd1.type = hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd1.count --> 和第一步一样,触发计算,但是,把结果进行缓存
res2: Long = 921911
scala> rdd1.count --> 从缓存中直接读出结果
res3: Long = 921911

2)RDD的容错机制:通过检查点来实现
检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销

设置checkpoint的目录,可以是本地的文件夹、也可以是HDFS。一般是在具有容错能力,高可靠的文件系统上(比如HDFS, S3等)设置一个检查点路径,用于保存检查点数据

/**

  • Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

  • directory set with SparkContext#setCheckpointDir and all references to its parent

  • RDDs will be removed. This function must be called before any job has been

  • executed on this RDD. It is strongly recommended that this RDD is persisted in

  • memory, otherwise saving it on a file will require recomputation.
    */

(*)复习检查点:
HDFS中的检查点:有SecondaryNamenode来实现日志的合并

(*)RDD的检查点:容错
概念:血统 Lineage
理解:表示任务执行的生命周期
WordCount textFile —> redceByKey

如果血统越长,越容易出错

假如有检查点,可以从最近的一个检查点开始,往后面计算。不用重头计算

(*)RDD检查点的类型
(1)基于本地目录:需要将Spark shell 或者任务运行在本地模式上(setMaster(“local”))
开发和测试

(2)HDFS目录:用于生产
sc.setCheckPointDir(目录)

举例:设置检查点

scala> var rdd1 = sc.textFile("hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24

设置检查点目录:
scala> sc.setCheckpointDir(“hdfs://192.168.116.121:9000/sparkchkpt”)

标识rdd1可以执行检查点操作
scala> rdd1.checkpoint

scala> rdd1.count
res2: Long = 921911

(3)依赖关系:宽依赖,窄依赖
划分任务执行的stage
RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用(一(父)对一(子))
总结:窄依赖我们形象的比喻为独生子女

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition(一(父)对多(子))
总结:窄依赖我们形象的比喻为超生

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

七、RDD的高级算子

1、mapPartitionsWithIndex

对RDD中的每个分区(带有下标)进行操作,下标用index表示
通过这个算子,我们可以获取分区号

def mapPartitionsWithIndex<a href="
f: %28Int, Iterator%5bT%5d%29 ⇒ Iterator%5bU%5d,
preservesPartitioning: Boolean = false”>U(implicit arg0: ClassTag[U]): RDD[U]

通过将函数应用于此RDD的每个分区来返回新的RDD,同时跟踪原始分区的索引

preservesPartitioning指输入函数是否保留分区器,除非是一对RDD并且输入函数不修改keys,否则应该是false

参数:f是个函数参数 f 中第一个参数是Int,代表分区号,第二个Iterator[T]代表分区中的元素

举例:把分区中的元素,包括分区号,都打印出来

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
| iter.toList.map(x => "[partId: "+ index +" , value = " + x + " ]").iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
[partId: 0 , value = 1 ], [partId: 0 , value = 2 ],
[partId: 1 , value = 3 ], [partId: 1 , value = 4 ], [partId: 1 , value = 5 ],
[partId: 2 , value = 6 ], [partId: 2 , value = 7 ], [partId: 2 , value = 8 ])

2、aggregate

聚合操作。类似于分组
(*)先对局部进行聚合操作,再对全局进行聚合操作

调用聚合操作

scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])
scala> import scala.math._
import scala.math._
scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7
说明:aggregate
(0) 初始值是 0
(max(_,_) 局部操作的函数
, _+_ 全局操作的函数
)
scala> rdd2.aggregate(100)(max(_,_),_+_)
res8: Int = 300

分析结果:初始值是100,代表每个分区多了一个100
全局操作,也多了一个100
100+100+100 = 300

对RDD中的元素进行求和

  1. RDD.map

  2. 聚合操作(效率大于map)

scala> rdd2.aggregate(0)(_+_,_+_)
res9: Int = 15
相当于MapReduce 的 Combiner
scala> rdd2.aggregate(10)(_+_,_+_)
res10: Int = 45

(*)对字符串操作

scala> val rdd2 = sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27
scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef
scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc

结果分析:

  1. *abc *def

  2. **def*abc

(*)复杂的例子
1)

scala> val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27
scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
| iter.toList.map(x => "[partId : " + index + " , value = " + x + " ]").iterator
| }
scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
[partId : 0 , value = 12 ], [partId : 0 , value = 23 ],
[partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])
scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42

执行过程:
第一个分区:
第一次比较: “” “12” 长度最大值 2 2–>”2”
第二次比较: “2” “23” 长度最大值 2 2–>”2”

第二个分区:
第一次比较: “” “345” 长度最大值 3 3–>”3”
第二次比较: “3” “4567” 长度最大值 4 4–>”4”
结果:24 或者42

2)

scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11

执行过程:
第一个分区:
第一次比较: “” “12” 长度最小值 0 0–>”0”
第二次比较: “0” “23” 长度最小值 1 1–>”1”

第二个分区:
第一次比较: “” “345” 长度最小值 0 0–>”0”
第二次比较: “0” “4567” 长度最小值 1 1–>”1”

val rdd3 = sc.parallelize(List("12","23","345",""),2)
rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
scala> val rdd3 = sc.parallelize(List("12","23","345",""),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:27
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res19: String = 10
scala> rdd3.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res20: String = 01

3)aggregateByKey:类似于aggregate,区别:操作的是 key value 的数据类型

scala> val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> def fun3(index:Int, iter:Iterator[(String,Int)]) : Iterator[String] = {
| iter.toList.map(x=>"partId : " + index + " , value = " + x + " ]").iterator
| }
fun3: (index: Int, iter: Iterator[(String, Int)])Iterator[String]
scala> pairRDD.mapPartitionsWithIndex(fun3).collect
res0: Array[String] = Array(
partId : 0 , value = (cat,2) ], partId : 0 , value = (cat,5) ], partId : 0 , value = (mouse,4) ],
partId : 1 , value = (cat,12) ], partId : 1 , value = (dog,12) ], partId : 1 , value = (mouse,2) ])

1.将每个动物园(分区)中,动物数最多的动物,进行求和
动物园0
[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],

动物园1
[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])

pairRDD.aggregateByKey(0)(math.max(_,_),_+_)
scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res1: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

2.将所有动物求和

pairRDD.aggregateByKey(0)(_+_,_+_).collect
scala> pairRDD.reduceByKey(_+_).collect
res27: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

aggregateByKey效率更高

4)coalesce与repartition
与分区有关
都是对RDD进行重分区

区别:
coalesce 默认不会进行Shuffle 默认 false 如需修改分区,需置为true

repartition 会进行Shuffle

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console
scala> val rdd2 = rdd1.repartition(3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at repartition at <console>:26
scala> rdd2.partitions.length
res4: Int = 3
scala> val rdd3 = rdd1.coalesce(3,true)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at coalesce at <console>:26
scala> rdd3.partitions.length
res5: Int = 3
scala> val rdd4 = rdd1.coalesce(4)
rdd4: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[14] at coalesce at <console>:26
scala> rdd4.partitions.length
res6: Int = 2

5)其他高级算子
比较好的高级算子的博客(推荐)
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

八、编程案例

1、分析日志

需求:找到访问量最高的两个网页
(*)第一步:对网页的访问量求和
(*)第二步:排序,降序
日志数据
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/ HTTP/1.1” 200 259
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/head.jsp HTTP/1.1” 200 713
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/body.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242

package day2
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object MyTomcatLogCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCount")
val sc = new SparkContext(conf)
/**
* 读入日志解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
*
*/
val rdd1 = sc.textFile("H:\other\localhost_access_log.txt")
.map(
line => {
//解析字符串, 得到jsp的名字
//1.解析两个引号之间的字符串
val index1 = line.indexOf(""")
val index2 = line.lastIndexOf(""")
val line1 = line.substring(index1+1,index2)//GET /MyDemoWeb/oracle.jsp HTTP/1.1
//得到两个空格的位置
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)///MyDemoWeb/oracle.jsp
//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/"))//oracle.jsp
(jspName,1)
}
)
//统计出每个jsp的次数
val rdd2 = rdd1.reduceByKey(_+_)
//使用value排序
val rdd3 = rdd2.sortBy(_._2, false)
rdd3.take(2).foreach(println)
sc.stop()
}
}

结果:
(/hadoop.jsp,9)
(/oracle.jsp,9)

2、创建自定义分区

根据jsp文件的名字,将各自的访问日志放入到不同的分区文件中

package day2
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
import scala.collection.mutable.HashMap
object MyTomcatLogPartitioner {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "E:\hadoop-2.7.3")
val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogPartitioner")
val sc = new SparkContext(conf)
/**
* 读入日志解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
*
*/
val rdd1 = sc.textFile("H:\other\localhost_access_log.txt")
.map(
line => {
//解析字符串, 得到jsp的名字
//1.解析两个引号之间的字符串
val index1 = line.indexOf(""")
val index2 = line.lastIndexOf(""")
val line1 = line.substring(index1+1,index2)//GET /MyDemoWeb/oracle.jsp HTTP/1.1
//得到两个空格的位置
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)///MyDemoWeb/oracle.jsp
//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/"))//oracle.jsp
(jspName,line)
}
)
//定义分区规则
//得到不重复的jsp的名字
val rdd2 = rdd1.map(_._1).distinct().collect()
//创建分区规则
val myPartitioner = new MyWebPartitioner(rdd2)
val rdd3 = rdd1.partitionBy(myPartitioner)
//将rdd3 输出
rdd3.saveAsTextFile("H:\other\test_partition")
sc.stop()
}
}
class MyWebPartitioner(jspList : Array[String]) extends Partitioner{
//定义一个集合来保存分区条件, String 代表jsp的名字, Int 代表序号
val partitionMap = new HashMap[String,Int]()
var partID = 0 //初始分区号
for (jsp <- jspList){
partitionMap.put(jsp, partID)
partID += 1
}
//定义有多少个分区
def numPartitions : Int = partitionMap.size
//根据jsp,返回对应的分区
def getPartition(key : Any) : Int = partitionMap.getOrElse(key.toString(),0)
}

3、使用JDBCRDD 操作数据库

将RDD的数据保存到mysql数据库中

package day2
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
/**
* 需求找出工资小于等于2000大于900的员工
* select * from emp where sal > ? and sal <= ?
*/
object MyMysqlDemo {
val connection = () => {
Class.forName("com.mysql.cj.jdbc.Driver").newInstance()
DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456")
}
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "E:\hadoop-2.7.3")
val conf = new SparkConf().setMaster("local").setAppName("MyMysqlDemo")
val sc = new SparkContext(conf)
val mysqlRDD = new JdbcRDD(sc, connection, "select * from emp where sal > ? and sal <= ?", 900, 2000, 2, r => {
val ename = r.getString(2)
val sal = r.getInt(4)
(ename, sal)
})
val result = mysqlRDD.collect()
println(result.toBuffer)
sc.stop()
}
}

mysql的company的emp数据
1 Tom 10 2400
2 Alis 11 1900
3 Kei 12 1500
4 Mi 11 900
结果
ArrayBuffer((Alis,1900), (Kei,1500))

JdbcRDD参数说明

参数名称 类型 说明
sc org.apache.spark.SparkContext Spark Context对象
getConnection scala.Function0[java.sql.Connection] 得到一个数据库Connection
sql scala.Predef.String 执行的SQL语句
lowerBound scala.Long 下边界值,即:SQL的第一个参数
upperBound scala.Long 上边界值,即:SQL的第二个参数
numPartitions scala.Int 分区的个数,即:启动多少个Executor
mapRow scala.Function1[java.sql.ResultSet, T] 得到的结果集

JdbcRDD的缺点:从上面的参数说明可以看出,JdbcRDD有以下两个缺点:
(1)执行的SQL必须有两个参数,并类型都是Long
(2)得到的结果是ResultSet,即:只支持select操作

4、操作数据库:把结果存放到数据库中

package day3
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
/**
* 把Spark结果存放到mysql数据库中
*/
object MyTomcatLogCountToMysql {
def main(args: Array[String]): Unit = {
//创建SparkContext
val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCountToMysql")
val sc = new SparkContext(conf)
/**
* 读入日志解析
*
* 192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
*
*/
val rdd1 = sc.textFile("H:\other\localhost_access_log.txt")
.map(
line => {
//解析字符串, 得到jsp的名字
//1.解析两个引号之间的字符串
val index1 = line.indexOf(""")
val index2 = line.lastIndexOf(""")
val line1 = line.substring(index1+1,index2)//GET /MyDemoWeb/oracle.jsp HTTP/1.1
//得到两个空格的位置
val index3 = line1.indexOf(" ")
val index4 = line1.lastIndexOf(" ")
val line2 = line1.substring(index3+1,index4)///MyDemoWeb/oracle.jsp
//得到jsp的名字
val jspName = line2.substring(line2.lastIndexOf("/"))//oracle.jsp
(jspName,1)
}
)
//存入数据库
// var conn : Connection = null
// var pst : PreparedStatement = null
//
// try{
// /**
// * create table mydata(jspname varchar(50), countNumber Int);
// *
// * foreach 没有返回值 , 在本需求中,只需要写数据库,不需要返回新的RDD,所以用foreach即可
// *
// * 运行Task not serializable
// */
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456")
// pst = conn.prepareStatement("insert into mydata values (?,?)")
//
// rdd1.foreach(f => {
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// }catch{
// case t : Throwable => t.printStackTrace()
// }finally{
// if(pst != null) pst.close()
// if(conn != null) conn.close()
// }
// sc.stop()
//第一种修改方式
//存入数据库
// var conn : Connection = null
// var pst : PreparedStatement = null
//
// try{
// rdd1.foreach(f => {
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456")
// pst = conn.prepareStatement("insert into mydata values (?,?)")
//
// pst.setString(1, f._1)
// pst.setInt(2, f._2)
//
// pst.executeUpdate()
// })
// }catch{
// case t : Throwable => t.printStackTrace()
// }finally{
// if(pst != null) pst.close()
// if(conn != null) conn.close()
// }
// sc.stop()
/*
* 第一种修改方式功能上可以实现,但每条数据都会创建连接,对数据库造成很大压力
*
* 针对分区来操作:一个分区建立一个连接即可
*/
rdd1.foreachPartition(saveToMysql)
sc.stop()
}
def saveToMysql(it : Iterator[(String, Int)]) = {
var conn : Connection = null
var pst : PreparedStatement = null
try{
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456")
pst = conn.prepareStatement("insert into mydata values (?,?)")
it.foreach(f => {
pst.setString(1, f._1)
pst.setInt(2, f._2)
pst.executeUpdate()
})
}catch{
case t : Throwable => t.printStackTrace()
}finally{
if(pst != null) pst.close()
if(conn != null) conn.close()
}
}
}
原文地址:https://www.cnblogs.com/hsiehchou/p/10670159.html