Hive实现用户访问路径还原

今天某位仁兄给了一道Hive的题目

hive里有个表存储了  (用户ID)  (点击时间)  (点击网址)       输出 用户ID,点击顺序,from url ,to url。  其中点击顺序是每个id执行按时间排序后的顺序号,from url为上一次点击的网址,to url 为当次点击的网址。 顺序号为1的时候from url 为空就行了    

1.实现基于纯Hive SQL的ETL过程

2.实现一个能加速上述处理过程的Hive Generic UDF,并给出使用此UDF实现ETL过程的Hive SQL

按照俺的理解,这个应该做UDAF跟UDTF结合比较容易

于是就着手写了一些代码

Part.1:

View Code
package Url;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 针对同一用户的访问序列构造访问序列对
 * @author juefan
 */
public class UDAFRank extends UDAF{
    //构造对象存储用户的每一条数据
    public static class State{
        String url;
        Long time;
        public State(){
            url = null;
            time = null;
        }
    }
    public static class UDAFRankEvaluator implements UDAFEvaluator{
        public final List<State> stateList;

        public UDAFRankEvaluator(){
            stateList  = new ArrayList<State>();
        }
        public void init(){
            if(stateList != null){
                stateList.clear();
            }
        }
        //聚合用户的访问数据
        public boolean iterate(String url, Long time){
            if (url == null || time == null) {
                return false;
            } else {
                State tmpState = new State();
                tmpState.url = url;
                tmpState.time = time;
                stateList.add(tmpState);
                tmpState = new State();
            }
            return true;
        }
        public List<State> terminatePartial(){
            return stateList;
        }
        //二次聚合同一用户的访问数据
        public boolean merge(List<State> other){
            if (stateList == null || other == null) {
                return false;
            }
            stateList.addAll(other);
            return true;
        }
        public String terminate(){
            Statecomparator1 compare = new Statecomparator1();
            StringBuilder resultBuilder = new StringBuilder();
            Collections.sort(stateList, compare);
            int size = stateList.size();
            //用户只有一个访问记录,不能构造序列对
                resultBuilder.append("NULL").append("SEP_01").append(stateList.get(0).url).append("SEP_02");
            //将用户序列对写进结果中
            for(int i = 0; i < size - 1; i++){
                resultBuilder.append(stateList.get(i).url).append("SEP_01").append(stateList.get(i + 1).url).append("SEP_02");
            }
            return resultBuilder.toString();
        }
        
        //对数组里面的用户访问数据按照访问时间从小到大排序
        public class Statecomparator1 implements Comparator<Object>{
            public int compare(Object o1, Object o2){
                State s1 = (State)o1;
                State s2 = (State)o2;
                return s1.time > s2.time ? 1:0;
            }
        }
    }

}

Part.2:

View Code
package Url;

import java.util.ArrayList;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * 拆分用户序列对
 * @author juefan
 */
public class UDTFPair extends GenericUDTF{
    @Override
    public void close() throws HiveException {
    }
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
        }
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }
        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("from");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("to");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("rank");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    @Override
    public void process(Object[] args) throws HiveException {
        String input = args[0].toString();
        String[] test = input.split("SEP_02");
        int size = test.length;
        for (int i = 0; i < size; i++) {
            try {
                String[] result  = new String[3];
                result[0] = test[i].split("SEP_01")[0];
                result[1] = test[i].split("SEP_01")[1];
                result[2] = Integer.toString(i + 1);
                forward(result);
            } catch (Exception e) {
                continue;
            }
        }
    }

}

 上面这种做法的好处就是运行速度较快

接下来按照题目的要求来写,就是纯Hive SQL的方式

代码如下:

create table dm_fan_test12 as
select a.ucookie, a.ranks, b.currenturl as from_url, a.currenturl as to_url
from(
     select a.ucookie, a.currenturl, sum(a.re) as ranks   --计算出当前url在该用户的访问序列中的次序
     from(
          select a.ucookie, a.currenturl, case when a.currenttime < b.currenttime then 0 else 1 end as re
          from dm_fan_test11 a
          join dm_fan_test11 b
          on a.ucookie = b.ucookie
          )a
     group by a.ucookie, a.currenturl
    )a
left outer join(
     select a.ucookie, a.currenturl, sum(a.re) as rank   --计算出当前url在该用户的访问序列中的次序
     from(
          select a.ucookie, a.currenturl, case when a.currenttime < b.currenttime then 0 else 1 end as re
          from dm_fan_test11 a
          join dm_fan_test11 b
          on a.ucookie = b.ucookie
          )a
     group by a.ucookie, a.currenturl
   )b
on a.ucookie = b.ucookie and (a.ranks - 1) = b.rank;

总结:

使用UDAF跟UDTF的方式最终计算只用到一个节点,运算时间是35秒,纯Hive SQL的也是只用到一个节点,运算时间是225秒,相差7倍,这就是自定义函数的魅力了,而且,纯Hive SQL还有一个问题就是当一个用户同一时间点访问的URL有多个的时候,数据会有所丢失,导致结果不正确!

原文地址:https://www.cnblogs.com/juefan/p/3030878.html