Spark调优与调试

1.使用SparkConf配置Spark
(1)在java中使用SparkConf创建一个应用:
SparkConf conf =new SparkConf();
//设置应用名称
conf.set("spark.app.name",",my spark app");
//设置master
conf.set("spark.master","local");
//设置ui端口号
conf.set("spark.ui.port","36000");
//使用这个配置对象创建一个SparkContext
JavaSparkContext sc =new JavaSparkContext(conf);
当然也可以调用SaprkConf的set方法,使用如下:
SparkConf conf =new SparkConf();
conf.setAppName("my spark app");
conf.setMaster("local");
JavaSparkContext sc =new JavaSparkContext(conf);
(2)也可以在spark-submit中指定conf的相关设置,如下:
   
  1. bin/spark-submit
  2. --class com.example.myapp
  3. --master local
  4. --name myspp
  5. --conf spark.ui.port=36000
  6. myApp.jar
    spark-submit也支持从文件中读取配置项的值。这对于设置一些与环境相关的配置项比较有用,方便不同用户共享这些配置。默认情况下,spark-submit脚本会在Spark安装目录中找到conf/spark-defaults.conf文件,尝试读取该文件中以空格隔开的键值对数据。你也可以通过spark-submit的--properties-File标记,自定义该文件的路径。
    示例如下:
  1. bin/spark-submit
  2. --class com.example.myapp
  3. --properties-file my-config.conf
  4. myApp.jar
  5. ##Content of my-config.conf ##
  6. spark.master local[4]
  7. spark.app.name "my spark app"
  8. spark.ui.port 360000
    注意:有时候,有可能同一个配置项在多个地方被配置了。Spark的优先级顺序是:优先级最高的是在用户代码中显示调用set()方法设置的选项。其次是通过spark-submit传递的参数,再次是卸载配置文件中的值,最后是系统的默认值。
常用的Spark配置项的值
常用的Spark配置项的值默认值描述
spark.executor.memory
(--executor-memory)
512M为每个执行器进程分配的内存,格式与JVM内存字符串格式一样。
spark.executor.cores
(--executor-cores)
spark.core.max
(--total-executor-cores)
1(无)限制应用使用的核心个数的配置项。在YARN模式下,spark.executor.cores会为每个任务分配指定书目的核心。在独立模式和Mesos模式下,spark.core.max设置了所有执行器进程使用的核心总数的上限。
spark.speculationfalse设为true时开启任务预测执行机制。当出现比较慢的任务是,这种机制会在另外的节点上也尝试执行该任务的一个副本。打开此选项会帮助减少大规模集群中个别慢的任务带来的影响。
spark.storage.blockMan
agerTimeoutIntervalMs
45000内部用来通过超时机制追踪执行器进程是否存活的阀值。对于会引发长时间垃圾回收暂停的作业,需要把这个值调到100秒以上来放置失败。在Spark将来的版本中,这个配置项可能会被一个统一的超时设置所替代。
spark.executor.extraJava Options
spark.executor.extra
ClassPath
spark.executor.extra 
Library
这三个参数用来自动以如何启动执行器进程的JVM,分别用来添加额外的Java参数,classpath以及程序库路径。使用字符串来设置这些参数。推荐使用--jars标记来添加依赖。
spark.serizlizerorg.apache.spark.serializer.JavaSerializer指定用来进行序列化的类库,包含通过网络传输数据或缓存数据时的徐泪花。默认的Java序列化对于任何可以被徐泪花的Java独享都适用,但是速度很慢。我们推荐在追求速度时适用org.apache.spark.serializer.KryoSerializer并且对Kyro进行适当的调优。该项可以配置为任何org.apache.serializer的子类。
spark.eventLog.enabledfalse设为true时,开启时间日志机制,这样已完成的Saprk作业就可以通过历史服务器查看。
spark.eventLog.dirfile///tmp/spark-event值开启事件日志机制时,事件日志文件的存储位置。这个值指向的路径需要设置一个全局可见的文件系统中,比如HDFS



2.spark执行流程
(1)用户代码定义RDD的有向无环图
    RDD上的操作会创建出新的RDD,并引用它们的父节点,这样就创建出了一个图
(2)行动操作把有向无环图强制转译为执行计划
    当你调用RDD的一个行动操作是,这个RDD就必须计算出来。这也要求计算出该RDD的父节点。Saprk调度器提交一个作业来计算所有必要的RDD。这个作业会包含一个或者多个步骤,每个步骤其实也就是一波并行执行的计算任务。一个步骤对应有向无环图中的一个或者多个RDD,一个步骤对应多个RDD是因为发生了流水线执行。
(3)任务于集群中调度并执行
    步骤是按照顺序处理的,任务则独立地启动来计算出RDD的一部分。一旦作业的最后一个步骤结束,一个行动操作也就执行完毕了。
3.使用Kryo序列化工具并注册所需要的类 重点!
    spark在开发过程中,当数据量很大的时候,为了减少内存的消耗,可以使用KryoSerializer进行序列化,其具体操作步骤如下:
    (1)指定spark序列化类
    (2)增加类MyRegistrator类,注册需要Kryo序列化的类
以下是示例程序:
MyRegistrator.java
import com.esotericsoftware.kryo.Kryo;
import org.apache.spark.serializer.KryoRegistrator;

/**
* Created by Administrator on 2015/11/30.
*/
public class MyRegistrator implements KryoRegistrator{
public void registerClasses(Kryo kryo) {
//注册需要使用KryoSerizlizer的类
kryo.register(javaBean.class);
}
}
javaBean.java
public class javaBean implements Serializable {
public int i;
//构造函数
public javaBean(int i){
this.i=i;
}
//getter
public int getI() {
return i;
}
//setter
public void setI(int i) {
this.i = i;
}
public String toString(){
return "this is the "+i+"th element!";

}
}
main.java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
* Created by Administrator on 2015/11/30.
*/
public class main {
public static void main(String args[]){
SparkConf conf =new SparkConf();
conf.setAppName("my spark app");
conf.setMaster("local");
//设置序列化类
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
//注册要使用KryoSerializer的类
conf.set("spark.kryo.registrator","MyRegistrator");
JavaSparkContext sc =new JavaSparkContext(conf);
List<javaBean> list =new ArrayList<javaBean>();
for(int i=0;i<10000;i++){
javaBean bean =new javaBean(i);
list.add(bean);
}
JavaRDD<javaBean> rdd =sc.parallelize(list);
for(javaBean bean:rdd.collect())
System.out.println(bean);



}
}













我的github: https://github.com/zhoudayang
原文地址:https://www.cnblogs.com/zhoudayang/p/5007759.html