数据交换工具Kettle

网上搜集了一些关于开源数据交换工具Kattle的文章,特收藏例如以下:

文章一:ETL和Kettle简单介绍
ETL即数据抽取(Extract)、转换(Transform)、装载(Load)的过程。它是构建数据仓库的重要环节。数据仓库是面向主题的、集成的、稳定的且随时间不断变化的数据集合,用以支持经营管理中的决策制定过程。数据仓库系统中有可能存在着大量的噪声数据,引起的主要原因有:滥用缩写词、惯用语、数据输入错误、反复记录、丢失值、拼写变化等。即便是一个设计和规划良好的数据库系统,假设当中存在着大量的噪声数据,那么这个系统也是没有不论什么意义的,由于垃圾进,垃圾出garbage in, garbage out),系统根本就不可能为决策分析系统提供不论什么支持。为了清除噪声数据,必须在数据库系统中进行数据清洗。眼下有不少数据清洗研究和ETL研究,可是怎样在ETL过程中进行有效的数据清洗并使这个过程可视化,此方面研究不多。本文主要从两个方面阐述ETL和数据清洗的实现过程:ETL的处理方式和数据清洗的实现方法。
(1)       ETL的处理方式
本文所採用的ETL方法是数据库段区域中的ETL处理方式,它不使用外部引擎而是使用数据库作为唯一的控制点。因为源系统SQLserver2000是关系数据库,它的段表也是典型的关系型表。成功地将外部未改动数据加载数据库后,再在数据库内部进行转换。数据库段区域中的ETL处理方式运行的步骤是提取、装载、转换,即通常所说的ELT。这样的方式的长处是为抽取出的数据首先提供一个缓冲以便于进行复杂的转换,减轻了ETL进程的复杂度。
(2)       ETL过程中实现数据清洗的实现方法
首先,在理解源数据的基础上实现数据表属性一致化。为解决源数据的同义异名和同名异义的问题,可通过元数据管理子系统,在理解源数据的同一时候,对不同表的属性名依据其含义又一次定义其在数据挖掘库中的名字,并以转换规则的形式存放在元数据库中,在数据集成的时候,系统自己主动依据这些转换规则将源数据中的字段名转换成新定义的字段名,从而实现数据挖掘库中的同名同义。
其次,通过数据缩减,大幅度缩小数据量。因为源数据量非常大,处理起来非常耗时,所以能够优先进行数据缩减,以提高兴许数据处理分析效率。
最后,通过预先设定数据处理的可视化功能节点,达到可视化的进行数据清洗和数据转换的目的。针对缩减并集成后的数据,通过组合预处理子系统提供各种数据处理功能节点,可以以可视化的方式高速有效完毕数据清洗和数据转换过程。

2.   KETTLE简单介绍
    如今是一个Google的时代,而对于开发人员,开源已成为最重要的參考书。对于某课题,无论你是深入研究还是初窥门径。估且google一把,勾一勾同行的成就,你必会获益良多。
    说到ETL开源项目,Kettle当属翘首,项目名称非常有意思,水壶。按项目负责人Matt的说法:把各种数据放到一个壶里,然后呢,以一种你希望的格式流出。呵呵,外国人都非常有联想力。
    看了提供的文档,然后对公布程序的简单试用后,能够非常清楚得看到Kettle的四大块:
Chef——工作(job)设计工具 (GUI方式)
Kitchen——工作(job)运行器 (命令行方式)
Spoon——转换(transform)设计工具 (GUI方式)
Span——转换(trasform)运行器 (命令行方式)
2.1.Chef——工作(job)设计器
这是一个GUI工具,操作方式主要通过拖拖拉拉,勿庸多言,一看就会。
何谓工作?多个作业项,按特定的工作流串联起来,开成一项工作。正如:我的工作是软件开发。我的作业项是:设计、编码、測试!先设计,假设成功,则编码,否则继续设计,编码完毕则開始设计,周而复始,作业完毕。
2.1.1.    Chef中的作业项包含:
转换:指定更细的转换任务,通过Spoon生成。通过Field来输入參数;
SQLsql语句运行;
FTP:下载ftp文件;
邮件:发送邮件;
检查表是否存在
检查文件是否存在
运行shell脚本:如dos命令。
批处理(注意:windows批处理不能有输出到控制台)
Job:作为嵌套作业使用。
JavaScript运行:这个比較有意思,我看了一下源代码,假设你有自已的Script引擎,能够非常方便的替换成自己定义Script,来扩充其功能;
SFTP:安全的Ftp协议传输;
HTTP方式的上/下传
如上文所述,工作流是作业项的连接方式。分为三种:无条件,成功,失败,为了方便工作流使用,KETTLE提供了几个辅助结点单元(也可将其作为简单的作业项)
Start单元:任务必须由此開始。设计作业时,以此为起点。
OK单元:能够编制做为中间任务单元,且进行脚本编制,用来控制流程。
ERROR单元:用途同上。
DUMMY单元:什么都不做,主要是用来支持多分支的情况,文档中有样例。
支持XML存储,或存储到指定数据库中。
一些默认的配置(如数据库存储位置……),在系统的用户文件夹下,单独建立了一个.Kettle文件夹,用来保存用户的这些设置。
可查看运行日志。
2.2.Kitchen——作业运行器
是一个作业运行引擎,用来运行作业。这是一个命令行运行工具,没啥可讲的,就把它的參数说明列一下。
    -rep      : Repository name   任务包所在存储名
    -user     : Repository username   运行人
    -pass     : Repository password   运行人密码
    -job      : The name of the job to launch 任务包名称
    -dir      : The directory (don't forget the leading / or /)
    -file     : The filename (Job XML) to launch
    -level    : The logging level (Basic, Detailed, Debug, Rowlevel, Error, Nothing) 指定日志级别
    -log      : The logging file to write to 指定日志文件
    -listdir : List the directories in the repository 列出指定存储中的文件夹结构。
    -listjobs : List the jobs in the specified directory 列出指定文件夹下的全部任务
    -listrep : List the defined repositories 列出全部的存储
    -norep    : Don't log into the repository 不写日志
    嗯,竟然不支持调度。看了一下文档,建议使用操作系统提供的调度器来实现调度,比方:Windows能够使用它的任务计划工具。
2.3.Spoon——转换过程设计器
    GUI工作,用来设计数据转换过程,创建的转换能够由Pan来运行,也能够被Chef所包括,作为作业中的一个作业项。
    以下简单列举一下全部的转换过程。(简单描写叙述,具体的可见Spoon文档)
2.3.1.    Input-Steps:输入步骤
l         Text file input:文本文件输入
能够支持多文件合并,有不少參数,基本一看參数名就能明确其意图。
l         Table input:数据表输入
实际上是视图方式输入,由于输入的是sql语句。当然,须要指定数据源(数据源的定制方式在后面讲一下)
l         Get system info:取系统信息
就是取一些固定的系统环境值,如本月最后一天的时间,本机的IP地址之类。
l         Generate Rows:生成多行。
这个须要匹配使用,主要用于生成多行的数据输入,比方配合Add sequence能够生成一个指定序号的数据列。
l         XBase Input
l         Excel Input
l         XML Input
这三个没啥可讲的,看看參数就明了。
l         Text file output:文本文件输出。这个用来作測试蛮好,呵呵。非常方便的看到转换的输出。
l         Table output:输出到目的表。
l         Insert/Update:目的表和输入数据行进行比較,然后有选择的运行添加,更新操作。
l         Update:同上,仅仅是不支持添加操作。
l         XML Output
2.3.3.    Look-up:查找操作
l         Data Base
l         Stream
l         Procedure
l         Database join
l         Select values
对输入的行记录数据的字段进行更改 (更改数据类型,更改字段名或删除) 数据类型变更时,数据的转换有固定规则,可简单定制參数。可用来进行数据表的改装。
l         Filter rows
 对输入的行记录进行指定复杂条件的过滤。用途可扩充sql语句现有的过滤功能。但现有提供逻辑功能超出标准sql的不多。
l         Sort rows
对指定的列以升序或降序排序,当排序的行数超过5000时须要暂时表。
l         Add sequence
为数据流添加一个序列,这个配合其他Step(Generate rows, rows join),能够生成序列表,如日期维度表(年、月、日)
l         Dummy
不做不论什么处理,主要用来作为分支节点。
l         Join Rows
对全部输入流做笛卡儿乘积。
l         Aggregate
聚合,分组处理
l         Group by
分组,用途可扩充sql语句现有的分组,聚合函数。但我想可能会有其他方式的sql语句能实现。
l         Java Script value
使用mozillarhino作为脚本语言,并提供了非常多函数,用户能够在脚本中使用这些函数。
l         Row Normaliser
该步骤能够从透视表中还原数据到事实表,通过指定维度字段及其分类值,度量字段,终于还原出事实表数据。
l         Unique rows
去掉输入流中的反复行,在使用该节点前要先排序,否则仅仅能删除连续的反复行。 
l         Calculator
提供了一组函数对列值进行运算,用该方式比用户自己定义JAVA SCRIPT脚本速度更快。
l         Merge Rows
用于比較两组输入数据,一般用于更新后的数据又一次导入到数据仓库中。
l         Add constants
添加常量值。
l         Row denormaliser
Normaliser过程相反。
l         Row flattener
表扁平化处理,指定需处理的字段和扃平化后的新字段,将其他字段做为组合Key进行扃平化处理。
l         SPLIT FIELDS
按指定分隔符拆分字段
l         EXECUTE SQL SCRIPT
运行SQL语句
l         CUBE INPUT
l         CUBE OUTPUT
l         存储方式:Chef同样。
l         数据源(Connection);见后。
l         Hopssetp连接起来,形成Hops
l         Plugin step types等节点:这个没细致看,不知怎样制作Plugin step
l         LogView:可查看运行日志。
2.4.Pan——转换的运行工具
命令行运行方式,能够运行由Spoon生成的转换任务。相同,不支持调度。參数与Kitchen相似,可參见Pan的文档。
Connection
能够配置多个数据源,在Job或是Trans中使用,这意味着能够实现跨数据库的任务。支持大多数市面上流行的数据库。
2.6.个人感觉:(本人不成熟的看法)
1、 转换功能全,使用简洁。作业项丰富,流程合理。但缺少调度。
2、 java代码,支持的数据源范围广,所以,跨平台性较好。
3、 从实际项目的角度看,和其他开源项目相似,主要还是程序猿的思维,缺少与实际应用项目(专业领域)的很多其他接轨,当然,项目实施者的专注点可能在于一个平台框架,而非实际应用(实际应用须要二次开发)
4、 看过了大多数源代码,发现源代码的可重用性不是太好(缺少大粒度封装),有些关键部分好像有Bug。比方:个别class过于臃肿,线程实现的同步有问题。
5、 提供的工具有些小错,如參数的容错处理。
做数据仓库系统,ETL是关键的一环。说大了,ETL是数据整合解决方式,说小了,就是倒数据的工具。回顾一下工作这么些年来,处理数据迁移、转换的工作倒还真的不少。但是那些工作基本上是一次性工作或者非常小数据量,使用access、 DTS或是自己编个小程序搞定。但是在数据仓库系统中,ETL上升到了一定的理论高度,和原来小打小闹的工具使用不同了。到底什么不同,从名字上就能够看到,人家已经将倒数据的过程分成3个步骤,E、T、L分别代表抽取、转换和装载。
事实上ETL过程就是数据流动的过程,从不同的数据源流向不同的目标数据。但在数据仓库中,ETL有几个特点,一是数据同步,它不是一次性倒完数据就拉到,它是常常性的活动,依照固定周期执行的,甚至如今还有人提出了实时ETL的概念。二是数据量,一般都是巨大的,值得你将数据流动的过程拆分成E、T和L。
如今有许多成熟的工具提供ETL功能,比如datastage、powermart 等,且不说他们的好坏。从应用角度来说,ETL的过程事实上不是很复杂,这些工具给数据仓库project带来和很大的便利性,特别是开发的便利和维护的便利。但还有一方面,开发者easy迷失在这些工具中。举个样例,VB是一种很easy的语言而且也是很易用的编程工具,上手特别快,可是真正VB的高手有多少?微软设计的产品通常有个原则是“将使用者当作傻瓜”,在这个原则下,微软的东西确实很好用,可是对于开发者,假设你自己也将自己当作傻瓜,那就真的傻了。 ETL工具也是一样,这些工具为我们提供图形化界面,让我们将基本的精力放在规则上,以期提高开发效率。从使用效果来说,确实使用这些工具可以很高速地构建一个job来处理某个数据,只是从总体来看,并不见得他的总体效率会高多少。问题主要不是出在工具上,而是在设计、开发者上。他们迷失在工具中,没有去探求ETL的本质。
能够说这些工具应用了这么长时间,在这么多项目、环境中应用,它必定有它成功之处,它必定体现了ETL的本质。假设我们不透过表面这些工具的简单使用去看它背后蕴涵的思想,终于我们作出来的东西也就是一个个独立的job,将他们整合起来仍然有巨大的工作量。大家都知道“理论与实践相结合”,假设在一个领域有所超越,必需要在理论水平上达到一定的高度

4.1.ETL 特点
ETL的过程就是数据流动的过程,从不同异构数据源流向统一的目标数据。其间,数据的抽取、清洗、转换和装载形成串行或并行的过程。ETL的核心还是在于T这个过程,也就是转换,而抽取和装载一般能够作为转换的输入和输出,或者,它们作为一个单独的部件,其复杂度没有转换部件高。和OLTP系统中不同,那里充满这单条记录的insert、update和select等操作,ETL过程一般都是批量操作,比如它的装载多採用批量装载工具,一般都是DBMS系统自身附带的工具,比如Oracle SQLLoader和DB2的autoloader等。
ETL本身有一些特点,在一些工具中都有体现,以下以datastage和powermart举例来说。
1、静态的ETL单元和动态的ETL单元实例;一次转换指明了某种格式的数据怎样格式化成还有一种格式的数据,对于数据源的物理形式在设计时能够不用指定,它能够在执行时,当这个ETL单元创建一个实例时才指定。对于静态和动态的ETL单元,Datastage没有严格区分,它的一个Job就是实现这个功能,在早期版本号,一个Job同一时候不能执行两次,所以一个Job相当于一个实例,在后期版本号,它支持multiple instances,并且还不是默认选项。Powermart中将这两个概念加以区分,静态的叫做Mapping,动态执行时叫做Session。
2、ETL元数据;元数据是描写叙述数据的数据,他的含义很广泛,这里仅指ETL的元数据。主要包含每次转换前后的数据结构和转换的规则。ETL元数据还包含形式參数的管理,形式參数的ETL单元定义的參数,相对还有实參,它是执行时指定的參数,实參不在元数据管理范围之内。
3、数据流程的控制;要有可视化的流程编辑工具,提供流程定义和流程监控功能。流程调度的最小单位是ETL单元实例,ETL单元是不能在细分的ETL过程,当然这由开发人员来控制,比如能够将抽取、转换放在一个ETL单元中,那样这个抽取和转换仅仅能同一时候执行,而假设将他们分作两个单元,能够分别执行,这有利于错误恢复操作。当然,ETL单元到底应该细分到什么程度应该根据详细应用来看,眼下还没有找到非常好的细分策略。比方,我们能够规定将装载一个表的功能作为一个ETL单元,可是不可否认,这种ETL单元之间会有非常多共同的操作,比如两个单元共用一个Hash表,要将这个Hash表装入内存两次。
4、转换规则的定义方法;提供函数集提供经常使用规则方法,提供规则定义语言描写叙述规则。
5、对数据的高速索引;一般都是利用Hash技术,将參照关系表提前装入内存,在转换时查找这个hash表。Datastage中有Hash文件技术,Powermart也有相似的Lookup功能。
4.2.ETL 类型
昨在IT-Director上阅读一篇报告,关于ETL产品分类的。一般来说,我们眼中的ETL工具都是价格昂贵,可以处理海量数据的家伙,可是这是当中的一种。它可以分成4种,针对不同的需求,主要是从转换规则的复杂度和数据量大小来看。它们包含:
1、交互式执行环境,你能够指定数据源、目标数据,指定规则,立刻ETL。这样的交互式的操作无疑很方便,可是仅仅能适合小数据量和复杂度不高的ETL过程,由于一旦规则复杂了,可能须要语言级的描写叙述,不能简简单单拖拖拽拽就能够的。还有数据量的问题,这样的交互式必定建立在解释型语言基础上,另外他的灵活性必定要牺牲一定的性能为代价。所以假设要处理海量数据的话,每次读取一条记录,每次对规则进行解释执行,每次在写入一条记录,这对性能影响是很大的。
2、专门编码型的,它提供了一个基于某种语言的程序框架,你能够不必将编程精力放在一些周边的功能上,比如读文件功能、写数据库的功能,而将精力主要放在规则的实现上面。这样的近似手工代码的性能肯定是没话说,除非你的编程技巧只是关(这也是不可忽视的因素之中的一个)。对于处理大数据量,处理复杂转换逻辑,这样的方式的ETL实现是很直观的。
3、代码生成器型的,它就像是一个ETL代码生成器,提供简单的图形化界面操作,让你拖拖拽拽将转换规则都设定好,事实上他的后台都是生成基于某种语言的程序,要执行这个ETL过程,必需要编译才行。Datastage就是相似这种产品,设计好的job必需要编译,这避免了每次转换的解释执行,可是不知道它生成的中间语言是什么。曾经我设计的ETL工具大挪移事实上也是归属于这一类,它提供了界面让用户编写规则,最后生成C++语言,编译后就可以执行。这类工具的特点就是要在界面上下狠功夫,必须让用户轻松定义一个ETL过程,提供丰富的插件来完毕读、写和转换函数。大挪移在这方面就太弱了,规则必须手写,并且要写成标准c++语法,这未免还是有点难为终于用户了,还不如做成一个专业编码型的产品呢。另外一点,这类工具必须提供面向专家应用的功能,由于它不可能考虑到全部的转换规则和全部的读写,一方面提供插件接口来让第三方编写特定的插件,还有一方面还有提供特定语言来实现高级功能。比如Datastage提供一种类Basic的语言,只是他的Job的脚本化实现好像就做的不太好,仅仅能手工绘制job,而不能编程实现Job。
4、最后另一种类型叫做数据集线器,顾名思义,他就是像Hub一样地工作。将这样的类型分出来和上面几种分类在标准上有所差异,上面三种很多其它指ETL实现的方法,此类主要从数据处理角度。眼下有一些产品属于EAI(Enterprise Application Integration),它的数据集成主要是一种准实时性。所以这类产品就像Hub一样,不断接收各种异构数据源来的数据,经过处理,在实施发送到不同的目标数据中去。
尽管,这些类看似各又千秋,特别在BI项目中,面对海量数据的ETL时,中间两种的选择就開始了,在选择过程中,必需要考虑到开发效率、维护方面、性能、学习曲线、人员技能等各方面因素,当然还有最重要也是最现实的因素就是客户的意象。
4.3.ETL 中的转换-Transication
ETL探求之中的一个中提到,ETL过程最复杂的部分就是T,这个转换过程,T过程到底有哪些类型呢?
从对数据源的整个宏观处理分,看看一个ETL过程的输入输出,能够分成以下几类:
1、大小交,这样的处理在数据清洗过程是常见了,比如从数据源到ODS阶段,假设数据仓库採用维度建模,并且维度基本採用代理键的话,必定存在代码到此键值的转换。假设用SQL实现,必定须要将一个大表和一堆小表都Join起来,当然假设使用ETL工具的话,一般都是先将小表读入内存中再处理。这样的情况,输出数据的粒度和大表一样。
2、大大交,大表和大表之间关联也是一个重要的课题,当然当中要有一个主表,在逻辑上,应当是主表Left Join辅表。大表之间的关联存在最大的问题就是性能和稳定性,对于海量数据来说,必须有优化的方法来处理他们的关联,另外,对于大数据的处理无疑会占用太多的系统资源,出错的几率很大,怎样做到有效错误恢复也是个问题。对于这样的情况,我们建议还是尽量将大表拆分成适度的稍小一点的表,形成大小交的类型。这类情况的输出数据粒度和主表一样。
3、站着进来,躺着出去。事务系统中为了提高系统灵活性和扩展性,非常多信息放在代码表中维护,所以它的“事实表”就是一种窄表,而在数据仓库中,通常要进行宽化,从行变成列,所以称这样的处理情况叫做“站着进来,躺着出去”。大家对 Decode肯定不陌生,这是进行宽表化常见的手段之中的一个。窄表变宽表的过程主要体如今对窄表中那个代码字段的操作。这样的情况,窄表是输入,宽表是输出,宽表的粒度必然要比窄表粗一些,就粗在那个代码字段上。
4、聚集。数据仓库中重要的任务就是沉淀数据,聚集是不可缺少的操作,它是粗化数据粒度的过程。聚集本身事实上非常easy,就是相似SQL中Group by的操作,选取特定字段(维度),对度量字段再使用某种聚集函数。可是对于大数据量情况下,聚集算法的优化仍是探究的一个课题。比如是直接使用SQL的 Group by,还是先排序,在处理。
从数据的转换的微观细节分,能够分成以下的几个基本类型,当然另一些复杂的组合情况,比如先运算,在參照转换的规则,这样的基于基本类型组合的情况就不在此列了。ETL的规则是依赖目标数据的,目标数据有多少字段,就有多少条规则。
1、直接映射,原来是什么就是什么,原封不动照搬过来,对这种规则,假设数据源字段和目标字段长度或精度不符,须要特别注意看是否真的能够直接映射还是须要做一些简单运算;
2、字段运算,数据源的一个或多个字段进行数学运算得到的目标字段,这样的规则一般对数值型字段而言;
3、參照转换,在转换中通常要用数据源的一个或多个字段作为Key。
4.4.ETL中数据质量
“不要绝对的数据准确,但要知道为什么不准确。”这是我们在构建BI系统是对数据准确性的要求。确实,对绝对的数据准确谁也没有把握,不仅是系统集成商,包含客户也是无法确定。准确的东西须要一个标准,但首先要保证这个标准是准确的,至少如今还没有这样一个标准。客户会提出一个相对标准,比如将你的OLAP数据结果和报表结果对照。尽管这是一种不太公平的比較,你也仅仅好认了吧。
首先在数据源那里,已经非常难保证数据质量了,这一点也是事实。在这一层有哪些可能原因导致数据质量问题?能够分为以下几类:
1、数据格式错误,比如缺失数据、数据值超出范围或是数据格式非法等。要知道对于相同处理大数据量的数据源系统,他们一般会舍弃一些数据库自身的检查机制,比如字段约束等。他们尽可能将数据检查在入库前保证,可是这一点是非常难确保的。这类情况诸如身份证号码、手机号、非日期类型的日期字段等。
2、数据一致性,相同,数据源系统为了性能的考虑,会在一定程度上舍弃外键约束,这一般会导致数据不一致。比如在帐务表中会出现一个用户表中没有的用户ID,在比如有些代码在代码表中找不到等。
3、业务逻辑的合理性,这一点非常难说对与错。通常,数据源系统的设计并非非常严谨,比如让用户开户日期晚于用户销户日期都是有可能发生的,一个用户表中存在多个用户ID也是有可能发生的。对这样的情况,有什么办法吗?
构建一个BI系统,要做到全然理解数据源系统根本就是不可能的。特别是数据源系统在交付后,有很多其它维护人员的即兴发挥,那更是要花大量的时间去寻找原因。以前以前争辩过设计人员对规则描写叙述的问题,有人提出要在ETL開始之前务必将全部的规则弄得一清二楚。我并不允许这种意见,倒是觉得在ETL过程要有处理这些质量有问题数据的保证。一定要正面这些脏数据,是丢弃还是处理,无法逃避。假设没有质量保证,那么在这个过程中,错误会逐渐放大,抛开数据源质量问题,我们再来看看ETL过程中哪些因素对数据准确性产生重大影响。
1、规则描写叙述错误。上面提到对设计人员对数据源系统理解的不充分,导致规则理解错误,这是一方面。还有一方面,是规则的描写叙述,假设无二义性地描写叙述规则也是要探求的一个课题。规则是依附于目标字段的,在探求之三中,提到规则的分类。可是规则总不能总是用文字描写叙述,必须有严格的数学表达方式。我甚至想过,假设设计人员可以使用某种规则语言来描写叙述,那么我们的ETL单元就行自己主动生成、同步,省去非常多手工操作了。
2、ETL开发错误。即时规则非常明白,ETL开发的过程中也会发生一些错误,比如逻辑错误、书写错误等。比如对于一个分段值,开区间闭区间是须要指定的,可是经常开发者没注意,一个大于等于号写成大于号就导致数据错误。
3、人为处理错误。在总体ETL流程没有完毕之前,为了图省事,一般会手工执行ETL过程,这当中一个重大的问题就是你不会依照正常流程去执行了,而是依照自己的理解去执行,发生的错误可能是误删了数据、反复装载数据等。
4.5.ETL数据质量保证
上回提到ETL数据质量问题,这是无法根治的,仅仅能採取特定的手段去尽量避免,并且必需要定义出度量方法来衡量数据的质量是好还是坏。对于数据源的质量,客户对此应该更加关心,假设在这个源头不能保证比較干净的数据,那么后面的分析功能的可信度也都成问题。数据源系统也在不断进化过程中,客户的操作也在逐渐规范中,BI系统也相同如此。本文探讨一下对数据源质量和ETL处理质量的应对方法。
怎样应对数据源的质量问题?记得在onteldatastage列表中也讨论过一个话题-"-1的处理",在数据仓库模型维表中,通常有一条-1记录,表示“未知”,这个未知含义可广了,不论什么可能出错的数据,NULL数据甚至是规则没有涵盖到的数据,都转成-1。这是一种处理脏数据的方法,但这也是一种掩盖事实的方法。就好像写一个函数FileOpen(filename),返回一个错误码,当然,你能够只返回一种错误码,如-1,但这是一种不好的设计,对于调用者来说,他须要根据这个错误码进行某些推断,比如是文件不存在,还是读取权限不够,都有对应的处理逻辑。数据仓库中也是一样,所以,建议将不同的数据质量类型处理结果分别转换成不同的值,譬如,在转换后,-1表示參照不上,-2表示NULL数据等。只是这只对付了上回提到的第一类错误,数据格式错误。对于数据一致性和业务逻辑合理性问题,这仍有待探求。但这里有一个原则就是“必须在数据仓库中反应数据源的质量”。
对于ETL过程中产生的质量问题,必须有保障手段。从以往的经验看,没有保障手段给实施人员带来麻烦重重。实施人员对于重复装载数据一定不会陌生,甚至是最后数据留到最后的Cube,才发现了第一步ETL事实上已经错了。这个保障手段就是数据验证机制,当然,它的目的是可以在ETL过程中监控数据质量,产生报警。这个模块要将实施人员当作是终于用户,可以说他们是数据验证机制的直接收益者。
首先,必须有一个对质量的度量方法,什么是高质什么是低质,不能靠感官感觉,但这却是在没有度量方法条件下通常的做法。那经营分析系统来说,联通总部曾提出測试规范,这事实上就是一种度量方法,比如指标的误差范围不能高于5%等,对系统本身来说事实上必需要有这种度量方法,先不要说这个度量方法是否科学。对于ETL数据处理质量,他的度量方法应该比联通总部測试规范定义的方法更要严格,由于他很多其它将BI系统看作一个黑盒子,从数据源到展现的数据误差同意一定的误差。而ETL数据处理质量度量是一种白盒的度量,要注重每一步过程。因此理论上,要求输入输出的指标应该全然一致。可是我们必须正面全然一致仅仅是理想,对于有误差的数据,必须找到原因。
在质量度量方法的前提下,就能够建立一个数据验证框架。此框架根据总量、分量数据稽核方法,该方法在高的《数据仓库中的数据稽核技术》一文中已经指出。作为补充,以下提出几点功能上的建议:
1、提供前端。将开发实施人员当作用户,相同也要为之提供友好的用户界面。《稽核技术》一文中指出測试报告的形式,这样的形式还是要依赖人为推断,在一堆数据中去找规律。到不如用OLAP的方式提供界面,不光是加上測试统计出来的指标结果,而且配合度量方法的计算。比如误差率,对于误差率为大于0的指标,就要好好查一下原因了。
2、提供框架。数据验证不是一次性工作,而是每次ETL过程中都必须做的。因此,必须有一个框架,自己主动化验证过程,并提供扩展手段,让实施人员能够添加验证范围。有了这样一个框架,事实上它起到规范化操作的作用,开发实施人员能够将主要精力放在验证脚本的编写上,而不必过多关注验证怎样融合到流程中,怎样展现等工作。为此,要设计一套表,相似于DM表,每次验证结果数据都记录当中,而且自己主动触发多维分析的数据装载、公布等。这样,实施人员能够在每次装载,甚至在流程过程中就能够观察数据的误差率。特别是,假设数据仓库的模型能够统一起来,甚至数据验证脚本都能够确定下来,剩下的就是规范流程了。
3、规范流程。上回提到有一种ETL数据质量问题是因为人工处理导致的,其中最主要原因还是流程不规范。开发实施人员执行单独一个ETL单元是非常方便的,尽管曾经曾建议一个ETL单元必须是“可重入”的,这可以解决误删数据,反复装载数据问题。但要记住数据验证也是在流程其中,要让数据验证可以日常运作,就不要让实施者感觉到他的存在。总的来说,规范流程是提高实施效率的关键工作,这也是以后要继续探求的。
对于元数据(Metadata)的定义到眼下为止没有什么特别精彩的,这个概念很广,一般都是这样定义,“元数据是描写叙述数据的数据(Data about Data)”,这造成一种递归定义,就像问小强住在哪里,答,在旺財隔壁。依照这种定义,元数据所描写叙述的数据是什么呢?还是元数据。这样就可能有元元元...元数据。我还听说过一种对元数据,假设说数据是一抽屉档案,那么元数据就是分类标签。那它和索引有什么差别?
元数据体现是一种抽象,哲学家从古至今都在抽象这个世界,力图找到世界的本质。抽象不是一层关系,它是一种逐步由详细到一般的过程。比如我->男人->人->哺乳动物->生物这就是一个抽象过程,你要是在软件业混会发现这个样例非经常见,面向对象方法就是这样一种抽象过程。它对世界中的事物、过程进行抽象,使用面向对象方法,构建一套对象模型。相同在面向对象方法中,类是对象的抽象,接口又是对类的抽象。因此,我觉得能够将“元”和“抽象”换一下,叫抽象数据是不是好理解一些。
常听到这种话,“xx领导的讲话高屋建瓴,给我们后面的工作指引的清晰的方向”,这个成语“高屋建瓴”,站在10楼往下到水,居高临下,能砸死人,这是指站在一定的高度看待事物,这个一定的高度就是指他有够“元”。在设计模式中,强调要对接口编程,就是说你不要处理这类对象和那类对象的交互,而要处理这个接口和那个接口的交互,先别管他们内部是怎么干的。
元数据存在的意义也在于此,尽管上面说了一通都撤到哲学上去,但这个词必须还是要结合软件设计中看,我不知道在别的领域是不是存在Metadata这种叫法,尽管我相信别的领域必定有相似的东东。元数据的存在就是要做到在更高抽象一层设计软件。这肯定有优点,什么灵活性啊,扩展性啊,可维护性啊,都能得到提高,并且架构清晰,仅仅是弯弯太多,要是从下往上看,太复杂了。非常早曾经,我曾看过 backorifice的代码,我靠,一个简单的功能,从这个类转到父类,又转到父类,非常不理解,为什么一个简单的功能不在一个类的方法中实现就拉到了呢?如今想想,还真不能这样,这尽管使代码easy看懂了,可是结构确实混乱的,那他仅仅能干如今的事,假设有什么功能扩展,这些代码就废了。
我从98年刚工作时就開始接触元数据的概念,当时叫做元数据驱动的系统架构,后来在 QiDSS中也用到这个概念构建QiNavigator,可是如今觉得元数据也没啥,不就是建一堆表描写叙述界面的元素,再利用这些数据自己主动生成界面吗。到了数据仓库系统中,这个概念更强了,是数据仓库中一个重要的部分。可是至今,我还是觉得这个概念过于玄乎,看不到实际的东西,市面上有一些元数据管理的东西,可是从应用情况就得知,用的不多。之所以玄乎,就是由于抽象层次没有分清楚,关键就是对于元数据的分类(这样的分类就是一种抽象过程)和元数据的使用。你能够将元数据抽象成0和1,可是那样对你的业务实用吗?必须还得抽象到适合的程度,最后问题还是“度”。
数据仓库系统的元数据作用怎样?还不就是使系统自己主动运转,易于管理吗?要做到这一步,可不是必需将系统抽象到太极、两仪、八卦之类的,业界也曾定义过一些元数据规范,向CWM、XMI等等,能够借鉴,只是俺对此也是不精通的说,以后再说。


文章二:Kattle API 实战
前言:

为什么要用Kettle和KETTLE JAVA API?
Kettle是什么?kettle:是一个开源ETL工具。kettle提供了基于java的图形化界面,使用非常方便,kettle的ETL工具集合也比較多,经常使用的ETL工具都包括了。

为什么使用KETTLE JAVA API:就像kettle文档所说:KETTLE JAVA API : Program your own Kettle transformation,kettle提供了基于 JAVA的脚步编写功能,能够灵活地自己定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序猿须要做的工作,而不仅是象使用word一样操作 kettle用户界面。

KETTLE JAVA API 实战操作记录:

、         搭建好开发环境 :到http://www.kettle.be站点下载kettle的源代码包,加压缩,比如解压缩到d:/kettle文件夹

、         打开eclipse,新建一个项目,要使用jdk1.5.0,由于kettle的要使用System.getenv(),仅仅有在jdk1.5.0才被支持。提起getenv(),好像有一段几起几落的记录,曾一度被抛弃,如今又被jdk1.5支持了.

、         建一个class : TransBuilder.java,能够把d:/kettle/ extra/TransBuilder.java的内容原样复制到你的TransBuilder.java里。

、         根据须要编辑源代码。并须要对原程序进行例如以下改动,在头部添加:

import org.eclipse.swt.dnd.Transfer;

//这个包被遗漏了,原始位置kettle根文件夹/libswt/win32/swt.jar

//add by chq(www.chq.name) on  2006.07.20

(后来发现,不必加这个引用,由于编译时不须要)

、         编译准备,在eclipse中添加jar包,主要包括(主要根据extra/TransBuilder.bat):

/lib/kettle.jar
/libext/CacheDB.jar
/libext/SQLBaseJDBC.jar
/libext/activation.jar
/libext/db2jcc.jar
/libext/db2jcc_license_c.jar
/libext/edtftpj-1.4.5.jar
/libext/firebirdsql-full.jar
/libext/firebirdsql.jar
/libext/gis-shape.jar
/libext/hsqldb.jar
/libext/ifxjdbc.jar
/libext/javadbf.jar
/libext/jconn2.jar
/libext/js.jar
/libext/jt400.jar
/libext/jtds-1.1.jar
/libext/jxl.jar
/libext/ktable.jar
/libext/log4j-1.2.8.jar
/libext/mail.jar
/libext/mysql-connector-java-3.1.7-bin.jar
/libext/ojdbc14.jar
/libext/orai18n.jar
/libext/pg74.215.jdbc3.jar
/libext/edbc.jar

(注意 :以下这个包被遗漏了,要加上。原始位置kettle根文件夹/libswt/win32/swt.jar)
/libswt/win32/swt.jar 

、         编译成功后,准备执行

为使程序不必登陆就能够执行,须要设置环境署文件:kettle.properties,位置在用户文件夹里,一般在 /Documents and Settings/用户/.kettle/,主要内容例如以下:

KETTLE_REPOSITORY=kettle@m80

KETTLE_USER=admin

KETTLE_PASSWORD=passwd

、         好了,如今能够执行一下了,看看数据是不是已经复制到目标表了。

以下是执行时的控制台信息输出:



以下是自己主动生成的Transformation :



以下为改动后的程序源代码:


--------------------------------------------------------------------------------

  1. package name.chq.test;

  2.  

  3. import java.io.DataOutputStream;

  4. import java.io.File;

  5. import java.io.FileOutputStream;

  6.  

  7. import be.ibridge.kettle.core.Const;

  8. import be.ibridge.kettle.core.LogWriter;

  9. import be.ibridge.kettle.core.NotePadMeta;

  10. import be.ibridge.kettle.core.database.Database;

  11. import be.ibridge.kettle.core.database.DatabaseMeta;

  12. import be.ibridge.kettle.core.exception.KettleException;

  13. import be.ibridge.kettle.core.util.EnvUtil;

  14. import be.ibridge.kettle.trans.StepLoader;

  15. import be.ibridge.kettle.trans.Trans;

  16. import be.ibridge.kettle.trans.TransHopMeta;

  17. import be.ibridge.kettle.trans.TransMeta;

  18. import be.ibridge.kettle.trans.step.StepMeta;

  19. import be.ibridge.kettle.trans.step.StepMetaInterface;

  20. import be.ibridge.kettle.trans.step.selectvalues.SelectValuesMeta;

  21. import be.ibridge.kettle.trans.step.tableinput.TableInputMeta;

  22. import be.ibridge.kettle.trans.step.tableoutput.TableOutputMeta;

  23.  

  24.  

  25. //这个包被遗漏了,原始位置kettle根文件夹/libswt/win32/swt.jar

  26. //add by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20

  27. //import org.eclipse.swt.dnd.Transfer; 

  28.  

  29. /**

  30.  * Class created to demonstrate the creation of transformations on-the-fly.

  31.  * 

  32.  * @author Matt

  33.  * 

  34.  */

  35. public class TransBuilder

  36. {

  37.     public static final String[] databasesXML = {

  38.         "<?xml version=/"1.0/" encoding=/"UTF-8/"?>" +

  39.         "<connection>" +

  40.           "<name>target</name>" +

  41.           "<server>192.168.17.35</server>" +

  42.           "<type>ORACLE</type>" +

  43.                      "<access>Native</access>" +

  44.                      "<database>test1</database>" +

  45.                      "<port>1521</port>" +

  46.                      "<username>testuser</username>" +

  47.                      "<password>pwd</password>" +

  48.                      "<servername/>" +

  49.                      "<data_tablespace/>" +

  50.                      "<index_tablespace/>" +

  51.                      "<attributes>" +

  52.                        "<attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute>" +

  53.                        "<attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute>" +

  54.                           "<attribute><code>PORT_NUMBER</code><attribute>1521</attribute></attribute>" +

  55.                             "</attributes>" +

  56.                        "</connection>" ,

  57.          

  58.         "<?xml version=/"1.0/" encoding=/"UTF-8/"?>" +

  59.                          "<connection>" +

  60.                                 "<name>source</name>" +

  61.                                 "<server>192.168.16.12</server>" +

  62.                                 "<type>ORACLE</type>" +

  63.                                 "<access>Native</access>" +

  64.                                 "<database>test2</database>" +

  65.                                 "<port>1521</port>" +

  66.                                 "<username>testuser</username>" +

  67.                                 "<password>pwd2</password>" +

  68.                                 "<servername/>" +

  69.                                 "<data_tablespace/>" +

  70.                                 "<index_tablespace/>" +

  71.                                 "<attributes>" +

  72.                                     "<attribute><code>EXTRA_OPTION_MYSQL.defaultFetchSize</code><attribute>500</attribute></attribute>" +

  73.                                     "<attribute><code>EXTRA_OPTION_MYSQL.useCursorFetch</code><attribute>true</attribute></attribute>" +

  74.                                        "<attribute><code>PORT_NUMBER</code><attribute>1521</attribute></attribute>" +

  75.                                 "</attributes>" +

  76.                          "</connection>" 

  77.     };

  78.  

  79.     /**

  80.      * Creates a new Transformation using input parameters such as the tablename to read from.

  81.      * @param transformationName The name of the transformation

  82.      * @param sourceDatabaseName The name of the database to read from

  83.      * @param sourceTableName The name of the table to read from

  84.      * @param sourceFields The field names we want to read from the source table

  85.      * @param targetDatabaseName The name of the target database

  86.      * @param targetTableName The name of the target table we want to write to

  87.      * @param targetFields The names of the fields in the target table (same number of fields as sourceFields)

  88.      * @return A new transformation

  89.      * @throws KettleException In the rare case something goes wrong

  90.      */

  91.     public static final TransMeta buildCopyTable(

  92.        String transformationName,String sourceDatabaseName, String sourceTableName, 

  93.        String[] sourceFields, String targetDatabaseName, String targetTableName, 

  94.        String[] targetFields)

  95.       throws KettleException

  96.     {

  97.         LogWriter log = LogWriter.getInstance();

  98.         EnvUtil.environmentInit();

  99.         try

  100.         {

  101.             //

  102.             // Create a new transformation...

  103.             //

  104.             TransMeta transMeta = new TransMeta();

  105.             transMeta.setName(transformationName);

  106.             

  107.             // Add the database connections

  108.  

  109.             for (int i=0;i<databasesXML.length;i++)

  110.             {

  111.                 DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);

  112.                 transMeta.addDatabase(databaseMeta);

  113.             }

  114.             

  115.             DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);

  116.             DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);

  117.  

  118.             

  119.             //

  120.             // Add a note

  121.             //

  122.             String note = "Reads information from table [" + sourceTableName+ "] on database [" 

  123.                             + sourceDBInfo + "]" + Const.CR;

  124.             note += "After that, it writes the information to table [" + targetTableName + "] on database [" 

  125.                             + targetDBInfo + "]";

  126.             NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);

  127.             transMeta.addNote(ni);

  128.  

  129.             // 

  130.             // create the source step...

  131.             //

  132.             String fromstepname = "read from [" + sourceTableName + "]";

  133.             TableInputMeta tii = new TableInputMeta();

  134.             tii.setDatabaseMeta(sourceDBInfo);

  135.             String selectSQL = "SELECT "+Const.CR;

  136.             for (int i=0;i<sourceFields.length;i++)

  137.             {

  138.             /* modi by chq(www.chq.name): use * to replace the fields,经分析,下面语句能够处理‘*‘ */

  139.                 if (i>0) 

  140.                      selectSQL+=", "

  141.                 else selectSQL+="  ";

  142.                

  143.                 selectSQL+=sourceFields[i]+Const.CR;

  144.             }

  145.             selectSQL+="FROM "+sourceTableName;

  146.             tii.setSQL(selectSQL);

  147.  

  148.             StepLoader steploader = StepLoader.getInstance();

  149.  

  150.             String fromstepid = steploader.getStepPluginID(tii);

  151.             StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);

  152.             fromstep.setLocation(150, 100);

  153.             fromstep.setDraw(true);

  154.             fromstep.setDescription("Reads information from table [" + sourceTableName 

  155.                                      + "] on database [" + sourceDBInfo + "]");

  156.             transMeta.addStep(fromstep);

  157.  

  158.             //

  159.             // add logic to rename fields

  160.             // Use metadata logic in SelectValues, use SelectValueInfo...

  161.             //

  162.             /* 不必改名或映射 add by chq(www.chq.name) on 2006.07.20

  163.             SelectValuesMeta svi = new SelectValuesMeta();

  164.             svi.allocate(0, 0, sourceFields.length);

  165.             for (int i = 0; i < sourceFields.length; i++)

  166.             {

  167.                 svi.getMetaName()[i] = sourceFields[i];

  168.                 svi.getMetaRename()[i] = targetFields[i];

  169.             }

  170.  

  171.             String selstepname = "Rename field names";

  172.             String selstepid = steploader.getStepPluginID(svi);

  173.             StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);

  174.             selstep.setLocation(350, 100);

  175.             selstep.setDraw(true);

  176.             selstep.setDescription("Rename field names");

  177.             transMeta.addStep(selstep);

  178.  

  179.             TransHopMeta shi = new TransHopMeta(fromstep, selstep);

  180.             transMeta.addTransHop(shi);

  181.             fromstep = selstep; //设定了新的起点 by chq([link=http://www.chq.name]www.chq.name[/link]) on 2006.07.20

  182.             */

  183.             // 

  184.             // Create the target step...

  185.             //

  186.             //

  187.             // Add the TableOutputMeta step...

  188.             //

  189.             String tostepname = "write to [" + targetTableName + "]";

  190.             TableOutputMeta toi = new TableOutputMeta();

  191.             toi.setDatabase(targetDBInfo);

  192.             toi.setTablename(targetTableName);

  193.             toi.setCommitSize(200);

  194.             toi.setTruncateTable(true);

  195.  

  196.             String tostepid = steploader.getStepPluginID(toi);

  197.             StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);

  198.             tostep.setLocation(550, 100);

  199.             tostep.setDraw(true);

  200.             tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");

  201.             transMeta.addStep(tostep);

  202.  

  203.             //

  204.             // Add a hop between the two steps...

  205.             //

  206.             TransHopMeta hi = new TransHopMeta(fromstep, tostep);

  207.             transMeta.addTransHop(hi);

  208.  

  209.             // OK, if we're still here: overwrite the current transformation...

  210.             return transMeta;

  211.         }

  212.         catch (Exception e)

  213.         {

  214.             throw new KettleException("An unexpected error occurred creating the new transformation", e);

  215.         }

  216.     }

  217.  

  218.     /**

  219.      * 1) create a new transformation

  220.      * 2) save the transformation as XML file

  221.      * 3) generate the SQL for the target table

  222.      * 4) Execute the transformation

  223.      * 5) drop the target table to make this program repeatable

  224.      * 

  225.      * @param args

  226.      */

  227.     public static void main(String[] args) throws Exception

  228.     {

  229.        EnvUtil.environmentInit();

  230.         // Init the logging...

  231.         LogWriter log = LogWriter.getInstance("TransBuilder.log"true, LogWriter.LOG_LEVEL_DETAILED);

  232.         

  233.         // Load the Kettle steps & plugins 

  234.         StepLoader stloader = StepLoader.getInstance();

  235.         if (!stloader.read())

  236.         {

  237.             log.logError("TransBuilder",  "Error loading Kettle steps & plugins... stopping now!");

  238.             return;

  239.         }

  240.         

  241.         // The parameters we want, optionally this can be 

  242.         String fileName = "NewTrans.xml";

  243.         String transformationName = "Test Transformation";

  244.         String sourceDatabaseName = "source";

  245.         String sourceTableName = "testuser.source_table";

  246.         String sourceFields[] = { 

  247.                "*" 

  248.                };

  249.  

  250.         String targetDatabaseName = "target";

  251.         String targetTableName = "testuser.target_table";

  252.         String targetFields[] = { 

  253.                "*"

  254.                };

  255.  

  256.         

  257.         // Generate the transformation.

  258.         TransMeta transMeta = TransBuilder.buildCopyTable(

  259.                 transformationName,

  260.                 sourceDatabaseName,

  261.                 sourceTableName,

  262.                 sourceFields,

  263.                 targetDatabaseName,

  264.                 targetTableName,

  265.                 targetFields

  266.                 );

  267.         

  268.         // Save it as a file:

  269.         String xml = transMeta.getXML();

  270.         DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));

  271.         dos.write(xml.getBytes("UTF-8"));

  272.         dos.close();

  273.         System.out.println("Saved transformation to file: "+fileName);

  274.  

  275.         // OK, What's the SQL we need to execute to generate the target table?

  276.         String sql = transMeta.getSQLStatementsString();

  277.         

  278.         // Execute the SQL on the target table:

  279.         Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));

  280.         targetDatabase.connect();

  281.         targetDatabase.execStatements(sql);

  282.         

  283.         // Now execute the transformation...

  284.         Trans trans = new Trans(log, transMeta);

  285.         trans.execute(null);

  286.         trans.waitUntilFinished();

  287.         

  288.         // For testing/repeatability, we drop the target table again

  289.         /* modi by chq([link=http://www.chq.name]www.chq.name[/link]) on  2006.07.20 不必删表

  290.         //targetDatabase.execStatement("drop table "+targetTableName);

  291.         targetDatabase.disconnect();

  292.     }
  293.  

文章三:Kattle JAVA API

http://wiki.pentaho.com/display/EAI/Pentaho+Data+Integration+-+Java+API+Examples

KETTLE JAVA API   http://kettle.pentaho.org/downloads/api.php

Program your own Kettle transformation

The example described below performs the following actions:

  1. create a new transformation
  2. save the transformation as XML file
  3. generate the SQL for the target table
  4. Execute the transformation
  5. drop the target table to make this program repeatable

The complete source code for the example is distributed in the distribution zip file. You can find this file in the downloads section. (Kettle version 2.1.3 or higher)

After unzipping this file, you can find the source code in the 揟ransBuilder.java� file in the 揺xtra� directory.

The Kettle Java API for Kettle is found here: Kettle Java API

// Generate the transformation.
TransMeta transMeta = TransBuilder.buildCopyTable(
transformationName,
sourceDatabaseName,
sourceTableName,
sourceFields,
targetDatabaseName,
targetTableName,
targetFields
);

// Save it as a file:
String xml = transMeta.getXML();
DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
dos.write(xml.getBytes("UTF-8"));
dos.close();
System.out.println("Saved transformation to file: "+fileName);

// OK, What's the SQL we need to execute to generate the target table?
String sql = transMeta.getSQLStatementsString();

// Execute the SQL on the target table:
Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
targetDatabase.connect();
targetDatabase.execStatements(sql);

// Now execute the transformation...
Trans trans = new Trans(log, transMeta);
trans.execute(null);
trans.waitUntilFinished();

// For testing/repeatability, we drop the target table again
targetDatabase.execStatement("drop table "+targetTableName);
targetDatabase.disconnect();

Below is the source code for the method that creates the transformation:

/**
* Creates a new Transformation using input parameters such as the tablename to read from.
* @param transformationName The name of the transformation
* @param sourceDatabaseName The name of the database to read from
* @param sourceTableName The name of the table to read from
* @param sourceFields The field names we want to read from the source table
* @param targetDatabaseName The name of the target database
* @param targetTableName The name of the target table we want to write to
* @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
* @return A new transformation metadata object
* @throws KettleException In the rare case something goes wrong
*/

