离线数仓(五)

6章 数仓搭建-DWD

  1)对用户行为数据解析。

  2)对业务数据采用维度模型重新建模。

6.1 DWD层(用户行为日志)

6.1.1 日志格式回顾

  (1)页面埋点日志

  (2)启动日志

6.1.2 get_json_object函数使用

  1)数据

[{"name":"大郎","sex":"","age":"25"},{"name":"西门庆","sex":"","age":"47"}]

  2)取出第一个json对象

select get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西门庆","sex":"男","age":"47"}]','$[0]');

  3)取出第一个jsonage字段的值

select get_json_object('[{"name":"大郎","sex":"男","age":"25"},{"name":"西门庆","sex":"男","age":"47"}]','$[0].age');

6.1.3 启动日志表

  启动日志解析思路:启动日志表中每行数据对应一个启动记录,一个启动记录应该包含日志中的公共信息和启动信息。先将所有包含start字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

  1)建表语句(数据采用parquet存储方式,是可以支持切片的,不需要再对数据创建索引。如果单纯的text方式存储数据,需要采用支持切片的,选用lzop压缩方式并创建索引)

drop table if exists dwd_start_log;
create external table dwd_start_log(
    `area_code` string comment '地区编码',
    `brand` string comment '手机品牌',
    `channel` string comment '渠道',
    `is_new` string comment '是否首次启动',
    `model` string comment '手机型号',
    `mid_id` string comment '设备id',
    `os` string comment '操作系统',
    `user_id` string comment '会员id',
    `version_code` string comment 'app版本号',
    `entry` string comment 'icon手机图标 notice 通知 install 安装后启动',
    `loading_time` bigint comment '启动加载时间',
    `open_ad_id` string comment '广告页ID ',
    `open_ad_ms` bigint comment '广告总共播放时间',
    `open_ad_skip_ms` bigint comment '用户跳过广告时点',
    `ts` bigint comment '时间'
) comment '启动日志表'
partitioned by (`dt` string) --按时间分区
stored as parquet   --采用parquet列式存储
location '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
tblproperties ('parquet.compression'='lzo') -- 采用LZO压缩

  2)数据导入

insert overwrite table dwd_start_log partition (dt='2021-06-08')
select
    --common中的字段
       get_json_object(line,'$.common.ar') area_code,
       get_json_object(line,'$.common.ba') brand,
       get_json_object(line,'$.common.ch') channel,
       get_json_object(line,'$.common.is_new') is_new,
       get_json_object(line,'$.common.md') model,
       get_json_object(line,'$.common.mid') mid_id,
       get_json_object(line,'$.common.os') os,
       get_json_object(line,'$.common.uid') user_id,
       get_json_object(line,'$.common.vc') version_code,
    --取start中的字段
       get_json_object(line,'$.start.entry') entry,
       get_json_object(line,'$.start.loading_time') loading_time,
       get_json_object(line,'$.start.open_ad_id') open_ad_id,
       get_json_object(line,'$.start.open_ad_ms') open_ad_ms,
       get_json_object(line,'$.start.open_ad_skip_ms') open_ad_skip_ms,

       get_json_object(line,'$.ts') ts
from ods_log
--过滤出启动日志,只有包含start属性的是启动日志
where dt='2021-06-08'
and get_json_object(line,'$.start') is not null;

  3)查看数据

select * from dwd_start_log where dt='2021-06-08' limit 5;

6.1.4 页面日志表

  页面日志解析思路:页面日志表中每行数据对应一个页面访问记录,一个页面访问记录应该包含日志中的公共信息和页面信息。先将所有包含page字段的日志过滤出来,然后使用get_json_object函数解析每个字段。

  1)建表语句

drop table if exists dwd_page_log;
create external table dwd_page_log(
    `area_code` string comment '地区编码',
    `brand` string comment '手机品牌',
    `channel` string comment '渠道',
    `is_new` string comment '是否首次启动',
    `model` string comment '手机型号',
    `mid_id` string comment '设备id',
    `os` string comment '操作系统',
    `user_id` string comment '会员id',
    `version_code` string comment 'app版本号',
    `during_time` bigint comment '持续时间毫秒',
    `page_item` string comment '目标id',
    `page_item_type` string comment '目标类型',
    `last_page_id` string comment '上页类型',
    `page_id` string comment '页面ID ',
    `source_type` string comment '来源类型',
    `ts` bigint comment '时间'
) comment '页面日志表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_page_log'
tblproperties('parquet.compression'='lzo');

  2)数据导入

insert overwrite table dwd_page_log partition (dt='2021-06-08')
select
    --common中的字段
       get_json_object(line,'$.common.ar') area_code,
       get_json_object(line,'$.common.ba') brand,
       get_json_object(line,'$.common.ch') channel,
       get_json_object(line,'$.common.is_new') is_new,
       get_json_object(line,'$.common.md') model,
       get_json_object(line,'$.common.mid') mid_id,
       get_json_object(line,'$.common.os') os,
       get_json_object(line,'$.common.uid') user_id,
       get_json_object(line,'$.common.vc') version_code,
    --取page中的字段
       get_json_object(line,'$.page.during_time') during_time,
       get_json_object(line,'$.page.item') page_item,
       get_json_object(line,'$.page.item_type') page_item_type,
       get_json_object(line,'$.page.last_page_id') last_page_id,
       get_json_object(line,'$.page.page_id') page_id,
       get_json_object(line,'$.page.source_type') source_type,

       get_json_object(line,'$.ts') ts
from ods_log
--过滤出页面日志,只有包含page属性的是yem日志
where dt='2021-06-08'
and get_json_object(line,'$.page') is not null;

  3)查看数据

select * from dwd_page_log where dt='2021-06-08' limit 5;

6.1.5 动作日志表

  动作日志解析思路:动作日志表中每行数据对应用户的一个动作记录,一个动作记录应当包含公共信息、页面信息以及动作信息。先将包含action字段的日志过滤出来,然后通过UDTF函数action数组“炸开”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段

  1)建表语句

create external table dwd_action_log(
    `area_code` string comment '地区编码',
    `brand` string comment '手机品牌',
    `channel` string comment '渠道',
    `is_new` string comment '是否首次启动',
    `model` string comment '手机型号',
    `mid_id` string comment '设备id',
    `os` string comment '操作系统',
    `user_id` string comment '会员id',
    `version_code` string comment 'app版本号',
    `during_time` bigint comment '持续时间毫秒',
    `page_item` string comment '目标id',
    `page_item_type` string comment '目标类型',
    `last_page_id` string comment '上页类型',
    `page_id` string comment '页面id',
    `source_type` string comment '来源类型',
    `action_id` string comment '动作id',
    `item` string comment '目标id ',
    `item_type` string comment '目标类型',
    `ts` bigint comment '时间'
) comment '动作日志表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_action_log'
tblproperties ('parquet.compression'='lzo');

  2)创建UDTF函数——设计思路

  3)创建UDTF函数——编写代码

    (1)创建一个maven工程hivefunction

    (2)创建包名:com.yuange.hive.udtf

    (3)引入如下依赖

<dependencies>
    <!--添加hive依赖-->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>

    (4)编码

package com.yuange.hive;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.List;

/**
 * @作者:袁哥
 * @时间:2021/6/14 13:19
 */
public class TestHiveFunction extends GenericUDTF {

    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {

        // 获取输入参数的所有字段类型的引用
        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
        //检查入参是否符合条件
        // 要求必须只能传入一列参数
        if (inputFields.size() != 1){
            throw  new UDFArgumentException("此函数只允许传入1列参数!");
        }

        // 要求传入的一列参数必须是string类型
        if (!"string".equals(inputFields.get(0).getFieldObjectInspector().getTypeName())){
            throw  new UDFArgumentException("此函数只允许传入1列string类型参数!");
        }

        //返回StructObjectInspector
        //为函数返回的每行每列的参数起个名称
        List<String> fieldNames=new ArrayList<String>();
        fieldNames.add("col1");

        //函数返回的每行每列的参数类型(ObjectInspector)
        List<ObjectInspector> fieldOIs=new ArrayList<ObjectInspector>();
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return   ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
                fieldOIs);
    }


    private String[] result=new String[ 1 ];

    /*
                执行炸裂的逻辑,将炸裂的结果写出

                Object[] objects: 函数传入的参数
     */
    @Override
    public void process(Object[] objects) throws HiveException {

        //取出传入的1列参数  [{},{},{}]
        String jsonArrayStr = objects[0].toString();

        //将jsonArrayStr转为 Java对象
        JSONArray jsonArray = new JSONArray(jsonArrayStr);

        for (int i=0; i < jsonArray.length() ; i++){
            // {}
            JSONObject jsonObject = jsonArray.getJSONObject(i);

            // 将 Java对象 JSONObject 转为 jsonObjectStr
            // result数组就代表写出的一行数据
            result[0] = jsonObject.toString();

            //将函数计算的结果输出
            forward(result);
        }
    }

    // 选择性
    @Override
    public void close() throws HiveException {

    }
}

  4)创建函数

    (1)使用Maven打包

    (2hivefunction-1.0-SNAPSHOT.jar上传hadoop102的 $HIVE_HOME/auxlib目录(重启hive)

mkdir auxlib

    (3)创建永久函数与开发好的java class关联

CREATE function gmall.explode_jsonarray as 'com.yuange.hive.TestHiveFunction';
desc function gmall.explode_jsonarray;

    (4)注意:如果修改了自定义函数重新生成jar包怎么处理?只需要替换HDFS路径上的旧jar包,然后重启Hive客户端即可。

  5)数据导入

