logstash之mongodb-log

1、logstash6.5.3

配置收集mongodb的日志:

首先在mongodb服务器上部署filebeat收集日志并上传到logstash进行处理,然后上传到ES。

filebeat-conf:

- input_type: log

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /data/log/mongod.log
  tags: ['db']

output.logstash:
    hosts: ["10.1.1.12:5044"]

这里只给出主要的配置。

 

logstash-conf:

input {
        beats {
                port => "5044"
        }
}
filter {
      if 'db' in [tags] {
            grok {
                match => ["message","%{TIMESTAMP_ISO8601:timestamp}s+%{MONGO3_SEVERITY:severity}s+%{MONGO3_COMPONENT:component}%{SPACE}(?:[%{DATA:context}])?s+%{GREEDYDATA:body}"]
                remove_field => [ "message" ]
                remove_field => [ "beats_input_codec_plain_applied" ]
              }
           if [body] =~ "ms$"  {
                grok {
                  match => ["body","commands+%{DATA:collection}s+command:s+%{DATA:action}s+.*s+query:s+%{DATA:query}s+planSummary+.*s+%{NUMBER:spend_time:int}ms$"]
                 }
             }
            if [body] =~ "aggregate" {
                grok {
                   match => ["body","commands+%{DATA:collection}s+command:s+%{DATA:action}s+.*s+pipeline:s+%{DATA:pipeline}s+keyUpdates+.*s+%{NUMBER:spend_time:int}ms"]
                }
          }
            if [body] =~ "find" {
                grok {
                   match => ["body","commands+%{DATA:collection}s+command:s+%{DATA:action}s+.*s+filter:s+%{DATA:filter}s+planSummary+.*s+%{NUMBER:spend_time:int}ms"]
                }
             }
          date {
            match => [ "timestamp", "ISO8601" ]
            remove_field => [ "timestamp" ]
           }
         }
}

output {
      if 'db' in [tags] {
                 elasticsearch {
                        hosts => "192.4.7.16:9200"
                        index => "logstash-mongodb-slow-%{+YYYY-MM-dd}"
                 }
        }
  }

 

grok需要先进行测试,kibana6.3以后提供了grok debugger:

  

 

测试效果:

 

原文地址:https://www.cnblogs.com/cuishuai/p/10861932.html