public static final TransMeta buildCopyTable(
String transformationName,
String sourceDatabaseName,
String sourceTableName,
String[] sourceFields,
String targetDatabaseName,
String targetTableName,
String[] targetFields) throws KettleException
{

LogWriter log = LogWriter.getInstance();

try
{

//
// Create a new transformation...
//
TransMeta transMeta = new TransMeta();
transMeta.setName(transformationName);

// Add the database connections
for (int i=0;i<databasesXML.length;i++)
{
DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
transMeta.addDatabase(databaseMeta);
}

DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);
DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);

//
// Add a note
//

String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;
note += "After that, it writes the information to table [" + targetTableName + "] on database [" + targetDBInfo + "]";
NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);
transMeta.addNote(ni);

//
// create the source step...
//

String fromstepname = "read from [" + sourceTableName + "]";
TableInputMeta tii = new TableInputMeta();
tii.setDatabaseMeta(sourceDBInfo);
String selectSQL = "SELECT "+Const.CR;
for (int i=0;i<sourceFields.length;i++)
{
if (i>0) selectSQL+=", "; else selectSQL+=" ";
selectSQL+=sourceFields[i]+Const.CR;
}
selectSQL+="FROM "+sourceTableName;
tii.setSQL(selectSQL);

StepLoader steploader = StepLoader.getInstance();