insert overwrite table dwd_action_log partition (dt='2021-06-08')
select
    --common中的字段
       get_json_object(line,'$.common.ar') area_code,
       get_json_object(line,'$.common.ba') brand,
       get_json_object(line,'$.common.ch') channel,
       get_json_object(line,'$.common.is_new') is_new,
       get_json_object(line,'$.common.md') model,
       get_json_object(line,'$.common.mid') mid_id,
       get_json_object(line,'$.common.os') os,
       get_json_object(line,'$.common.uid') user_id,
       get_json_object(line,'$.common.vc') version_code,
    --取page中的字段
       get_json_object(line,'$.page.during_time') during_time,
       get_json_object(line,'$.page.item') page_item,
       get_json_object(line,'$.page.item_type') page_item_type,
       get_json_object(line,'$.page.last_page_id') last_page_id,
       get_json_object(line,'$.page.page_id') page_id,
       get_json_object(line,'$.page.source_type') source_type,

       get_json_object(jsonStr,'$.action_id') action_id,
       get_json_object(jsonStr,'$.item') item,
       get_json_object(jsonStr,'$.item_type') item_type,
       get_json_object(jsonStr,'$.ts') ts
from ods_log
lateral view explode_jsonarray(get_json_object(line,'$.actions')) tmp as jsonStr
--过滤出动作日志,只有包含actions属性的是动作日志
where dt='2021-06-08'
and get_json_object(line,'$.actions') is not null;

  3)查看数据

select * from dwd_action_log where dt='2021-06-08' limit 5;

6.1.6 曝光日志表

  曝光日志解析思路:曝光日志表中每行数据对应一个曝光记录,一个曝光记录应当包含公共信息、页面信息以及曝光信息。先将包含display字段的日志过滤出来,然后通过UDTF函数,将display数组“炸开”(类似于explode函数的效果),然后使用get_json_object函数解析每个字段。

  1)建表语句

drop table if exists dwd_display_log;
create external table dwd_display_log(
    `area_code` string comment '地区编码',
    `brand` string comment '手机品牌',
    `channel` string comment '渠道',
    `is_new` string comment '是否首次启动',
    `model` string comment '手机型号',
    `mid_id` string comment '设备id',
    `os` string comment '操作系统',
    `user_id` string comment '会员id',
    `version_code` string comment 'app版本号',
    `during_time` bigint comment 'app版本号',
    `page_item` string comment '目标id ',
    `page_item_type` string comment '目标类型',
    `last_page_id` string comment '上页类型',
    `page_id` string comment '页面ID ',
    `source_type` string comment '来源类型',
    `ts` bigint comment 'app版本号',
    `display_type` string comment '曝光类型',
    `item` string comment '曝光对象id ',
    `item_type` string comment 'app版本号',
    `order` bigint comment '曝光顺序',
    `pos_id` bigint comment '曝光位置'
) comment '曝光日志表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_display_log'
tblproperties ('parquet.compression'='lzo');

  2)数据导入

insert overwrite table dwd_display_log partition(dt='2021-06-08')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.is_new'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.during_time'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.source_type'),
    get_json_object(line,'$.ts'),
    get_json_object(display,'$.display_type'),
    get_json_object(display,'$.item'),
    get_json_object(display,'$.item_type'),
    get_json_object(display,'$.order'),
    get_json_object(display,'$.pos_id')
from ods_log lateral view explode_jsonarray(get_json_object(line,'$.displays')) tmp as display
where dt='2021-06-08'
and get_json_object(line,'$.displays') is not null;

  3)查看数据

select * from dwd_display_log where dt='2021-06-08' limit 5;

6.1.7 错误日志表

  错误日志解析思路:错误日志表中每行数据对应一个错误记录,为方便定位错误,一个错误记录应当包含与之对应的公共信息、页面信息、曝光信息、动作信息、启动信息以及错误信息。先将包含err字段的日志过滤出来,然后使用get_json_object函数解析所有字段。

  1)建表语句

drop table if exists dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
    `area_code` STRING COMMENT '地区编码',
    `brand` STRING COMMENT '手机品牌',
    `channel` STRING COMMENT '渠道',
    `is_new` STRING COMMENT '是否首次启动',
    `model` STRING COMMENT '手机型号',
    `mid_id` STRING COMMENT '设备id',
    `os` STRING COMMENT '操作系统',
    `user_id` STRING COMMENT '会员id',
    `version_code` STRING COMMENT 'app版本号',
    `page_item` STRING COMMENT '目标id ',
    `page_item_type` STRING COMMENT '目标类型',
    `last_page_id` STRING COMMENT '上页类型',
    `page_id` STRING COMMENT '页面ID ',
    `source_type` STRING COMMENT '来源类型',
    `entry` STRING COMMENT ' icon手机图标  notice 通知 install 安装后启动',
    `loading_time` STRING COMMENT '启动加载时间',
    `open_ad_id` STRING COMMENT '广告页ID ',
    `open_ad_ms` STRING COMMENT '广告总共播放时间',
    `open_ad_skip_ms` STRING COMMENT '用户跳过广告时点',
    `actions` STRING COMMENT '动作',
    `displays` STRING COMMENT '曝光',
    `ts` STRING COMMENT '时间',
    `error_code` STRING COMMENT '错误码',
    `msg` STRING COMMENT '错误信息'
) COMMENT '错误日志表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_error_log'
tblproperties ('parquet.compression'='lzo');

    说明:此处为对动作数组和曝光数组做处理,如需分析错误与单个动作或曝光的关联,可先使用explode_jsonarray函数将数组“炸开”,再使用get_json_object函数获取具体字段

  2)数据导入

insert overwrite table dwd_error_log partition (dt='2021-06-08')
select
    --common中的字段
       get_json_object(line,'$.common.ar') area_code,
       get_json_object(line,'$.common.ba') brand,
       get_json_object(line,'$.common.ch') channel,
       get_json_object(line,'$.common.is_new') is_new,
       get_json_object(line,'$.common.md') model,
       get_json_object(line,'$.common.mid') mid_id,
       get_json_object(line,'$.common.os') os,
       get_json_object(line,'$.common.uid') user_id,
       get_json_object(line,'$.common.vc') version_code,
    --取page中的字段
       get_json_object(line,'$.page.item') page_item,
       get_json_object(line,'$.page.item_type') page_item_type,
       get_json_object(line,'$.page.last_page_id') last_page_id,
       get_json_object(line,'$.page.page_id') page_id,
       get_json_object(line,'$.page.source_type') source_type,
       --取start中的字段
        get_json_object(line,'$.start.entry') entry,
       get_json_object(line,'$.start.loading_time') loading_time,
       get_json_object(line,'$.start.open_ad_id') open_ad_id,
       get_json_object(line,'$.start.open_ad_ms') open_ad_ms,
       get_json_object(line,'$.start.open_ad_skip_ms') open_ad_skip_ms,
       get_json_object(line,'$.actions') actions,
       get_json_object(line,'$.displays') displays,

       get_json_object(line,'$.ts') ts,
       get_json_object(line,'$.err.error_code') error_code,
       get_json_object(line,'$.err.msg') msg
from ods_log
--过滤出错误日志,只有包含err属性的是错误日志
where dt='2021-06-08'
and get_json_object(line,'$.err') is not null;

  5)查看数据

select * from dwd_error_log where dt='2021-06-08' limit 5;

6.1.8 DWD用户行为数据加载脚本

  1)编写脚本

    (1)在hadoop102/home/atguigu/bin目录下创建脚本

vim ods_to_dwd_log.sh
#!/bin/bash

APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi

dwd_start_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_start_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.is_new'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.start.entry'),
    get_json_object(line,'$.start.loading_time'),
    get_json_object(line,'$.start.open_ad_id'),
    get_json_object(line,'$.start.open_ad_ms'),
    get_json_object(line,'$.start.open_ad_skip_ms'),
    get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.start') is not null;"

dwd_page_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_page_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.is_new'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.during_time'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.source_type'),
    get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;"

dwd_action_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_action_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.is_new'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.during_time'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.source_type'),
    get_json_object(action,'$.action_id'),
    get_json_object(action,'$.item'),
    get_json_object(action,'$.item_type'),
    get_json_object(action,'$.ts')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='$do_date'
and get_json_object(line,'$.actions') is not null;"

dwd_display_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_display_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.is_new'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.during_time'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.source_type'),
    get_json_object(line,'$.ts'),
    get_json_object(display,'$.display_type'),
    get_json_object(display,'$.item'),
    get_json_object(display,'$.item_type'),
    get_json_object(display,'$.order'),
    get_json_object(display,'$.pos_id')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='$do_date'
and get_json_object(line,'$.displays') is not null;"
 
