Spark开发-Spark中UDAF开发示例

Spark UDAF

 无泛型约束的UDAF  extends UserDefinedAggregateFunction  extends Aggregator  dataframe设计的
 有泛型约束的UDAF  extends Aggregator 该UDAF时允许添加泛型,保障函数更加安全.但是这种UDAF不可直接在SQL中被调用运算 适用于强类型Datasets
01.在Spark中使用
    1.编写UDAF<两种类型的UDAF都可以>
    2. 在spark中注册UDAF,为其绑定一个名字,使用
02.在Spark SQL 中使用
   1.编写UDAF<使用继承 UserDefinedAggregateFunction 类型编写>
   2. 打Jar包,并上传
   3. 注册临时聚合函数,并使用
        ADD  jar TestSpark.jar;
        CREATE  TEMPORARY FUNCTION  mean_my AS  'com.test.structure.udaf.MeanMy';
        select t1.data,mean_my(t1.age)
        from (select 33 as age,  '1' as data union all select 55  as age, '1' as data 
           union all select 66 as age, '2' as data)t1
        group by   t1.data;

Spark UDAF开发

`
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;

 public class MeanFloatUDAF extends UserDefinedAggregateFunction {
  /**
  * 聚合函数的输入数据结构
  *   函数的参数列表,不过需要写成StructType的格式
  */
 @Override
 public StructType inputSchema() {
    List<StructField> structFields = new ArrayList<>();
    structFields.add(DataTypes.createStructField( "field_nm", DataTypes.DoubleType, true ));
    return DataTypes.createStructType( structFields );
}

/**
 * 聚缓存区数据结构 - 产生中间结果的数据类型
 * 如果是求平均数,存储总和以及计数,总和及计数就是中间结果
 * count    buffer.getInt(0)
 * sum_field   buffer.getDouble(1)
 */
@Override
public StructType bufferSchema() {
    List<StructField> structFields = new ArrayList<>();
    structFields.add(DataTypes.createStructField( "count", DataTypes.IntegerType, true ));
    structFields.add(DataTypes.createStructField( "sum_field", DataTypes.DoubleType, true ));
    return DataTypes.createStructType( structFields );
}

/**
 * 聚合函数返回值数据结构
 */
@Override
public DataType dataType() {
    return DataTypes.DoubleType;
}

/**
 * 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
 */
@Override
public boolean deterministic() {
    return true;
}

/**
 * 初始化缓冲区
 * buffer是中间结果,是Row类的子类
 */
@Override
public void initialize(MutableAggregationBuffer buffer) {
    //相加的初始值,这里的要和上边的中间结果的类型和位置相对应 - buffer.getInt(0)
    buffer.update(0,0);
    //参与运算数字个数的初始值
    buffer.update(1,Double.valueOf(0.0) );
}

/**
 *  给聚合函数传入一条新数据进行处理
 *  //每有一条数据参与运算就更新一下中间结果(update相当于在每一个分区中的计算)
 *  buffer里面存放着累计的执行结果,input是当前的执行结果
 */
@Override
public void update(MutableAggregationBuffer buffer, Row input) {
    //个数加1
    buffer.update(0,buffer.getInt(0)+1);
    //每有一个数字参与运算就进行相加(包含中间结果)
    buffer.update(1,buffer.getDouble(1)+Double.valueOf(input.getDouble(0)));
}

/**
 *  合并聚合函数缓冲区   //全局聚合
 */
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0));
    buffer1.update(1,buffer1.getDouble(1)+buffer2.getDouble(1));
}

/**
 * 计算最终结果
 */
@Override
public Object evaluate(Row buffer) {
    return buffer.getDouble(1)/buffer.getInt(0);
}

}

`

Spark聚合函数使用

在Spark中使用  extends UserDefinedAggregateFunction类型的UDAF的使用
`
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;

public class MeanUDAFMain {
public static void main(String[] args){
    try {
        SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL data sources example")
                .config("spark.some.config.option", "some-value")
                .master("local[2]")
                .getOrCreate();
        List<Row> dataExample = Arrays.asList(
                RowFactory.create( "2019-0801", 4,9.2),
                RowFactory.create( "2020-0802", 3,8.6),
                RowFactory.create( "2021-0803",2,5.5),
                RowFactory.create( "2021-0803",2,5.5),
                RowFactory.create( "2021-0803",7,4.5)
        );
        StructType schema = new StructType(new StructField[]{
                 new StructField("date", DataTypes.StringType, false, Metadata.empty()),
                new StructField("dist_mem", DataTypes.IntegerType, false, Metadata.empty()),
                new StructField("dm_mem", DataTypes.DoubleType, false, Metadata.empty())
        });
        Dataset<Row> itemsDF = spark.createDataFrame(dataExample, schema);
        itemsDF.printSchema();
        itemsDF.createOrReplaceTempView("test_mean_table");
        // 注册自定义聚合函数 -2. 在spark中注册UDAF,为其绑定一个名字
        spark.udf().register("mymean",new MeanFloatUDAF ());
        spark.sql("select dist_mem  from test_mean_table").show();
        spark.sql("select date,mymean(dm_mem) memdoubleMean from test_mean_table group by date").show();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

`

附录

 在开发Spark 的UDAF中,查询了相关资料,涉及一些其他的逻辑设计,但没有相关的思路。而搜集的资料和示例也不是很清晰。此时,将复杂的问题进行拆解。
 一是通过复习相关的基础点,对开发中涉及到的基础内容进行深入的学习和理解,结合当前的内容进行扩展,构建扎实的知识体系和对当前内容的理解。
 二是对搜集的资料和示例进行修改,通过修改相应的内容来确认不是很清晰的概念。
  本例就是使用第二种方式,来确认各个输入和输出之间的关系。通过对简单示例的学习修改和改进调试,不断深入。
 总结: 资料和示例搜集很重要,不断改变关键词,搜集相关内容
       资料的来源和准确性要进行确认和测试,不可盲信
       原理和基础内容,看着用处不大,但实际上功底所在,自然而然。基础扎实,是走的顺和走的远的一个必要条件。

参考

 Spark SQL 用户自定义函数UDF、用户自定义聚合函数UDAF 教程(Java踩坑教学版) https://www.cnblogs.com/xing901022/p/6436161.html
 Spark笔记之使用UDAF(User Defined Aggregate Function) https://www.cnblogs.com/cc11001100/p/9471859.html
 User Defined Aggregate Functions (UDAFs)  http://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html
 SparkSQL自定义聚合函数(UDAF)实现bitmap函数 https://blog.csdn.net/xiongbingcool/article/details/81282118
原文地址:https://www.cnblogs.com/ytwang/p/13985774.html