【sparkSQL】SparkSession的认识

https://www.cnblogs.com/zzhangyuhang/p/9039695.html

 https://www.jianshu.com/p/dea6a78b9dff

在Spark1.6中我们使用的叫Hive on spark,主要是依赖hive生成spark程序,有两个核心组件SQLcontext和HiveContext。

这是Spark 1.x 版本的语法

1
2
3
4
5
//set up the spark configuration and create contexts
 val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")
 // your handle to SparkContext to access other context like SQLContext
 val sc = new SparkContext(sparkConf).set("spark.some.config.option""some-value")
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)

而Spark2.0中我们使用的就是sparkSQL,是后继的全新产品,解除了对Hive的依赖。

从Spark2.0以上的版本开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext

来实现对数据的加载、转换、处理等工作,并且实现了SQLcontext和HiveContext的所有功能。

我们在新版本中并不需要之前那么繁琐的创建很多对象,只需要创建一个SparkSession对象即可。

SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表。

然后使用SQL语句来操作数据,也提供了HiveQL以及其他依赖于Hive的功能支持。

创建SparkSession

SparkSession 是 Spark SQL 的入口。

使用 Dataset 或者 Datafram 编写 Spark SQL 应用的时候,第一个要创建的对象就是 SparkSession。

Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置。

Builder 的方法如下:

MethodDescription
getOrCreate 获取或者新建一个 sparkSession
enableHiveSupport 增加支持 hive Support
appName 设置 application 的名字
config 设置各种配置

你可以通过 SparkSession.builder 来创建一个 SparkSession 的实例,并通过 stop 函数来停止 SparkSession。

1
2
3
4
5
6
7
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
  .appName("My Spark Application")  // optional and will be autogenerated if not specified
  .master("local[*]")               // avoid hardcoding the deployment environment
  .enableHiveSupport()              // self-explanatory, isn't it?
  .config("spark.sql.warehouse.dir""target/spark-warehouse")
  .getOrCreate

这样我就就可以使用我们创建的SparkSession类型的spark对象了。