dwd_error_log="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_error_log partition(dt='$do_date')
select
    get_json_object(line,'$.common.ar'),
    get_json_object(line,'$.common.ba'),
    get_json_object(line,'$.common.ch'),
    get_json_object(line,'$.common.is_new'),
    get_json_object(line,'$.common.md'),
    get_json_object(line,'$.common.mid'),
    get_json_object(line,'$.common.os'),
    get_json_object(line,'$.common.uid'),
    get_json_object(line,'$.common.vc'),
    get_json_object(line,'$.page.item'),
    get_json_object(line,'$.page.item_type'),
    get_json_object(line,'$.page.last_page_id'),
    get_json_object(line,'$.page.page_id'),
    get_json_object(line,'$.page.source_type'),
    get_json_object(line,'$.start.entry'),
    get_json_object(line,'$.start.loading_time'),
    get_json_object(line,'$.start.open_ad_id'),
    get_json_object(line,'$.start.open_ad_ms'),
    get_json_object(line,'$.start.open_ad_skip_ms'),
    get_json_object(line,'$.actions'),
    get_json_object(line,'$.displays'),
    get_json_object(line,'$.ts'),
    get_json_object(line,'$.err.error_code'),
    get_json_object(line,'$.err.msg')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.err') is not null;"

case $1 in
    dwd_start_log )
        hive -e "$dwd_start_log"
    ;;
    dwd_page_log )
        hive -e "$dwd_page_log"
    ;;
    dwd_action_log )
        hive -e "$dwd_action_log"
    ;;
    dwd_display_log )
        hive -e "$dwd_display_log"
    ;;
    dwd_error_log )
        hive -e "$dwd_error_log"
    ;;
    all )
        hive -e "$dwd_start_log$dwd_page_log$dwd_action_log$dwd_display_log$dwd_error_log"
    ;;
esac
ods_to_dwd_log.sh

    (2)增加脚本执行权限

chmod 777 ods_to_dwd_log.sh

  2)脚本使用

    (1)执行脚本

ods_to_dwd_log.sh all 2021-06-08

    (2)查询导入结果

6.2 DWD层(业务数据)

  业务数据方面DWD层的搭建主要注意点在于维度建模。

6.2.1 评价事实表(事务型事实表)

  1)建表语句

drop table if exists dwd_comment_info;
create external table dwd_comment_info(
--来自ods_comment_info
`id` string comment '编号',
`user_id` string comment '用户ID',
`sku_id` string comment '商品sku',
`spu_id` string comment '商品spu',
`order_id` string comment '订单ID',
`appraise` string comment '评价(好评、中评、差评、默认评价)',
`create_time` string comment '评价时间'
) comment '评价事实表'
partitioned by (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_comment_info/'
tblproperties ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_comment_info partition (dt)
select id,
       user_id,
       sku_id,
       spu_id,
       order_id,
       appraise,
       create_time,
       date_format(create_time,'yyyy-MM-dd')
from ods_comment_info
where dt='2021-06-08';

    (2)每日装载

insert overwrite table dwd_comment_info partition (dt='2021-06-09')
select id,
       user_id,
       sku_id,
       spu_id,
       order_id,
       appraise,
       create_time
from ods_comment_info
where dt = '2021-06-09';

6.2.2 订单明细事实表(事务型事实表)

  1)建表语句

drop table if exists dwd_order_detail;
CREATE EXTERNAL TABLE dwd_order_detail (
--来自ods_order_detail
`id` STRING COMMENT '订单编号',
`order_id` STRING COMMENT '订单号',
`sku_id` STRING COMMENT 'sku商品id',
`sku_num` BIGINT COMMENT '商品数量',
`create_time` STRING COMMENT '创建时间',
`source_type` STRING COMMENT '来源类型',
`source_id` STRING COMMENT '来源编号',
`split_final_amount` DECIMAL(16,2) COMMENT '最终价格分摊',
`split_activity_amount` DECIMAL(16,2) COMMENT '活动优惠分摊',
`split_coupon_amount` DECIMAL(16,2) COMMENT '优惠券优惠分摊',
--来自ods_order_info
`province_id` STRING COMMENT '省份ID',
`original_amount` DECIMAL(16,2) COMMENT '原始价格',
`user_id` STRING COMMENT '用户id',
--来自ods_order_detail_activity
`activity_id` STRING COMMENT '活动ID',
`activity_rule_id` STRING COMMENT '活动规则ID',
--来自ods_order_detail_coupon
`coupon_id` STRING COMMENT '优惠券ID'
) COMMENT '订单明细事实表表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_order_detail/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载


set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_order_detail partition(dt)
select
od.id, --订单明细id
od.order_id, --订单id
od.sku_id, --skuid
od.sku_num, --商品数量
od.create_time, --创建时间
od.source_type, --来源类型
od.source_id, --来源编号
od.split_final_amount, --分摊最终金额
od.split_activity_amount, --分摊活动优惠
od.split_coupon_amount, --分摊优惠券优惠
oi.province_id, --省份id
od.order_price*od.sku_num, --原始价格:商品价格*sku数量
oi.user_id, --用户id
oda.activity_id, --活动id
oda.activity_rule_id, --活动规则id
odc.coupon_id, --优惠券id
date_format(create_time,'yyyy-MM-dd')
from
(
select id, --订单明细id
order_id, --订单id
sku_id, --skuid
sku_name, --skuname
order_price, --商品价格
sku_num, --商品数量
create_time, --创建时间
source_type, --来源类型
source_id, --来源编号
split_final_amount, --分摊最终金额
split_activity_amount, --分摊活动优惠
split_coupon_amount, --分摊优惠券优惠
dt
from ods_order_detail
where dt='2021-06-08'
)od
left join
(
select
id, --订单id
user_id, --用户id
province_id --省份id
from ods_order_info
where dt='2021-06-08'
)oi
on od.order_id=oi.id
left join
(
select
order_detail_id, --订单明细id
activity_id, --活动id
activity_rule_id --活动规则id
from ods_order_detail_activity
where dt='2021-06-08'
)oda
on od.id=oda.order_detail_id
left join
(
select
order_detail_id, --订单明细id
coupon_id --优惠券id
from ods_order_detail_coupon
where dt='2021-06-08'
)odc
on od.id=odc.order_detail_id;

  (2)每日装载

insert overwrite table dwd_order_detail partition(dt='2021-06-09')
select
od.id, --订单明细id
od.order_id, --订单id
od.sku_id, --skuid
od.sku_num, --商品数量
od.create_time, --创建时间
od.source_type, --来源类型
od.source_id, --来源编号
od.split_final_amount, --分摊最终金额
od.split_activity_amount, --分摊活动优惠
od.split_coupon_amount, --分摊优惠券优惠
oi.province_id, --省份id
od.order_price*od.sku_num, --原始价格:商品价格*sku数量
oi.user_id, --用户id
oda.activity_id, --活动id
oda.activity_rule_id, --活动规则id
odc.coupon_id --优惠券id
from
(
select id, --订单明细id
order_id, --订单id
sku_id, --skuid
sku_name, --skuname
order_price, --商品价格
sku_num, --商品数量
create_time, --创建时间
source_type, --来源类型
source_id, --来源编号
split_final_amount, --分摊最终金额
split_activity_amount, --分摊活动优惠
split_coupon_amount, --分摊优惠券优惠
dt
from ods_order_detail
where dt='2021-06-09'
)od
left join
(
select
id, --订单id
user_id, --用户id
province_id --省份id
from ods_order_info
where dt='2021-06-09'
)oi
on od.order_id=oi.id
left join
(
select
order_detail_id, --订单明细id
activity_id, --活动id
activity_rule_id --活动规则id
from ods_order_detail_activity
where dt='2021-06-09'
)oda
on od.id=oda.order_detail_id
left join
(
select
order_detail_id, --订单明细id
coupon_id --优惠券id
from ods_order_detail_coupon
where dt='2021-06-09'
)odc
on od.id=odc.order_detail_id;

6.2.3 退事实表(事务型事实表)

  1)建表语句

DROP TABLE IF EXISTS dwd_order_refund_info;
CREATE EXTERNAL TABLE dwd_order_refund_info(
--来自ods_order_refund_info
`id` STRING COMMENT '编号',
`user_id` STRING COMMENT '用户ID',
`order_id` STRING COMMENT '订单ID',
`sku_id` STRING COMMENT '商品ID',
`refund_type` STRING COMMENT '退单类型',
`refund_num` BIGINT COMMENT '退单件数',
`refund_amount` DECIMAL(16,2) COMMENT '退单金额',
`refund_reason_type` STRING COMMENT '退单原因类型',
`create_time` STRING COMMENT '退单时间',
--来自ods_order_info
`province_id` STRING COMMENT '地区ID'
) COMMENT '退单事实表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_order_refund_info/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载


insert overwrite table dwd_order_refund_info partition(dt)
select
ri.id, --退单id
ri.user_id, --用户id
ri.order_id, --订单id
ri.sku_id, --商品id
ri.refund_type, --退单类型
ri.refund_num, --退单件数
ri.refund_amount, --退单金额
ri.refund_reason_type, --退单原因类型
ri.create_time, --退单时间
oi.province_id, --省份id
date_format(ri.create_time,'yyyy-MM-dd')
from
(
select id, --退单id
user_id, --用户id
order_id, --订单id
sku_id, --商品id
refund_type, --退单类型
refund_num, --退单件数
refund_amount, --退单金额
refund_reason_type, --退单原因类型
create_time, --退单时间
refund_status, --退单状态
dt
from ods_order_refund_info
where dt='2021-06-08'
)ri
left join
(
select id, --订单id
province_id --省份id
from ods_order_info
where dt='2021-06-08'
)oi
on ri.order_id=oi.id;

    (2)每日装载

