ELK docker 安装logstash

logstash 是什么。

个人理解。就3个东西 输入input,输出output,过滤转换filter。

其中input可有有多种类型来源:beat网络传入,jdbc数据库查询来源,file来源文件。

什么时候用什么场景用。

因为logstash内存、cpu消耗非常大。所以每个机器上安装它去采集信息不合理。而可以结合其他的数据源轻便的,比如mq队列kafka、文件采集filebeat。

beat:通过端口传入信息。很多个多点都往同一个logstash中传输。比如分布式日志采集的时候,用filebeat。

jdbc:我们需要从数据库中定时查询出某些数据,放入其他的存储库中,比如文件,nosql库,es库等。。一般用于搜索引擎数据同步

file:采集本机文件入库。。。比如某些订单,某些日志。。。

Logstash 下载安装。

运行之前,需要准备好挂载目录中的内容上传到宿主机中:

配置文件logstash.yml
# 配置文件都在目录 conf.d/*.conf中,意味着,所有.conf结尾的都是配置文件,都会执行
path.config: /usr/share/logstash/conf.d/*.conf

# 指定日志目录
path.logs: /usr/share/logs
conf.d/*.conf 放了些什么东西。。
我这里放两个文件,
1是 filebeat.conf,采集文件的文件规则
input {
    beats {
        port => 5044
        codec => "json"
    }
}

filter {
    date {
       match => [ "@timestamp" , "yyyy/MM/dd:HH:mm:ss Z" ]
    }
    mutate { 
        remove_field => ["host","ecs","json","agent","input","log","@version","tags"]
    }
}

output {
  elasticsearch { 
    hosts => ["www.baidu.com:9200","www.baidu.com:9201"]
    user => "elastic"
    password => "elastic"
    index => "%{[fields][systemid]}-%{+YYYY.MM.dd}" 
  }
}



2是 syncmysql.conf,数据表同步到es的文件规则
input {
    jdbc {    
    # 数据类型,类似定义当前查询的这个类型是什么,如有多个input,那么可通过这个type值进行区分
        type => "jdbc_es_search"
    # 数据库连接
        jdbc_connection_string => "jdbc:mysql://www.baidu.com:9205/esdb"
        jdbc_user => "root"
        jdbc_password => "123456"
    # 驱动信息
        jdbc_driver_library => "/usr/share/logstash/myfile/mysql-connector-java-6.0.6.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 分页信息    
        jdbc_paging_enabled => "true"
        jdbc_page_size => "1000"
    # 查询语句,注意字段小写,不能出现大写,否则异常
        statement => "SELECT * FROM t_b_estate_search where c_time_update> :sql_last_value order by c_time_update asc"
    # 任务执行频率,最小力度是分
        schedule => "*/1 * * * *"
    # 转码utf8
          codec => plain { charset => "UTF-8"}
    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run => true
    # 只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值    
        last_run_metadata_path => "/usr/share/logstash/myfile/station_time.txt"
    # 使用查询的字段值作为最后查询时间戳
        use_column_value => true
    # 使用的字段,一定要小写
        tracking_column => "c_time_update"
    # 数字或时间戳,一般都是时间戳
        tracking_column_type => "timestamp" 
    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
    }
}
 
filter {
    date {
        match => ["create_at", "yyyy-MM-dd HH:mm:ss,SSS", "UNIX"]
        target => "@timestamp"
        locale => "cn"
    }
}
 
output {
    if [type] == "jdbc_es_search" {
        elasticsearch { 
            hosts => ["www.baidu.com:9200","www.baidu.com:9201"]
            index => "mses-%{+YYYY.MM.dd}" 
            # pk_id 为数据库表字段,指定为es的ID信息
            document_id => "%{pk_id}"
        }
        stdout {
            codec => json_lines
        }
    }
}



myfile目录,有mysql驱动,一个空白的文件,用于记录数据库采集同步时间
 

安装运行命令:

// 意思 运行这个(logstash:7.1.1,有就执行,没有就下载后执行),如果异常了,自动重启(随机器启动过后重启),限制运行内存在1000m 后台运行-it -d ,
// 对外映射顿口5044,9600,容器名字叫logstash,挂载几个目录,
--privileged=true 让容器有特权写入同步到宿主机
docker run --restart=always  -m 1000M 
-it -d -p 5044:5044 -p 9600:9600 --name logstash 
-v /home/soft/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml 
-v /home/soft/logstash/conf.d/:/usr/share/logstash/conf.d/ 
-v /home/soft/logstash/myfile/:/usr/share/logstash/myfile/ 
-v /home/soft/logstash/logs/:/usr/share/logs/ 
--privileged=true 
logstash:7.1.1
原文地址:https://www.cnblogs.com/a393060727/p/12573203.html