2.在SparkSession这个类中,有builder,通过builder去构建SparkSession实例,用法如下。

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("spark://hadoopmaste:

7077").appName("test").config("spark.xxxx.conf", "some-value")

.getOrCreate()

master用于指定spark集群地址

appName用于设置app的名称

config中以key,value的形式进行一些配置

config可以以链式编程的方式多次调用,每次调用可设置一组key,value配置。而且conf中还可以传入一个关键字参数conf,指定外部的SparkConf配置对象getOrCreate,若存在sparksession实例直接返回,否则实例化一个sparksession返回

设置参数

创建SparkSession之后可以通过 spark.conf.set 来设置运行参数

1
2
3
4
5
//set new runtime options
 spark.conf.set("spark.sql.shuffle.partitions"6)
 spark.conf.set("spark.executor.memory""2g")
 //get all settings
 val configMap:Map[String, String] = spark.conf.getAll()//可以使用Scala的迭代器来读取configMap中的数据。

读取元数据

如果需要读取元数据(catalog),可以通过SparkSession来获取。

1
2
3
//fetch metadata data from the catalog
 spark.catalog.listDatabases.show(false)
 spark.catalog.listTables.show(false)

这里返回的都是Dataset,所以可以根据需要再使用Dataset API来读取。

注意:catalog 和 schema 是两个不同的概念

Catalog是目录的意思,从数据库方向说,相当于就是所有数据库的集合;

Schema是模式的意思, 从数据库方向说, 类似Catelog下的某一个数据库;

创建Dataset和Dataframe

通过SparkSession来创建Dataset和Dataframe有多种方法。

最简单的就是通过range()方法来创建dataset,通过createDataFrame()来创建dataframe。

1
2
3
4
5
6
7
8
9
10
11
12
//create a Dataset using spark.range starting from 5 to 100, with increments of 5
val numDS = spark.range(51005)//创建dataset
// reverse the order and display first 5 items
numDS.orderBy(desc("id")).show(5)
//compute descriptive stats and display them
numDs.describe().show()
// create a DataFrame using spark.createDataFrame from a List or Seq
val langPercentDF = spark.createDataFrame(List(("Scala"35), ("Python"30), ("R"15), ("Java"20)))//创建dataframe
//rename the columns
val lpDF = langPercentDF.withColumnRenamed("_1""language").withColumnRenamed("_2""percent")
//order the DataFrame in descending order of percentage
lpDF.orderBy(desc("percent")).show(false)

读取数据

可以用SparkSession读取JSON、CSV、TXT和parquet表。

1
2
3
4
import spark.implicits //使RDD转化为DataFrame以及后续SQL操作
//读取JSON文件,生成DataFrame
val jsonFile = args(0)
val zipsDF = spark.read.json(jsonFile)

使用SparkSQL

借助SparkSession用户可以像SQLContext一样使用Spark SQL的全部功能。

1
2
3
4
zipsDF.createOrReplaceTempView("zips_table")//对上面的dataframe创建一个表
zipsDF.cache()//缓存表
val resultsDF = spark.sql("SELECT city, pop, state, zip FROM zips_table")//对表调用SQL语句
resultsDF.show(10)//展示结果

存储/读取Hive表 

下面的代码演示了通过SparkSession来创建Hive表并进行查询的方法。

1
2
3
4
5
6
7
//drop the table if exists to get around existing table error
 spark.sql("DROP TABLE IF EXISTS zips_hive_table")
 //save as a hive table
 spark.table("zips_table").write.saveAsTable("zips_hive_table")
 //make a similar query against the hive table
 val resultsHiveDF = spark.sql("SELECT city, pop, state, zip FROM zips_hive_table WHERE pop > 40000")
 resultsHiveDF.show(10)

下图是 SparkSession 的类和方法, 这些方法包含了创建 DataSet, DataFrame, Streaming 等等。

MethodDescription
builder "Opens" a builder to get or create a SparkSession instance
version Returns the current version of Spark.
implicits Use import spark.implicits._ to import the implicits conversions and create Datasets from (almost arbitrary) Scala objects.
emptyDataset[T] Creates an empty Dataset[T].
range Creates a Dataset[Long].
sql Executes a SQL query (and returns a DataFrame).
udf Access to user-defined functions (UDFs).
table Creates a DataFrame from a table.
catalog Access to the catalog of the entities of structured queries
read Access to DataFrameReader to read a DataFrame from external files and storage systems.
conf Access to the current runtime configuration.
readStream Access to DataStreamReader to read streaming datasets.
streams Access to StreamingQueryManager to manage structured streaming queries.
newSession Creates a new SparkSession.
stop Stops the SparkSession.

当我们使用Spark-Shell的时候,Spark会自动帮助我们建立好了一个名字为spark的SparkSesson和一个名字为sc的SparkContext。

开发实践

1. 读取mysql表数据

import com.test.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;

public class SparkSimple01 {

    public static void main(String[] args) {

        // 创建spark会话,实质上是SQLContext和HiveContext的组合
        SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();

        // 设置日志级别,默认会打印DAG,TASK执行日志,设置为WARN之后可以只关注应用相关日志
        sparkSession.sparkContext().setLogLevel("WARN");

        // 分区方式读取mysql表数据
        Dataset<Row> predicateSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people",
                (String[]) Arrays.asList(" name = 'tom'", " name = 'sam' ").toArray(), ConnectionInfos.getTestUserAndPasswordProperties());

        predicateSet.show();

    }
}

为了确认该查询对mysql发出的具体sql,我们先查看一下mysql执行sql日志,

#mysql 命令窗口执行以下命令打开日志记录
SHOW VARIABLES LIKE "general_log%";
SET GLOBAL general_log = 'ON';
 
mysql log.png

打开Lenovo.log得到以上代码在mysql上的执行情况:


 
分区执行sql

通过分区查询获取表数据的方式有以下几个优点:

  • 利用表索引查询提高查询效率
  • 自定义sql条件使分区数据更加均匀,方便后面的并行计算
  • 分区并发读取可以通过控制并发控制对mysql的查询压力
  • 可以读取大数据量的mysql表

spark jdbc 读取msyql表还有直接读取(无法读取大数据量表),指定字段分区读取(分区不够均匀)等方式,通过项目实践总结,以上的分区读取方式是我们目前认为对mysql最友好的方式。
分库分表的系统也可以利用这种方式读取各个表在内存中union所有spark view得到一张统一的内存表,在业务操作中将分库分表透明化。如果线上数据表数据量较大的时候,在union之前就需要将spark view通过指定字段的方式查询,避免on line ddl 在做变更时union表报错,因为可能存在部分表已经添加新字段,部分表还未加上新字段,而union要求所有表的表结构一致,导致报错。

2. Dataset 分区数据查看