insert overwrite table dwd_order_refund_info partition(dt='2021-06-09')
select
ri.id, --退单id
ri.user_id, --用户id
ri.order_id, --订单id
ri.sku_id, --商品id
ri.refund_type, --退单类型
ri.refund_num, --退单件数
ri.refund_amount, --退单金额
ri.refund_reason_type, --退单原因类型
ri.create_time, --退单时间
oi.province_id --省份id
from
(
select id, --退单id
user_id, --用户id
order_id, --订单id
sku_id, --商品id
refund_type, --退单类型
refund_num, --退单件数
refund_amount, --退单金额
refund_reason_type, --退单原因类型
create_time, --退单时间
refund_status, --退单状态
dt
from ods_order_refund_info
where dt='2021-06-09'
)ri
left join
(
select id, --订单id
province_id --省份id
from ods_order_info
where dt='2021-06-09'
)oi
on ri.order_id=oi.id;

    3)查询加载结果

6.2.4 加购事实表周期型快照事实表,每日快照

  1)建表语句

CREATE EXTERNAL TABLE dwd_cart_info(
--来自ods_cart_info
`id` STRING COMMENT '编号',
`user_id` STRING COMMENT '用户ID',
`sku_id` STRING COMMENT '商品ID',
`cart_price` DECIMAL(16,2) COMMENT '加入购物车时的价格',
`sku_num` BIGINT COMMENT '加购数量',
`create_time` STRING COMMENT '创建时间',
`operate_time` STRING COMMENT '修改时间',
`is_ordered` STRING COMMENT '是否已下单',
`order_time` STRING COMMENT '下单时间',
`source_type` STRING COMMENT '来源类型',
`source_id` STRING COMMENT '来源编号'
) COMMENT '加购事实表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_cart_info/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载

insert overwrite table dwd_cart_info partition(dt='2021-06-08')
select
id, --购物车id
user_id, --用户id
sku_id, --商品id
cart_price, --加入购物车时的价格
sku_num, --加购数量
create_time, --创建时间
operate_time, --修改时间
is_ordered, --是否下单
order_time, --下单时间
source_type, --来源类型
source_id --来源id
from ods_cart_info
where dt='2021-06-08';

    (2)每日装载

insert overwrite table dwd_cart_info partition(dt='2021-06-09')
select
id, --购物车id
user_id, --用户id
sku_id, --商品id
cart_price, --加入购物车时的价格
sku_num, --加购数量
create_time, --创建时间
operate_time, --修改时间
is_ordered, --是否下单
order_time, --下单时间
source_type, --来源类型
source_id --来源id
from ods_cart_info
where dt='2021-06-09';

6.2.5 收藏事实表周期型快照事实表,每日快照

  1)建表语句

DROP TABLE IF EXISTS dwd_favor_info;
CREATE EXTERNAL TABLE dwd_favor_info(
--来自ods_favor_info
`id` STRING COMMENT '编号',
`user_id` STRING COMMENT '用户id',
`sku_id` STRING COMMENT 'skuid',
`spu_id` STRING COMMENT 'spuid',
`is_cancel` STRING COMMENT '是否取消',
`create_time` STRING COMMENT '收藏时间',
`cancel_time` STRING COMMENT '取消时间'
) COMMENT '收藏事实表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_favor_info/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载

insert overwrite table dwd_favor_info partition(dt='2021-06-08')
select
id, --收藏id
user_id, --用户id
sku_id, --skuid
spu_id, --spuid
is_cancel, --是否取消
create_time, --收藏时间
cancel_time --取消时间
from ods_favor_info
where dt='2021-06-08';

    (2)每日装载

insert overwrite table dwd_favor_info partition(dt='2021-06-09')
select
id, --收藏id
user_id, --用户id
sku_id, --skuid
spu_id, --spuid
is_cancel, --是否取消
create_time, --收藏时间
cancel_time --取消时间
from ods_favor_info
where dt='2021-06-09';

6.2.6 优惠券领用事实表累积型快照事实表

  1)建表语句

DROP TABLE IF EXISTS dwd_coupon_use;
CREATE EXTERNAL TABLE dwd_coupon_use(
--来自ods_coupon_use
`id` STRING COMMENT '编号',
`coupon_id` STRING COMMENT '优惠券ID',
`user_id` STRING COMMENT 'userid',
`order_id` STRING COMMENT '订单id',
`coupon_status` STRING COMMENT '优惠券状态',
`get_time` STRING COMMENT '领取时间',
`using_time` STRING COMMENT '使用时间(下单)',
`used_time` STRING COMMENT '使用时间(支付)',
`expire_time` STRING COMMENT '过期时间'
) COMMENT '优惠券领用事实表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_coupon_use/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载

set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table dwd_coupon_use partition(dt)
select
id, --优惠券使用id
coupon_id, --优惠券ID
user_id, --userid
order_id, --订单id
coupon_status, --优惠券状态
get_time, --领取时间
using_time, --使用时间(下单)
used_time, --使用时间(支付)
expire_time, --过期时间
coalesce(date_format(used_time,'yyyy-MM-dd'),date_format(expire_time,'yyyy-MM-dd'),'9999-99-99')
from ods_coupon_use
where dt='2021-06-08';

    (2)每日装载

insert overwrite table dwd_coupon_use partition(dt)
select
nvl(new.id,old.id),
nvl(new.coupon_id,old.coupon_id),
nvl(new.user_id,old.user_id),
nvl(new.order_id,old.order_id),
nvl(new.coupon_status,old.coupon_status),
nvl(new.get_time,old.get_time),
nvl(new.using_time,old.using_time),
nvl(new.used_time,old.used_time),
nvl(new.expire_time,old.expire_time),
coalesce(date_format(nvl(new.used_time,old.used_time),'yyyy-MM-dd'),date_format(nvl(new.expire_time,old.expire_time),'yyyy-MM-dd'),'9999-99-99')
from
(
select
id, --优惠券使用id
coupon_id, --优惠券ID
user_id, --userid
order_id, --订单id
coupon_status, --优惠券状态
get_time, --领取时间
using_time, --使用时间(下单)
used_time, --使用时间(支付)
expire_time --过期时间
from dwd_coupon_use
where dt='9999-99-99'
)old
full outer join
(
select
id, --优惠券使用id
coupon_id, --优惠券ID
user_id, --userid
order_id, --订单id
coupon_status, --优惠券状态
get_time, --领取时间
using_time, --使用时间(下单)
used_time, --使用时间(支付)
expire_time --过期时间
from ods_coupon_use
where dt='2021-06-09'
)new
on old.id=new.id;

6.2.7 支付事实表(累积快照事实表)

  1)建表语句

DROP TABLE IF EXISTS dwd_payment_info;
CREATE EXTERNAL TABLE dwd_payment_info (
--来自ods_payment_info
`id` STRING COMMENT '编号',
`out_trade_no` STRING COMMENT '对外交易编号',
`order_id` STRING COMMENT '订单编号',
`user_id` STRING COMMENT '用户编号',
`payment_type` STRING COMMENT '支付类型',
`trade_no` STRING COMMENT '交易编号',
`payment_amount` DECIMAL(16,2) COMMENT '支付金额',
`payment_status` STRING COMMENT '支付状态',
`create_time` STRING COMMENT '创建时间',--调用第三方支付接口的时间
`callback_time` STRING COMMENT '完成时间',--支付完成时间,即支付成功回调时间
--来自ods_order_info
`province_id` STRING COMMENT '地区ID'
) COMMENT '支付事实表表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_payment_info/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载

insert overwrite table dwd_payment_info partition(dt)
select
pi.id, --支付id
pi.out_trade_no, --对外交易编号
pi.order_id, --订单编号
pi.user_id, --用户编号
pi.payment_type, --支付类型
pi.trade_no, --交易编号
pi.payment_amount, --支付金额
pi.payment_status, --支付状态
pi.create_time, --创建时间
pi.callback_time, --完成时间
oi.province_id, ----地区ID
nvl(date_format(pi.callback_time,'yyyy-MM-dd'),'9999-99-99')
from
(
select id, --支付id
out_trade_no, --对外交易编号
order_id, --订单编号
user_id, --用户编号
payment_type, --支付类型
trade_no, --交易编号
payment_amount, --支付金额
subject, --交易内容
payment_status, --支付状态
create_time, --创建时间
callback_time, --完成时间
dt
from ods_payment_info
where dt='2021-06-08'
)pi
left join
(
select id, --订单编号
province_id --地区ID
from ods_order_info
where dt='2021-06-08'
)oi on pi.order_id=oi.id;

    (2)每日装载

