064 UDF

一:UDF

1.自定义UDF

  

二:UDAF 

2.UDAF

  

3.介绍AbstractGenericUDAFResolver

  

4.介绍GenericUDAFEvaluator

  

5.程序

  1 package org.apache.hadoop.hive_udf;
  2 
  3 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  4 import org.apache.hadoop.hive.ql.metadata.HiveException;
  5 import org.apache.hadoop.hive.ql.parse.SemanticException;
  6 import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
  7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
  8 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
  9 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 10 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 11 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 12 import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
 13 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 14 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
 15 import org.apache.hadoop.io.LongWritable;
 16 
 17 /**
 18  * 
 19  * 需求:实现sum函数,支持int和double类型
 20  *
 21  */
 22 
 23 public class UdafProject extends AbstractGenericUDAFResolver{
 24     public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
 25             throws SemanticException {
 26         //判断参数是否是全部列
 27         if(info.isAllColumns()){
 28             throw new SemanticException("不支持*的参数");
 29         }
 30         
 31         //判断是否只有一个参数
 32         ObjectInspector[] inspector = info.getParameterObjectInspectors();
 33         if(inspector.length != 1){
 34             throw new SemanticException("参数只能有一个");
 35         }
 36         //判断输入列的数据类型是否为基本类型
 37         if(inspector[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
 38             throw new SemanticException("参数必须为基本数据类型");
 39         }
 40         
 41         AbstractPrimitiveWritableObjectInspector woi = (AbstractPrimitiveWritableObjectInspector) inspector[0];
 42         
 43         //判断是那种基本数据类型
 44         
 45         switch(woi.getPrimitiveCategory()){
 46         case INT:
 47         case LONG:
 48         case BYTE:
 49         case SHORT:
 50             return new udafLong();
 51         case FLOAT:
 52         case DOUBLE:
 53             return new udafDouble();
 54             default:
 55                 throw new SemanticException("参数必须是基本类型,且不能为string等类型");
 56         
 57         
 58         }
 59           
 60     }
 61     
 62     /**
 63      * 对整形数据进行求和
 64      */
 65     public static class udafLong extends  GenericUDAFEvaluator{
 66         
 67         //定义输入数据类型
 68         public  PrimitiveObjectInspector inputor;
 69         
 70         //实现自定义buffer
 71         static class sumlongagg implements AggregationBuffer{
 72             long sum;
 73             boolean empty;
 74         }
 75         
 76         //初始化方法
 77         @Override
 78         public ObjectInspector init(Mode m, ObjectInspector[] parameters)
 79                 throws HiveException {
 80             // TODO Auto-generated method stub
 81             
 82             super.init(m, parameters);
 83             if(parameters.length !=1 ){
 84                 throw new UDFArgumentException("参数异常");
 85             }
 86             if(inputor == null){
 87                 this.inputor = (PrimitiveObjectInspector) parameters[0];
 88             }
 89             //注意返回的类型要与最终sum的类型一致
 90             return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
 91         }
 92 
 93         @Override
 94         public AggregationBuffer getNewAggregationBuffer() throws HiveException {
 95             // TODO Auto-generated method stub
 96             sumlongagg slg = new sumlongagg();
 97             this.reset(slg);
 98             return slg;
 99         }
100 
101         @Override
102         public void reset(AggregationBuffer agg) throws HiveException {
103             // TODO Auto-generated method stub
104             sumlongagg slg = (sumlongagg) agg;
105             slg.sum=0;
106             slg.empty=true;
107         }
108 
109         @Override
110         public void iterate(AggregationBuffer agg, Object[] parameters)
111                 throws HiveException {
112             // TODO Auto-generated method stub
113             if(parameters.length != 1){
114                 throw new UDFArgumentException("参数错误");
115             }
116             this.merge(agg, parameters[0]);
117             
118         }
119 
120         @Override
121         public Object terminatePartial(AggregationBuffer agg)
122                 throws HiveException {
123             // TODO Auto-generated method stub
124             return this.terminate(agg);
125         }
126 
127         @Override
128         public void merge(AggregationBuffer agg, Object partial)
129                 throws HiveException {
130             // TODO Auto-generated method stub
131             sumlongagg slg = (sumlongagg) agg;
132             if(partial != null){
133                 slg.sum += PrimitiveObjectInspectorUtils.getLong(partial, inputor);
134                 slg.empty=false;
135             }
136         }
137 
138         @Override
139         public Object terminate(AggregationBuffer agg) throws HiveException {
140             // TODO Auto-generated method stub
141             sumlongagg slg = (sumlongagg) agg;
142             if(slg.empty){
143                 return null;
144             }
145             return new LongWritable(slg.sum);
146         }
147         
148     }
149     
150     /**
151      * 实现浮点型的求和
152      */
153     public static class udafDouble extends GenericUDAFEvaluator{
154         
155         //定义输入数据类型
156         public  PrimitiveObjectInspector input;
157         
158         //实现自定义buffer
159         static class sumdoubleagg implements AggregationBuffer{
160             double sum;
161             boolean empty;
162         }
163         
164         //初始化方法
165         @Override
166         public ObjectInspector init(Mode m, ObjectInspector[] parameters)
167                 throws HiveException {
168             // TODO Auto-generated method stub
169             
170             super.init(m, parameters);
171             if(parameters.length !=1 ){
172                 throw new UDFArgumentException("参数异常");
173             }
174             if(input == null){
175                 this.input = (PrimitiveObjectInspector) parameters[0];
176             }
177             //注意返回的类型要与最终sum的类型一致
178             return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
179         }
180         
181         
182 
183         @Override
184         public AggregationBuffer getNewAggregationBuffer() throws HiveException {
185             // TODO Auto-generated method stub
186             sumdoubleagg sdg = new sumdoubleagg();
187             this.reset(sdg);
188             return sdg;
189         }
190 
191         @Override
192         public void reset(AggregationBuffer agg) throws HiveException {
193             // TODO Auto-generated method stub
194             sumdoubleagg sdg = (sumdoubleagg) agg;
195             sdg.sum=0;
196             sdg.empty=true;
197         }
198 
199         @Override
200         public void iterate(AggregationBuffer agg, Object[] parameters)
201                 throws HiveException {
202             // TODO Auto-generated method stub
203             if(parameters.length != 1){
204                 throw new UDFArgumentException("参数错误");
205             }
206             this.merge(agg, parameters[0]);
207         }
208 
209         @Override
210         public Object terminatePartial(AggregationBuffer agg)
211                 throws HiveException {
212             // TODO Auto-generated method stub
213             return this.terminate(agg);
214         }
215 
216         @Override
217         public void merge(AggregationBuffer agg, Object partial)
218                 throws HiveException {
219             // TODO Auto-generated method stub
220             sumdoubleagg sdg =(sumdoubleagg) agg;
221             if(partial != null){
222                 sdg.sum += PrimitiveObjectInspectorUtils.getDouble(sdg, input);
223                 sdg.empty=false;
224             }
225         }
226 
227         @Override
228         public Object terminate(AggregationBuffer agg) throws HiveException {
229             // TODO Auto-generated method stub
230             sumdoubleagg sdg = (sumdoubleagg) agg;
231             if (sdg.empty){
232                 return null;
233             }
234             return new DoubleWritable(sdg.sum);
235         }
236         
237     }
238 
239 }

6.打成jar包

  并放入路径:/etc/opt/datas/

7.添加jar到path

  格式:

    add jar linux_path;

  即:

    add jar /etc/opt/datas/af.jar

8.创建方法

  create temporary function af as 'org.apache.hadoop.hive_udf.UdafProject';

9.在hive中运行

  select sum(id),af(id) from stu_info;

三:UDTF

1.UDTF

  

2.程序 

 1 package org.apache.hadoop.hive.udf;
 2 
 3 import java.util.ArrayList;
 4 
 5 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 6 import org.apache.hadoop.hive.ql.metadata.HiveException;
 7 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 8 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 9 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
10 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
11 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
12 
13 public class UDTFtest extends GenericUDTF {
14 
15     @Override
16     public StructObjectInspector initialize(StructObjectInspector argOIs)
17             throws UDFArgumentException {
18         // TODO Auto-generated method stub
19         if(argOIs.getAllStructFieldRefs().size() != 1){
20             throw new UDFArgumentException("参数只能有一个");
21         }
22         ArrayList<String> fieldname = new ArrayList<String>();
23         fieldname.add("name");
24         fieldname.add("email");
25         ArrayList<ObjectInspector> fieldio = new ArrayList<ObjectInspector>();
26         fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
27         fieldio.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
28         
29         return ObjectInspectorFactory.getStandardStructObjectInspector(fieldname, fieldio);
30     }
31     
32     @Override
33     public void process(Object[] args) throws HiveException {
34         // TODO Auto-generated method stub
35         if(args.length == 1){
36             String name = args[0].toString();
37             String email = name + "@ibeifneg.com";
38             super.forward(new String[] {name,email});
39         }
40     }
41 
42     @Override
43     public void close() throws HiveException {
44         // TODO Auto-generated method stub
45         super.forward(new String[] {"complete","finish"});
46     }
47 
48 }

3.同样的步骤

4.在hive中运行

  select tf(ename) as (name,email) from emp;

原文地址:https://www.cnblogs.com/juncaoit/p/6079378.html