我们都知道 Dataset 的分区是否均匀,对于结果集的并行处理效果有很重要的作用,spark Java版暂时无法查看partition分区中的数据分布,这里用java调用scala 版api方式查看,线上不推荐使用,因为这里的分区查看使用foreachPartition,多了一次action操作,并且打印出全部数据。

import org.apache.spark.sql.{Dataset, Row}

/**
  * Created by lesly.lai on 2017/12/25.
  */
class SparkRddTaskInfo {
  def getTask(dataSet: Dataset[Row]) {
    val size = dataSet.rdd.partitions.length
    println(s"==> partition size: $size " )
    import scala.collection.Iterator
    val showElements = (it: Iterator[Row]) => {
      val ns = it.toSeq
      import org.apache.spark.TaskContext
      val pid = TaskContext.get.partitionId
      println(s"[partition: $pid][size: ${ns.size}] ${ns.mkString(" ")}")
    }
    dataSet.foreachPartition(showElements)
  }
}

还是用上面读取mysql数据的例子来演示调用,将predicateSet作为参数传入

new SparkRddTaskInfo().getTask(predicateSet);

控制台打印结果


 
分区结果.png

通过分区数据,我们可以看到之前的predicate 方式得到的分区数就是predicate size 大小,并且按照我们想要的数据分区方式分布数据,这对于业务数据的批处理,executor的local cache,spark job执行参数调优都很有帮助,例如调整spark.executor.cores,spark.executor.memory,GC方式等等。
这里涉及java和Scala容器转换的问题,Scala和Java容器库有很多相似点,例如,他们都包含迭代器、可迭代结构、集合、 映射和序列。但是他们有一个重要的区别。Scala的容器库特别强调不可变性,因此提供了大量的新方法将一个容器变换成一个新的容器。
在Scala内部,这些转换是通过一系列“包装”对象完成的,这些对象会将相应的方法调用转发至底层的容器对象。所以容器不会在Java和Scala之间拷贝来拷贝去。一个值得注意的特性是,如果你将一个Java容器转换成其对应的Scala容器,然后再将其转换回同样的Java容器,最终得到的是一个和一开始完全相同的容器对象(这里的相同意味着这两个对象实际上是指向同一片内存区域的引用,容器转换过程中没有任何的拷贝发生)。

3. sql 自定义函数

自定义函数,可以简单方便的实现业务逻辑。

import com.tes.spark.db.ConnectionInfos;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;

public class SparkSimple02 {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Java Spark SQL basic example").getOrCreate();
        sparkSession.sparkContext().setLogLevel("WARN");
        Dataset<Row> originSet = sparkSession.read().jdbc(ConnectionInfos.TEST_MYSQL_CONNECTION_URL, "people", ConnectionInfos.getTestUserAndPasswordProperties());
        originSet.cache().createOrReplaceTempView("people");

        // action操作 打印原始结果集
        originSet.show();

        // 注册自定义函数
        sparkSession.sqlContext().udf().register("genderUdf", gender -> {
            if("M".equals(gender)){
                return  "男";
            }else if("F".equals(gender)){
                return  "女";
            }
            return "未知";
        }, DataTypes.StringType);

        // 查询结果
        Dataset<Row> peopleDs = sparkSession.sql("select job_number,name,age, genderUdf(gender) gender, dept_id, salary, create_time from people ");

        // action操作 打印函数处理后结果集
        peopleDs.show();
    }
}

执行结果:


 
image.png

在sql中用使用java代码实现逻辑操作,这为sql的处理逻辑能力提升了好几个层次,将函数抽取成接口实现类可以方便的管理和维护这类自定义函数类。此外,spark也支持自定义内聚函数,窗口函数等等方式,相比传统开发实现的功能方式,使用spark sql开发效率可以明显提高。

4. mysql 查询连接复用

最近线上任务遇到一个获取mysql connection blocked的问题,从spark ui的executor thread dump 可以看到blocked的栈信息,如图:


 
connection blocked.png

查看代码发现DBConnectionManager 调用了 spark driver注册mysql driver 使用同步方式的代码


 
driverRegister.png

看到这里我们很容易觉得是注册driver 导致的blocked,其实再仔细看回报错栈信息,我们会发现,这里的getConnection是在dataset 的foreachpartition 中调用,并且是在每次db 操作时获取一次getConnection 操作,这意味着在该分区下有多次重复的在同步方法中注册driver获取连接的操作,看到这里线程blocked的原因就很明显了,这里我们的解决方式是:
a. 在同个partition中的connection 复用进行db操作
b. 为了避免partition数据分布不均导致连接active时间过长,加上定时释放连接再从连接池重新获取连接操作
通过以上的连接处理,解决了blocked问题,tps也达到了4w左右。

