logstash的filter

https://www.cnblogs.com/dyh004/p/9699813.html
官方文档https://www.elastic.co/guide/en/logstash/6.7/event-dependent-configuration.html#conditionals
https://windcoder.com/logstash6peizhiyufazhongdetiaojianpanduan
http://www.ttlsa.com/elk/elk-logstash-configuration-syntax/
logstash的filter的支持双引号 单引号,都可以,需要注意的是在grok的时候如果匹配字符串是双引号需要转义"

logstash的输出,如果有多个es地址,可以放到['es-node1','es-node2']

filter中的判断

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

如若action的值是login,则通过mutate filter删除secret字段:
filter {
  if [action] == "login" {
    mutate { remove_field => "secret" }
  }
}

若是需要通过正则匹配(即,当if的表达式为正则时),匹配成功后添加自定义字段:
filter {
    if [message] =~ /w+s+/w+(/learner/course/)/ {
        mutate {
            add_field => { "learner_type" => "course" }
        }
    }
}
在一个条件里指定多个表达式:
output {
  # Send production errors to pagerduty
  if [loglevel] == "ERROR" and [deployment] == "production" {
    pagerduty {
    ...
    }
  }
}

在in条件,可以比较字段值:
filter {
  if [foo] in [foobar] {
    mutate { add_tag => "field in field" }
  }
  if [foo] in "foo" {
    mutate { add_tag => "field in string" }
  }
  if "hello" in [greeting] {
    mutate { add_tag => "string in field" }
  }
  if [foo] in ["hello", "world", "foo"] {
    mutate { add_tag => "field in list" }
  }
  if [missing] in [alsomissing] {
    mutate { add_tag => "shouldnotexist" }
  }
  if !("foo" in ["hello", "world"]) {
    mutate { add_tag => "shouldexist" }
  }
}

not in示例:
output {
  if "_grokparsefailure" not in [tags] {
    elasticsearch { ... }
  }
}


在logstash1.5版本开始,有一个特殊的字段,叫做@metadata。@metadata包含的内容不会作为事件的一部分输出
input { stdin { } }

filter {
  mutate { add_field => { "show" => "This data will be in the output" } }
  mutate { add_field => { "[@metadata][test]" => "Hello" } }
  mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
  if [@metadata][test] == "Hello" {
    stdout { codec => rubydebug }
  }
}
输出结果
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
    "@timestamp" => 2016-06-30T02:42:51.496Z,
      "@version" => "1",
          "host" => "windcoder.com",
          "show" => "This data will be in the output",
       "message" => "asdf"
}
"asdf"变成message字段内容。条件与@metadata内嵌的test字段内容判断成功,但是输出并没有展示@metadata字段和其内容。

不过,如果指定了metadata => true,rubydebug codec允许显示@metadata字段的内容。
stdout { codec => rubydebug { metadata => true } }
输出结果
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
    "@timestamp" => 2016-06-30T02:46:48.565Z,
     "@metadata字段及其子字段内容。" => {
           "test" => "Hello",
        "no_show" => "This data will not be in the output"
    },
      "@version" => "1",
          "host" => "windcoder.com",
          "show" => "This data will be in the output",
       "message" => "asdf"
}

现在就可以见到@metadata字段及其子字段内容。
只有rubydebug codec允许显示@metadata字段的内容。
只要您需要临时字段但不希望它在最终输出中,就可以使用@metadata字段。
最常见的情景是filter的时间字段,需要一临时的时间戳。如:
input { stdin { } }

filter {
  grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
  date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}

output {
  stdout { codec => rubydebug }
}
输出结果
$ bin/logstash -f ../test.conf
Pipeline main started
02/Mar/2014:15:36:43 +0100
{
    "@timestamp" => 2014-03-02T14:36:43.000Z,
      "@version" => "1",
          "host" => "windcoder.com",
       "message" => "02/Mar/2014:15:36:43 +0100"
}

一些示例:

input{
    file {
        path => "/usr/share/logstash/wb.cond/test.log"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}
filter{
    mutate {
        gsub =>[
            "message", "'", '"'
        ]
    }
    json {
        source => "message"
    }
    mutate {
        convert => {
            "usdCnyRate" => "float"
            "futureIndex" => "float"
        }
    }
    date {
        match => [ "timestamp", "UNIX_MS" ]
        target => "logdate"
    }
}
output{
    stdout{
        codec=>rubydebug
    }
}

input {
  file {
    path => ["/data/test_logstash.log"]
    type => ["nginx_log"]
    start_position => "beginning"
  }
}
filter {
  if [type] =~ "nginx_log" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:create_time} %{IP:server_ip}  %{URIPATH:uri} %{GREEDYDATA:args}   %{IP:client_ip}  %{NUMBER:status}" }
    }
    urldecode{
    field =>args
    }
    kv {
    source =>"args"
    field_split =>"&"
    remove_field => [ "args","@timestamp","message","path","@version","path","host" ]
    }
    json {
        source => "yc_log"
        remove_field => [ "yc_log" ]
    }
    mutate {
      add_field => { "lg_value" => "%{lg_vl}" }
    }
    json {
        source => "lg_value"
        remove_field => [ "lg_vl","lg_value" ]
    }
  }
}

output {
  stdout { codec => rubydebug }
}


匹配失败

对于logstash的过滤,如果在match里面指定的正则语句对输入的日志信息无法过滤,会默认的添加一个tag字段,并且赋值 _grokparsefailure的字段
对于这种没有匹配成功的日志信息,可以选择丢弃

对于nginx的正则匹配。匹配不到通过tag字段中的值进行判断移除
filter {
    grok {
        match => {"message" => "%{IPV4:remote_addr} - %{DATA:nginx.access.user_name} [%{HTTPDATE:nginx.access.time}] "%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} "%{DATA:nginx.access.referrer}" "%{DATA:nginx.access.agent}" "%{DATA:nginx.access.other}""}
}
    if "_grokparsefailure" in [tags] {
        drop {}
}
}
原文地址:https://www.cnblogs.com/cizao/p/12549451.html