Spark工作中遇到的问题

1、java: 找不到符号

map(o->o._2)处提示找不到符号

SparkSession spark = SparkSession.builder().appName(appName).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaRDD<ObjectArrayWritable> rdd = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf).map(o->o._2);

使用sequenceFile代替newAPIHadoopFile

或者将原先代码分为两行

JavaPairRDD<ObjectArrayWritable,ObjectArrayWritable> pairRDD = jsc.newAPIHadoopFile(dir, SequenceFileInputFormat.class, ObjectArrayWritable.class, ObjectArrayWritable.class, hdConf);
JavaRDD<ObjectArrayWritable> rdd = pairRDD.map(o->o._2);

 

2、java.lang.ClassCastException: java.util.Arrays$ArrayList cannot be cast to scala.collection.Seq

原因:在创建Row时突发奇想把java的List当做元素放入

List<Object> list = new ArrayList<>();
Row r = RowFactory.create(list);

然后在shuffle后取出时报错

Row r = ...
List<Object> list = r.getList(0);

看了下scala的代码发现,虽然放入时是按照java类型放入的;但是使用getList取出时是按照scala的seq序列取出,所以导致类型转换异常

根据源码看到getAs方法,将getList替换为getAs,这次没有报ClassCastException;但是出现另外一个错误!

Caused by: java.lang.UnsupportedOperationException

原因是因为我获取到List后是没有具体子类实现的,所以在调用addAll时,最后使用了AbstractList的add方法

创建新的list,将getAs的list遍历放入新的list。

 

3、spark Container killed on request. Exit code is 143

很大可能是由于物理内存达到限制,导致container被kill掉报错。

粗暴简单的解决方式,增加executor内存大小

spark.executor.memory 4g

再增加内存后依然不行,查看application的log日志发现(yarn logs -applicationId …)

ERROR RetryingBlockFetcher: Exception while beginning fetch of 5 outstanding blocks (after 2 retries) java.io.IOException: Failed to connect to 主机:端口

原来是某个executor挂了,某个exetutor想要fetch数据(应该是shuffle read),但那个有数据的executor挂了,导致fetch失败

shuffle分为shuffle write和shuffle read两部分。

shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

如果shuffle read的量很大,那么将会导致一个task需要处理的数据非常大,从而导致JVM crash以及取shuffle数据失败,最后executor也丢失了,看到Failed to connect to host的错误(executor lost)或者造成长时间的gc。

 

解决方案:

(a) 减少shuffle数据和操作 思考是否可以使用map side join或是broadcast join来规避shuffle的产生。 将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

(b) 控制分区数 对于SparkSQL和DataFrame的join,group by等操作 通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。 对于Rdd的join,groupBy,reduceByKey等操作 通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

(c)提高executor的内存 通过spark.executor.memory适当提高executor的memory值。

(d)增加并行task的数目 通过增加并行task的数目,从而减小每个task的数据量。(spark.default.parallelism)

(e)查看是否存在数据倾斜的问题 是否存在某个key数据特别大导致倾斜?如果存在可以单独处理或者考虑改变数据分区规则。

原文地址:https://www.cnblogs.com/java-meng/p/15189266.html