filebeat直连elasticsearch利用pipeline提取message中的字段

这里使用filebeat直连elasticsearch的形式完成数据传输,由于没有logstash,所有对于原始数据的过滤略显尴尬(logstash的filter非常强大)。 但是由于业务需求,还是需要将message(原始数据)中的某些字段进行提取,具体方式如下:

1. /path/目录下建立pipeline.json文件
{
  "description" : "test-pipeline",
  "processors" : [
    {
      "grok" :{
        "field" : "message",
        "patterns" : ["%{DATA:puid}\	%{DATA:datatime}\	\	%{DATA:content}"]
      }
    }
  ]
}
2. 将规则上传至elasticsearch中
curl -H "Content-Type: application/json" -XPUT 'http://localhost:9200/_ingest/pipeline/test-pipeline' -d@/path/pipeline.json

3. filebeat.yml中
filebeat.prospectors:
 ******
 ******
output.elasticsearch:
  hosts: ["localhost:9200"]
  # 加入如下行:
  pipeline: "test-pipeline"
4. 测试数据
f1b25095cc823e63389ff299622b7e85    2019/02/27 03:38:54     send packet! opcode:3 message is in lua8282
f1b25095cc823e63389ff299622b7e85    2019/02/27 03:38:54     PacketManager::_onReceivedPacket opcode:3 size:27,rec_len:278282
5. elasticsearch中数据结果
[
    {
        "_score":1,
        "_type":"doc",
        "_id":"zWmLj2kB7ah0Pw2MmQGw",
        "_source":{
            "datatime":"2019/02/27 03:38:54",
            "log":{
                "file":{
                    "path":"/path/test_1.log"
                }
            },
            "beat":{
                "hostname":":",
                "name":":",
                "version":"6.6.1"
            },
            "@timestamp":"2019-03-18T06:44:43.224Z",
            "host":{
                "name":":"
            },
            "content":"",
            "source":"/path/test_1.log",
            "puid":"f1b25095cc823e63389ff299622b7e85",
            "offset":0,
            "input":{
                "type":"log"
            },
            "message":"f1b25095cc823e63389ff299622b7e85 2019/02/27 03:38:54 send packet! opcode:3 message is in lua",
            "prospector":{
                "type":"log"
            }
        },
        "_index":"test"
    },
    {
        "_score":1,
        "_type":"doc",
        "_id":"0GmLj2kB7ah0Pw2MmQGw",
        "_source":{
            "datatime":"2019/02/27 03:38:54",
            "log":{
                "file":{
                    "path":"/path/test_1.log"
                }
            },
            "beat":{
                "hostname":":",
                "name":":",
                "version":"6.6.1"
            },
            "@timestamp":"2019-03-18T06:44:43.224Z",
            "host":{
                "name":":"
            },
            "content":"",
            "source":"/path/test_1.log",
            "puid":"f1b25095cc823e63389ff299622b7e85",
            "offset":318,
            "input":{
                "type":"log"
            },
            "message":"f1b25095cc823e63389ff299622b7e85 2019/02/27 03:38:54 PacketManager::_onReceivedPacket| ReceivedPacket size:27",
            "prospector":{
                "type":"log"
            }
        },
        "_index":"test"
    }
]
参考:
  1. https://note.yuchaoshui.com/blog/post/yuziyue/filebeat-use-ingest-node-dealwith-log-then-load-into-elasticsearch
  2. http://www.axiaoxin.com/article/236/
  3. https://blog.csdn.net/spring_ming/article/details/62232331
原文地址:https://www.cnblogs.com/remainsu/p/filebeat-zhi-lianelasticsearch-li-yongpipeline-ti-.html