使用OGG同步Oracle12C数据至Kafka

使用OGG同步Oracle12C数据至Kafka

https://blog.csdn.net/andyguan01_2/article/details/86716388

一、环境

操作系统:CentOS6.9
软件版本:Zookeeper3.4.13,Kafka2.1.0
集群架构(Zookeeper和Kafka):
Node1:10.200.4.116(oracle03)
Node2:10.200.4.117(oracle02)
Node3:10.100.125.156(db01)

二、OGG源端安装配置

如果要了解OGG基本架构,可以查看:https://blog.csdn.net/andyguan01_2/article/details/87075927

我这里的源端Oracle数据库安装在Node1(10.200.4.116),按说应该是要有一台单独的数据库服务器,由于我这里资源有限,在Node1上面搭建了源端Oracle数据库。

1、源端Oracle数据库配置

1.1 开启归档模式

检查源端Oracle数据库是否开启归档模式,若没有可按以下方法开启:

Oracle12C开启归档模式:https://blog.csdn.net/andyguan01_2/article/details/86715413

1.2 启用ENABLE_GOLDENGATE_REPLICATION参数

参数ENABLE_GOLDENGATE_REPLICATION需要置为TRUE,否则后面启动OGG的Extract进程的时候会报错。

数据库默认是没有启用此参数,可按此方法启用:
https://blog.csdn.net/andyguan01_2/article/details/86748448

1.3 启用最小日志补全supplemental_log_data_min

需要启用最小日志补全supplemental_log_data_min,否则后面启动OGG的Extract进程的时候会报错。

数据库默认是没有启用此功能,可按此方法启用:https://blog.csdn.net/andyguan01_2/article/details/86743751

2、建立数据库OGG用户

创建OGG用户表空间:

create tablespace oggdata datafile '/data/oracle/rcas/RCAS/datafile/oggdata01.dbf' size 10M autoextend on next 10M maxsize 1G;

    1

创建OGG用户:

create user ogg identified by ogg default tablespace oggdata;  

    1

给OGG用户授权:

grant connect,resource to ogg;
--如果不做ddl trigger,dba权限可以不给
grant dba to ogg;
 
GRANT CREATE SESSION TO ogg;
GRANT ALTER SESSION  TO ogg;
GRANT SELECT ANY DICTIONARY TO ogg;
GRANT SELECT ANY TABLE TO ogg;
--用户配置表级追加日志
GRANT ALTER ANY TABLE TO ogg;
 
GRANT FLASHBACK ANY TABLE TO ogg;
GRANT EXECUTE on DBMS_FLASHBACK TO ogg;
GRANT EXECUTE ON utl_file TO ogg;
grant execute on sys.dbms_lob to ogg;

execute DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE('OGG');
--以下语句是在oracle 11g之上版本用的,10g版本不需要执行
execute DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE(Grantee=> 'OGG',privilege_type=> 'CAPTURE',grant_select_privileges => TRUE,do_grants=> TRUE);

  
3、在源端安装OGG

安装方法见:

在CentOS6.9安装OGG18.1.0.0 for Oracle:https://blog.csdn.net/andyguan01_2/article/details/86719382

4、在源端配置OGG

4.1 在oracle用户打开ggsci命令:

cd $GG_HOME
./ggsci

4.2 创建OGG相关子目录
create subdirs


4.3 配置OGG Manager

编辑mgr参数文件:

edit params mgr

输入以下内容:

PORT 7809
DYNAMICPORTLIST 7810-7860
AUTOSTART EXTRACT *
AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
PURGEOLDEXTRACTS ./dirdat/*, usecheckpoints, minkeepdays 3
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

    MANAGER进程参数配置说明:

    PORT:指定服务监听端口,默认为7809。

    DYNAMICPORTLIST:动态端口,可以制定最大256个可用端口的动态列表,当指定的端口不可用时,管理进程将会从列表中选择一个可用的端口,源端和目标端的Collector、Replicat、GGSCI进程通信也会使用这些端口。

    COMMENT:注释行,也可以用–来代替。

    AUTOSTART:指定在管理进程启动时自动启动哪些进程。

    AUTORESTART:自动重启参数设置。本处设置表示每3分钟尝试重新启动所有EXTRACT进程,共尝试5次;

    PURGEOLDEXTRACTS:定期清理trail文件设置。本处设置表示对于超过3天的trail文件进行删除。

    LAGREPORTHOURS、LAGINFOMINUTES、LAGCRITICALMINUTES:定义数据延迟的预警机制。本处设置表示MGR进程每隔1小时检查EXTRACT的延迟情况,如果超过了30分钟就把延迟作为信息记录到错误日志中,如果延迟超过了45分钟,则把它作为警告写到错误日志中。

启动OGG Manager:

start mgr

4.4 创建Extract抽取进程

编辑Extract参数文件:

edit params ext_1
输入以下内容:

EXTRACT ext_1
Setenv (NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
USERID ogg@rcas, PASSWORD ogg
GETTRUNCATES
DISCARDFILE /data/ogg_app/dirrpt/ext_1.dsc, APPEND, MEGABYTES 1024
DBOPTIONS  ALLOWUNUSEDCOLUMN
REPORTCOUNT EVERY 1 MINUTES, RATE
WARNLONGTRANS 2h,CHECKINTERVAL 5m
FETCHOPTIONS NOUSESNAPSHOT
--TRANLOGOPTIONS  CONVERTUCS2CLOBS
EXTTRAIL  /data/ogg_app/dirdat/te
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
DYNAMICRESOLUTION
table ogg.test_1;

    EXTRACT进程参数配置说明:

    SETENV:配置系统环境变量。

    USERID/PASSWORD: 指定OGG连接数据库的用户名和密码,连接的是源端的数据库。例如:userid ogg@source, password gg。source是连接源库的tnsname。

    COMMENT:注释行,也可以用–来代替。

    TABLE:定义需复制的表,后面需以;结尾

    TABLEEXCLUDE:定义需要排除的表,如果在TABLE参数中使用了通配符,可以使用该参数指定排除掉的表。

    GETUPDATEAFTERS|IGNOREUPDATEAFTERS:是否在队列中写入后影像,缺省复制。

    GETUPDATEBEFORES| IGNOREUPDATEBEFORES:是否在队列中写入前影像,缺省不复制。

    GETUPDATES|IGNOREUPDATES:是否复制UPDATE操作,缺省复制。

    GETDELETES|IGNOREDELETES:是否复制DELETE操作,缺省复制。

    GETINSERTS|IGNOREINSERTS:是否复制INSERT操作,缺省复制。

    GETTRUNCATES|IGNORETRUNDATES:是否复制TRUNCATE操作,缺省不复制。

    RMTHOST:指定目标系统及其Goldengate Manager进程的端口号,还用于定义是否使用压缩进行传输。

    RMTTRAIL:指定写入到目标端的哪个队列。

    EXTTRAIL:指定写入到本地的哪个队列。

    SQLEXEC:在extract进程运行时首先运行一个SQL语句。

    PASSTHRU:禁止extract进程与数据库交互,适用于Data Pump传输进程。

    REPORT:定义自动定时报告。

    STATOPTIONS:定义每次使用stat时统计数字是否需要重置。

    REPORTCOUNT:报告已经处理的记录条数统计数字。

    TLTRACE:打开对于数据库日志的跟踪日志。

    DISCARDFILE:定义discardfile文件位置,如果处理中有记录出错会写入到此文件中。

    DBOPTIONS:指定对于某种特定数据库所需要的特殊参数。此处ALLOWUNUSEDCOLUMN参数表示,当抽取进程遇到一个没有使用的字段时只生成一个警告,进程会继续执行而不会被异常终止(abend)。

    TRANLOGOPTIONS:指定在解析数据库日志时所需要的特殊参数。本处配置参数CONVERTUCS2CLOBS只用在extract端UTF字符类型,并且OGG11.1.1版本之前处理CLOB才需要。

    WARNLONGTRANS:指定对于超过一定时间的长交易可以在gsserr.log里面写入警告信息,本处配置为每隔5分钟检查一次长交易,对于超过2小时的进行警告。

    DYNAMICRESOLUTION:有时候开启OGG进程的时候较慢,可能是因为需要同步的表太多,OGG在开启进程之前会将需要同步的表建立一个记录并且存入到磁盘中,这样就需要耗费大量的时间。使用该参数来解决此问题。

    FETCHOPTIONS:参数NOUSESNAPSHOT表示不会从闪回日志中获取数据。

添加Extract进程:

add extract ext_1, TRANLOG, BEGIN NOW


定义trail文件:

GGSCI> add exttrail /data/ogg_app/dirdat/te, EXTRACT ext_1, MEGABYTES 200

4.5 创建Pump传输进程

抽取进程Extract和传输进程Pump其实都是Extract进程,也可以配置在一个进程完成这两个功能,但是当网络传输有问题时,这样抽取也就不能继续运行了,所以推荐分开配置为两个进程。

编辑Pump参数文件:

edit param pump_1

输入以下内容:

EXTRACT pump_1
--PASSTHRU
RMTHOST 10.100.125.156, MGRPORT 7809
RMTTRAIL /data/ogg_app/dirdat/te
DYNAMICRESOLUTION
TABLE ogg.test_1;

    PUMP进程参数配置说明:

    PASSTHRU:如果在配置OGG 的时候既没有过滤行也没有选择列,并且源和目标数据结构都是一模一样,那么可以指定PASSTHRU参数。使用PASSTHRU参数可以使OGG绕过检测表定义数据文件从而提高性能。

    RMTHOST,MGRPORT:目标端主机IP,管理进程端口号。

    RMTTRAIL:目标端主机保存队列文件的目录。

添加Pump进程:

ADD EXTRACT pump_1, EXTTRAILSOURCE /data/ogg_app/dirdat/te


定义pump trail文件:

GGSCI> ADD RMTTRAIL /data/ogg_app/dirdat/te, EXTRACT pump_1, MEGABYTES 200


启动Extract和Pump进程:

start extract ext_1
start extract pump_1

查看进程状态:

 info all

三、OGG目标端安装配置

我这里的OGG目标端安装在Node3(10.100.125.156),按说应该是要有一台单独的接口服务器,由于我这里资源有限,在Node3上面搭建了目标端OGG。

1、在目标端安装OGG

可按以下方法在目标端安装OGG For BigData:

https://www.cnblogs.com/zzpblogs/p/13049698.html

2、在目标端配置OGG

2.1 创建相关子目录
./ggsci
create subdirs


2.2 复制example

cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirprm/
ll $OGG_HOME/dirprm/

在这里插入图片描述
2.3 配置manager

启动GGSCI后,编辑mgr参数:

edit params mgr

在这里插入图片描述
输入以下内容:

PORT 7809
DYNAMICPORTLIST 7810-7860
AUTORESTART REPLICAT *, RETRIES 5, WAITMINUTES 3, RESETMINUTES 60
PURGEOLDEXTRACTS /data/ogg_app/dirdat/*, usecheckpoints, minkeepdays 1
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45

2.4 配置kafka.props

vi /data/ogg_app/dirprm/kafka.props

    1

配置以下内容:

gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
#gg.handler.kafkahandler.topicMappingTemplate=${tableName}
gg.handler.kafkahandler.topicMappingTemplate=ogg
#The following selects the message key using the concatenated primary keys
#gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
#gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format=json
#gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.SchemaTopicName=myogg
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op

goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j
gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/u01/app/kafka_2.12-2.1.0/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

 

    配置说明:

    gg.handler.kafkahandler.topicMappingTemplate:kafka topic名称的映射,指定topic名称,也可以通过占位符的方式,例如${tableName},每一张表对应一个topic。

    gg.handler.kafkahandler.format:传输文件的格式,支持json,xml等。

    gg.handler.kafkahandler.mode:传输模式,op为一次SQL传输一次,tx为一次事务传输一次。

    gg.classpath:须指定相应的lib路径。

2.5 配置custom_kafka_producer.properties

vi /data/ogg_app/dirprm/custom_kafka_producer.properties

配置以下内容:

bootstrap.servers=10.200.4.117:9092,10.200.4.116:9092,10.100.125.156:9092
acks=1
reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=16384
linger.ms=0

    1
    2
    3
    4
    5
    6
    7
    8
    9

    配置说明:

    bootstrap.servers:用于建立与kafka集群的连接,这个list仅仅影响用于初始化的hosts,来发现全部的servers。格式:host1:port1,host2:port2,…,数量尽量不止一个,以防其中一个down了。

    acks:Server完成 producer request 前需要确认的数量。
    acks=0时,producer不会等待确认,直接添加到socket等待发送;
    acks=1时,等待leader写到local log就行;
    acks=all或acks=-1时,等待isr中所有副本确认
    (注意:确认都是 broker 接收到消息放入内存就直接返回确认,不是需要等待数据写入磁盘后才返回确认,这也是kafka快的原因)
    batch.size:Producer可以将发往同一个Partition的数据做成一个Produce Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小。另外,每个Request请求包含多个Batch,每个Batch对应一个Partition,且一个Request发送的目的Broker均为这些partition的leader副本。若将该值设为0,则不会进行批处理。

    reconnect.backoff.ms:连接失败时,当我们重新连接时的等待时间。

    value.serializer:value序列化方式,类型为class,需实现Serializer interface。

    key.serializer:key 序列化方式,类型为class,需实现Serializer interface。

    linger.ms:Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
    官网解释翻译:producer会将request传输之间到达的所有records聚合到一个批请求。通常这个值发生在欠负载情况下,record到达速度快于发送。但是在某些场景下,client即使在正常负载下也期望减少请求数量。这个设置就是如此,通过人工添加少量时延,而不是立马发送一个record,producer会等待所给的时延,以让其他records发送出去,这样就会被聚合在一起。这个类似于TCP的Nagle算法。该设置给了batch的时延上限:当我们获得一个partition的batch.size大小的records,就会立即发送出去,而不管该设置;但是如果对于这个partition没有累积到足够的record,会linger指定的时间等待更多的records出现。该设置的默认值为0(无时延)。例如,设置linger.ms=5,会减少request发送的数量,但是在无负载下会增加5ms的发送时延。

2.6 定义表结构传递

    GoldenGate 提供了一个名为 DEFGEN 的专用工具,用于生成数据定义,当源表和目标表中的定义不同时(例如源数据库为Oracle,目标数据库为SQL Server),Oracle GoldenGate 进程将引用该专用工具。在运行 DEFGEN 之前,需要为其创建一个参数文件,指定该工具应检查哪些表以及在检查表之后存放类型定义文件的位置。

在源端执行:

ggsci
edit param defgen

    1
    2

在这里插入图片描述
输入以下内容:

DEFSFILE /data/ogg_app/dirdef/source.def, PURGE
USERID ogg@rcas, PASSWORD ogg
TABLE ogg.test_1;

    1
    2
    3

在源端执行defgen命令:

defgen paramfile /data/ogg_app/dirprm/defgen.prm

    1

在这里插入图片描述
复制源端的/data/ogg_app/dirdef/source.def文件到目标端的/data/ogg_app/dirdef目录下。在源端oracle用户执行:

--scp /data/ogg_app/dirprm/defgen.prm oracle@10.100.125.156:/data/ogg_app/dirdef
scp /data/ogg_app/dirdef/source.def oracle@10.100.125.156:/data/ogg_app/dirdef

    1
    2

2.7 定义Replication进程

2.7.1 定义参数文件

ggsci
edit params rep_1

    1
    2

在这里插入图片描述
输入以下内容:

REPLICAT rep_1
TARGETDB LIBFILE libggjava.so SET property=/data/ogg_app/dirprm/kafka.props
SOURCEDEFS /data/ogg_app/dirdef/source.def
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP ogg.*, TARGET ogg.*;

    1
    2
    3
    4
    5
    6

2.7.2 指定Trail文件

add replicat rep_1, exttrail /data/ogg_app/dirdat/te

    1

在这里插入图片描述
2.7.3 启动Replicat进程

先启动Manager进程:

start mgr
info all

    1
    2

在这里插入图片描述
再启动Replicat进程:

start replicat rep_1

在这里插入图片描述
查看进程状态:

info all


在这里插入图片描述
四、测试验证

1、启动Kafka ConsumerConsole

在任一节点启动:

kafka-console-consumer.sh --bootstrap-server  10.200.4.116:9092,10.200.4.117:9092,10.100.125.156:9092 --topic ogg

    1

2、在源端操作表数据

对表数据进行insert,update,delete。执行以下SQL:

insert into ogg.test_1 values (123);
commit;
update ogg.test_1 set c1 = 2 where c1 = 1;
commit;
delete from ogg.test_1 where c1 = 2;
commit;

3、查看Kafka ConsumerConsole是否接收到数据

发现ConsumerConsole对应有3行输出,显示如下:


另外,我测试了对数据表进行truncate,发现ConsumerConsole没有输出。因为OGG默认是不捕获truncate操作的,如要捕获,需在Extract和Replicat进程都添加GETTRUNCATES参数。(这种方法我没有测试,有兴趣的朋友可以试试看)

原文地址:https://www.cnblogs.com/zzpblogs/p/13051171.html