String fromstepid = steploader.getStepPluginID(tii);
StepMeta fromstep = new StepMeta(log, fromstepid, fromstepname, (StepMetaInterface) tii);
fromstep.setLocation(150, 100);
fromstep.setDraw(true);
fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");
transMeta.addStep(fromstep);

//
// add logic to rename fields
// Use metadata logic in SelectValues, use SelectValueInfo...
//

SelectValuesMeta svi = new SelectValuesMeta();
svi.allocate(0, 0, sourceFields.length);
for (int i = 0; i < sourceFields.length; i++)
{

svi.getMetaName()[i] = sourceFields[i];
svi.getMetaRename()[i] = targetFields[i];

}

String selstepname = "Rename field names";
String selstepid = steploader.getStepPluginID(svi);
StepMeta selstep = new StepMeta(log, selstepid, selstepname, (StepMetaInterface) svi);
selstep.setLocation(350, 100);
selstep.setDraw(true);
selstep.setDescription("Rename field names");
transMeta.addStep(selstep);

TransHopMeta shi = new TransHopMeta(fromstep, selstep);
transMeta.addTransHop(shi);
fromstep = selstep;

//
// Create the target step...
//

//
// Add the TableOutputMeta step...
//

String tostepname = "write to [" + targetTableName + "]";
TableOutputMeta toi = new TableOutputMeta();
toi.setDatabase(targetDBInfo);
toi.setTablename(targetTableName);
toi.setCommitSize(200);
toi.setTruncateTable(true);

String tostepid = steploader.getStepPluginID(toi);
StepMeta tostep = new StepMeta(log, tostepid, tostepname, (StepMetaInterface) toi);
tostep.setLocation(550, 100);

tostep.setDraw(true);
tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
transMeta.addStep(tostep);

//
// Add a hop between the two steps...
//

TransHopMeta hi = new TransHopMeta(fromstep, tostep);
transMeta.addTransHop(hi);

// The transformation is complete, return it...
return transMeta;
}
catch (Exception e)
{

throw new KettleException("An unexpected error occurred creating the new transformation", e);

}

}


原文地址:https://www.cnblogs.com/blfshiye/p/4291245.html