Spark2.x(五十四):在spark structured streaming下测试ds.selectExpr(),当返回列多时出现卡死问题。

业务需求,有一部分动态字段,需要在程序中动态加载并解析表达式:

实现方案1):在MapFunction、MapPartitionFunction中使用FelEngine进行解析:

        FelEngine fel = FelEngine.instance;
        FelContext ctx = fel.getContext();
        ctx.set("rsrp", 100);
        ctx.set("rsrq", 80);

        expValue = Double.valueOf(String.valueOf(fel.eval("rsrp*10-rsrq*8")));

实现方案2):采用selectExpr()函数

package com.dx.streaming.drivers.test;

import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import scala.collection.JavaConversions;
import scala.collection.Seq;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class MrsExpressionDoWithSelectExp {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate();
        
        StructType type = new StructType();
        type = type.add("id", DataTypes.StringType);
        type = type.add("cellname", DataTypes.StringType);
        type = type.add("rsrp", DataTypes.StringType);
        type = type.add("rsrq", DataTypes.StringType);
        ExpressionEncoder<Row> encoder = RowEncoder.apply(type);

        Dataset<String> ds = sparkSession.readStream().textFile("E:\test-structured-streaming-dir\*");
        Dataset<Row> rows = ds.mapPartitions(new MapPartitionsFunction<String, Row>() {
            private static final long serialVersionUID = -1988302292518096148L;

            @Override
            public Iterator<Row> call(Iterator<String> input) throws Exception {
                List<Row> rows = new ArrayList<>();
                while (input.hasNext()) {
                    String line = input.next();
                    String[] items = line.split(",");
                    rows.add(RowFactory.create(items));
                }
                return rows.iterator();
            }
        }, encoder);
        rows.printSchema();

        int dynamicExprLength=10;
        Map<String, String> expMap = new LinkedHashMap<>();
        // 从配置文件加载配置公式
        expMap.put("rsrpq_count", "rsrp+rsrp");
        expMap.put("rsrpq_sum", "rsrp*10+rsrq*10");
        for(int i=0;i<dynamicExprLength;i++){
            expMap.put("rsrpq_sum"+i, "rsrp*10+rsrq*10");    
        }
                
        expMap.put("$rsrpq_avg", "rsrpq_sum/rsrpq_count");

        List<String> firstLayerExpList = new ArrayList<>();
        List<String> secondLayerExpList = new ArrayList<>();
        firstLayerExpList.add("*");
        secondLayerExpList.add("*");

        for (Map.Entry<String, String> kv : expMap.entrySet()) {
            if (kv.getKey().startsWith("$")) {
                secondLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey().replace("$", ""));
            } else {
                firstLayerExpList.add("(" + kv.getValue() + ") as " + kv.getKey());
            }
        }

        // 第一层计算:select *,(rsrp+rsrp) as rsrpq_count,(rsrp*10+rsrq*10) as rsrpq_sum
        //rows = rows.selectExpr(firstLayerExpList.toArray(new String[firstLayerExpList.size()] ));
        Seq<String> firstLayerExpSeq = JavaConversions.asScalaBuffer(firstLayerExpList);
        rows = rows.selectExpr(firstLayerExpSeq);
        //rows.show();

        // 第二层计算:select *,(rsrpq_sum/rsrpq_count) as rsrpq_avg
        //rows = rows.selectExpr(secondLayerExpList.toArray(new String[secondLayerExpList.size()] ));
        Seq<String> secondLayerExpSeq = JavaConversions.asScalaBuffer(secondLayerExpList);
        rows = rows.selectExpr(secondLayerExpSeq);
        
        rows.printSchema();
        //rows.show();
        rows.writeStream().format("console").outputMode(OutputMode.Append()).trigger(Trigger.ProcessingTime(1,TimeUnit.MINUTES)).start();
        try {
            sparkSession.streams().awaitAnyTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }

    }
}

此时动态列dynamicExprLength为10,可以正常输出。

ds.selectExpr()问题发现:

当列设置为500或者1000时,本地测试出现以下问题:

19/07/18 14:18:18 INFO CodeGenerator: Code generated in 105.715218 ms
19/07/18 14:18:19 WARN CodeGenerator: Error calculating stats of compiled class.
java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1509)
    at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
    at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
    at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:996)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:993)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:993)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:961)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1027)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1024)
    at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:906)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:412)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:366)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:890)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection$lzycompute(ExpressionEncoder.scala:263)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.extractProjection(ExpressionEncoder.scala:263)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:287)
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
    at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:573)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:235)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/07/18 14:18:19 INFO CodeGenerator: Code generated in 1354.475257 ms

当发布到yarn上不管是yarn-client还是yarn-cluster都会出现卡死问题,executor/driver创建起来,并且都分配了资源,但是没有任务被分配。

而且没有任何错误日志抛出,一直卡顿,可以持续到无限时间。

原文地址:https://www.cnblogs.com/yy3b2007com/p/11207078.html