Logstash使用grok插件解析Nginx日志

grok表达式的打印复制格式的完整语法是下面这样的:

%{PATTERN_NAME:capture_name:data_type}
data_type 目前只支持两个值:int 和 float。

在线gork正则的地址:http://grokdebug.herokuapp.com/
Logstash基础正则地址:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

也可以在你的安装路径下查找grok-patterns内置的正则表达式:

/usr/local/logstash-6.2.4/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

例如:

URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%&_-]*)+
URIPARAM ?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?-[]<>]*

-----------------------------------------------------------------------------------------

logstash-filter-mutate 插件是Logstash 另一个重要插件,它提供了丰富的基础类型数据处理能力,包括类型转换,字符串处理和字段处理等

可以设置的转换类型包括:"integer","float" 和 "string"。示例如下:

filter {
    mutate {
        convert => ["request_time", "float"]
        gsub => [ "message", "aa", "" ]  字符串替换,此处为清除
    }
}

注意:mutate 除了转换简单的字符值,还支持对数组类型的字段进行转换,即将 ["1","2"] 转换成 [1,2]。但不支持对哈希类型的字段做类似处理。

对已有索引进行修改并且平滑过渡

        mutate {
            convert => [ "request_time", "float" ]
            add_field => [ "response_time", "%{request_time}" ]
            remove_field => [ "request_time" ]         
        }

--------------------------------------------------------------------------------

在线grok表达式解析地址:http://grokdebug.herokuapp.com/  NGINXACCESS为规则名称,测试时不用填入)

根据已经生成的日志记录逐个参数进行调试即可。

需要注意的是当日志或可能出现“-”时,必须在NGINXACCESS中指定,例如: (?:%{NUMBER:serverelapsed}|-)

需要修改的文件位置:/usr/local/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

如果找不到的话,grep -rw NGINXACCESS /usr/local/logstash

首先对nginx日志格式设定如下:

log_format  main  '$remote_addr - $remote_user [$time_local] "$request_method $uri $query_string" $status $body_bytes_sent "$http_referer" "$http_user_agent" $request_time $upstream_response_time "$http_x_forwarded_for" ';

Logstash Grok默认的参数为:

NGUSERNAME [a-zA-Z.@-+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} [%{HTTPDATE:timestamp}] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response}  (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent} %{QS:xforwardedfor} %{IPORHOST:host} %{BASE10NUM:request_duration}

 我这边设置的是:

#nginx
WZ ([^ ]*)
NGINXACCESS %{IP:remote_ip} - - [%{HTTPDATE:timestamp}] "%{WORD:method} %{WZ:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:status} %{NUMBER:bytes} (?:%{QS:referer}|-) %{QS:agent} %{NUMBER:elapsed} (?:%{NUMBER:serverelapsed}|-) %{QS:xforward}

 或拆分请求的uri和请求参数:此处需要注意的是必须确保NGINXACCESS规则是一整行

WZ ([^ ]*)
URIPARAM [A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?-[]<>]*  #此处重新定义该匹配规则,不匹配参数开头的?
NGINXACCESS %{IP:remote_ip} - - [%{HTTPDATE:timestamp}] "%{WORD:method} (%{URIPATH:request}|-|) (%{URIPARAM:requestParam}|-)" %{NUMBER:status}
%{NUMBER:bytes} %{QS:referer} %{QS:agent} %{NUMBER:elapsed} %{NUMBER:serverelapsed} %{QS:xforward}

  

自定义规则时也可以在/usr/local/logstash主目录下新建一个目录:patterns,并在该目录下新建一个grok的表达式解析文件nginx,内容同上,也可以实现相同的目的。

Nginx logstash配置文件:

input{
  file{
    path => "/usr/local/nginx/logs/access.log"
    type => "nginx"
    start_position => "beginning"
  }
 
  file{
    path => "/usr/local/nginx/logs/admin.log"
    type => "admin-nginx"
    start_position => "beginning"
  }
}

filter {  
    grok {  
      match => { "message" => "%{NGINXACCESS}" }
    }
    mutate {
      convert => [ "elapsed", "float" ]
      convert => [ "serverelapsed", "float" ]
    }  
}
 
output{
  if [type] == "nginx" {
    elasticsearch {
    hosts=> ["172.17.102.202:9200"]
    index=> "nginx"
    }
  }
  else {
    elasticsearch {
    hosts=> ["172.17.102.202:9200"]
    index=> "admin-nginx"
    }
  }
}
原文地址:https://www.cnblogs.com/wjoyxt/p/9606269.html