filebeat收集json日志到elasticsearch笔记

前言

为了方便测试,本文例子是在windows上搭建的,linux大同小异。搭建前需要准备windows环境,springboot应用和windows版docker。

一.logback配置json格式日志

1.pom.xml配置增加依赖

        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>6.1</version>
        </dependency>

2.修改logback.xml配置

            <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
                <providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders">
                    <pattern>
                        <pattern>
                            {
                            "date":"%d{yyyy-MM-dd HH:mm:ss.SSS}",
                            "project":"test",
                            "app":"${APP_NAME}",
                            "env":"${ENV:-dev}",
                            "level":"%level",
                            "logger":"%logger",
                            "thread":"%thread",
                            "traceId":"%X{traceId}",
                            "message": "%msg  %exception{20}"
                            }
                        </pattern>
                    </pattern>
                </providers>
            </encoder>

二.elasticsearch

1.docker安装

docker run -d --name elasticsearch  -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.10.1

2.安装ik中文分词器

下载对应版本的ik分词器,地址https://github.com/medcl/elasticsearch-analysis-ik/releases

解压后,使用docker命令cp复制到容器内,重启elasticsearch容器

docker cp C:UsersDELLDesktopik elasticsearch:/usr/share/elasticsearch/plugins

 3.浏览器打开http://127.0.0.1:9200,会出现如下信息

{
    "name": "08485dfd8dd0",
    "cluster_name": "docker-cluster",
    "cluster_uuid": "wZqRjN7nTb66aFMeAz1Hug",
    "version": {
        "number": "7.10.1",
        "build_flavor": "default",
        "build_type": "docker",
        "build_hash": "1c34507e66d7db1211f66f3513706fdf548736aa",
        "build_date": "2020-12-05T01:00:33.671820Z",
        "build_snapshot": false,
        "lucene_version": "8.7.0",
        "minimum_wire_compatibility_version": "6.8.0",
        "minimum_index_compatibility_version": "6.0.0-beta1"
    },
    "tagline": "You Know, for Search"
}

三.filebeat收集日志

1.下载filebeat,地址https://www.elastic.co/cn/downloads/beats/filebeat

2.解压后,修改filebeat.yml文件

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - D:datalog**-root.log

  fields_under_root: true

  tail_files: true
  
  json.keys_under_root: true

  json.overwrite_keys: true

  json.add_error_key: true


# ======================= Elasticsearch template setting =======================
setup.ilm.enabled: false

setup.template.name: "filebeat"
setup.template.fields: "fields.yml"
setup.template.pattern: "*"
setup.template.enabled: true
setup.template.overwrite: false
#setup.template.append_fields:
#- name: levelee
#  type: keyword
setup.template.settings:
  index.number_of_shards: 1
  index.number_of_replicas: 0
  index.codec: best_compression
  #_source.enabled: false

# ---------------------------- Elasticsearch Output ----------------------------
output:
  elasticsearch:
  # Array of hosts to connect to.
    hosts: ["localhost:9200"]
    index: "filebeat_%{+yyyy-MM-dd}"
    indices:
    - index: "%{[env]}_%{[project]}_%{[app]}_%{+yyyy-MM-dd}"

# ================================= Processors =================================
processors:
  - add_host_metadata:
      netinfo: 
         enabled: true
  - timestamp:
      field: date
      layouts:
        - '2006-01-02 15:04:05.999'
      test:
        - '2021-01-18 14:06:18.452'
      timezone: Asia/Shanghai
  - drop_fields:
      fields: ["ecs", "agent", "log","input"]

修改fields.yml文件

- key: ecs
  title: ECS
  description: ECS Fields.
  fields:
  - name: '@timestamp'
    level: core
    required: true
    type: date
    description: 'Date/time when the event originated.
    example: '2016-05-23T08:05:34.853Z'
  - name: message
    level: core
    type: text
    analyzer: ik_max_word
    description: 'For log events the message field contains the log message, optimized
    example: Hello World

       这两个文件主要定义了es的索引模板,模板定义message字段,采用ik分词器,而日志文件的其他字段将会由索引模板自动生成