5. executor 并发控制

我们都知道,利用spark 集群分区并行能力,可以很容易实现较高的并发处理能力,如果是并发的批处理,那并行处理的能力可以更好,但是,mysql 在面对这么高的并发的时候,是有点吃不消的,因此我们需要适当降低spark 应用的并发和上下游系统和平相处。控制spark job并发可以通过很多参数配置组合、集群资源、yarn队列限制等方式实现,经过实践,我们选择以下参数实现:

#需要关闭动态内存分配,其他配置才生效
spark.dynamicAllocation.enabled = false
spark.executor.instances = 2
spark.executor.cores = 2
 
image.png

这里发现除了设置executor配置之外,还需要关闭spark的动态executor分配机制,spark 的ExecutorAllocationManager 是 一个根据工作负载动态分配和删除 executors 的管家, ExecutorAllocationManager 维持一个动态调整的目标executors数目, 并且定期同步到资源管理者,也就是 yarn ,启动的时候根据配置设置一个目标executors数目, spark 运行过程中会根据等待(pending)和正在运行(running)的tasks数目动态调整目标executors数目,因此需要关闭动态配置资源才能达到控制并发的效果。

除了executor是动态分配之外,Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,我们先看看worker中的内存规划是怎样的:


 
worker memory schedule.png

worker 可以根据实例配置,内存配置,cores配置动态生成executor数量,每一个executor为一个jvm进程,因此executor 的内存管理是建立在jvm的内存管理之上的。从本文第一张spark on yarn图片可以看到,yarn模式的 executor 是在yarn container 中运行,因此container的内存分配大小同样可以控制executor的数量。
RDD 的每个 Partition 经过处理后唯一对应一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID ),从上图可以看出,开发过程中常用的分区(partition)数据是以block的方式存储在堆内的storage内存区域的,还有为了减少网络io而做的broadcast数据也存储在storage区域;堆内的另一个区域内存则主要用于缓存rdd shuffle产生的中间数据;此外,worker 中的多个executor还共享同一个节点上的堆外内存,这部分内存主要存储经序列化后的二进制数据,使用的是系统的内存,可以减少不必要的开销以及频繁的GC扫描和回收。

为了更好的理解executor的内存分配,我们再来看一下executor各个内存块的参数设置:


 
executor jvm

 
off-heap.png

了解spark 内存管理的机制后,就可以根据mysql的处理能力来设置executor的并发处理能力,让我们的spark 应用处理能力收放自如。调整executor数量还有另外一个好处,就是集群资源规划,目前我们的集群队列是yarn fair 模式,


 
yarn fair 集群模式.png

先看看yarn fair模式,举个例子,假设有两个用户A和B,他们分别拥有一个队列。当A启动一个job而B没有任务时,A会获得全部集群资源;当B启动一个job后,A的job会继续运行,当A的job执行完释放资源后,不过一会儿之后两个任务会各自获得一半的集群资源。如果此时B再启动第二个job并且其它job还在运行,则它将会和B的第一个job共享B这个队列的资源,也就是B的两个job会用于四分之一的集群资源,而A的job仍然用于集群一半的资源,结果就是资源最终在两个用户之间平等的共享。

在这种情况下,即使有多个队列执行任务,fair模式容易在资源空闲时占用其他队列资源,一旦占用时间过长,就会导致其他任务都卡住,这也是我们遇到的实际问题。如果我们在一开始能评估任务所用的资源,就可以在yarn队列的基础上指定应用的资源,例如executor的内存,cpu,实例个数,并行task数量等等参数来管理集群资源,这有点类似于yarn Capacity Scheduler 队列模式,但又比它有优势,因为spark 应用可以通过spark context的配置来动态的设置,不用在配置yarn 队列后重启集群,稍微灵活了一点。

除了以上提到的几点总结,我们还遇到很多其他的疑问和实践,例如,什么时候出现shuffle;如何比较好避开或者利用shuffle;Dataset 的cache操作会不会有性能问题,如何从spark ui中分析定位问题;spark 任务异常处理等等,暂时到这里,待续...

原文地址:https://www.cnblogs.com/yuluoxingkong/p/10635469.html