通过elasticsearch对日志进行搜索热词统计

通过logstash搜集日志

这里搜集日志可以使用ELK的一个插件filebeat对日志进行处理,并传输到后端的程序
在这里有一个不好的地方, 如果想要直接使用filebeat将日志发送到elasticsearch的话, 它并不能对任何字段进行替换等处理
比较明显的问题就是, 一般我们需要将@timestamp替换成日志里面的时间而不是程序对日志的处理时间, 这一点它无法做到
还有一点, 使用filebeat对多行日志进行处理时似乎会发生日志收集错乱的现象, 这个问题有待测试, 因为filebeat程序是自带处理多行日志的
当然好处也是有点, 可以比较省资源

input {
    file {
        path => "/tmp/test.log"
        add_field => {"area"=>"beijing"}
        codec => multiline {
            pattern => "^["
            negate => true
            what => previous
        }
    }
}
filter {
    grok {
            match => { "message" => "^[(%{WORD:loglevel}s+)?%{TIMESTAMP_ISO8601:timestamp}?(?<file>[^@]+)s+@s+(?<pid>[^]]+)]s+-s+?%{GREEDYDATA:result}" }
            remove_field => [ "message" ]
    }

    if ([result] =~ "visitlog|") {
        mutate {
            split => ["result","visitlog|"]
                add_field => {
                    "field2" => "%{[result][1]}"
                }
                remove_field => [ "result" ]
        }

        json {
            source => "field2"
            target => "results"
            remove_field => [ "field2" ]
        }
        date {
            match => [ "[results][reqTime]", "yyyy-MM-dd HH:mm:ss" ]
        }
    }
}
output {
    elasticsearch {
        hosts => [ "127.0.0.1:9200" ]
        index => "logstash-name-%{+YYYY.MM.dd.HH}"
        flush_size => 20
        idle_flush_time => 3
        sniffing => true
        template_overwrite => true
    }
}
output {
    stdout {
        codec => rubydebug
    }
}

上面是一个logstash的配置文件,处理的日志格式大概是这样的

[ERROR 2017-05-04 10:12:24,281 ./connect_info.py:336 @ 8299] - socket send and recieve Error: Traceback (most recent call last):
  File "./connect_info.py", line 305, in get_request
    retdata['handstr']=unpack('10s',client_socket.recv(10) )
error: unpack requires a string argument of length 10

[INFO 2017-05-04 10:12:24,282 ./connect_info.py:84 @ 8299] - before doing clean up...
[INFO 2017-05-04 10:12:24,282 ./connect_info.py:92 @ 8299] - end clean up.
[INFO 2017-05-04 10:12:24,289 ./connect_info.py:320 @ 8299] - from engine:{"data":{"isFromCache":0,"results":[{"aa":"bb","cc":dd"}],"semantic":[{"aa":"bb","cc":"dd"}],"total":1},"errmsg":"","retcode":0,"tolerance":["abc"]}
[INFO 2017-05-04 10:12:24,290 /xxx/ooo/music_service.py:95 @ 8299] - visitlog|{"reqTime":"2017-05-04 10:12:24","time":{"receive": 0.006849050521850586, "init": 4.0531158447265625e-06, "reqTime": 0.008450031280517578, "send": 1.5974044799804688e-05},"req":{"pageSize": 20, "text": "abc", "appId": "appid", "uuid": "1e4e45365ae43b12cf31004f41013b23", "lengthMin": 0, "isCorrect": "1", "sessionId": "1493863935", "sid": "1493863935", "sort": "1", "pageIndex": 1, "searchFunc": "searchmusic", "lengthMax": 0, "timestamp": "1493863935", "isSemantic": "1", "isFilter": "0", "releaseDateMin": 0, "path": "/aa/bb/cc/searchmusic", "_": "1493863109797", "releaseDateMax": 0, "callback": "jQuery1900565385167_1456109742", "token": "aaaaaaaaaaaaaaaaaa", "queryId": "dfbab18a3bd7cfb28acb33f323ada1cd"},"response":{"data":{"isFromCache":0,"results":[{"aa":"bb","cc":dd"}],"semantic":[{"aa":"bb","cc":"dd"}],"total":1},"errmsg":"","retcode":0,"tolerance":["abc"]}}

这里分为三个段落

input段:

采用文件的形式, path可以采用*来匹配任意字符(匹配单个字符待测试),
add_field 可以增加字段, 可以很好的区分开日志
codec => multiline 采用多行的模式 如果不是以[开头的将后面的行算作第一行

filter段:

这里采用的是 grok 匹配前面的无规则(非json格式)内容, 其后的json格式内容统一存到 result 字段, 并移除message字段
再通过 if 判断, 提取需要处理的日志 使用 mutate 对日志进行切分, 标准的json格式日志将保存在 field2 字段 之后通过 json 进行格式化该字段
最好将格式化好的字段中的时间 替换默认的 @timestamp 字段

output字段:

elasticsearch 将日志输出到elasticsearch 中
stdout 将日志输出到屏幕终端

通过elasticsearch对日志进行检索

先通过results.req.searchFunc字段过滤出包含 searchmusic的内容, 再判断 results.response.data.total 是否大于 1 排除搜索无结果的内容
最后使用 aggregations 对 results.req.text.keyword 字段结果进行聚合 统计出该字段的每个内容的个数, size控制显示多少个内容

aggregations 上面的size控制不显示其他搜索内容, 只关注aggregations 统计结果

GET /logstash-name-2017.06*/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "results.req.searchFunc": "searchmusic"
          }
        },
        {
          "range": {
            "results.response.data.total": {
              "gte": "1"
            }
          }
        }
      ]
    }
  },
  "size":0,
  "aggregations": {
    "topSearch": {
      "terms": {
        "field": "results.req.text.keyword",
        "size": 100
      }
    }
  }
}
原文地址:https://www.cnblogs.com/mikeguan/p/6943311.html