kettle、pentaho 实现不同数据库之间表数据同步

1、pentaho 下载 pdi-ce-9.1.0.0-324.zip 并解压至 D:data-integration

https://sourceforge.net/projects/pentaho/files/

2、运行  D:data-integrationSpoon.bat   打开配置界面

3、找到并编辑  C:Users{用户名}.kettlekettle.properties 文件,增加标红内容后重新启动Spoon.bat

windows server 系统文件路径:C:Windowssystem32configsystemprofile.kettlekettle.properties

linux系统文件路径:/root/.kettle/kettle.properties

# This file was generated by Pentaho Data Integration version 9.1.0.0-324.
# 
# Here are a few examples of variables to set: 
#
# PRODUCTION_SERVER = hercules
# TEST_SERVER = zeus
# DEVELOPMENT_SERVER = thor
#
# Note: lines like these with a # in front of it are comments

#解决kettle把空字符串当成null的情况
KETTLE_EMPTY_STRING_DIFFERS_FROM_NULL=Y

4、在Spoon界面新建如下“转换”:

4.1 源数据:

4.2 目标数据

4.3 合并记录(标志字段bz为新定义的动态变量,不能出现在关键字或数据字段中)

4.4 数据同步

合并完成后,标志字段的值有4种,分别是:

“Identical” : 关键字段在新旧数据源中都存在,且域值相同

“changed” : 关键字段在新旧数据源中都存在,但域值不同

“new” : 旧数据源中没有找到该关键字段

“deleted”: 新数据源中没有找到关键字段

则数据同步的配置需要注意以下几点:

(1) 不论是查询的关键字,还是更新字段,都要把标志字段去掉(注意,去掉标志字段!);其他字段根据业务需求,进行设置;

(2) 高级标签中的规则要定义好,否则会报“It was not possible to find operation field [null] in the input stream!”错误。

5、JAVA 中调用.ktr转换配置文件

5.1 从D:data-integrationlib中拷贝必要的jar包到工程lib下

    包括kettle-dbdialog-9.1.0.0-324.jar、kettle-engine-9.1.0.0-324.jar、kettle-core-9.1.0.0-324.jar、commons-vfs2-2.3.jar、pentaho-encryption-support-9.1.0.0-324.jar、metastore-9.1.0.0-324.jar、guava-17.0.jar

5.2 在工程src下新建 kettle-password-encoder-plugins.xml 文件

    内容如下:

<password-encoder-plugins>
  <password-encoder-plugin id="kettle">
     <description>kettle Password Encoder</description>
     <classname>org.pentaho.di.core.encryption.KettleTwoWayPasswordEncoder</classname>
  </password-encoder-plugin>
</password-encoder-plugins>

否则会出现如下错误:

Unable to find plugin with ID 'Kettle'. If this is a test, make sure kettle-core tests jar is a dependency. If this is live make sure a kettle-password-encoder-plugins.xml exits in the classpath

5.3 JAVA 调用示例代码

package com.xrh.extend.quartz.jobs;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.logging.Logger;

import org.pentaho.di.core.Const;
import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;

import com.xrh.base.job.BN_Job;
import com.xrh.core.util.ObjectUtil;
import com.xrh.extend.quartz.QuartzJob;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

/**
 * Kettle Job示例
 * @author 李小家
 *
 */
@DisallowConcurrentExecution
public class KettleJob implements QuartzJob {

    private static Logger logger = Logger.getLogger(KettleJob.class.getName());

