通过Logstash由MySQL和SQL Server向Elasticsearch导入数据

所需软件版本信息

说明
部分软件官网下载地址的下载速度很慢,如果有类似阿里云服务器的云服务器,
可先在云服务器上进行下载(超快),然后拷贝到本地。
如若网址无法访问,请安装 蓝灯
也可下载我已整理好的,下载链接如下:

说明
我已在本地“hosts”(C:WindowsSystem32driversetchosts)文件中添加“127.0.0.1 es”,
以下内容中的“es”请自行替换为“localhost”

安装ELK

  1. 安装 elasticsearch-6.0.0-beta2.msi
    注意安装路径,中间不要有中文和空格,这里我选择安装在C:ElasticElasticsearch
    其他三个文件(lib、data、log)也放在C:ElasticElasticsearch下(可通过勾选框一键修改);
    不安装其他插件,如x-pack(安装很慢,还需要配置以及license,后期可以加进去);
    在浏览器中访问“es:9200”,跳出类似如下内容说明正常(服务已正常启动):

clipboard.png

  1. kibana-6.0.0-beta2-windows-x86_64.ziplogstash-6.0.0-beta2.zip解压到C:Elastic
  2. 创建kibana的后台启动文件
    可直接下载上方提供的配置文件,或自行在C:Elastickibana-6.0.0-beta2-windows-x86_64in下创建RunKibana.vbs,编辑内容如下:

    1.  
      Set ws = CreateObject("Wscript.Shell")
    2.  
      ws.run "cmd /c kibana.bat",vbhide

    保存文件后双击“RunKibana.vbs”运行,然后在浏览器中访问“es:5601”。
    正常跳出kibana的界面且没有错误提示说明kibana服务已正常启动(如有问题,请自行谷歌解决问题,然后继续下面的操作)。

安装Java并配置环境变量

JAVA_HOME : C:Program FilesJavajdk1.8.0_131(请根据自己的安装路径进行替换);
CLASSPATH : .;%JAVA_HOME%lib;%JAVA_HOME%libd
Path : %JAVA_HOME%in;%JAVA_HOME%jrein
具体步骤可参照百度经验

安装MySQL

