Logstash配置

Logstash配置

一般logstash更常用于ELK日志监控系统,作为数据管道

Logstash:接收,处理,转发日志

Elasticsearch:文档型数据库,实时的分布式搜索和分析引擎

Kibana:查询、生成报表的web GUI

Logstash 有两个必要元素:input 和 output ,一个可选元素:filter。

input:采集数据,指定数据来源。可以是文件路径,也可以是kafka的某个topic、redis。

filter:筛选、处理数据,剔除多余的数据,生成想要的索引。

output:将数据传输到目的地,可以是具体文件路径,也可以是kafka、redis。

配置

在安装目录bin文件夹里新建一个logstash.conf,用来配置上述所说的输出输出等,将会覆盖logstash.yml默认设置。
一般配置如下:

# 输入输入
input { stdin {} }
# 数据处理
filter {
    grok {
        match => ["message",  "%{COMBINEDAPACHELOG}"]
    }
}
# 数据输出
output { stdout { codec => rubydebug } }

配置文件语法类似Ruby

filter

filter中的配置,基本格式如下,plugin都是官方提供的,针对文本做处理,比如正则表达式啊(grok),按固定的格式做切分(kv)等等。选择正确的plugin可以更快速的帮助你解析日志

filter {
    plugin {
        XX => "YY"
    }

    if expression {
        plugin {
             XX => "YY"
        }
    } else if expression {
        plugin {
            XX => "YY"
        }
    } else {
        plugin {
            XX => "YY"
        }
    }

    plugin {
        XX => "YY"
    }
}

关于插件

1、grok:正则表达式插件,功能强大,有许多内置的pattern

以何种规则从字符串中提取出结构化的信息,grok是logstash里的一款插件,可以使用正则表达式匹配日志,上文中的%{COMBINEDAPACHELOG}是内置的正则,用来匹配apache access日志
默认正则:

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([A-a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
2、mutate:字段的CRUD操作,比如替换内容,截取等等
mutate {
    split=>{"error_request_info"=>",fromuid"}
    add_field=>{"error_uri"=>"%{error_request_info[0]}"}
    remove_field=>["error_request_info"]
} 
3、kv:key-value插件,非常适合URL参数解析一类的具有固定分隔符的日志
#比如解析URL的querystring: a=1&b=2&c=3

filter {
    kv {
        field_split => "&"
    }
}
4、json:将json字符串直接转换成对应的key-value
#比如日志为:xxxxxxxxx:xxxxxx&result={"data":"1"}:xxxxxx
filter {
    json {
        #假设数据通过grok预处理,将result内容捕获
        source => "result"
    }
}
#通过json encode后
{
  "data": "1"
}

5、drop:直接丢掉本行日志,过滤不符合要求的日志
if "/app/log/logfront" in [content] {
        # 特定的处理
        drop {}  
}

output

配置保存解析结果

  • elasticsearch:将事件数据发送给 Elasticsearch(推荐模式)。
  • file:将事件数据写入文件或磁盘。
  • statsd:将事件数据发送到 statsd (这是一种侦听统计数据的服务,如计数器和定时器,通过UDP发送并将聚合发送到一个或多个可插入的后端服务)
原文地址:https://www.cnblogs.com/xiao-xue-di/p/11763964.html