    public String run (JobExecutionContext context) throws Exception {
        StringBuffer runInfo = new StringBuffer();
        BN_Job job = (BN_Job) context.getJobDetail().getJobDataMap().get("job");
        logger.info(job.getOpName() + "[" + job.getId() + "] run======");
        
        String jobParam = job.getJobParam();
        if (ObjectUtil.isNull(jobParam)){
            logger.warning("调度附加参数(JSON) 不能为空!");
            runInfo.append("调度附加参数(JSON) 不能为空!");
            return runInfo.toString();
        }
        
        JSONObject paramJson = JSONObject.fromObject(jobParam);
        String ktrFilePath = paramJson.optString("ktrFilePath"); //转换文件完整路径
        JSONArray argumentsJSONArray = paramJson.optJSONArray("arguments");
        String[] arguments = null;
        if (ObjectUtil.isNull(ktrFilePath)) {
            logger.warning("调度附加参数(JSON) 必须包含转换文件路径'ktrFilePath'参数!");
            runInfo.append("调度附加参数(JSON) 必须包含转换文件路径'ktrFilePath'参数!");
            return runInfo.toString();
        }
        if (!new File(ktrFilePath).exists()) {
            logger.warning("系统找不到转换文件["+ktrFilePath+"]!");
            runInfo.append("系统找不到转换文件["+ktrFilePath+"]!");
            return runInfo.toString();
        }
        if (argumentsJSONArray != null) {
            Object[] objArr = argumentsJSONArray.toArray(new Object[] {});
            if (objArr.length > 0) {
                arguments = new String[objArr.length];
                for (int i = 0 ; i < objArr.length; i ++) {
                    arguments[i] = objArr[i].toString();
                }
            }
        }
        
        Trans trans = null;  
        try {  
            initKettleProperties();
            KettleEnvironment.init();// 初始化  
            //EnvUtil.environmentInit();  
            TransMeta transMeta = new TransMeta(ktrFilePath);  
            // 转换  
            trans = new Trans(transMeta);  
            // 执行转换  
            trans.execute(arguments);  
            // 等待转换执行结束  
            trans.waitUntilFinished();  
            // 抛出异常  
            if (trans.getErrors() > 0) {  
                runInfo.append("There are errors during transformation exception!(传输过程中发生异常)");
                throw new Exception(  
                        "There are errors during transformation exception!(传输过程中发生异常)");  
            }
        } catch (Exception e) {  
            e.printStackTrace(); 
            runInfo.append(e.getMessage());
            return runInfo.toString();
        } 
        runInfo.append("执行完毕了, 未发现异常!");
        
        return runInfo.toString();
    }
    
    /**
     * 解决kettle无法写入空字符串的问题
     * window环境中,需要在C:Userswangll.kettlekettle.properties中写入如下配置;
     * linux环境中,需要在/root/.kettle/kettle.properties中写入如下配置。
     * 故为了方便直接使用它自带的方法去生成上述文件
     */
    public static void initKettleProperties() {
        String directory = Const.getKettleDirectory();
        String kpFile = directory + Const.FILE_SEPARATOR + "kettle.properties";
        logger.info("kpFile===" + kpFile);
        if (!new File(kpFile).exists()) {
            File dir = new File(directory);
            dir.mkdirs();
            KettleClientEnvironment.createKettleHome();
            
            File file = new File(kpFile);
            FileWriter fw = null;
            BufferedWriter bw = null;
            try {
                fw = new FileWriter(file);
                bw = new BufferedWriter(fw);
                bw.write("KETTLE_EMPTY_STRING_DIFFERS_FROM_NULL=Y");
                bw.flush();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (bw != null) {
                    try {
                        bw.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                    }
                }
                if (fw != null) {
                    try {
                        fw.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                    }
                }
            }
        } 
    }
}

6、常见问题

6.1 在使用转换mysql的tinyint(1)字段类型时,会将tinyint(1)类型当成Boolean类型来处理

     解决方法:通过拼接字符串,如select columnName+ "" as columnName

6.2 执行转换时出现以下错误:

2021/05/31 14:24:24 - 合并记录.0 - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : Unexpected error
2021/05/31 14:24:24 - 合并记录.0 - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : java.lang.NullPointerException
2021/05/31 14:24:24 - 合并记录.0 - 完成处理 (I=0, O=0, R=0, W=0, U=0, E=1)
2021/05/31 14:24:24 - zl_products - 转换被检测
2021/05/31 14:24:24 - zl_products - 转换正在杀死其他步骤!
2021/05/31 14:24:24 - 源数据.0 - Finished reading query, closing connection.
2021/05/31 14:24:24 - 源数据.0 - 完成处理 (I=2, O=0, R=0, W=0, U=0, E=0)
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 错误被检测到!
2021/05/31 14:24:24 - Spoon - 转换完成!!
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 错误被检测到!
2021/05/31 14:24:24 - zl_products - ERROR (version 9.1.0.0-324, build 9.1.0.0-324 from 2020-09-07 05.09.05 by buildguy) : 错误被检测到!

解决办法:确认连接处于生效状态(灰色表示未生效)

 6.3  将.ktr转换文件部署生产环境

     修改该文件connection数据源配置,其中<password>Encrypted 2be98afc86aa7f2e4cb79ff228dc6fa8c</password>红色部分为数据库密码加密后的内容,可通过执行“JavaScript代码”获得加密后的值,如下图所示:

加密脚本:

//Script here
var setValue;
setValue = Packages.org.pentaho.di.core.encryption.Encr.encryptPassword('123456');

解密脚本:

//解密
var setValue1;
setValue1 = org.pentaho.di.core.encryption.Encr.decryptPasswordOptionallyEncrypted('Encrypted 2be98afc86aa7f2e4cb79ff228dc6fa8c');

李小家
原文地址:https://www.cnblogs.com/101key/p/14821144.html