根据自身需求进行安装,记住用户名和密码。我这里设置为“root”,密码“123qweASD”。
创建数据库“forelk”,并在“forelk”下创建表“elktable”,之后插入数据:

  1.  
    CREATE DATABASE `forelk` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;
  2.  
     
  3.  
    CREATE TABLE `elktable` (
  4.  
    `elkid` int(11) NOT NULL,
  5.  
    `elkname` varchar(45) COLLATE utf8_bin DEFAULT NULL,
  6.  
    `elkage` int(11) DEFAULT NULL,
  7.  
    `elksex` tinyint(4) DEFAULT NULL,
  8.  
    `elkbirth` date DEFAULT NULL,
  9.  
    PRIMARY KEY (`elkid`)
  10.  
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  11.  
     
  12.  
    INSERT INTO `forelk`.`elktable`
  13.  
    (`elkid`,
  14.  
    `elkname`,
  15.  
    `elkage`,
  16.  
    `elksex`,
  17.  
    `elkbirth`)
  18.  
    VALUES
  19.  
    (111,
  20.  
    aa,
  21.  
    11,
  22.  
    1,
  23.  
    2006);
  24.  
     
  25.  
    INSERT INTO `forelk`.`elktable`
  26.  
    (`elkid`,
  27.  
    `elkname`,
  28.  
    `elkage`,
  29.  
    `elksex`,
  30.  
    `elkbirth`)
  31.  
    VALUES
  32.  
    (222,
  33.  
    bb,
  34.  
    22,
  35.  
    0,
  36.  
    1995);

准备MySQL的驱动

C:Elasticlogstash-6.0.0-beta2lib下新建mysqldriver文件夹,并将mysql-connector-java-5.1.44-bin.jar拷贝到mysqldriver文件夹中。

准备Logstash的配置文件

C:Elasticlogstash-6.0.0-beta2config下新建mysql.conf文件,编辑内容如下:

  1.  
    input {
  2.  
    jdbc {
  3.  
    jdbc_driver_library => "C:/Elastic/logstash-6.0.0-beta2/lib/mysqldriver/mysql-connector-java-5.1.44-bin.jar"
  4.  
    jdbc_driver_class => "com.mysql.jdbc.Driver"
  5.  
    jdbc_connection_string => "jdbc:mysql://es:3306/forelk?autoReconnect=true&useSSL=false"
  6.  
    jdbc_user => "root"
  7.  
    jdbc_password => "123qweASD"
  8.  
    schedule => "* * * * *"
  9.  
    jdbc_default_timezone => "Asia/Shanghai"
  10.  
    statement => "SELECT * FROM elktable;"
  11.  
    }
  12.  
    }
  13.  
    output {
  14.  
    elasticsearch {
  15.  
    index => "elkdb"
  16.  
    document_type => "elktable"
  17.  
    document_id => "%{elkid}"
  18.  
    hosts => ["es:9200"]
  19.  
    }
  20.  
    }

参数说明:

  • jdbc_driver_library:

数据库驱动路径,这里我填写的是绝对路径,可自行尝试相对路径;

  • jdbc_driver_class:

驱动名称;

  • jdbc_connection_string:
    数据库的连接字符串;
    forelk为数据库名;
    ?autoReconnect=true&useSSL=false自动重连并禁用SSL;
  • jdbc_user:

数据库用户名;

  • jdbc_password:

数据库密码;

  • schedule:

重复执行导入任务的时间间隔;

  • jdbc_default_timezone:

默认时区设置;

  • statement:

导入的表(查询SQL,可以过滤数据)

  • index:

索引名称(类似数据库名称);

  • document_type:

类型名称(类似数据库表名);

  • document_id:

类似主键;

  • hosts:

要导入到的Elasticsearch所在的主机;

执行导入

C:Elasticlogstash-6.0.0-beta2Shift+鼠标右键,选择在此处打开命令窗口(W)
执行binlogstash -f configmysql.conf,执行结果如下:

clipboard.png

执行查询

在Kibana的Dev Tools下执行GET elkdb/_search命令,
右侧的结果显示类似下图(我的MySQL表中插入了四条数据,所以这里导入的也是四条),
则说明导入成功。

clipboard.png

以上为从MySQL导入数据到Elasticsearch的过程


从SQL Server导入数据到Elasticsearch的过程与上方类似。

安装SQL Server 2016

自动谷歌/百度安装,记住用户名和密码。我这里的用户名为sa,密码为123qweASD
注意防火墙关闭或添加入站规则(开通1433端口)。
SQL Server 2016配置管理器中启用SQL Server网络配置MSSQLSERVER的协议中的Named Pipes,然后重启SQL Server(MSSQLSERVER)服务。
确认通过客户端可以通过用户名和密码正常连接数据库。
执行下面的脚本,创建DB并插入数据:

  1.  
    USE [master]
  2.  
    GO
  3.  
    CREATE DATABASE [elkdb]
  4.  
     
  5.  
    USE [elkdb]
  6.  
    GO
  7.  
    SET ANSI_NULLS ON
  8.  
    GO
  9.  
    SET QUOTED_IDENTIFIER ON
  10.  
    GO
  11.  
    CREATE TABLE [dbo].[elktable](
  12.  
    [elkid] [NVARCHAR](50) NOT NULL,
  13.  
    [elkname] [NVARCHAR](50) NULL,
  14.  
    [elkage] [INT] NULL,
  15.  
    [elksex] [BIT] NULL,
  16.  
    [elkbirth] [DATETIME] NULL
  17.  
    ) ON [PRIMARY]
  18.  
    GO
  19.  
    ALTER TABLE [dbo].[elktable] ADD CONSTRAINT [DF_elktable_elkid] DEFAULT (NEWID()) FOR [elkid]
  20.  
    GO
  21.  
    ALTER TABLE [dbo].[elktable] ADD CONSTRAINT [DF_elktable_elkbirth] DEFAULT (GETDATE()) FOR [elkbirth]
  22.  
    GO
  23.  
     
  24.  
    INSERT [dbo].[elktable] ([elkid], [elkname], [elkage], [elksex], [elkbirth]) VALUES (N'B0048C6E-05AD-4685-861F-E8F4A861D3AB', N'ssa', 123, 0, CAST(N'2017-09-08T09:39:12.567' AS DateTime))
  25.  
    GO
  26.  
    INSERT [dbo].[elktable] ([elkid], [elkname], [elkage], [elksex], [elkbirth]) VALUES (N'EAC8CC17-4E66-4C1C-964B-36D3CD875BDD', N'ssb', 456, 1, CAST(N'2017-09-08T09:39:23.090' AS DateTime))
  27.  
    GO
  28.  
    INSERT [dbo].[elktable] ([elkid], [elkname], [elkage], [elksex], [elkbirth]) VALUES (N'D672806D-6C4F-4CCB-9DCC-434A7ECDA083', N'ssc', NULL, 0, CAST(N'2017-09-08T09:39:33.050' AS DateTime))
  29.  
    GO
  30.  
    INSERT [dbo].[elktable] ([elkid], [elkname], [elkage], [elksex], [elkbirth]) VALUES (N'20DDA41B-A679-4627-B6ED-1D96AF953CF7', N'ssd', 789, NULL, CAST(N'2017-09-08T09:39:41.467' AS DateTime))
  31.  
    GO
  32.  
    INSERT [dbo].[elktable] ([elkid], [elkname], [elkage], [elksex], [elkbirth]) VALUES (N'6F9F8CD8-FCFB-4D88-B191-40DFD360C8C3', NULL, 135, NULL, CAST(N'2017-09-08T09:39:57.353' AS DateTime))
  33.  
    GO

准备SQL Server的驱动

与MySQL类似,在C:/Elastic/logstash-6.0.0-beta2/lib下创建sqlserverdriver文件夹,并将mssql-jdbc-6.2.1.jre8.jar拷贝到该文件夹下。

准备SQL Server的配置文件

C:/Elastic/logstash-6.0.0-beta2/config下创建sqlserver.conf,编辑内容:

  1.  
    input {
  2.  
    jdbc {
  3.  
    jdbc_driver_library => "C:/Elastic/logstash-6.0.0-beta2/lib/sqlserverdriver/mssql-jdbc-6.2.1.jre8.jar"
  4.  
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
  5.  
    jdbc_connection_string => "jdbc:sqlserver://es:1433;databaseName=elkdb;"
  6.  
    jdbc_user => "sa"
  7.  
    jdbc_password => "123qweASD"
  8.  
    schedule => "* * * * *"
  9.  
    jdbc_default_timezone => "Asia/Shanghai"
  10.  
    statement => "SELECT * FROM elktable"
  11.  
    }
  12.  
    }
  13.  
    output {
  14.  
    elasticsearch {
  15.  
    index => "sselkdb"
  16.  
    document_type => "sselktable"
  17.  
    document_id => "%{elkid}"
  18.  
    hosts => ["es:9200"]
  19.  
    }
  20.  
    }

参数说明同MySQL配置文件中的说明一致。

执行导入

C:Elasticlogstash-6.0.0-beta2Shift+鼠标右键,选择在此处打开命令窗口(W)
执行binlogstash -f configsqlserver.conf,执行结果类似MySQL。

执行查询

在Kibana的Dev Tools下执行GET sselkdb/_search命令,
查询结果类似MySQL。

参考文档

声明

如需转载,请注明原文链接https://segmentfault.com/a/11...

原文地址:https://www.cnblogs.com/wangwenlong8/p/13022121.html