记一次docker安装logstash,并且读取数据库数据到es

docker安装logstash,在hub.docker官网是没有示例的。查了文章,大部分复制黏贴,语焉不详的。看着懵,虽然经过复制黏贴操作启起来了,但还是很多不理解。回想下不用docker安装的logstash,对比了下大致有点理解了。可自己配置run,还是启动没一会自动停止了。懊恼不已。

刚才仔细对比,小心求证发现了问题所在。貌似logstash启动要使用交互模式,即启动语句里要加上 -it。否则就会启动后停止。

另外查看资料,官网说的需要配置两个文件,有一个叫pipelines.yml的。

按照说明配置了几次都有问题,看启动日志,貌似自己配的  pipelines文件都没有被读取。后来发现是因为版本问题。我自己用的5.6.12版本不存在此文件的。而官网在5版本也说要配置这个,我就纳闷了。下载了6版本的logstash,发现conf文件夹下存在这么个文件。这下就明了了,我的5.6.12只需要只需要写一个conf后缀的文件即可,只有6版本的才需要另外配置管道文件pipelines,来指定需要读取哪个conf文件。

这是5版本的conf文件夹内容

这是6版本的conf文件夹下内容:

我此次是要读取数据库的数据到es中,

 上传了一个mysql的驱动jar文件。自定义一个mysql01.conf文件:

