logstash配置文件

日志收集流程如下

1.由log4j2打印日志到kafka队列
2.然后logstash在kafka上接收日志消息
3.再由logstash推送日志消息到elasticsearch

logstash配置文件

#接收kafka上的日志消息
input{
    kafka {
        bootstrap_servers => ["127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"]
        group_id => "ecs2"
        auto_offset_reset => "earliest"
        consumer_threads => "5"
        decorate_events => "false"
        topics => ["ms-job-console"]
        type => "ms-job-console"
        tags => ["ms-job-console"]
   }
}
#数据过滤
filter {
   if [type] == 'ms-job-console'{
     grok {
        match => { "message" => "%{GREEDYDATA:date} %{GREEDYDATA:timer} %{LOGLEVEL:loglevel} %{GREEDYDATA:service}  %{GREEDYDATA:className} [Class = %{GREEDYDATA:className1}] [File = %{GREEDYDATA:classFile}] [Line = %{NUMBER:classLine}] [Method = %{GREEDYDATA:classMethod}] [%{GREEDYDATA:logModule}] %{GREEDYDATA:log-context}"}
     }
   }
}
#输出配置为本机的9200端口,这是ElasticSerach服务的监听端口
output {
   if "ms-job-console" in [tags]{
     elasticsearch {
       hosts => ["127.0.0.1:9200"]
       index=>"ms-job-console-%{+YYYY.MM.dd}"
     }
  }
}

elasticsearch、logstash版本均为6.2.3

原文地址:https://www.cnblogs.com/yechen2019/p/12061364.html