通过Logstash由SQLServer向Elasticsearch同步数据

1下载与Elasticsearch对应版本Logstash7.13.2 与数据库驱动JDBC

下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.13.2-windows-x86_64.zip
JDBC  https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15

2.Logstash下载完成,解压,在bin文件下面创建jdbcconfig文件夹

 3.Logstash配置

在jdbcconfig文件夹下面创建jdbc.conf,如图:

配置如下:


input {
stdin {
}
jdbc {
#数据库驱动所在位置,可以是绝对路径或者相对路径
jdbc_driver_library => "D:softwareElasticsearchlogstash-7.13.2injdbcconfigmssql-jdbc-9.2.1.jre8.jar"
#驱动类名
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
#数据库连接
jdbc_connection_string => "jdbc:sqlserver://testmssql.const.cc;DatabaseName=CstCRMTest;"
#用户
jdbc_user => "sa"
#密码
jdbc_password => "const-123456"
# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
schedule => "* * * * *"
#sql语句
statement => "SELECT [Id] as id
,[QuoteCode] as quoteCode
,[QuoteName] as quoteName
,[CustomerId] as customerId
,[QuotePerson] as quotePerson
,[UserId] as userId
,[QuotePhone] as quotePhone
,[PayType] as payType
,[QuoteVailDate] as quoteVailDate
,[LeadTime] as leadTime
,[QuoteDate] as quoteDate
,[CompanyPhone] as companyPhone
,[CompanyAddress] as companyAddress
,[CompanyUrl] companyUrl
,[Remark] remark
,[CreatedTime] createdTime
,CONVERT (VARCHAR (30),UpdatedTime,25) AS updatedtime
,[CreatedUser] createdUser
,[UpdatedUser] updatedUser
FROM [CstCRMTEST].[dbo].[T_Quote] where updatedtime>:sql_last_value"
#或者sql可执行文件
#statement_filepath => "路径"
#是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
use_column_value => true
#tracking_column 对应字段的类型
tracking_column_type => "timestamp"
#如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
tracking_column => "updatedtime"
#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 记录上一次追踪的结果值
last_run_metadata_path => "D:softwareElasticsearchlogstash-7.13.2injdbcconfigupdatedTime.txt"
# 索引类型
#type => "_doc"
# 数据库字段名称大写转小写
lowercase_column_names => false
#是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
#clean_run :
}
}


output {
elasticsearch {
# ES的IP地址及端口
hosts => ["localhost:9200"]
# 索引名称 可自定义(只可以小写)
index => "quote"
document_type => "_doc"
# 需要关联的数据库中有有一个id字段,对应类型中的id
document_id => "%{id}"

}
stdout {
# codec => json_lines
#设置输出的格式
codec => line {
format => "updatedtime: %{[updatedtime]}"
}
}

}

statement : sql 里面的字段首字母必须as 成小写,或者你直接小写也行,但是必须的小写。

配置完成   bin 目录下,执行.logstash -f .jdbcconfigjdbc.conf --path.data=/jdbcconfig/

无穷的伟大,也是从“0”开始的
原文地址:https://www.cnblogs.com/wxxf/p/15061452.html