insert overwrite table dwd_payment_info partition(dt)
select
nvl(new.id,old.id),
nvl(new.out_trade_no,old.out_trade_no),
nvl(new.order_id,old.order_id),
nvl(new.user_id,old.user_id),
nvl(new.payment_type,old.payment_type),
nvl(new.trade_no,old.trade_no),
nvl(new.payment_amount,old.payment_amount),
nvl(new.payment_status,old.payment_status),
nvl(new.create_time,old.create_time),
nvl(new.callback_time,old.callback_time),
nvl(new.province_id,old.province_id),
nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
from
(
select id, --支付id
out_trade_no, --对外交易编号
order_id, --订单编号
user_id, --用户编号
payment_type, --支付类型
trade_no, --交易编号
payment_amount, --支付金额
payment_status, --支付状态
create_time, --创建时间
callback_time, --完成时间
province_id --地区ID
from dwd_payment_info
where dt = '9999-99-99'
)old
full outer join
(
select
pi.id, --支付id
pi.out_trade_no, --对外交易编号
pi.order_id, --订单编号
pi.user_id, --用户编号
oi.province_id, --地区ID
pi.payment_type, --支付类型
pi.trade_no, --交易编号
pi.payment_amount, --支付金额
pi.payment_status, --支付状态
pi.create_time, --创建时间
pi.callback_time --完成时间
from
(
select id, --支付id
out_trade_no, --对外交易编号
order_id, --订单编号
user_id, --用户编号
payment_type, --支付类型
trade_no, --交易编号
payment_amount, --支付金额
subject, --交易内容
payment_status, --支付状态
create_time, --创建时间
callback_time, --完成时间
dt
from ods_payment_info
where dt='2021-06-09'
)pi
left join
(
select id, --订单编号
province_id --地区ID
from ods_order_info
where dt='2021-06-09'
)oi
on pi.order_id=oi.id
)new
on old.id=new.id;

6.2.8 退款事实表(累积快照事实表)

  1)建表语句

DROP TABLE IF EXISTS dwd_refund_payment;
CREATE EXTERNAL TABLE dwd_refund_payment (
--来自ods_refund_payment
`id` STRING COMMENT '编号',
`out_trade_no` STRING COMMENT '对外交易编号',
`order_id` STRING COMMENT '订单编号',
`sku_id` STRING COMMENT 'SKU编号',
`payment_type` STRING COMMENT '支付类型',
`trade_no` STRING COMMENT '交易编号',
`refund_amount` DECIMAL(16,2) COMMENT '退款金额',
`refund_status` STRING COMMENT '退款状态',
`create_time` STRING COMMENT '创建时间', --调用第三方支付接口的时间
`callback_time` STRING COMMENT '回调时间', --支付接口回调时间,即支付成功时间
--来自ods_order_info
`user_id` STRING COMMENT '用户ID',
`province_id` STRING COMMENT '地区ID'
) COMMENT '退款事实表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_refund_payment/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载

insert overwrite table dwd_refund_payment partition(dt)
select
rp.id, --退款id
out_trade_no, --对外交易编号
order_id, --订单编号
sku_id, --SKU编号
payment_type, --支付类型
trade_no, --交易编号
refund_amount, --退款金额
refund_status, --退款状态
create_time, --创建时间
callback_time, --回调时间
user_id, --用户ID
province_id, --地区ID
nvl(date_format(callback_time,'yyyy-MM-dd'),'9999-99-99')
from
(
select
id, --退款id
out_trade_no, --对外交易编号
order_id, --订单编号
sku_id, --SKU编号
payment_type, --支付类型
trade_no, --交易编号
refund_amount, --退款金额
refund_status, --退款状态
create_time, --创建时间
callback_time --回调时间
from ods_refund_payment
where dt='2021-06-08'
)rp
left join
(
select
id, --订单编号
user_id, --用户ID
province_id --地区ID
from ods_order_info
where dt='2021-06-08'
)oi
on rp.order_id=oi.id;

    (2)每日装载

insert overwrite table dwd_refund_payment partition(dt)
select
nvl(new.id,old.id),
nvl(new.out_trade_no,old.out_trade_no),
nvl(new.order_id,old.order_id),
nvl(new.sku_id,old.sku_id),
nvl(new.payment_type,old.payment_type),
nvl(new.trade_no,old.trade_no),
nvl(new.refund_amount,old.refund_amount),
nvl(new.refund_status,old.refund_status),
nvl(new.create_time,old.create_time),
nvl(new.callback_time,old.callback_time),
nvl(new.user_id,old.user_id),
nvl(new.province_id,old.province_id),
nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
from
(
select
id, --退款id
out_trade_no, --对外交易编号
order_id, --订单编号
sku_id, --SKU编号
payment_type, --支付类型
trade_no, --交易编号
refund_amount, --退款金额
refund_status, --退款状态
create_time, --创建时间
callback_time, --回调时间
user_id, --用户ID
province_id --地区ID
from dwd_refund_payment
where dt='9999-99-99'
)old
full outer join
(
select
rp.id, --退款id
out_trade_no, --对外交易编号
order_id, --订单编号
sku_id, --SKU编号
payment_type, --支付类型
trade_no, --交易编号
refund_amount, --退款金额
refund_status, --退款状态
create_time, --创建时间
callback_time, --回调时间
user_id, --用户ID
province_id --地区ID
from
(
select
id, --退款id
out_trade_no, --对外交易编号
order_id, --订单编号
sku_id, --SKU编号
payment_type, --支付类型
trade_no, --交易编号
refund_amount, --退款金额
refund_status, --退款状态
create_time, --创建时间
callback_time --回调时间
from ods_refund_payment
where dt='2021-06-09'
)rp
left join
(
select
id, --订单编号
user_id, --用户ID
province_id --地区ID
from ods_order_info
where dt='2021-06-09'
)oi
on rp.order_id=oi.id
)new
on old.id=new.id;

  3)查询加载结果

6.2.9 订单事实表累积型快照事实表

  1)建表语句

DROP TABLE IF EXISTS dwd_order_info;
CREATE EXTERNAL TABLE dwd_order_info(
--来自ods_order_info
`id` STRING COMMENT '编号',
`final_amount` DECIMAL(16,2) COMMENT '订单最终价格',
`order_status` STRING COMMENT '订单状态',
`user_id` STRING COMMENT '用户ID',
`payment_way` STRING COMMENT '支付方式',
`delivery_address` STRING COMMENT '邮寄地址',
`out_trade_no` STRING COMMENT '对外交易编号',
`create_time` STRING COMMENT '创建时间(未支付状态)',
`expire_time` STRING COMMENT '过期时间',
`payment_time` STRING COMMENT '支付时间(已支付状态)',
`cancel_time` STRING COMMENT '取消时间(已取消状态)',
`finish_time` STRING COMMENT '完成时间(已完成状态)',
`refund_time` STRING COMMENT '退款时间(退款中状态)',
`refund_finish_time` STRING COMMENT '退款完成时间(退款完成状态)',
`tracking_no` STRING COMMENT '物流单号',
`province_id` STRING COMMENT '地区ID',
`activity_reduce_amount` DECIMAL(16,2) COMMENT '活动减免',
`coupon_reduce_amount` DECIMAL(16,2) COMMENT '优惠券减免',
`original_amount` DECIMAL(16,2) COMMENT '订单原始价格',
`feight_fee` DECIMAL(16,2) COMMENT '运费',
`feight_fee_reduce` DECIMAL(16,2) COMMENT '运费减免'
) COMMENT '订单事实表'
PARTITIONED BY (`dt` STRING)
STORED AS PARQUET
LOCATION '/warehouse/gmall/dwd/dwd_order_info/'
TBLPROPERTIES ("parquet.compression"="lzo");

  2)数据装载

    (1)首日装载

insert overwrite table dwd_order_info partition(dt)
select
oi.id, --订单号
final_amount, --订单最终价格
oi.order_status, --订单状态
oi.user_id, --用户ID
oi.payment_way, --支付方式
oi.delivery_address, --邮寄地址
oi.out_trade_no, --对外交易编号
oi.create_time, --创建时间(未支付状态)
oi.expire_time, --过期时间
times.ts['1002'] payment_time, --支付时间(已支付状态)
times.ts['1003'] cancel_time, --取消时间(已取消状态)
times.ts['1004'] finish_time, --完成时间(已完成状态)
times.ts['1005'] refund_time, --退款时间(退款中状态)
times.ts['1006'] refund_finish_time, --退款完成时间(退款完成状态)
oi.tracking_no, --物流单号
oi.province_id, --地区ID
activity_reduce_amount, --活动减免
coupon_reduce_amount, --优惠券减免
original_amount, --订单原始价格
feight_fee, --运费
feight_fee_reduce, ----运费减免
case
when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='2021-06-08' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
else '9999-99-99'
end
from
(
select id, --订单号
final_amount, --订单最终价格
order_status, --订单状态
user_id, --用户ID
payment_way, --支付方式
delivery_address, --邮寄地址
out_trade_no, --对外交易编号
create_time, --创建时间(未支付状态)
operate_time, --操作时间
expire_time, --过期时间
tracking_no, --物流单号
province_id, --地区ID
activity_reduce_amount, --活动减免
coupon_reduce_amount, --优惠券减免
original_amount, --订单原始价格
feight_fee, --运费
feight_fee_reduce, --运费减免
dt
from ods_order_info
where dt='2021-06-08'
)oi
left join
(
select
order_id, --订单编号
--订单状态:修改时间
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
from ods_order_status_log
where dt='2021-06-08'
group by order_id
)times
on oi.id=times.order_id;

    (2)每日装载

