Logstash

Logstash
在ELK三兄弟里,L是个特殊,EK才是亲兄弟,一个复制储存数据,一个复制展示数据,而L是一个控制数据进出的管道,ELK是指日记收集处理器,意思就是让Logstash从各种日志文件里把数据取出来,存进ES里,但是Logstash还可以从其他各种地方取得数据,也可以把数据存到很多的地方

Logstash的配置文件

  • 新建一个文件名xxx.conf
  • input,可以是日志文件,mysql,redis
  • filter
  • output,可以是es,redis,mysql
  • 执行logstash.bat -f xxx.conf启动服务

示例

  • 把nginx的日志存入redis
# 数据来自日志文件,比如tomcat,nginx对日志进行过滤可以获得一些黑名单什么的
# 日志文件如果是json格式,最好处理,如果不是json的,就需要在filter对象里进行特殊的过滤
# 往nginx的 http{} 里添加日志的格式,默认的日志不是json格式的
log_format json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"request":"$request",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"url":"$uri",'
'"referer":"$http_referer",'
'"agent":"$http_user_agent",'
'"status":"$status"}';
access_log /data/nginx/logs/access_json.log json;

# nginx2redis.conf
input {
    file {
        path => ['/data/nginx/logs/access_json.log']
        start_position => "beginning"
        codec => "json"
        tags => ['user']
        type => "nginx"
    }
}
output {
    if [type] == "nginx" {
        redis {
            host => "172.17.0.90"
            port => "6379"
            key => "nginx"
            db => "10"
            data_type => "list"
        }
    }
}
  • 把mysql的数据存入es
input {
   jdbc {
      // mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.10.1:3306/test"
      // 用户名和密码
      jdbc_user => "root"
      jdbc_password => "root"
      // 驱动(需单独下载)
      jdbc_driver_library => "/usr/local/mysql-connector-java-5.1.47-bin.jar"
      // 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      //处理中文乱码问题
      codec => plain {charset => "UTF-8"}
      //使用其它字段追踪,而不是用时间(这里是用来实现增量更新的)
      use_column_value => true
      //追踪的字段
      tracking_column => id
      //是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 
      last_run_metadata_path 指定的文件中
      record_last_run => true
      //上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/usr/local/logstash-6.2.2/bin/config-mysql/station_parameter.txt"
      //sql_last_value每次读取last_run_metadata_path中存放的值,下面语句增量更新是按照id值递增的顺序同步mysql中的内容
      statement => "select * from booklist where id > :sql_last_value"
      //开启分页查询
      jdbc_paging_enabled => true
      jdbc_page_size => 300
      //设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
   }
}
filter{
   //下面:当使用Logstash自动生成的mapping模板时过滤掉@timestamp和@version字段 
   mutate { remove_field => ["@timestamp","@version"] }
}
output {
   elasticsearch {
       //hosts:一般是localhost:9200
       hosts => ["localhost:9200"]
       //索引名
       index => "booklist"
       //表示按照id同步mysql数据
       document_id => "%{id}"
   }
   # 调试,打印出来看看
   stdout {}
}
  • 把redis的数据传到es里
input{
    redis {
       host => "192.168.1.202"
       port => "6379"
       password => 'test'
       db => '1'
       data_type => "list"
       key => 'elk-test'
       #这个值是指从队列中读取数据时,一次性取出多少条,默认125条(如果redis中没有125条,就会报错,所以在测试期间加上这个值)
       batch_count => 1
     }
}
output {
     elasticsearch {
       hosts => ['192.168.1.202:9200']
       index => 'redis-test-%{+YYYY.MM.dd}'
     }
}

更多的配置查看官网

原文地址:https://www.cnblogs.com/pengdt/p/13062361.html