欧莱雅实时数仓设计与实现

系统分析
需求分析
公司现有多个的销售渠道,如京东、天猫等分销商,线下门店,以及品牌自身的官网和微信小程序等。

其中官网和微信小程序因为是公司内部管理的,所以数据可以及时的取到;而其他部分的销售数据都是第二天由各渠道统一推送。

随着业务的深入,对于数据的实时性要求越来越高,尤其是官网、微信小程序的数据,在实时数仓建设以前,已经由每天计算一次改为每小时计算一次。而数据原先是以csv文件形式给到IT部门,按照旧的架构,需要先把csv导入到内部SQL Server之后,再从SQL Server抽取数据到hdfs。这就意味着,我们需要每小时将这部分数据导入SQL Server,并做一次ETL。显然,对于现有的技术水平来说,这种处理方式已经落伍了。

由传统的ETL向流处理的转变迫在眉睫。

可行性分析
1.技术可行性
实时数仓这个字眼再目前已经算不上新颖了,很多大公司都已经部署了自己的实时数仓。目前市面上也有许多实时数仓的解决方案。通常以Flink、Spark-Streaming作为计算引擎,Kafka作为数仓中转与存储,HBase、KUDU、ClickHouse、Hive等OLAP作为数据落地层,并以Impala、Presto、Spark-SQL等对落地后的数据做查询。其各个组件都已发展成熟,并有成套的解决方案。

2.业务可行性
首先,实时数仓是对传统数仓的补充。

实时数仓不是用来取代传统数仓的,而是对传统数仓的补充。就拿需求分析中的例子来说,我们只要求部分渠道的订单数据具有实时性,而其他的仍然以T+1的模式计算,因为分销商的数据只有在第二天才能给到我们,等以后技术水平再次提升,打破了其中的数据壁垒,再考虑将其向实时数仓转变。再比如一些维度数据,本身更新频率就很低,T+1的模式完全能够满足其业务需要,也不需要做成实时数仓。

所以,传统数仓与实时数仓应该相辅相成,对于需要实时性的数据使用实时数仓,不需要实时性的仍以传统数仓为主,两者结合使用。

其次,实时数仓应该与传统数仓同时存在。

对于公司现有的业务流程来说,很多地方都需要人工参与,而有人的地方就有错误。

比如在人工上传某些数据时,发现多传了一份,如果是基于传统数仓,只要在做ETL前将这部分数据删除即可。

但是在实时数仓的情况下,可能上传的数据马上就被ETL了。到时候就得在应用数据时手动排除这部分数据(而且是在发现了这个问题的情况下,目前公司数仓业务分为多个部门,上传数据的和做ETL的也许互不相识,上传者根本不会告诉业务部门他多传了一份数据)。

所以项目初期,我们需要实时数仓和传统数仓一起跑,并每日校验两部分数据,观察实时数仓是否存在问题,并逐步规范化业务流程,减少人为干预,等到时机成熟后,才可以考虑将这块业务交由实时数仓实现。

总体架构模块介绍
原技术架构介绍


数据源层:在原先的数仓中,数据源头是来自维护人员每天不定时多次上传至服务器的CSV文件。由于这部分数据要给大数据部门和其他部门同时使用,所以先将CSV的数据导入到共有的SQL Server中。

数据抽取层:由于公司技术体系较旧,所以使用的抽数工具是Imformatica。将数据从SQL Server抽取到HDFS。其实使用Sqoop作为抽数工具可以提高效率。

数据计算层:即数据的ETL流程。

tmp:从SQL Server抽取到的数据会先放在tmp层,因为Imformatica抽取的数据是txt格式的,如果直接保存下来,会占用比较多的空间,且txt的查询效率较低。我们将数据以txt的格式暂时放在tmp层,然后将其转换成parquet格式再在src层永久保存。tmp层只存放当次抽取的数据,每次抽取都会覆盖原先的数据。
src:即ODS层,将数据原封不动保存在src层。
dwd:对src的数据做简单的去重、过滤。
dws:将dws中的数据与其他维度表做关联,最终形成一张大宽表。
数据应用层:经过ETL之后的数据,可供数据分析师查询分析;用机器学习做预测分析;以及用BI工具做报表展示。

新实时数仓技术架构介绍


新版的实时数仓与旧版离线数仓的区别在于数据抽取层和数据计算层。实时数仓的特征之一就是实时存,离线算。也就是说数据的ETL采用流式处理来做,但是在数据的应用层面仍以离线计算为主,使每次对数据做分析时,保证用到的数据都是最新的。

数据抽取层

由于历史原因,我们仍然只能得到CSV形式的数据。所以,在实时数仓中,我们需要将CSV里的数据第一时间写入Kafka。考虑到Flume比自己开发Kafka生产程序可靠,而业务对数据的实时性也允许些许延迟,所以我们没有直接将这部分数据发送到kafka,而是在中间加了一层Flume,使用Flume将文件写入到Kafka。

Flume可以监控指定目录下指定的文件变化,将文件数据写入下游数据节点。我们在使用Flume时,选用了Taildir Source、File Channel和Kafka Sink。由于Flume不支持CSV格式的文件内容,所以在用户上传CSV文件后,后台将CSV文件内容转换成JSON,并写入一个文本文件中,再将这个文本文件放入Flume监控的目录下。

数据计算层