insert overwrite table dwd_order_info partition(dt)
select
oi.id, --订单号
final_amount, --订单最终价格
oi.order_status, --订单状态
oi.user_id, --用户ID
oi.payment_way, --支付方式
oi.delivery_address, --邮寄地址
oi.out_trade_no, --对外交易编号
oi.create_time, --创建时间(未支付状态)
oi.expire_time, --过期时间
times.ts['1002'] payment_time, --支付时间(已支付状态)
times.ts['1003'] cancel_time, --取消时间(已取消状态)
times.ts['1004'] finish_time, --完成时间(已完成状态)
times.ts['1005'] refund_time, --退款时间(退款中状态)
times.ts['1006'] refund_finish_time, --退款完成时间(退款完成状态)
oi.tracking_no, --物流单号
oi.province_id, --地区ID
activity_reduce_amount, --活动减免
coupon_reduce_amount, --优惠券减免
original_amount, --订单原始价格
feight_fee, --运费
feight_fee_reduce, ----运费减免
case
when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='2021-06-09' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
else '9999-99-99'
end
from
(
select id, --订单号
final_amount, --订单最终价格
order_status, --订单状态
user_id, --用户ID
payment_way, --支付方式
delivery_address, --邮寄地址
out_trade_no, --对外交易编号
create_time, --创建时间(未支付状态)
operate_time, --操作时间
expire_time, --过期时间
tracking_no, --物流单号
province_id, --地区ID
activity_reduce_amount, --活动减免
coupon_reduce_amount, --优惠券减免
original_amount, --订单原始价格
feight_fee, --运费
feight_fee_reduce, --运费减免
dt
from ods_order_info
where dt='2021-06-09'
)oi
left join
(
select
order_id, --订单编号
--订单状态:修改时间
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
from ods_order_status_log
where dt='2021-06-09'
group by order_id
)times
on oi.id=times.order_id;

6.2.10 DWD业务数据首日装载脚本

  1)编写脚本

    (1)在/home/atguigu/bin目录下创建脚本ods_to_dwd_db_init.sh

vim ods_to_dwd_db_init.sh
#!/bin/bash
APP=gmall

if [ -n "$2" ] ;then
   do_date=$2
else 
   echo "请传入日期参数"
   exit
fi 

dwd_order_info="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_order_info partition(dt)
select
    oi.id,                      --订单号
    final_amount,               --订单最终价格
    oi.order_status,            --订单状态
    oi.user_id,                 --用户ID
    oi.payment_way,             --支付方式
    oi.delivery_address,        --邮寄地址
    oi.out_trade_no,            --对外交易编号
    oi.create_time,             --创建时间(未支付状态)
    oi.expire_time,             --过期时间
    times.ts['1002'] payment_time,  --支付时间(已支付状态)
    times.ts['1003'] cancel_time,   --取消时间(已取消状态)
    times.ts['1004'] finish_time,   --完成时间(已完成状态)
    times.ts['1005'] refund_time,   --退款时间(退款中状态)
    times.ts['1006'] refund_finish_time,    --退款完成时间(退款完成状态)
    oi.tracking_no,             --物流单号
    oi.province_id,             --地区ID
    activity_reduce_amount,     --活动减免
    coupon_reduce_amount,       --优惠券减免
    original_amount,            --订单原始价格
    feight_fee,                 --运费
    feight_fee_reduce,          ----运费减免
    case
        when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
        when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='$do_date' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
        when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
        when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
        else '9999-99-99'
    end
from
(
    select id,                  --订单号
           final_amount,        --订单最终价格
           order_status,        --订单状态
           user_id,             --用户ID
           payment_way,         --支付方式
           delivery_address,    --邮寄地址
           out_trade_no,        --对外交易编号
           create_time,         --创建时间(未支付状态)
           operate_time,        --操作时间
           expire_time,         --过期时间
           tracking_no,         --物流单号
           province_id,         --地区ID
           activity_reduce_amount,  --活动减免
           coupon_reduce_amount,    --优惠券减免
           original_amount,         --订单原始价格
           feight_fee,              --运费
           feight_fee_reduce,       --运费减免
           dt
    from ${APP}.ods_order_info
    where dt='$do_date'
)oi
left join
(
    select
        order_id,       --订单编号
        --订单状态:修改时间
        str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
    from ${APP}.ods_order_status_log
    where dt='$do_date'
    group by order_id
)times
on oi.id=times.order_id;"

dwd_order_detail="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_order_detail partition(dt)
select
    od.id,                              --订单明细id
    od.order_id,                        --订单id
    od.sku_id,                          --skuid
    od.sku_num,                         --商品数量
    od.create_time,                     --创建时间
    od.source_type,                     --来源类型
    od.source_id,                       --来源编号
    od.split_final_amount,              --分摊最终金额
    od.split_activity_amount,           --分摊活动优惠
    od.split_coupon_amount,             --分摊优惠券优惠
    oi.province_id,                     --省份id
    od.order_price*od.sku_num,          --原始价格:商品价格*sku数量
    oi.user_id,                         --用户id
    oda.activity_id,                    --活动id
    oda.activity_rule_id,               --活动规则id
    odc.coupon_id,                      --优惠券id
    date_format(create_time,'yyyy-MM-dd')
from
(
    select id,                      --订单明细id
           order_id,                --订单id
           sku_id,                  --skuid
           sku_name,                --skuname
           order_price,             --商品价格
           sku_num,                 --商品数量
           create_time,             --创建时间
           source_type,             --来源类型
           source_id,               --来源编号
           split_final_amount,      --分摊最终金额
           split_activity_amount,   --分摊活动优惠
           split_coupon_amount,     --分摊优惠券优惠
           dt
    from ${APP}.ods_order_detail
    where dt='$do_date'
)od
left join
(
    select
        id,             --订单id
        user_id,        --用户id
        province_id     --省份id
    from ${APP}.ods_order_info
    where dt='$do_date'
)oi
on od.order_id=oi.id
left join
(
    select
        order_detail_id,    --订单明细id
        activity_id,        --活动id
        activity_rule_id    --活动规则id
    from ${APP}.ods_order_detail_activity
    where dt='$do_date'
)oda
on od.id=oda.order_detail_id
left join
(
    select
        order_detail_id,    --订单明细id
        coupon_id           --优惠券id
    from ${APP}.ods_order_detail_coupon
    where dt='$do_date'
)odc
on od.id=odc.order_detail_id;"

dwd_payment_info="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_payment_info partition(dt)
select
    pi.id,              --支付id
    pi.out_trade_no,    --对外交易编号
    pi.order_id,        --订单编号
    pi.user_id,         --用户编号
    pi.payment_type,    --支付类型
    pi.trade_no,        --交易编号
    pi.payment_amount,  --支付金额
    pi.payment_status,  --支付状态
    pi.create_time,     --创建时间
    pi.callback_time,   --完成时间
    oi.province_id,     ----地区ID
    nvl(date_format(pi.callback_time,'yyyy-MM-dd'),'9999-99-99')
from
(
    select id,              --支付id
           out_trade_no,    --对外交易编号
           order_id,        --订单编号
           user_id,         --用户编号
           payment_type,    --支付类型
           trade_no,        --交易编号
           payment_amount,  --支付金额
           subject,         --交易内容
           payment_status,  --支付状态
           create_time,     --创建时间
           callback_time,   --完成时间
           dt
    from ${APP}.ods_payment_info
    where dt='$do_date'
)pi
left join
(
    select id,          --订单编号
           province_id  --地区ID
    from ${APP}.ods_order_info
    where dt='$do_date'
)oi on pi.order_id=oi.id;"

dwd_cart_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_cart_info partition(dt='$do_date')
select
    id,                 --购物车id
    user_id,            --用户id
    sku_id,             --商品id
    cart_price,         --加入购物车时的价格
    sku_num,            --加购数量
    create_time,        --创建时间
    operate_time,       --修改时间
    is_ordered,         --是否下单
    order_time,         --下单时间
    source_type,        --来源类型
    source_id           --来源id
from ${APP}.ods_cart_info
where dt='$do_date';"

dwd_comment_info="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_comment_info partition (dt)
select id,
       user_id,
       sku_id,
       spu_id,
       order_id,
       appraise,
       create_time,
       date_format(create_time,'yyyy-MM-dd')
from ${APP}.ods_comment_info
where dt='$do_date';
"

dwd_favor_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_favor_info partition(dt='$do_date')
select
    id,                 --收藏id
    user_id,            --用户id
    sku_id,             --skuid
    spu_id,             --spuid
    is_cancel,          --是否取消
    create_time,        --收藏时间
    cancel_time         --取消时间
from ${APP}.ods_favor_info
where dt='$do_date';"

dwd_coupon_use="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_coupon_use partition(dt)
select
    id,                 --优惠券使用id
    coupon_id,          --优惠券ID
    user_id,            --userid
    order_id,           --订单id
    coupon_status,      --优惠券状态
    get_time,           --领取时间
    using_time,         --使用时间(下单)
    used_time,          --使用时间(支付)
    expire_time,        --过期时间
    coalesce(date_format(used_time,'yyyy-MM-dd'),date_format(expire_time,'yyyy-MM-dd'),'9999-99-99')
from ${APP}.ods_coupon_use
where dt='$do_date';"

