logstash读取Oracle导入到es中

产生背景:某天领导说需要将Oracle中的一些表导入到es中,于是就开始了.....

方案说明

  1. 人生苦短我用py,直接jdbc连接,批量读取,存储es轻轻松松?
  2. 干大数据的怎么能少的了spark或flink?一个离线批处理不就ok了?
  3. 既然目标端是es那么还是用它自家产品logstash,轻松配置,无缝全量增量?

方案选择

需求明确的领导,不是一个好领导,而不考虑需求实现的程序员不是一个好的数据搬运工。

尝试方案一

  1. 百度查看了一下py和Oracle交互的模块用到最多的有两个,一个是cx_Oracle,一个是jaydebeapi

二者区别: cx_Oracle对py版本,系统环境版本依赖比较好,必须是门当户对,如果你的es是在Windows下,那就更呵呵了,缺了VC++ 2014不行。而网上的答案千篇一律,装环境都这么困难,直接next,jaydebeapi,这个模块的连接方式类似于cx_Oracle,但是至少好安装,直接下载tar.gz的源码包,python setup.py install

  1. 环境ok了,开始打开pycharm,远程连接服务器上的python环境,一切都很顺利,但是写着写着,发现,没有考虑到数据量很大咋整?得实现批量,数据中有大字段(clob)又咋整?又得判断字段类型(前面已经提到是一些表,而非一张),判断到是CLOB类型后还得读取大字段转为字符串类型...一来二去,我只是想迁移一些表的需求,结果活生生写成了一个迁移工具的影子。于是果断放弃。

尝试方案二

  1. 经历了方案一,那么同理只要是通过程序迁移避免不了数据量,更避免不了大字段的处理,想想方案二和方案一要经历的过程一样,只是选择的语言和方式不同而已,果断放弃。

尝试方案三

  1. 基于之前已经安装好input-jdbc插件的logstash,只需要解压,写一下配置文件即可,于是一个 增量 配置文件诞生了。
input {
        jdbc {
                #驱动包路径
                jdbc_driver_library => "/home/risen/ojdbc.jar"
                #驱动类
                jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
                #jdbc url
                jdbc_connection_string => "jdbc:oracle:thin:@ip:port:orcl"
                jdbc_user => "账号"
                jdbc_password => "密码"
                schedule => "* * * * *"
                statement => "select * from 表名"
                #增量标识字段名
                tracking_column => "主键"
                #是否使用字段值作为增量标识
                use_column_value => true
                #源表字段名导入ES后是否忽略大小写
                lowercase_column_names=> false
                #分页
                jdbc_paging_enabled => "true"
                #每页数据量
                jdbc_page_size => "5000"
                #默认时区
                jdbc_default_timezone => "UTC"
        }
}
output{
        elasticsearch {
                hosts =>["ip:port"]
                index => "小写表名"
                document_type => "小写表名"
                document_id => "%{主键}"
        }
}
  1. 经过步骤一的完美配置后,开始启动
bin/logstash -f ../oracle2es.conf  -w 3

正常运行不报错,赶紧上es看看,它有没有处理大字段,事实证明,还是"孪生兄弟"强,数据没问题,还是增量同步,而且大字段也都被转换为了字符串存储。针对大数据量还可以通过-w指定线程数,同时还可以指定batch数量。

  1. 那么问题来了,这配置文件虽然好用,但是那可是200多张表呢?总不能导入一张修改一下配置文件吧,当然不能,接下来就需要 shell 登场了

!> logstash + shell 脚本有没有搞头?当然有

讲过查询,logstash的配置文件可以读取外部设置的环境变量,那这个问题就妥了。

#!/bin/bash +x

#author:AmCoder

#设置logstash_path路径
logstash_path=/home/risen/logstash-5.4.3

#设置logstash_conf_path路径
logstash_conf_path=/home/risen/logstash-5.4.3/oracle2es.conf

if [ ! -n "$1" ] && [ ! -n "$2" ];then
        echo "Usage: sh oracle2es.sh 表名 主键"
        exit 1
fi

#由于logstash支持引用系统环境变量所以

##设置系统环境变量表名
export table_name=`echo $1 | tr 'a-z' 'A-Z'`

##设置系统环境变量表中的唯一主键
export unique_key=`echo $2 | tr 'a-z' 'A-Z'`

##设置索引名称(表名转小写--es中的索引不能是大写)
export es_index=`echo $table_name | tr 'A-Z' 'a-z'`

#启动logstash并指定配置文件
$logstash_path/bin/logstash -f $logstash_conf_path -w 5

只需要修改logstash的安装目录和配置文件的目录,然后将用到的公共变量(表名和主键)作为外部参数传入即可了。当然之前的logstash的配置文件Oracle2es.conf也是需要修改为以下的(局部地方采用变量的方式):

input {
        jdbc {
                jdbc_driver_library => "/home/risen/ojdbc.jar"
                jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
                jdbc_connection_string => "jdbc:oracle:thin:@ip:port:orcl"
                jdbc_user => "用户名"
                jdbc_password => "密码"
                schedule => "* * * * *"
                statement => "select * from ${table_name}"
                #增量标识字段名
                tracking_column => "${unique_key}"
                #是否使用字段值作为增量标识
                use_column_value => true
                #源表字段名导入ES后是否忽略大小写
                lowercase_column_names=> false
                #分页
                jdbc_paging_enabled => "true"
                #每页数据量
                jdbc_page_size => "5000"
                #默认时区
                jdbc_default_timezone => "UTC"
        }
}
output{
        elasticsearch {
                hosts =>["ip:port"]
                index => "${es_index}"
                document_type => "${es_index}"
                document_id => "%{${unique_key}}"
        }
}

至此,问题基本已经解决了,剩下的就是你需要拿到200多张表的表名称和主键了。大不了在写个脚本套一层,当然也可以在上面的脚本中添加即可。总结一下,需要知道logstash可全量可增量,还可以引用外部变量,既可以多线程又可以指定分页批次量。(但是,logstash目前还不支持国产服务器)

狭路相逢勇者胜!
原文地址:https://www.cnblogs.com/amcoder/p/13920680.html