数据计算层我们只分了src、dwd、dws三层,对应的也是离线数仓中的这三层,取消了tmp层;ads层的计算仍然使用Impala等即席查询工具。

src层使用kafka来存储,数据由Flume直接抽取到src层;

在dwd层中会做一些过滤和轻度聚合,如我们一条订单分为两部分:header订单头和items订单详情,其中一个header可能对应多个items,每个items记录了这个订单购买的一种sku的信息。在dwd层会将这两部分的数据做join。

这就出现了常见的双流join问题:如果本次上传的header中,没有对应的items,而对应的items会在下次上传的csv文件中,则本次header就join不上对应的items;相反也是一样。

在上一家公司我们是用Spark-Streaming做计算引擎,处理这种情况时,会使用Redis+Hbase的组合,将未关联上的数据缓存至redis,当处理下一批关联不上的数据时,会在Redis中查找(如果Redis这中也没有对应的另一部分数据,则在Hbase中查找),如果redis中的数据长时间未被关联上,则刷写至Hbase。这样一来可以减少长时间未被关联上的数据占用过多Redis内存,二来可以减少Spark-Streaming与Hbase的交互。

但是,基于Flink的状态管理机制,我们可以用Flink状态代替Redis做为缓存工具,将未关联上的数据先写入状态后端(RockDB、内存或者磁盘等)。再设置定时器,将长时间未被关联上的状态中的数据写入HBase;关联上的则从状态中删除。状态管理机制是Flink的一大亮点,也是一大难点。

在dws层中,将join后得到的dwd层数据再与离线数仓中的维度表做join,然后写入到OLAP中供业务部门使用。在基于Spark-Streaming的实时数仓中,通常将维表数据先存在Hbase或Kudu等低延迟高存储的数据库中,得益于Flink 1.9和1.11的Hive Catlog新特性,现在Flink支持直接使用Hive中的维表数据做join,也可以将join后的数据写入Hive中,而不用使用其他组件,使架构更加轻量化。

详细设计与系统实现
数据抽取层
数据抽取层主要分为两部分模块:

格式转换模块:给用户提供一个文件上传接口,CSV文件上传后自动将CSV中的每行转换成JSON,并保存到一个文本文件中,再将这个文本文件保存至指定目录下。

数据抽取模块:使用Flume监控指定目录,当目录中添加格式转换模块写入的文件后,将这个文件发送到Kafka。

格式转换模块:

模块流程:

用户通过Client上传文件至服务器,如果成功则进入下一步;否则报错。
服务器收到用户上传的文件后,启动格式转换流程,如果成功,则进入下一步;否则报错;
格式转换成功后,将得到的新的文件移动到Flume监控的目录中,本模块结束,进入数据抽取模块。
数据抽取模块:

数据抽取模块使用Flume工具。

使用Taildir Source监控一个目录, 该目录会不定时添加包含JSON内容的文本文件,Taildir Source可以监控目录中文件变化,且支持断点续传。
使用File Channel,从Source中接收到的数据会先存入Channel中。由于业务对数据的实时性并没有非常高,所以没有选择Memory Channel和Kafka Channel,而是选择File Channel将数据暂存在磁盘文件中,提高可靠性。
使用Kafka Sink将channel中的数据写入Kafka。
Flume配置示例如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1

## sources
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/test1.txt
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test1/test2.txt
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

## channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /apps/flume/checkpoint/behavior
a1.channels.c1.dataDirs = /apps/flume/checkpoint/data
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = omstest
a1.sinks.k1.kafka.bootstrap.servers = kudu1:9092,kudu2:9092,kudu3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
数据计算层
数据计算层主要分为两大部分:SRC-DWD的ETL流程、DWD-DWS的ETL流程。

SRC-DWD的ETL流程
以headers和items表为例。

从SRC层kafka sourve创建heasers和items两个DataStream,并将两个流connect起来,再对合并后的ConnectedStream流做关联,由于ConnectedStream中的两个流之间状态共享,所以关联不上的数据可以先存进状态,当下一批数据到来,如果关联不上则在状态中查找,看看能否在状态中关联上,关联上则删除状态中的数据,关联不上则继续在Hbase中关联;并设置一个定时器,当状态中的数据长时间未被删除(即未被关联上)则刷写至HBase。

对于能够关联上的数据,则将关联后的数据写入DWD层的kafka,进行dwd-dws层的逻辑处理。

DWD-DWS的ETL流程
在以前的实时数仓中,通常会将维表的数据先同步到Mysql、HBase或者Redis这些响应速度较快的数据库中,然后将流式的事实数据与这部分离线的维表数据做关联。

得益于Flink的Hive Catlog新特性,现在可以不用先将维度表数据写入其他数据库,而是可以直接将流式数据与Hive中的离线数据进行关联,并且再写回Hive中。虽然会有一定延迟,但是对于实时性要求不高的业务场景来说,这种架构更加的轻量。

关联Hive维表采用了 Flink 的 Temporal Table 的语法,就是把 Hive 的维表作为Temporal Table,然后与流式的表进行 join。具体参见官网:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html
————————————————
版权声明:本文为CSDN博主「啥也不会吴子渲」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/x950913/article/details/109356835

本文来自博客园,作者:Slashout,转载请注明原文链接:https://www.cnblogs.com/SlashOut/p/15502522.html 关注公众号:数字化转型

原文地址:https://www.cnblogs.com/SlashOut/p/15502522.html