input {
  jdbc {
    jdbc_driver_library => "/home/kf/soft/logstash-5.6.12/config/mysql/mysql-connector-java-5.1.25.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.0.4:3306/test?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "root"
    schedule => "* * * * *"
    statement => "SELECT id,username,password,password_salt,status,insertTime,updateTime FROM users WHERE updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_users_last_time"
    type => "users"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
}
output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.88.130:9200"]
        # 索引名称 可自定义
        index => "users"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
        document_type => "users"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

因为elk是在虚拟机搭建的,而mysql是我本机windows的,一开始链接报错,链接被拒绝。mysql是默认只允许在本地链接的,需要在mysql更改权限:

例如,你想myuser使用mypassword从任何主机连接到mysql服务器的话。
%代表允许所有域的连接
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
要是本机连mysql连不上,再添加localhost访问权限,本机就可以登录了
grant all privileges on luffy.* to 'luffy'@'localhost' identified by 'luffy';
设置完有权限限制的账号后一定要刷新权限,如果没刷新权限,该终端无法被通知,当然也可以直接重启cmd
FLUSH PRIVILEGES; 

 此时可以链接mysql了。但是一直报错,提示:

而我的数据库中insertTime,updateTime字段都是这样的,驼峰命名。百度了下,有说需要把配置文件mysql01.conf中的tracking_column => "updateTime",updateTime用双引号括起来的,我本身就是括起来的,有说需要把查询字段放在select语句中的,我也放了。还是不行,然后看报错行上方的语句,logstash读取时强制把驼峰命名的T读成了小写t。然后查资料在mysql01配置文件中改加了一句

# 是否将 字段(column) 名称转小写
lowercase_column_names => false

这时候可以读取了。估计是logstash默认把查询字段全部转小写了。

last_run_metadata_path => "./last_record/logstash_users_last_time"

这个是存储最后更新时间的文件的。开始没有创建last_record文件夹启动会报找不到文件,此时需要手工创建一下即可。

至此,我的logstash读取数据库数据到es完成。

在测试环境下配置连接的mysql也是docker容器的。

启动logstash容器是需要指定配置文件,命令是:

docker run -d -p 5044:5044 -p 9600:9600 -it --name logstash --network ELS -v /home/smartcity/logstash/config/:/usr/share/logstash/config/  logstash:5.6.13 -f /usr/share/logstash/config/union_blueplus.conf

启动后,logstash日志报错:

Unable to connect to database. Trying again {:error_message=>"Java::ComMysqlJdbcExceptionsJdbc4::CommunicationsException: Communications link failure The last packet successfully received from the server was 5,867 milliseconds ago.  The last packet sent successfully to the server was 5,867 milliseconds ago."}

查了好久,很多说是要该mysql配置的,明显不合理。到墙外看到将jdbc插件的,我也查了版本,是最新的4.3.13版本。

最后跟网上的做对比,排查出是因为url地址多加了ssl的参数导致的

多表导入:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select sal.alarmID, sal.villageID, 
            sal.deviceType,sal.alarmTypeName,sal.modelID,
            sal.alarmLevel,sal.alarmState,sal.alarmTime,
            sal.alarmContent,de.installAddr,sal.updateTime 
            from e_sense_alarm_log sal left join e_device de on de.deviceID = sal.deviceID 
            WHERE sal.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_alarm_last_time"
    type => "alarm"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
  
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select de.deviceID, de.isDelete, de.villageID as villageid, de.installAddr as installadd, de.type as devicetype,
            de.buildingID,bu.buildingNo as  buildingno, bu.name as buildingName, de.productModel as productmodel, de.name as deviceName, de.code as code,
            de.state,  de.updateTime as updatetime from e_device de left join b_building bu on de.buildingID = bu.buildingID 
            WHERE de.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_device_last_time"
    type => "device"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
  
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select  al.accessLogID, al.villageID as villageid, al.peopleName as peoplename, peo.gender, peo.phoneNo as phoneno,    
                  al.credentialNo as credentialno, lab.name as peoplelabel, bu.buildingNo as buildingno, al.buildingID as buildingid, 
                  al.cardNo as cardno, al.openType as opentype, al.updateTime as opentime                
                  from e_access_log  al
                  left join p_people peo on peo.credentialNo =al.credentialNo
                  left join p_people_label pl on pl.peopleID = peo.peopleID 
                  left join s_label lab on lab.labelID = pl.labelID
                  left join b_building bu on bu.buildingID = al.buildingID  
                WHERE al.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_accessLog_last_time"
    type => "accessLog"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
  
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select fl.faceLogID,io.type as faceinouttype, io.villageID as villageid, io.ioID as ioid,
                  fl.personType as persontype, peo.peopleName as peoplename, peo.phoneNo as phoneno,
                  peo.credentialNo as credentialno,  sl.name as peoplelabel, fl.updateTime as                   facecapturetime
                  from e_face_log  fl 
                  left join b_in_out io on io.ioID = fl.ioID
                  left join p_people peo on peo.credentialNo = fl.credentialNo
                  left join p_people_label pl on pl.peopleID = peo.peopleID 
                  left join s_label sl on sl.labelID = pl.labelID
                  where  fl.updateTime >= :sql_last_value  and fl.faceSource = 0"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_wkface_last_time"
    type => "wkface"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }  
    jdbc {
    jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
    #&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "Tsl@2018"
    #使用前验证连接是否有效
    jdbc_validate_connection => true
    #多久进行连接有效验证(4小时)
    jdbc_validation_timeout => 14400
    #连接失败后最大重试次数
    connection_retry_attempts => 50
    #连接失败后重试时间间隔
    connection_retry_attempts_wait_time => 1
    jdbc_page_size => "2000"    
    schedule => "* * * * *"
    statement => "select pr.parkingReserveID, pr.villageID as villageid, io.ioID as inioid,
                    io.ioID as outioid, pr.inParkingLogID, pr.outParkingLogID, pr.carBrand as cartype,
                    pr.plateNo as plateno, peo.peopleName as peoplename, peo.phoneNo as phoneno,
                    peo.credentialNo as credentialno, pr.updateTime as intime
                    from e_parking_reserve pr
                    left join e_parking_channel pc on pc.parkingID = pr.parkingID
                    left join b_in_out io on io.ioID = pc.ioID
                    left join e_parking_car ec on ec.plateNo = pr.plateNo
                    left join p_people peo on peo.peopleID = ec.peopleID
                  where  pr.updateTime >= :sql_last_value"
    #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    record_last_run => true
    use_column_value => true
    tracking_column => "updateTime"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "./last_record/logstash_wkcar_last_time"
    type => "wkcar"
    # 是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }

}
output {
    if [type] == "alarm"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "alarmlogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{alarmID}"
        document_type => "alarm"
        }
    }
    if [type] == "device"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "deviceindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{deviceID}"
        document_type => "device"
        }
    }
    if [type] == "accessLog"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "accesslogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{accessLogID}"
        document_type => "accessLog"
        }
    }
    if [type] == "wkface"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "facelogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{faceLogID}"
        document_type => "wkface"
        }
    }
    if [type] == "wkcar"{
         elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.66.34:9200"]
        # 索引名称 可自定义
        index => "parkingreservelogindex"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{parkingReserveID}"
        document_type => "wkcar"
        }
    }  
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

 参考自:https://blog.csdn.net/u010887744/article/details/86708490

原文地址:https://www.cnblogs.com/fuguang/p/11511834.html