Hive高阶开发_自定义函数UDF/UDAF/UDTF

Hive中编写自定义函数

自定义函数有三类
 UDF
 UDAF
 UDTF

Hive中编写UDF函数的方式

Hive有两个不同的接口编写UDF程序。一个是基础的UDF接口,一个是复杂的GenericUDF接口。
    01.UDF
	  重写 evaluate
	2.GenericUDF :增强版的 udf ( 支持复杂类型的输入和输出 )
	  01、继承GenericUDF
      02、实现initialize、evaluate、getDisplayString方法
	     initialize        : 这个方法的目标是确定参数的返回类型
	     evaluate          :实现主要逻辑
	     getDisplayString  :显示函数的用法  

Hive编写UDAF

1.继承 AbstractGenericUDAFResolver
2.继承 GenericUDAFEvaluator
3.Evaluator 需要实现 init、iterate、terminatePartial、merge、terminate这几个函数 
   init 初始化
   iterate 函数处理读入的行数据
   terminatePartial 返回iterate处理的中间结果
   merge 合并上述处理结果
   terminate 返回最终值

 案例
 public static enum Mode {
    /**
     * PARTIAL1: from original data to partial aggregation data: iterate() and terminatePartial() will be called.
     * PARTIAL1: 从原始数据到部分聚合数据的过程,会调用 iterate()和 terminatePartial()
     * 可以理解为MapReduce过程中的map阶段
     */
    PARTIAL1,
        /**
     * PARTIAL2: from partial aggregation data to partial aggregation data: * merge() and terminatePartial() will be called.
     * PARTIAL2: 从部分聚合数据到部分聚合数据的过程(多次聚合),会调用 merge()和 terminatePartial()
     * 可以理解为MapReduce过程中的combine阶段
     */
    PARTIAL2,
        /**
     * FINAL: from partial aggregation to full aggregation: merge() and terminate() will be called.
     * FINAL: 从部分聚合数据到全部聚合数据的过程,会调用 merge()和 terminate()
     * 可以理解为MapReduce过程中的reduce阶段
     */
    FINAL,
        /**
     * COMPLETE: from original data directly to full aggregation: iterate() and terminate() will be called.
     * COMPLETE: 从原始数据直接到全部聚合数据的过程,会调用 iterate()和 terminate()
     * 可以理解为MapReduce过程中的直接map输出阶段,没有reduce阶段
     */
    COMPLETE
  };


3.程序执行过程:
      1)PARTIAL1(阶段1:map):init() --> iterate() --> terminatePartial()
      2)PARTIAL2(阶段2:combine):init() --> merge() --> terminatePartial()
      3)FINAL (最终阶段:reduce):init() --> merge() --> terminate()
      4)COMPLETE(直接输出阶段:只有map):init() --> iterate() --> terminate()
      
      注:每个阶段都会执行init()初始化操作。
一个GenericUDAF必须先了解以下两个抽象类: 
    org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver 
    org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
过程理解:
   这个UDAF函数读取数据(mapper),
   聚集一堆mapper输出到部分聚集结果(combiner),
   并且最终创建一个最终的聚集结果(reducer)。
   因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果

完整的UDAF逻辑是一个mapreduce过程,
    如果有mapper和reducer,             就会经历PARTIAL1(mapper),FINAL(reducer),
	如果有mapper和reducer还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
    有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果

Hive写UDTF

  POM文件
  	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-exec</artifactId>
		<version>2.3.4</version>
		<scope>provided</scope>
	 </dependency>
 类
   org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
重写的方法
   initialize   process  close
  //该方法中,指定输入输出参数:输入参数的ObjectInspector 与输出参数的StructObjectInspector
   abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;  
   //处理一条输入记录,输出若干条结果记录
   abstract void process(Object[] record) throws HiveException; 
   //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
   abstract void close() throws HiveException;

Hive UDTF代码示例

 代码主要参考网上资料,做了注释用于学习
`
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;

public class GenericUDTFEBitMap extends GenericUDTF {
/**
 *   输入数据
 */
private transient BinaryObjectInspector binaryOI = null;

/**
 *
 * @param args 输入参数的ObjectInspector
 * @return 输出参数的StructObjectInspector
 * @throws UDFArgumentException
 */
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException  {

    if (args.length != 1) {
        throw new UDFArgumentException("GenericUDTFEBitMap() takes  only one argument");
    }
    /** 有两种数据类型
     *  two categories of Hive Data types that are primitive data type and complex data type
     *  PRIMITIVE : Numeric  Date/time String Miscellaneous
     *  Complex Type:  Array  Map  Struct Union
     * public static enum Category { PRIMITIVE, LIST, MAP, STRUCT, UNION; private Category() {}}
     */
    if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
            && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.BINARY) {
        throw new UDFArgumentException("GenericUDTFEBitMap() () takes a binary as a parameter");
    }

    // 输入格式(inspectors)
    binaryOI = (BinaryObjectInspector) args[0];

    // ObjectInspector生成的方式
    // 输出格式(inspectors) -- 有两个属性的对象
    // 静态属性获得列名和列的类型 定义 List<ObjectInspector>列表
    ArrayList<String> fieldNames = new ArrayList<String>();
    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
    fieldNames.add("col_id");
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
    // 通过 ObjectInspectorFactory 构造ObjectInspector
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

/**
 *  forwardListObj forwardMapObj
 *  forward
 * @param record
 * @throws HiveException   处理一条输入记录,输出若干条结果记录
 */
@Override
public void process(Object[] record) throws HiveException {
    byte[] idBytes = this.binaryOI.getPrimitiveJavaObject(record[0]);
    if(idBytes !=null && idBytes.length>0){
        ImmutableRoaringBitmap other = new ImmutableRoaringBitmap(ByteBuffer.wrap(idBytes));
        Iterator<Integer> iterator = other.iterator();
        while (iterator.hasNext()){
            //   forward  Passes an output row to the collector.
            //   真正的处理过程在process函数中,在process中,每一次forward()调用产生一行
            forward(new Object[] { iterator.next() });
        }
    }
}

/**
 * 当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
 * @throws HiveException
 */
@Override
public void close() throws HiveException {
}

/**
 * toString
 */
@Override
public String toString() {
    return "GenericUDTFEBitMap";
}

}`

参考

 Hive之自定义聚合函数UDAF  https://blog.csdn.net/weixin_39469127/article/details/89766266
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
 https://github.com/sunyaf/bitmapudf
 Hive Data Types – Primitive and Complex Data Types in Hive https://data-flair.training/blogs/hive-data-types/
原文地址:https://www.cnblogs.com/ytwang/p/13986913.html