[数据仓库/数据治理]浅谈历史拉链表

1 拉链表 概述

1.1 什么是拉链表?(定义)

拉链表,其实是数仓工程师针对变化数据的修改问题的一种解决方案下的某一类同一性质的表。

拉链表是针对数据仓库设计中表存储数据的方式而定义的,顾名思义,所谓拉链,就是记录历史。记录一个事物从开始,一直到当前状态的所有变化的信息。

1.2拉链表的优点

  • 既可以保存历史数据,又防止了对历史数据的过多冗余存储(全量数据集越大,越节省磁盘存储空间

比如:用户的任一一个增、删,改操作,若都对一f条数据进行存储,这样就会出现大量重复的冗余数据,增加不必要的存储空间。

1.3 拉链表的缺点?

  • 降低系统检索、处理数据的效率。

1.4 拉链表的适用场景

  • 需要查看某一个时间点或者时间段的历史快照信息。比如,查看某一个订单在历史某一个时间点的状态;查看某一个用户在过去某一段时间内,更新过几次等等
  • 适合基于历史数据【更新频率低】(如果每天1000w订单,每天更新1000次以上)
  • 适合【更新数据集】占【全量数据集】的比例低
  • 须【节约存储空间】。如果对数据同步任务的目标表每天都保留1份全量,那么,每次全量数据集中会保存大量不变的信息,对存储是极大的浪费

2 拉链表场景1:每个周期仅对【增量更新(UPDATE + INSERT)】数据同步到数仓

2.1 需求描述

假定:对刚同步过来的【增量更新数据集】内每一条数据行是INSERT新增的,还是更新UPDATE更新的情况是未知的。
假定:数仓(ODS/DWD)是未开启(ACID特性) / 不支持对数据行 UPDATE、DELETE 操作的 HIVE 数仓。(即 拉链表的更新操作,需要全量取出,再全量写入)

局限性: 无法识别出 被删除的数据。(因为每个周期都是增量同步到数据仓库)

2.2 模型设计

exam_system.tb_examinee
  id / name / idcard / birthdate / sex / height / school_name

ods.ods_examinee; -- [T+1周期增量切片表 := T+1周期更新数据集] 仅存储应用系统在每个周期同步过来的 增量更新(INSERT + UPDATE) 数据集 
  id / name / idcard / birthdate / sex / height / school_name
  ods_time(在ODS库的最新[插入]更新时间; 假定在MYSQL数仓中数据类型:datetime,在HIVE数仓中数据类型:timestamp)

dwd.ods_examinee_his; -- [拉链表] 存储来源应用数据表的所有(含:过去0-T(+1)周期中未变化或变化过的)数据集: (T周期拉链表) + (T+1周期更新数据集) ==> (T+1周期拉链表)
  id / name / idcard / birthdate / sex / height / school_name
  dwd_start_time / dwd_end_time (在DWD库的插入时间、结束时间;通过这2个时间字段,限定了该行数据的生命周期;假定在MYSQL数仓中数据类型:datetime,在HIVE数仓中数据类型:timestamp)
  dwd_data_state (当前周期当前数据行的状态: INIT[针对初始数据集] / INSERT[针对T+1周期数据集] / UPDATE[针对T+1周期数据集] / UPDATED[针对T周期数据集] / DELETED[针对T周期数据集])

2.3 ETL任务设计

仅描述核心步骤

  • step1 T+1周期开始时:【增量更新数据集】同步到 ODS库的周期切片表
-- T+1周期开始时: tb_examinee --> ods_examinee
select id , name , idcard , birthdate , sex , height , school_name from tb_examinee; 
  • step2 T+1周期开始时: 通过 ods_examinee(T+1周期增量切片表) UNION(by: join 或 in /not in) ods_examinee_his(T周期的拉链表) ==> 生成 T+1周期的拉链表
[SQL-1] 方式1(JOIN) 
-- (INSERT 新增) LEFT JOIN update表A.key = 拉链表B.key WHERE 拉链表b.key IS NULL 
-- (UPDATE 更新) INNER JOIN update表A.key = 拉链表B.key 

INSERT OVERWRITE TABLE dwd_examinee_his_tmp -- 输出到【(拉链表的)临时表】;然后,再清空【HIS拉链表】;最后,把【临时表的数据】全量覆盖式(更新)到【拉链表】
  -- 1 处理/加载 T+1 周期内的 INSERT 数据
  UNION
  -- 2 处理/加载 T+1 周期内的 UPDATE(D) 数据 : T+1周期的数据 & T周期的被更新数据
  UNION
  -- 3 处理/加载 T 周期前的 [历史/无效]数据 (或可理解为: 处理 非T、非T+1周期的数据集)

DEMO : JOIN 简易版 ↑↑↑

[SQL-2] 方式2(IN) - NOT IN(INSERT 新增) 和 IN(UPDATE更新)
  INSERT OVERWRITE TABLE dwd_examinee_his_tmp -- 输出到【(拉链表的)临时表】;然后,再清空【HIS拉链表】;最后,把【临时表的数据】全量覆盖式(更新)到【拉链表】
    -- 1 处理/加载 INSERT 数据
    SELECT -- 针对 T+1 周期内 的 INSERT (最新/有效)数据 By 【 NOT IN 】
      tmp_update.*, 
      current_timestamp() AS dwd_start_time, '9999-12-31 00:00:00' AS dwd_end_time, 
      'INSERT' AS dwd_data_state
    FROM dwd.ods_examinee tmp_update WHERE tmp_update.id NOT IN ( -- 通过 NOT IN 筛选出 被 UPDATE 的数据
      SELECT id FROM dwd.dwd_examinee_his WHERE dw_end_time >= '9999-12-31 00:00:00' -- 通过 dw_end_time = '9999-12-31 00:00:00' 筛选出本表中[最新周期的]全部有效数据集
    )

    UNION -- UNION ALL(不会去重) / UNION(会去重)
      
    -- 2 处理/加载 UPDATE(D) 数据
    SELECT -- 针对 T+1 周期内 的 UPDATE (最新/有效)数据 By 【 IN 】
      tmp_update.*, 
      current_timestamp() AS dwd_start_time, '9999-12-31 00:00:00' AS dwd_end_time, 
      'UPDATE' AS dwd_data_state
    FROM dwd.ods_examinee tmp_update WHERE tmp_update.id IN ( -- 通过 IN 筛选出 被 UPDATE 的数据
      SELECT id FROM dwd.dwd_examinee_his WHERE dw_end_time >= '9999-12-31 00:00:00' -- 通过 dwd_end_time = '9999-12-31 00:00:00' 筛选出本表中[最新周期的]全部有效数据集
    )
    UNION
    SELECT -- 针对 T 周期的 UPDATED (T+1周期时过期的)数据 By 【 IN 】
      tmp_his.*, 
      dwd_start_time, '9999-12-31 00:00:00' AS dwd_end_time, 
      'UPDATED' AS dwd_data_state
    FROM dwd.dwd_examinee_his tmp_his 
    WHERE 1 =1 
      AND tmp_his.dwd_end_time >= '9999-12-31 00:00:00' -- 通过 dwd_end_time = '9999-12-31 00:00:00' 筛选出 本表中[最新周期的]全部有效数据集
      AND tmp_his.id IN ( -- 通过 IN 筛选出 被 UPDATE 的数据
        SELECT * from dwd.ods_examinee AS tmp_update
      )

  UNION

  -- 3 处理/加载 T周期前的[历史/无效]数据 (或可理解为: 处理 非T、非T+1周期的数据集)
  SELECT 
    tmp_his.*
    dwd_start_time, dwd_end_time,  -- 保持原样即可
    dwd_data_state -- 保持原样即可
  FROM dwd.dwd_examinee_his tmp_his 
  WHERE
    1 = 1
    AND tmp_his.dwd_end_time <= current_date() -- 通过 end_time <= current_date() 筛选出 历史/无效 数据集

3 拉链表场景2:每个周期【全量数据集(ALL: UPDATE + INSERT + ...)】数据同步到数仓

3.1 需求描述

假定:对刚同步过来的【全量数据集】内每一条数据行是INSERT新增的,还是UPDATE更新的、DELETE删除的状态信息是未知的。
假定:数仓(ODS/DWD)是未开启(ACID特性) / 不支持对数据行 UPDATE、DELETE 操作的 HIVE 数仓。(即 拉链表的更新操作,需要全量取出,再全量写入)

特性:

  • 能够识别出 被删除的数据。(因为每个周期都是全量同步到数据仓库)
  • 相比增量更新场景,全量更新会产生1个问题:如何计算出【更新数据集(INSERT新增、UPDATE修改、DELETE删除)】

3.2 模型设计

注1:同增量同步相比,在模型设计上也完全可以不做变化(切片表1张、拉链表1张)。

  • 上述的增量更新场景的模型设计已经是较为简洁的了(已尽量省去了中间表) 【故此,推荐】
  • 只不过需要注意的是,取T周期的全量数据,需要换成其它的表;新增1个问题:如何计算出更新数据集(重点、难点)

注2:下面提供的这种建模方式,建立的表较多(目的1:为的是更好地理解整个过程;目的2:提供另一种建模思路————空间换时间)

exam_system.tb_examinee
  id / name / idcard / birthdate / sex / height / school_name

ods.ods_examinee -- [T+1周期全量切片表 := T+1周期全量数据集] 仅存储应用系统在每个周期(T+1)同步过来的 全量数据集(... + INSERT + UPDATE) 
  id / name / idcard / birthdate / sex / height / school_name
  ods_time(在ODS库的最新[插入]更新时间; 假定在MYSQL数仓中数据类型:datetime,在HIVE数仓中数据类型:timestamp)

ods.ods_examinee_previous -- [T周期全量切片表 := T周期全量数据集] 仅存储应用系统在每个周期(T)同步过来的 全量数据集(... + INSERT + UPDATE) 
  id / name / idcard / birthdate / sex / height / school_name
  ods_time(在ODS库的最新[插入]更新时间; 假定在MYSQL数仓中数据类型:datetime,在HIVE数仓中数据类型:timestamp)

dwd.dwd_examinee_update -- [T+1周期更新数据集 := T+1周期增量切片表] :T+1周期全量切片表 +T期全量切片表 ==> T+1周期更新数据集 
  id / name / idcard / birthdate / sex / height / school_name
  dwd_time(在DWD库的最新[插入]更新时间; 假定在MYSQL数仓中数据类型:datetime,在HIVE数仓中数据类型:timestamp)
  dwd_data_state (当前周期当前数据行的状态: INIT[针对初始数据集] / INSERT[针对T+1周期数据集] / UPDATE[针对T+1周期数据集] / UPDATED[针对T周期数据集] / DELETED[针对T周期数据集])

dwd.ods_examinee_his -- [拉链表] 存储来源应用数据表的所有(含:过去0-T(+1)周期中未变化或变化过的)数据集: (T周期拉链表) + (T+1周期更新数据集) ==> (T+1周期拉链表)
  id / name / idcard / birthdate / sex / height / school_name
  dwd_start_time / dwd_end_time (在DWD库的插入时间、结束时间;通过这2个时间字段,限定了该行数据的生命周期;假定在MYSQL数仓中数据类型:datetime,在HIVE数仓中数据类型:timestamp)
  dwd_data_state (当前周期当前数据行的状态: INIT[针对初始数据集] / INSERT[针对T+1周期数据集] / UPDATE[针对T+1周期数据集] / UPDATED[针对T周期数据集] / DELETED[针对T周期数据集])

特补充: 应用系统((MYSQL))应用表的DDL、DML(初始化数据集)

use exam_system;
-- ----------------------------
-- Table structure for tb_examinee
-- ----------------------------
DROP TABLE IF EXISTS `tb_examinee`; -- 考生
CREATE TABLE `tb_examinee` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `birthdate` date DEFAULT NULL,
  `idcard` varchar(255) DEFAULT NULL,
  `sex` char(255) DEFAULT NULL,
  `height` double DEFAULT NULL,
  `school_name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
-- ----------------------------
-- Records of tb_examinee
-- ----------------------------
use exam_system;
TRUNCATE TABLE `tb_examinee`;
INSERT INTO `tb_examinee` VALUES ('1','王麻子', '2004-05-23', '532345200405231998', '1',  '167.5', 	'南京师范大学附属中学');
INSERT INTO `tb_examinee` VALUES ('2','李二娃', '2005-07-17', '532367200507171994', '1',  '178.2', 	'南京市金陵中学');
INSERT INTO `tb_examinee` VALUES ('3','张小花', '2005-02-28', '532367200502281786', '2',  '165.6', 	null);
INSERT INTO `tb_examinee` VALUES ('4','赵树人', '2005-06-30', '532367200506302850', '1',  '178.3', 	'南京市梅园中学');
INSERT INTO `tb_examinee` VALUES ('5','宋汇汇', '2006-01-04', '532367200601044529', '2',  '160.7',	'南京市金陵中学');
INSERT INTO `tb_examinee` VALUES ('6','张大牛', '2006-09-25', null, 		   '2',  '165.02', 	'南京市梅园中学');
INSERT INTO `tb_examinee` VALUES ('7','钱伟伟', '2005-07-29', '321002200507293629', '1', '178.4',	'南京师范大学附属中学');

3.3 ETL任务设计

step1 数仓表的建模

  • 1 test_ods.ods_edu_es_tb_fm_examinee_previous
-- HIVE存储
use test_ods;

DROP TABLE IF EXISTS `ods_edu_es_tb_fm_examinee_previous`; 
-- 考生信息 [全量切片表(上一周期的全量数据]

CREATE TABLE `ods_edu_es_tb_fm_examinee_previous` (
    `id` int COMMENT 'ID',
    `name` varchar(255) COMMENT '姓名',
    `birthdate` date COMMENT '出生日期',
    `idcard` varchar(255) COMMENT '身份证号码',
    `sex` varchar(255) COMMENT '性别',
    `height` double COMMENT '身高',
    `school_name` varchar(255) COMMENT '学校',

    `etl_time` timestamp COMMENT '数据入仓的ETL时间'
 ) COMMENT '考生信息表[[previous]全量切片表]'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '	' 
LINES TERMINATED BY '
'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
  • 2 test_ods.ods_edu_es_tb_fm_examinee
use test_ods;
DROP TABLE IF EXISTS `ods_edu_es_tb_fm_examinee`; 
-- 考生信息 [全量切片表(最新周期的全量数据]

CREATE TABLE `ods_edu_es_tb_fm_examinee` (
    `id` int COMMENT 'ID',
    `name` varchar(255) COMMENT '姓名',
    `birthdate` date COMMENT '出生日期',
    `idcard` varchar(255) COMMENT '身份证号码',
    `sex` varchar(255) COMMENT '性别',
    `height` double COMMENT '身高',
    `school_name` varchar(255) COMMENT '学校',

  `etl_time` timestamp
) COMMENT '考生信息表[(最新周期)全量切片表]'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '	' 
LINES TERMINATED BY '
'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
  • 3 test_dwd.dwd_edu_es_tb_im_examinee_update
use test_dwd;
DROP TABLE IF EXISTS `dwd_edu_es_tb_im_examinee_update`; 
-- 考生信息更新表 [更新表(筛选出:更新的、新增的、删除的)]
CREATE TABLE `dwd_edu_es_tb_im_examinee_update` (
    `id` int COMMENT 'ID',
    `name` varchar(255) COMMENT '姓名',
    `birthdate` date COMMENT '出生日期',
    `idcard` varchar(255) COMMENT '身份证号码',
    `sex` varchar(255) COMMENT '性别',
    `height` double COMMENT '身高',
    `school_name` varchar(255) COMMENT '学校',

  `etl_time` timestamp,
  `etl_data_state` varchar(50)  -- 标记: UPDATE / INSERT / DELETE
) COMMENT '考生信息表[update表]'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '	' 
LINES TERMINATED BY '
'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
  • 4 test_dwd.dwd_edu_es_tb_fm_examinee_his
use test_dwd;
DROP TABLE IF EXISTS `dwd_edu_es_tb_fm_examinee_his`; 
-- 考生信息更新表 [更新表(筛选出:更新的、新增的、删除的)]
CREATE TABLE `dwd_edu_es_tb_fm_examinee_his` (
    `id` int COMMENT 'ID',
    `name` varchar(255) COMMENT '姓名',
    `birthdate` date COMMENT '出生日期',
    `idcard` varchar(255) COMMENT '身份证号码',
    `sex` varchar(255) COMMENT '性别',
    `height` double COMMENT '身高',
    `school_name` varchar(255) COMMENT '学校',

  `etl_time` timestamp, -- 最新周期切片数据集 的 原始业务数据记录插入到数仓的时间
  `etl_data_state` varchar(50) , -- 标记 UPDATE / INSERT / DELETE
  
 `his_start_time` timestamp, -- 拉链表字段 (生命周期的开始时间)
 `his_end_time` timestamp -- 拉链表字段 (生命周期的结束时间)
) COMMENT '考生信息表[HIS(拉链)表]'
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '	' 
LINES TERMINATED BY '
'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");

step1 T+1周期前,[业务系统数据集]全量抽取到[最新周期全量切片表(init)]

仅在拉链表建模后的首次进行此初始化操作。
此操作的目的:对当前周期的数据进行全量地快照,作为拉链表的初始化数据集。

-- ktr_tr_es_ods-ods_edu_es_tb_fm_examinee
-- ETL TASK [全量切片表 : 按需要的频率,随时可全量备份一次数据]
-- SQL (mysql(业务系统) --> hive(ods.最新周期全量切片表)) 
-- 【最新周期切片表初始化的数据 - just first time】

-- 表输入组件SQL:
select id, name, birthdate, idcard, sex, school_name, now() as etl_time 
from exam_system.tb_examinee; 

-- Hive表输出组件: ods_edu_es_tb_fm_examinee [数据覆盖]

step2 T+1周期前,[最新周期全量切片表]全量抽取到[拉链表(init)]

仅在拉链表建模后的首次进行此初始化操作。
此操作的目的:对当前周期的数据进行全量地快照,作为拉链表的初始化数据集。

-- ktr_tr_ods_dwd-dwd_edu_es_tb_fm_examinee_his
-- ETL TASK [最新数据]
-- SQL (mysql(ods.最新周期 全量切片表) ----> hive(dwd.his拉链表)) [just first time for init](最初周期的初始化数据)
--- truncate table test_dwd.dwd_edu_es_tb_fm_examinee_his;
-- insert into test_dwd.dwd_edu_es_tb_fm_examinee_his(id, name, birthdate, idcard, sex, height, school_name, etl_time, etl_data_state, his_start_time, his_end_time)  select id, name, birthdate, idcard, sex, height, school_name, etl_time, 'INIT' as etl_data_state, current_timestamp() as his_start_time, '9999-12-31 00:00:00' as his_end_time from test_ods.ods_edu_es_tb_fm_examinee

step3 T+1周期前,[最新周期全量切片表]全量抽取到[上一周期全量切片表]

-- ktr_tr_es_ods-ods_edu_es_tb_fm_examinee_previous
-- ETL TASK [全量切片表 : 按需要的频率,随时可全量备份一次数据]
---- SQL(mysql(ods.最新周期全量切片表) --> hive(ods.上一周期全量切片表))

-- SQL组件1
truncate table test_ods.ods_edu_es_tb_fm_examinee_previous;

-- SQL组件2
insert into test_ods.ods_edu_es_tb_fm_examinee_previous(
id, name, birthdate, idcard, sex, height, school_name, etl_time
) 
select id, name, birthdate, idcard, sex, height, school_name, etl_time 
from test_ods.ods_edu_es_tb_fm_examinee;

step4 T+1周期内,对业务系统表的数据进行增删改操作[假定]

MYSQL

-- MYSQL
INSERT INTO exam_system.`tb_examinee`(
name, birthdate, idcard, sex, height, school_name
) VALUES ('欧阳云茵', '2006-01-01', '3210022005010142853', '2', '163.4', '南京师范大学附属中学'); -- insert

update exam_system.tb_examinee set height =186.3,sex='M' where name = '张大牛' ; -- update

DELETE FROM exam_system.tb_examinee where id=7 AND name= '钱伟伟'; -- delete

step5 T+1周期时,重复执行步骤1 (业务系统→最新周期全量切片表)

-- ktr_tr_es_ods-ods_edu_es_tb_fm_examinee
-- ETL TASK [全量切片表 : 按需要的频率,随时可全量备份一次数据]
-- SQL (mysql(业务系统) --> hive(ods.最新周期全量切片表)) 

-- 表输入组件SQL:
select id, name, birthdate, idcard, sex, school_name, now() as etl_time 
from exam_system.tb_examinee; 

-- Hive表输出组件: ods_edu_es_tb_fm_examinee [数据覆盖]

【step6】 T+1周期时,[最新周期全量切片表]+[上一周期全量切片表]覆盖式抽取生成[更新数据集]

找出T+1周期内的“更新数据集”(包括:新增、修改、删除),是最关键的一步。
找出“更新数据集”的方式方法有很多,例如,可通过:数据库日志 、其它的数据库专门工具(mysql:canal等)、全量比对T周期与T+1周期数据等方案来找出“更新数据集”。本方案中以较为通用的、与具体数据库类型无关的第三种方式——全量比对T周期与T+1周期数据集来实现对“更新数据集”的抽取。
本案例中假定的是每个周期全量抽取数据到2张切片表,故可以通过[T+1周期的全量切片表]+[T周期的全量切片表],通过各种join操作,比对出最新周期(T+1)的数据变更情况:新增的、删除的、修改过的。修改过的数据集如何查找?可通过比对2个周期中对拼接成一个字符串的表内所有字段的MD5摘要值,若MD5不同,则为被修改的。

但,若每个周期增量抽取数据到2张切片表,这种场景有几个不同之处:

  • 1)增量,非全量;
  • 2)可能有新增的、修改的,乃至删除的数据,甚至未做过修改的数据(后2者情况较少);

此时,怎么生成[更新数据集]?
实现思路是:通过[最新周期全量切片表(此时,其表内仅有业务系统推送的增量数据集)]+[拉链表(需筛选出T周期时有效的数据集)],来生成[更新数据集]。本质上,是异曲同工的。

-- HIVE
-- ktr_tr_ods_dwd-dwd_edu_es_tb_im_examinee_update
-- ETL TASK [生成 变化数据集 到 update表]
-- SQL (hive(ods.最新周期全量切片表)、hive(ods.上一周期全量切片表) ----> hive(dwd.update表))

-- SQL组件1
truncate table test_dwd.dwd_edu_es_tb_im_examinee_update;

-- SQL组件2
insert into test_dwd.dwd_edu_es_tb_im_examinee_update -- 增量、修改数据
	select	-- 新增 INSERT
		a.id, a.name, a.birthdate, a.idcard, a.sex, a.height, a.school_name
		,a.etl_time -- (最新周期)切片表的etl_time
		,'INSERT' as etl_data_state 
	from (
		select 
			id, name, birthdate, idcard, sex, height, school_name
			,etl_time
			-- ,concat(id, ',', name, ',', birthdate, ',', idcard, ',', sex, ',', height, ',', school_name) as linedata
			-- ,md5(concat(id, ',', name, ',', birthdate, ',', idcard, ',', sex, ',', height, ',', school_name)) as md5_linedata
		from test_ods.ods_edu_es_tb_fm_examinee
	) a 
	left join test_ods.ods_edu_es_tb_fm_examinee_previous b 
		on a.id = b.id
		where b.id is null

	union

	select	-- 删除 DELETE
		a.id, a.name, a.birthdate, a.idcard, a.sex, a.height, a.school_name
		, b.etl_time -- (最新周期)切片表的etl_time (NULL)
		,'DELETE' as etl_data_state
	from (
		select 
			id, name, birthdate, idcard, sex, height, school_name
			-- ,concat(id, ',', name, ',', birthdate, ',', idcard, ',', sex, ',', height, ',', school_name) as linedata
			-- ,md5(concat(id, ',', name, ',', birthdate, ',', idcard, ',', sex, ',', height, ',', school_name)) as md5_linedata
		from test_ods.ods_edu_es_tb_fm_examinee_previous
	) a 
	left join test_ods.ods_edu_es_tb_fm_examinee b 
		on a.id = b.id
		where b.id is null
	
	union

	select -- 修改 UPDATE
		a.id, a.name, a.birthdate, a.idcard, a.sex, a.height, a.school_name
		, a.etl_time -- (最新周期)切片表的 etl_time
		,'UPDATE' as etl_data_state
	from (
		select 
			id, name, birthdate, idcard, sex, height, school_name
			,etl_time
			-- ,concat(ifnull(id, '-'), ',', ifnull(name, '-'), ',', ifnull(birthdate, '-'), ',', ifnull(idcard, '-'), ',', ifnull(sex, '-'), ',', ifnull(height, '-'), ',', ifnull(school_name, '-') ) as linedata,
			,md5( concat(nvl(id, '-'), ',', nvl(name, '-'), ',', nvl(birthdate, '-'), ',', nvl(idcard, '-'), ',', nvl(sex, '-'), ',', nvl(height, '-'), ',', nvl(school_name, '-') ) ) as md5_linedata
		from test_ods.ods_edu_es_tb_fm_examinee
	) a 
	inner join test_ods.ods_edu_es_tb_fm_examinee_previous b 
		on a.id = b.id 
		where a.md5_linedata != md5( concat(nvl(b.id, '-'), ',', nvl(b.name, '-'), ',', nvl(b.birthdate, '-'), ',', nvl(b.idcard, '-'), ',', nvl(b.sex, '-'), ',', nvl(b.height, '-'), ',', nvl(b.school_name, '-') ) )

【step7】 T+1周期时,[拉链表]+[更新数据集]合并覆盖抽取到[拉链表]

-- ktr_job_dwd_dwd-dwd_edu_es_tb_fm_examinee_his (使用JOB编排流程,TR不支持流程编排,其并行执行会导致结果错误)
-- ETL TASK [最新数据]
-- SQL (hive(dwd.his拉链表表)、hive(dwd.update表) ----> hive(dwd.his拉链表))

-- SQL组件
CREATE TABLE IF NOT EXISTS test_dwd.dwd_edu_es_tb_fm_examinee_his_tmp
STORED AS ORC
AS

SELECT
    id, name, birthdate, idcard, sex, height, school_name, etl_time, etl_data_state, his_start_time, his_end_time
FROM
    test_dwd.dwd_edu_es_tb_fm_examinee_his
WHERE id= 0

-- 空操作组件

-- SQL组件
INSERT OVERWRITE TABLE test_dwd.dwd_edu_es_tb_fm_examinee_his_tmp
SELECT
 id, name, birthdate, idcard, sex, height, school_name, etl_time, etl_data_state, his_start_time, his_end_time 
FROM 
( 

    SELECT -- 筛选出 UPDATE 、 DELETE 的记录
	a.id, a.name, a.birthdate, a.idcard, a.sex, a.height, a.school_name,
	a.etl_time, a.etl_data_state,
	a.his_start_time, 
           	(CASE 
                		WHEN a.his_end_time = '9999-12-31' AND b.id IS NOT NULL THEN from_unixtime(unix_timestamp(current_timestamp()), 'YYYY-MM-dd HH:mm:ss')
			-- WHEN a.his_end_time = '9999-12-31' AND b.id IS NOT NULL THEN from_unixtime(unix_timestamp('2021-04-26'), '%Y-%m-%d %H:%i:%s') 
                	ELSE a.his_end_time END 
	) AS his_end_time
    FROM test_dwd.dwd_edu_es_tb_fm_examinee_his  AS a
    LEFT JOIN test_dwd.dwd_edu_es_tb_im_examinee_update AS b
    ON a.id = b.id

    -- select *, from_unixtime(unix_timestamp('2021-04-27'), '%Y-%m-%d %H:%i:%s') as his_start_time, '9999-12-31 00:00:00' AS his_end_time from test_dwd.dwd_edu_es_tb_im_examinee_update

    UNION -- UNION 去重 (UNION ALL 不去重)

    SELECT -- 筛选出 INSERT 的记录
	c.id, c.name, c.birthdate, c.idcard, c.sex, c.height, c.school_name,
	c.etl_time, c.etl_data_state,
           	-- from_unixtime(unix_timestamp('2021-04-27'), '%Y-%m-%d %H:%i:%s') AS his_start_time, 
	from_unixtime(unix_timestamp(current_timestamp()), 'YYYY-MM-dd HH:mm:ss') AS his_start_time,
           	-- from_unixtime(unix_timestamp('9999-12-31'), '%Y-%m-%d %H:%i:%s') AS his_end_time
	'9999-12-31 00:00:00' AS his_end_time
    FROM test_dwd.dwd_edu_es_tb_im_examinee_update AS c 

) AS T

-- SQL组件
TRUNCATE TABLE test_dwd.dwd_edu_es_tb_fm_examinee_his

-- SQL组件
INSERT OVERWRITE TABLE test_dwd.dwd_edu_es_tb_fm_examinee_his 
SELECT id,name,birthdate,idcard, sex, height, school_name, etl_time, etl_data_state, his_start_time,his_end_time FROM test_dwd.dwd_edu_es_tb_fm_examinee_his_tmp
-- SELECT * FROM test_dwd.dwd_edu_es_tb_fm_examinee_his_tmp; -- hive insert into select 时不能用 select *, 否制: 与其表内字段匹配不上,数据 全为 null

X 参考文献

Y 彩蛋

彩蛋1

建议1: 用于比对当前数据行是否被修改的MD5值,建议在数据入库时即单独存储好。(避免后期运算量太大,影响性能)

彩蛋2

with tmp_t_his as (
    select
        id, name, birthdate, idcard, sex, height, school_name
         ,dwd_time, dwd_start_time, dwd_end_time, data_state
         ,md5( concat(nvl(id, '-'), ',', nvl(name, '-'), ',', nvl(birthdate, '-'), ',', nvl(idcard, '-'), ',', nvl(sex, '-'), ',', nvl(height, '-'), ',', nvl(school_name, '-') ) ) as md5_linedata
    FROM
        test_dwd.dwd_examinee_his
    WHERE
       dwd_end_time >= to_date('9999-12-31', 'yyyy-MM-dd') -- 从拉链表中筛选出 T 周期的(有效)数据集
    -- limit 5
)
insert overwrite table test_dwd.dwd_examinee_his_tmp
select -- 1 拉链表 [0, T-1]
    id, name, birthdate, idcard, sex, height, school_name
    ,dwd_time, dwd_start_time, dwd_end_time, data_state
FROM
    test_dwd.dwd_examinee_his
WHERE 1 = 1 AND dwd_end_time < to_date('9999-12-31', 'yyyy-MM-dd') -- [0, T-1] 周期 的 历史记录
-- limit 5

union

select -- 2 拉链表 [T]  未被 UPDATE的 (保持原样即可) +  被 UPDATE 的 (拉链表T周期的原记录 : end_time 标记为当前时间戳 ; 数据状态 被标记为 UPDATED)
    b.id, b.name, b.birthdate, b.idcard, b.sex, b.height, b.school_name
     , a.dwd_time, a.dwd_start_time, -- a.dwd_end_time, 
	 (CASE 
	    WHEN md5( concat(nvl(a.id, '-'), ',', nvl(a.name, '-'), ',', nvl(a.birthdate, '-'), ',', nvl(a.idcard, '-'), ',', nvl(a.sex, '-'), ',', nvl(a.height, '-'), ',', nvl(a.school_name, '-') ) )= md5( concat(nvl(b.id, '-'), ',', nvl(b.name, '-'), ',', nvl(b.birthdate, '-'), ',', nvl(b.idcard, '-'), ',', nvl(b.sex, '-'), ',', nvl(b.height, '-'), ',', nvl(b.school_name, '-') ) )
	      THEN  current_timestamp() -- 标识 此数据行的生命周期在此刻结束
	   ELSE a.dwd_end_time
	  END) as dwd_end_time,
	 (CASE 
	    WHEN md5( concat(nvl(a.id, '-'), ',', nvl(a.name, '-'), ',', nvl(a.birthdate, '-'), ',', nvl(a.idcard, '-'), ',', nvl(a.sex, '-'), ',', nvl(a.height, '-'), ',', nvl(a.school_name, '-') ) ) = md5( concat(nvl(b.id, '-'), ',', nvl(b.name, '-'), ',', nvl(b.birthdate, '-'), ',', nvl(b.idcard, '-'), ',', nvl(b.sex, '-'), ',', nvl(b.height, '-'), ',', nvl(b.school_name, '-') ) )
	      THEN 'UPDATED' -- 标识 此数据行的生命周期在此刻结束
	   ELSE a.data_state
	  END) as data_state
from  test_dwd.dwd_examinee_his a
inner join test_ods.ods_examinee b on a.id = b.id 
where 1 = 1 
  AND a.dwd_end_time >= to_date('9999-12-31', 'yyyy-MM-dd')
  -- AND a.md5_linedata = md5( concat(nvl(b.id, '-'), ',', nvl(b.name, '-'), ',', nvl(b.birthdate, '-'), ',', nvl(b.idcard, '-'), ',', nvl(b.sex, '-'), ',', nvl(b.height, '-'), ',', nvl(b.school_name, '-') ) )

-- union

-- select -- 3 拉链表 [T]  被 UPDATE 的 (拉链表T周期的原记录 : end_time 标记为当前时间戳 ; 数据状态 被标记为 UPDATED)
--    a.id, a.name, a.birthdate, a.idcard, a.sex, a.height, a.school_name
--     , a.dwd_time, a.dwd_start_time, current_timestamp as dwd_end_time,'UPDATED' as data_state
     -- ,'UPDATE' as etl_data_state
-- from  tmp_t_his a
-- inner join test_dwd.dwd_examinee_his b on a.id = b.id
-- where 1 = 1 AND a.md5_linedata = md5( concat(nvl(b.id, '-'), ',', nvl(b.name, '-'), ',', nvl(b.birthdate, '-'), ',', nvl(b.idcard, '-'), ',', nvl(b.sex, '-'), ',', nvl(b.height, '-'), ',', nvl(b.school_name, '-') ) )

union

select -- 4 修改 UPDATE
    a.id, a.name, a.birthdate, a.idcard, a.sex, a.height, a.school_name
     , b.dwd_time,  b.dwd_start_time, '9999-12-31 00:00:00' as dwd_end_time,  -- 'UPDATE' as data_state
	-- (CASE 
	 --   WHEN b.md5_linedata = md5( concat(nvl(a.id, '-'), ',', nvl(a.name, '-'), ',', nvl(a.birthdate, '-'), ',', nvl(a.idcard, '-'), ',', nvl(a.sex, '-'), ',', nvl(a.height, '-'), ',', nvl(a.school_name, '-') ) )
	 --     THEN 'UPDATED'
	 --  ELSE a.data_state
	  -- END) as data_state
	  'UPDATED' as data_state
from  test_ods.ods_examinee a
inner join tmp_t_his b on a.id = b.id
where 1 = 1 AND b.md5_linedata = md5( concat(nvl(a.id, '-'), ',', nvl(a.name, '-'), ',', nvl(a.birthdate, '-'), ',', nvl(a.idcard, '-'), ',', nvl(a.sex, '-'), ',', nvl(a.height, '-'), ',', nvl(a.school_name, '-') ) )

union

select	-- 5 新增 INSERT
    a.id, a.name, a.birthdate, a.idcard, a.sex, a.height, a.school_name
    , current_timestamp as dwd_time,  current_timestamp as dwd_start_time, '9999-12-31 00:00:00' as dwd_end_time,  'INSERT' as data_state
     --  ,'INSERT' as etl_data_state
from test_ods.ods_examinee a
left join tmp_t_his b on a.id = b.id
where 1 = 1 AND b.id is null

彩蛋3

赞赏-支付宝二维码
本文作者千千寰宇
本文链接 https://www.cnblogs.com/johnnyzen
关于博文:评论和私信会在第一时间回复,或直接私信我。
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
声援博主:若本文对您有帮助,可点击右下角推荐一下。您的鼓励、【赞赏】(左侧赞赏支付码)是博主技术写作的重要动力!
原文地址:https://www.cnblogs.com/johnnyzen/p/14702870.html