dwd_order_refund_info="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_order_refund_info partition(dt)
select
    ri.id,                      --退单id
    ri.user_id,                 --用户id
    ri.order_id,                --订单id
    ri.sku_id,                  --商品id
    ri.refund_type,             --退单类型
    ri.refund_num,              --退单件数
    ri.refund_amount,           --退单金额
    ri.refund_reason_type,      --退单原因类型
    ri.create_time,             --退单时间
    oi.province_id,             --省份id
    date_format(ri.create_time,'yyyy-MM-dd')
from
(
    select id,                  --退单id
           user_id,             --用户id
           order_id,            --订单id
           sku_id,              --商品id
           refund_type,         --退单类型
           refund_num,          --退单件数
           refund_amount,       --退单金额
           refund_reason_type,  --退单原因类型
           create_time,         --退单时间
           refund_status,       --退单状态
           dt
    from ${APP}.ods_order_refund_info
    where dt='$do_date'
)ri
left join
(
    select id,              --订单id
           province_id      --省份id
    from ${APP}.ods_order_info
    where dt='$do_date'
)oi
on ri.order_id=oi.id;"

dwd_refund_payment="
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_payment_info partition(dt)
select
    pi.id,              --支付id
    pi.out_trade_no,    --对外交易编号
    pi.order_id,        --订单编号
    pi.user_id,         --用户编号
    pi.payment_type,    --支付类型
    pi.trade_no,        --交易编号
    pi.payment_amount,  --支付金额
    pi.payment_status,  --支付状态
    pi.create_time,     --创建时间
    pi.callback_time,   --完成时间
    oi.province_id,     ----地区ID
    nvl(date_format(pi.callback_time,'yyyy-MM-dd'),'9999-99-99')
from
(
    select id,              --支付id
           out_trade_no,    --对外交易编号
           order_id,        --订单编号
           user_id,         --用户编号
           payment_type,    --支付类型
           trade_no,        --交易编号
           payment_amount,  --支付金额
           subject,         --交易内容
           payment_status,  --支付状态
           create_time,     --创建时间
           callback_time,   --完成时间
           dt
    from ${APP}.ods_payment_info
    where dt='$do_date'
)pi
left join
(
    select id,          --订单编号
           province_id  --地区ID
    from ${APP}.ods_order_info
    where dt='$do_date'
)oi on pi.order_id=oi.id;"

case $1 in
    dwd_order_info )
        hive -e "$dwd_order_info"
    ;;
    dwd_order_detail )
        hive -e "$dwd_order_detail"
    ;;
    dwd_payment_info )
        hive -e "$dwd_payment_info"
    ;;
    dwd_cart_info )
        hive -e "$dwd_cart_info"
    ;;
    dwd_comment_info )
        hive -e "$dwd_comment_info"
    ;;
    dwd_favor_info )
        hive -e "$dwd_favor_info"
    ;;
    dwd_coupon_use )
        hive -e "$dwd_coupon_use"
    ;;
    dwd_order_refund_info )
        hive -e "$dwd_order_refund_info"
    ;;
    dwd_refund_payment )
        hive -e "$dwd_refund_payment"
    ;;
    all )
        hive -e "$dwd_order_info$dwd_order_detail$dwd_payment_info$dwd_cart_info$dwd_comment_info$dwd_favor_info$dwd_coupon_use$dwd_order_refund_info$dwd_refund_payment"
    ;;
esac

    (2)增加执行权限

chmod +x ods_to_dwd_db_init.sh

  2)脚本使用

    (1)执行脚本

ods_to_dwd_db_init.sh all 2021-06-08

    (2)查看数据是否导入成功

6.2.11 DWD业务数据每日装载脚本

  1)编写脚本

    (1)在/home/atguigu/bin目录下创建脚本ods_to_dwd_db.sh

vim ods_to_dwd_db.sh
#!/bin/bash

APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
    do_date=$2
else 
    do_date= date -d "-1 day" +%F
fi