3.启动filebeat,打开cmd定位到filebeat.exe的路径,执行./filebeat.exe

4.浏览器打开127.0.0.1:9200/_search,将会看到上传到es的数据

四.java查询es日志

pom.xml增加依赖

  <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
        </dependency>

按照日志时间倒序,调用代码如下

public String search(String env, String project, Integer size, String levels, String apps, String keywords, Date startDate) throws IOException {

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().sort("@timestamp", SortOrder.DESC).timeout(new TimeValue(60, TimeUnit.SECONDS))
                .fetchSource(new String[]{"message", "date", "level", "thread", "stackTrace", "app", "traceId", "logger", "@timestamp", "host.ip"}, null);

        if (size != null && size > 0) {
            sourceBuilder.from(0);
            sourceBuilder.size(size);
        }

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        if (startDate != null) {
            boolQueryBuilder.filter(QueryBuilders.rangeQuery("@timestamp").gte(DateFormatUtils.format(startDate, "yyyy-MM-dd'T'HH:mm:ss.SSSZZ")));
        }
        if (Strings.isNotBlank(levels)) {
            String[] array = levels.split(",");
            boolQueryBuilder.must(QueryBuilders.termsQuery("level", array));
        }
        if (Strings.isNotBlank(keywords)) {
            String[] array = keywords.split(",");
            for (String item : array) {
                boolQueryBuilder.must(QueryBuilders.wildcardQuery("message", "*" + item + "*"));
                //boolQueryBuilder.must(QueryBuilders.termQuery("message", item));
            }
        }
        sourceBuilder.query(boolQueryBuilder);
        System.out.println(sourceBuilder.toString());

        SearchResponse searchResponse = null;
        String result = null;

        Date tomorrow = DateUtils.ceiling(new Date(), Calendar.DATE);
        Date tmpDate = startDate;
        String tmpProject = "*";

        if (StringUtils.isNotBlank(project)) {
            tmpProject = project;
        }

        List<String> indexList = new ArrayList<>();
        while (tmpDate.before(tomorrow)) {
            if (StringUtils.isNotBlank(apps)) {
                String[] array = apps.split(",");
                for (String item : array) {
                    indexList.add(env + "_" + tmpProject + "_" + item + "_" + DateFormatUtils.format(tmpDate, "yyyy-MM-dd"));
                }
            } else {
                indexList.add(env + "_" + tmpProject + "_*_" + DateFormatUtils.format(tmpDate, "yyyy-MM-dd"));
            }
            tmpDate = DateUtils.addDays(tmpDate, 1);
        }

        SearchRequest searchRequest = new SearchRequest();
        if (CollectionUtils.isNotEmpty(indexList)) {
            searchRequest.indices(indexList.toArray(new String[0]));
            searchRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
        }

        searchRequest.source(sourceBuilder);
        System.out.println(searchRequest);
        searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        if (searchResponse != null) {
            SearchHits searchHits = searchResponse.getHits();
            StringBuilder sb = new StringBuilder();
            for (SearchHit hit : searchHits.getHits()) {
                Map<String, Object> map = hit.getSourceAsMap();
                String message = map.get("message").toString();
                String date = map.get("date").toString();
                String level = map.get("level").toString();
                sb.append(date).append(" ").append(level).append(" ").append(message).append(" ").append(System.lineSeparator()).append("-----------------------------------------------------------").append(System.lineSeparator());
            }
            result = sb.toString();
        }


        return result;
    }

常用es的api

http://127.0.0.1:9200/_analyze 查询字段的token分析   {"analyzer" : "ik_smart",  "text" : "bsp.supplier.saleOrder.query"}
http://127.0.0.1:9200/indexname/_mapping 主要是看索引的字段类型
http://127.0.0.1:9200/indexname/_settings 查看索引的设置,例如分片
http://127.0.0.1:9200/indexname delete方式请求,删除索引
http://127.0.0.1:9200/_template/indexname 查看索引模板
http://127.0.0.1:9200/indexname/_termvectors/文档id?fields=message 查看索引字段的分词情况
http://127.0.0.1:9200/indexname/_delete_by_query 删除文档
原文地址:https://www.cnblogs.com/caizl/p/14344081.html