spark UDAF 函数解决

UDAF函数用户自定义函数:

用java代码实现UDAF函数

SparkConf conf =  new SparkConf();
        conf.setAppName("UDAF").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext hc = new SQLContext(sc);


        JavaRDD<String> rdd = sc.parallelize(Arrays.asList("zhangsan", "lisi", "wangwu", "zhsangsan", "lisi", "zhsnagsan", "wangwu", "zhangsan", "lisi", "wangwu", "zhsangsan", "lisi", "zhsnagsan", "wangwu"));

        //将RDD转换成rddrow
        JavaRDD<Row> rddrow = rdd.map(new Function<String, Row>() {
            @Override
            public Row call(String str) throws Exception {
                return RowFactory.create(str);
            }
        });

        StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);
        List<StructField> fields = Arrays.asList(name);
        StructType schema = DataTypes.createStructType(fields);

        //创建DataType
        Dataset<Row> df = hc.createDataFrame(rddrow, schema);
        df.show();

        df.registerTempTable("user");

        /**
         * 注册UDAF函数
         *
         */

        hc.udf().register("strCount", new UserDefinedAggregateFunction() {
            @Override
            //定义输入数据的类型
            public StructType inputSchema() {
                return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType,true)));
            }

            @Override
            //输出结果的类型
            public DataType dataType() {
                return DataTypes.IntegerType;
            }

            @Override
            //判断当相同的值输入的时候是否有相同的输出
            public boolean deterministic() {
                return true;
            }

            @Override
            public void update(MutableAggregationBuffer buffer, Row row) {

                buffer.update(0,buffer.getInt(0)+1);

            }

            @Override
            //缓存区定义的数据类型
            public StructType bufferSchema() {
                return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("buffer",DataTypes.IntegerType,true)));
            }

            @Override
            public void merge(MutableAggregationBuffer buffer, Row row) {

                buffer.update(0,buffer.getInt(0)+row.getInt(0));
            }

            @Override
            //对数据进行初始化
            public void initialize(MutableAggregationBuffer buffer) {

                buffer.update(0,0);
            }

            @Override
            /**
             * 定义返回的结果
             *
             */
            public Object evaluate(Row row) {
                return row.getInt(0);
            }
        });


        hc.sql("select name,strCount(name) from user group by name").show();

UDAF函数实现时,方法UserDefinedAggregateFunction的八个需要实现方法的作用如上代码中注释,其中最重要的三个方法是:

update: 对相同元素进行合并时,row个数的更新

merge: shuffer归并的时候,将相同元素拉取的时候,对拉取的元素的总数进行累加

init:初始化函数。初始化函数在整个过程中总共调用了两次,作用是将数组中的元素进行初始化,第一次被调用初始化的时候,是在同一个分区相同数据进行统计的时候,第二次进行初始化的时候,是在讲相同元素从不同分区拉倒一起统计的数据,会生成数组,然后首先会对这个数据进行初始化。

本地执行Hive的条件:

1、拷贝当前配置文件到src目录:hive-site.xml,core-site.xml,hdfs-site.xml

2、添加jar,以data开头的三个jar 文件

3、window环境必须是以root用户名命名的

4、执行的时候可能内存不够,添加VM参数配置 -server -Xmas512M -Xmx1024M -XX:PermSize = 256M -XX:MaxNewSize = 512M -XX:MaxPermSize = 512M 

原文地址:https://www.cnblogs.com/wcgstudy/p/11066602.html