# 假设某累积型快照事实表,某天所有的业务记录全部完成,则会导致9999-99-99分区的数据未被覆盖,从而导致数据重复,该函数根据9999-99-99分区的数据
clear_data(){
    current_date=`date +%F`
    current_date_timestamp=`date -d "$current_date" +%s`

    last_modified_date=`hadoop fs -ls /warehouse/gmall/dwd/$1 | grep '9999-99-99' | awk '{print $6}'`
    last_modified_date_timestamp=`date -d "$last_modified_date" +%s`

    if [[ $last_modified_date_timestamp -lt $current_date_timestamp ]]; then
        echo "clear table $1 partition(dt=9999-99-99)"
        hadoop fs -rm -r -f /warehouse/gmall/dwd/$1/dt=9999-99-99/*
    fi
}

dwd_order_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_order_info partition(dt)
select
    oi.id,                      --订单号
    final_amount,               --订单最终价格
    oi.order_status,            --订单状态
    oi.user_id,                 --用户ID
    oi.payment_way,             --支付方式
    oi.delivery_address,        --邮寄地址
    oi.out_trade_no,            --对外交易编号
    oi.create_time,             --创建时间(未支付状态)
    oi.expire_time,             --过期时间
    times.ts['1002'] payment_time,  --支付时间(已支付状态)
    times.ts['1003'] cancel_time,   --取消时间(已取消状态)
    times.ts['1004'] finish_time,   --完成时间(已完成状态)
    times.ts['1005'] refund_time,   --退款时间(退款中状态)
    times.ts['1006'] refund_finish_time,    --退款完成时间(退款完成状态)
    oi.tracking_no,             --物流单号
    oi.province_id,             --地区ID
    activity_reduce_amount,     --活动减免
    coupon_reduce_amount,       --优惠券减免
    original_amount,            --订单原始价格
    feight_fee,                 --运费
    feight_fee_reduce,          ----运费减免
    case
        when times.ts['1003'] is not null then date_format(times.ts['1003'],'yyyy-MM-dd')
        when times.ts['1004'] is not null and date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)<='$do_date' and times.ts['1005'] is null then date_add(date_format(times.ts['1004'],'yyyy-MM-dd'),7)
        when times.ts['1006'] is not null then date_format(times.ts['1006'],'yyyy-MM-dd')
        when oi.expire_time is not null then date_format(oi.expire_time,'yyyy-MM-dd')
        else '9999-99-99'
    end
from
(
    select id,                  --订单号
           final_amount,        --订单最终价格
           order_status,        --订单状态
           user_id,             --用户ID
           payment_way,         --支付方式
           delivery_address,    --邮寄地址
           out_trade_no,        --对外交易编号
           create_time,         --创建时间(未支付状态)
           operate_time,        --操作时间
           expire_time,         --过期时间
           tracking_no,         --物流单号
           province_id,         --地区ID
           activity_reduce_amount,  --活动减免
           coupon_reduce_amount,    --优惠券减免
           original_amount,         --订单原始价格
           feight_fee,              --运费
           feight_fee_reduce,       --运费减免
           dt
    from ${APP}.ods_order_info
    where dt='2021-06-09'
)oi
left join
(
    select
        order_id,       --订单编号
        --订单状态:修改时间
        str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') ts
    from ${APP}.ods_order_status_log
    where dt='$do_date'
    group by order_id
)times
on oi.id=times.order_id;"

dwd_order_detail="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_order_detail partition(dt='$do_date')
select
    od.id,                              --订单明细id
    od.order_id,                        --订单id
    od.sku_id,                          --skuid
    od.sku_num,                         --商品数量
    od.create_time,                     --创建时间
    od.source_type,                     --来源类型
    od.source_id,                       --来源编号
    od.split_final_amount,              --分摊最终金额
    od.split_activity_amount,           --分摊活动优惠
    od.split_coupon_amount,             --分摊优惠券优惠
    oi.province_id,                     --省份id
    od.order_price*od.sku_num,          --原始价格:商品价格*sku数量
    oi.user_id,                         --用户id
    oda.activity_id,                    --活动id
    oda.activity_rule_id,               --活动规则id
    odc.coupon_id                      --优惠券id
from
(
    select id,                      --订单明细id
           order_id,                --订单id
           sku_id,                  --skuid
           sku_name,                --skuname
           order_price,             --商品价格
           sku_num,                 --商品数量
           create_time,             --创建时间
           source_type,             --来源类型
           source_id,               --来源编号
           split_final_amount,      --分摊最终金额
           split_activity_amount,   --分摊活动优惠
           split_coupon_amount,     --分摊优惠券优惠
           dt
    from ${APP}.ods_order_detail
    where dt='$do_date'
)od
left join
(
    select
        id,             --订单id
        user_id,        --用户id
        province_id     --省份id
    from ${APP}.ods_order_info
    where dt='$do_date'
)oi
on od.order_id=oi.id
left join
(
    select
        order_detail_id,    --订单明细id
        activity_id,        --活动id
        activity_rule_id    --活动规则id
    from ${APP}.ods_order_detail_activity
    where dt='$do_date'
)oda
on od.id=oda.order_detail_id
left join
(
    select
        order_detail_id,    --订单明细id
        coupon_id           --优惠券id
    from ${APP}.ods_order_detail_coupon
    where dt='$do_date'
)odc
on od.id=odc.order_detail_id;"


dwd_payment_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_payment_info partition(dt)
select
    nvl(new.id,old.id),
    nvl(new.out_trade_no,old.out_trade_no),
    nvl(new.order_id,old.order_id),
    nvl(new.user_id,old.user_id),
    nvl(new.payment_type,old.payment_type),
    nvl(new.trade_no,old.trade_no),
    nvl(new.payment_amount,old.payment_amount),
    nvl(new.payment_status,old.payment_status),
    nvl(new.create_time,old.create_time),
    nvl(new.callback_time,old.callback_time),
    nvl(new.province_id,old.province_id),
    nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
from
(
    select id,              --支付id
           out_trade_no,    --对外交易编号
           order_id,        --订单编号
           user_id,         --用户编号
           payment_type,    --支付类型
           trade_no,        --交易编号
           payment_amount,  --支付金额
           payment_status,  --支付状态
           create_time,     --创建时间
           callback_time,   --完成时间
           province_id      --地区ID
    from ${APP}.dwd_payment_info
    where dt = '9999-99-99'
)old
full outer join
(
    select
        pi.id,              --支付id
        pi.out_trade_no,    --对外交易编号
        pi.order_id,        --订单编号
        pi.user_id,         --用户编号
        oi.province_id,     --地区ID
        pi.payment_type,    --支付类型
        pi.trade_no,        --交易编号
        pi.payment_amount,  --支付金额
        pi.payment_status,  --支付状态
        pi.create_time,     --创建时间
        pi.callback_time    --完成时间
    from
    (
        select id,              --支付id
               out_trade_no,    --对外交易编号
               order_id,        --订单编号
               user_id,         --用户编号
               payment_type,    --支付类型
               trade_no,        --交易编号
               payment_amount,  --支付金额
               subject,         --交易内容
               payment_status,  --支付状态
               create_time,     --创建时间
               callback_time,   --完成时间
               dt
        from ${APP}.ods_payment_info
        where dt='$do_date'
    )pi
    left join
    (
        select id,          --订单编号
               province_id  --地区ID
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi
    on pi.order_id=oi.id
)new
on old.id=new.id;"

dwd_cart_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_cart_info partition(dt='$do_date')
select
    id,                 --购物车id
    user_id,            --用户id
    sku_id,             --商品id
    cart_price,         --加入购物车时的价格
    sku_num,            --加购数量
    create_time,        --创建时间
    operate_time,       --修改时间
    is_ordered,         --是否下单
    order_time,         --下单时间
    source_type,        --来源类型
    source_id           --来源id
from ${APP}.ods_cart_info
where dt='$do_date';"


dwd_comment_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_comment_info partition (dt='$do_date')
select id,
       user_id,
       sku_id,
       spu_id,
       order_id,
       appraise,
       create_time
from ${APP}.ods_comment_info
where dt = '$do_date';"


dwd_favor_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_favor_info partition(dt='$do_date')
select
    id,                 --收藏id
    user_id,            --用户id
    sku_id,             --skuid
    spu_id,             --spuid
    is_cancel,          --是否取消
    create_time,        --收藏时间
    cancel_time         --取消时间
from ${APP}.ods_favor_info
where dt='$do_date';"


dwd_coupon_use="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_coupon_use partition(dt)
select
    nvl(new.id,old.id),
    nvl(new.coupon_id,old.coupon_id),
    nvl(new.user_id,old.user_id),
    nvl(new.order_id,old.order_id),
    nvl(new.coupon_status,old.coupon_status),
    nvl(new.get_time,old.get_time),
    nvl(new.using_time,old.using_time),
    nvl(new.used_time,old.used_time),
    nvl(new.expire_time,old.expire_time),
    coalesce(date_format(nvl(new.used_time,old.used_time),'yyyy-MM-dd'),date_format(nvl(new.expire_time,old.expire_time),'yyyy-MM-dd'),'9999-99-99')
from
(
    select
        id,                 --优惠券使用id
        coupon_id,          --优惠券ID
        user_id,            --userid
        order_id,           --订单id
        coupon_status,      --优惠券状态
        get_time,           --领取时间
        using_time,         --使用时间(下单)
        used_time,          --使用时间(支付)
        expire_time        --过期时间
    from ${APP}.dwd_coupon_use
    where dt='9999-99-99'
)old
full outer join
(
    select
        id,                 --优惠券使用id
        coupon_id,          --优惠券ID
        user_id,            --userid
        order_id,           --订单id
        coupon_status,      --优惠券状态
        get_time,           --领取时间
        using_time,         --使用时间(下单)
        used_time,          --使用时间(支付)
        expire_time        --过期时间
    from ${APP}.ods_coupon_use
    where dt='$do_date'
)new
on old.id=new.id;"

dwd_order_refund_info="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_order_refund_info partition(dt='$do_date')
select
    ri.id,                      --退单id
    ri.user_id,                 --用户id
    ri.order_id,                --订单id
    ri.sku_id,                  --商品id
    ri.refund_type,             --退单类型
    ri.refund_num,              --退单件数
    ri.refund_amount,           --退单金额
    ri.refund_reason_type,      --退单原因类型
    ri.create_time,             --退单时间
    oi.province_id             --省份id
from
(
    select id,                  --退单id
           user_id,             --用户id
           order_id,            --订单id
           sku_id,              --商品id
           refund_type,         --退单类型
           refund_num,          --退单件数
           refund_amount,       --退单金额
           refund_reason_type,  --退单原因类型
           create_time,         --退单时间
           refund_status,       --退单状态
           dt
    from ${APP}.ods_order_refund_info
    where dt='$do_date'
)ri
left join
(
    select id,              --订单id
           province_id      --省份id
    from ${APP}.ods_order_info
    where dt='$do_date'
)oi
on ri.order_id=oi.id;"


dwd_refund_payment="
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table ${APP}.dwd_refund_payment partition(dt)
select
    nvl(new.id,old.id),
    nvl(new.out_trade_no,old.out_trade_no),
    nvl(new.order_id,old.order_id),
    nvl(new.sku_id,old.sku_id),
    nvl(new.payment_type,old.payment_type),
    nvl(new.trade_no,old.trade_no),
    nvl(new.refund_amount,old.refund_amount),
    nvl(new.refund_status,old.refund_status),
    nvl(new.create_time,old.create_time),
    nvl(new.callback_time,old.callback_time),
    nvl(new.user_id,old.user_id),
    nvl(new.province_id,old.province_id),
    nvl(date_format(nvl(new.callback_time,old.callback_time),'yyyy-MM-dd'),'9999-99-99')
from
(
    select
        id,             --退款id
        out_trade_no,   --对外交易编号
        order_id,       --订单编号
        sku_id,         --SKU编号
        payment_type,   --支付类型
        trade_no,       --交易编号
        refund_amount,  --退款金额
        refund_status,  --退款状态
        create_time,    --创建时间
        callback_time,  --回调时间
        user_id,        --用户ID
        province_id    --地区ID
    from ${APP}.dwd_refund_payment
    where dt='9999-99-99'
)old
full outer join
(
    select
        rp.id,          --退款id
        out_trade_no,   --对外交易编号
        order_id,       --订单编号
        sku_id,         --SKU编号
        payment_type,   --支付类型
        trade_no,       --交易编号
        refund_amount,  --退款金额
        refund_status,  --退款状态
        create_time,    --创建时间
        callback_time,  --回调时间
        user_id,        --用户ID
        province_id    --地区ID
    from
    (
        select
            id,             --退款id
            out_trade_no,   --对外交易编号
            order_id,       --订单编号
            sku_id,         --SKU编号
            payment_type,   --支付类型
            trade_no,       --交易编号
            refund_amount,  --退款金额
            refund_status,  --退款状态
            create_time,    --创建时间
            callback_time   --回调时间
        from ${APP}.ods_refund_payment
        where dt='$do_date'
    )rp
    left join
    (
        select
            id,             --订单编号
            user_id,        --用户ID
            province_id     --地区ID
        from ${APP}.ods_order_info
        where dt='$do_date'
    )oi
    on rp.order_id=oi.id
)new
on old.id=new.id;"

case $1 in
    dwd_order_info )
        hive -e "$dwd_order_info"
        clear_data dwd_order_info
    ;;
    dwd_order_detail )
        hive -e "$dwd_order_detail"
    ;;
    dwd_payment_info )
        hive -e "$dwd_payment_info"
        clear_data dwd_payment_info
    ;;
    dwd_cart_info )
        hive -e "$dwd_cart_info"
    ;;
    dwd_comment_info )
        hive -e "$dwd_comment_info"
    ;;
    dwd_favor_info )
        hive -e "$dwd_favor_info"
    ;;
    dwd_coupon_use )
        hive -e "$dwd_coupon_use"
        clear_data dwd_coupon_use
    ;;
    dwd_order_refund_info )
        hive -e "$dwd_order_refund_info"
    ;;
    dwd_refund_payment )
        hive -e "$dwd_refund_payment"
        clear_data dwd_refund_payment
    ;;
    all )
        hive -e "$dwd_order_info$dwd_order_detail$dwd_payment_info$dwd_cart_info$dwd_comment_info$dwd_favor_info$dwd_coupon_use$dwd_order_refund_info$dwd_refund_payment"
        clear_data dwd_order_info
        clear_data dwd_payment_info
        clear_data dwd_coupon_use
        clear_data dwd_refund_payment
    ;;
esac

    (2)增加脚本执行权限

chmod 777 ods_to_dwd_db.sh

  2)脚本使用

    (1)执行脚本

ods_to_dwd_db.sh all 2021-06-09

    (2)查看数据是否导入成功

原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/14875005.html