利用logstash 把mysql 查询的数据定时自动导入ES (超简版)

上一篇我们已经完成Elasticsearch 和logstash安装,现在可以进行把数据从mysql 数据库同步es索引上

1、下载java 数据库连接池  

[root@localhost home]# cd /home
[root@localhost home]# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.13/mysql-connector-java-8.0.13.jar

2、安装logstash-input-jdb  (如果你是按上一篇说的安装的是7.1以上的logstash 版本,调过这一步,因为它已经集成了)

直接在logstash的安装目录bin下运行

./logstash-plugin install logstash-input-jdb
View Code

3、增加logstash配置

[root@localhost home]# vi /etc/logstash/conf.d/users.conf

把以下代码复制进去并保存

input {
  jdbc {
    jdbc_driver_library => "/home/mysql-connector-java-8.0.13.jar" # 刚刚下载的 mysql-connector-java的绝对路径
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://10.8.42.10:3306/school?useSSL=false&characterEncoding=utf8&serverTimezone=Asia/Shanghai" #数据库IP:10.8.42.10 ,端口:3306,库名:school
    jdbc_user => "root" #数据库用户名
    jdbc_password => "dfc52f" #数据库密码
    schedule => "* * * * *" #cron表达式,设置多久跑一次,5个*代表每分钟跑一次
    jdbc_paging_enabled => "true" #如果数据较多,可以设置分页查询
    jdbc_page_size => "200" #每页查询的行数
    record_last_run => true
    clean_run => false
    statement => "SELECT * FROM users WHERE updated_at >= :sql_last_value" #查询语句,会把该语句查询出的数据推给output设置的服务
    use_column_value => true #When set to true, uses the defined tracking_column value as the :sql_last_value. When set to false, :sql_last_value reflects the last time the query was executed.
    tracking_column_type => "timestamp" #用来跟踪数据变化的列类型:numeric 或者 timestamp
    tracking_column => "updated_at" #通过哪个列来跟踪是否有更新
    last_run_metadata_path => "/data/meta/users2_offset.txt" #指定上次运行的偏移量值 目录需提前建,users2_offset.txt 文件如果不存在,系统会自动建
  }
}
output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["127.0.0.1:9200"]
        # 索引名称 可自定义
        index => "users"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

运行logstah来把mysql数据推给es

/usr/share/logstash/bin/logstash -f --path.settings=/etc/logstash/conf.d/users.conf -t

正常的话看到没有报错,并有sql语句数据

 

 查看es上的索引

[root@localhost conf.d]# curl 'localhost:9200/_cat/indices?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .geoip_databases 4NHJwd3OSsmTu4rXj7Wmtw 1 0 42 45 48.2mb 48.2mb
yellow open users-fr-java8 IZfD2tbcRcirSga5KTjQSg 1 1 4 32 18.3kb 18.3kb
yellow open order_shopify iTj8aUqhRl2ulUXLM5-1fQ 1 1 9639 0 40.8mb 40.8mb
yellow open users _hYfruXyQUulZ5-TefybHg 1 1 125 49 653.9kb 653.9kb
yellow open user-6 7MxDLCMUQfe2pzrCCoDJHw 1 1 4 3339 172.1kb 172.1kb

------------END---------------------------

原文地址:https://www.cnblogs.com/jinshao/p/15698946.html