06ELK 初步

1. Logstash

Logstash 是一个数据抽取工具,将数据从一个地方转移到另一个地方。如 hadoop 生态圈的 sqoop 等。

Logstash 之所以功能强大和流行,还与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。

Logstash 配置文件有如下三部分组成,其中 input、output 部分是必须配置,filter 部分是可选配置,而 filter 就是过滤器插件,可以在这部分实现各种日志过滤功能。

input {
    # 输入插件
}
filter {
    # 过滤匹配插件
}
output {
    # 输出插件
}

启动命令为 logstash.bat -e 'input{stdin{}} output{stdout{}}',为了好维护,将配置写入文件启动 logstash.bat -f ../config/demo.conf

1.1 input

a. 标准输入(Stdin)

input {
    stdin {
    }
}
output {
    stdout {
        codec=>rubydebug
    }
}

b. 读取文件

Logstash 使用一个名为 filewatch 的 Ruby gem 库来监听文件变化,并通过一个叫 .sincedb 的数据库文件来记录被监听的日志文件的读取进度(时间戳),这个 sincedb 数据文件的默认路径在 <path.data>/plugins/inputs/file 下面,文件名类似于 .sincedb_123456,而 <path.data> 表示 Logstash 插件存储目录,默认是 LOGSTASH_HOME/data。

input {
    file {
        path => ["/var/*/*"]
        start_position => "beginning"
    }
}
output {
    stdout{
        codec=>rubydebug
    }
}

默认情况下,Logstash 会从文件的结束位置开始读取数据,也就是说 Logstash 进程会以类似 tail -f 命令的形式逐行获取数据。

c. 读取 KafkaTopic

input {
	file {
		path => "/root/logs/*.log"
		start_position => beginning
		add_field => {
			"from" => "localfile"
		}
	}
	kafka {
		bootstrap_servers => ["39.98.133.153:9103"]
		group_id => "logstash"
		topics => ["demo"]
		consumer_threads => 1
		decorate_events => true
		add_field => {
			"from" => "demo"
		}
	}
}
filter {}
output {
	elasticsearch {
		hosts => "localhost:9200"
		index => "mylog"
	}
	stdout {}
}

1.2 filter

a. Grok 正则捕获

Grok是一个十分强大的 Logstash filter 插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。他是目前 Logstash 中解析非结构化日志数据最好的方式。

Grok 的语法规则:%{语法: 语义}

例如输入的内容为:

172.16.213.132 [07/Feb/2019:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

%{IP:clientip} 匹配模式将获得的结果为 clientip: 172.16.213.132%{HTTPDATE:timestamp} 匹配模式将获得的结果为 timestamp: 07/Feb/2018:16:24:19 +0800,而 %{QS:referrer} 匹配模式将获得的结果为 referrer: "GET / HTTP/1.1"

下面是一个组合匹配模式,它可以获取上面输入的所有内容:

%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}

通过上面这个组合匹配模式,我们将输入的内容分成了 5 个部分,即 5 个字段,将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用 Grok 的目的。

例子:

input {
    stdin {}
}
filter {
    grok {
        match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
    }
}
output {
    stdout {
        codec => "rubydebug"
    }
}

输入内容:

192.168.206.131 [07/JAN/2022:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

控制台输出:

b. 时间处理

date 插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成 LogStash::Timestamp 对象,然后转存到 @timestamp 字段里,这在之前已经做过简单的介绍。

下面是 date 插件的一个配置示例(这里仅仅列出 filter 部分):

filter {
    grok {
        match => ["message", "%{HTTPDATE:timestamp}"]
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

c. 数据修改(Mutate)

(1)正则表达式替换匹配字段

gsub 可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效,下面是一个关于 mutate 插件中 gsub 的示例(仅列出 filter 部分):

filter {
    mutate {
        gsub => ["filed_name_1", "/" , "_"]
    }
}

这个示例表示将 filed_name_1 字段中所有 "/" 字符替换为 "_"。

(2)分隔符分割字符串为数组

split 可以通过指定的分隔符分割字段中的字符串为数组,下面是一个关于 mutate 插件中 split 的示例(仅列出 filter 部分):

filter {
    mutate {
        split => ["filed_name_2", "|"]
    }
}

这个示例表示将filed_name_2字段以"|"为区间分隔为数组。

(3)重命名字段

rename 可以实现重命名某个字段的功能,下面是一个关于 mutate 插件中 rename 的示例(仅列出 filter 部分):

filter {
    mutate {
        rename => { "old_field" => "new_field" }
    }
}

这个示例表示将字段 old_field 重命名为 new_field。

(4)删除字段

remove_field 可以实现删除某个字段的功能,下面是一个关于 mutate 插件中 remove_field 的示例(仅列出 filter 部分):

filter {
    mutate {
        remove_field  =>  ["timestamp"]
    }
}

这个示例表示将字段 timestamp 删除。

(5)GeoIP 地址查询归类

filter {
    geoip {
        source => "ip_field"
    }
}

(6)综合案例

input {
    stdin {}
}
filter {
    grok {
        match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
        remove_field => [ "message" ]
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
    mutate {
          convert => [ "response","float" ]
           rename => { "response" => "response_new" }
           gsub => ["referrer","\"",""]
           split => ["clientip", "."]
    }
}
output {
    stdout {
        codec => "rubydebug"
    }
}

1.3 output

output 是 Logstash 的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。 一些常用的输出包括:

  • file:表示将日志数据写入磁盘上的文件;
  • elasticsearch:表示将日志数据发送给 Es。

(1)输出到标准输出(stdout)

output {
    stdout {
        codec => rubydebug
    }
}

(2)保存为文件(file)

output {
    file {
        path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
    }
}

(3)输出到 Es

output {
    elasticsearch {
        host => ["192.168.1.1:9200","172.16.213.77:9200"]
        index => "logstash-%{+YYYY.MM.dd}"
    }
}
  • host:是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
  • index:写入 Es 的索引的名称,这里可以使用变量。Logstash 提供了 %{+YYYY.MM.dd} 这种写法。在语法解析的时候,看到以 + 号开头的,就会自动认为后面是时间格式,尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。此外,注意索引名中不能有大写字母。
  • manage_template:用来设置是否开启 Logstash 自动管理模板功能,如果设置为 false 将关闭自动管理模板功能。如果我们自定义了模板,那么应该设置为 false。
  • template_name:这个配置项用来设置在 Es 中模板的名称。

2. Kibana

  1. 是什么:ELK 中数据展现工具;
  2. 下载:https://www.elastic.co/cn/downloads/kibana
  3. 使用:建立索引模式(index partten),discover 中使用 DSL 搜索。

功能:

  • 可视化:绘制图形;
  • 仪表盘:将各种可视化图形放入,形成大屏幕;
  • 使用模板数据指导绘图:点击主页的添加模板数据,可以看到很多模板数据以及绘图;
  • 其他功能:监控、日志、APM 等功能非常丰富。

3. 集群部署

https://www.cnblogs.com/crazymakercircle/p/15433680.html#autoid-h2-9-5-0

3.1 五大角色

(1)Master Node 主节点

  • 该节点不和应用创建连接,每个节点都保存了集群状态。
  • Master 节点控制整个集群的元数据。只有 Master Node 节点可以修改节点状态信息及元数据(metadata)的处理,比如索引的新增、删除、分片路由分配、所有索引和相关 Mapping 、Setting 配置等;
  • 从资源占用的角度来说:Master 节点不占用磁盘 IO 和 CPU,内存使用量一般, 没有 data 节点高。

(2)Master eligible Nodes 合格主节点

  • 有资格成为 Master 节点但暂时并不是 Master 的节点被称为 eligible 节点,该节点可以参加选主流程,成为 Master 节点;
  • 每个节点部署后不修改配置信息,默认就是一个 eligible 节点;
  • 该节点只是与集群保持心跳,判断 Master 是否存活,如果 Master 故障则参加新一轮的 Master 选举;
  • 从资源占用的角度来说:eligible 节点比 Master 节点更节省资源,因为它还未成为 Master 节点,只是有资格成功 Master 节点。

(3)Data Node 数据节点

  • 该节点用于建立文档索引, 接收应用创建连接、接收索引请求,接收用户的搜索请求;
  • 数据节点真正存储数据,Es 集群的性能取决于该节点的个数(每个节点最优配置的情况下);
  • 数据节点的分片执行查询语句获得查询结果后将结果反馈给 Coordinating Node,在查询的过程中非常消耗硬件资源,如果在分片配置及优化没做好的情况下,进行一次查询非常缓慢(硬件配置也要跟上数据量);
  • 保存包含索引文档的分片数据,执行CRUD、搜索、聚合相关的操作。属于:内存、CPU、IO密集型,对硬件资源要求高。从资源占用的角度来说:data 节点会占用大量的 CPU、IO 和内存。

(4)Coordinating Node 协调节点

  • 协调节点,该节点专用与接收应用的查询连接、接受搜索请求,但其本身不负责存储数据;
  • 协调节点的职责:接受客户端搜索请求后将请求转发到与查询条件相关的多个 data 节点的分片上,然后多个 data 节点的分片执行查询语句或者查询结果再返回给协调节点,协调节点把各个 data 节点的返回结果进行整合、排序等一系列操作后再将最终结果返回给用户请求。
  • 搜索请求在两个阶段中执行(query 和 fetch),这两个阶段由接收客户端请求的节点 —— 协调节点协调
    • 在请求 query 阶段,协调节点将请求转发到保存数据的数据节点。每个数据节点在本地执行请求并将其结果返回给协调节点;
    • 在收集 fetch 阶段,协调节点将每个数据节点的结果汇集为单个全局结果集;
  • 从资源占用的角度来说:协调节点,可当负责均衡节点,该节点不占用 CPU、IO 和内存。

(5)Ingest Node

  • Ingest 节点可以看作是数据前置处理转换的节点,支持 pipeline 管道设置,可以使用 Ingest 对数据进行过滤、转换等操作,类似于 Logstash 中 filter 的作用,功能相当强大;
  • Ingest 节点处理时机:在数据被索引之前,通过预定义好的处理管道对数据进行预处理。默认情况下,所有节点都启用 Ingest,因此任何节点都可以处理 Ingest 任务;
  • 我们也可以创建专用的 Ingest 节点。

3.2 Coordinating Node 详解

Es 本身是一个分布式的计算集群,每个 Node 都可以响应用户的请求,包括 Master Node、Data Node,它们都有完整的 Cluster State 信息。

正如我们知道的一样,在某个 Node 收到用户请求的时候,会将请求转发到集群中所有索引相关的 Node 上,之后将每个 Node 的计算结果合并后返回给请求方。

我们暂且将这个 Node 称为查询节点,整个过程跟分布式数据库原理类似。那问题来了,这个查询节点如果在并发和数据量比较大的情况下,由于数据的聚合可能会让内存和网络出现瓶颈。

因此,在职责分离指导思想的前提下,这些操作我们也应该从这些角色中剥离出来,官方称它是 Coordinating Nodes,只负责路由用户的请求,包括读、写等操作,对内存、网络和 CPU 要求比较高。

本质上,Coordinating Only Nodes 可以笼统的理解为是一个负载均衡器,或者反向代理,只负责读,本身不写数据。

它的配置是:

node.master: false
node.data: false
node.ingest: false
search.remote.connect: false

增加 Coordinating Nodes 的数量可以提高 API 请求响应的性能, 提升集群的吞吐量。

我们也可以针对不同量级的 Index 分配独立的 Coordinating Nodes 来满足请求性能。

那是不是越多越好呢?

在一定范围内是肯定的,但凡事有个度,过了负作用就会突显,太多的话会给集群增加负担。

3.3 Ingest Node 详解

a. 用途

可以把 Ingest 节点的功能抽象为大数据处理环节的“ETL”—— 抽取、转换、加载。

  1. Ingest 节点可以看作是数据前置处理转换的节点,支持 pipeline 管道设置,可以使用 ingest 对数据进行过滤、转换等操作,类似于 Logstash 中 filter 的作用,功能相当强大。
  2. Ingest 节点可用于执行常见的数据转换和丰富。 处理器配置为形成管道。 在写入时,Ingest Node 有 20 个内置处理器,例如 grok,date,gsub,小写/大写,删除和重命名。
  3. 在批量请求或索引操作之前,Ingest 节点拦截请求,并对文档进行处理。

Ingest 的例子:(1)可以是日期处理器,其用于解析字段中的日期;(2)转换处理器,它将字段值转换为目标类型,例如将字符串转换为整数。

b. 解决问题

Q1:线上写入数据改字段需求,如何在数据写入阶段修改字段名(不是修改字段值)?
Q2:线上业务数据添加特定字段需求,如何在批量写入数据的时候,每条 Doc 插入实时时间戳?

这时,脑海里开始对已有的知识点进行搜索。

T1:字段值的修改无非:update,update_by_query,但是字段名呢?貌似没有相关接口或实现;
T2:插入的时候,业务层面处理,读取当前时间并写入貌似可以,有没有不动业务层面的字段的方法呢?

答案是有的,这就是 Ingest 节点的妙处。

【A1】借助 Ingest 节点的 rename_hostname 管道的预处理功能,实现了字段名称的变更:由 hostname 改成 host。

PUT _ingest/pipeline/rename_hostname
{
  "processors": [
    {
        "field": "hostname",
        "target_field": "host",
        "ignore_missing": true
      }
    }
  ]
}

PUT server

POST server/values/?pipeline=rename_hostname
{
  "hostname": "myserver"
}

【A2】通过 indexed_at 管道的 set 处理器与 ms-test 的索引层面关联操作, ms-test 索引每插入一个 document,都会自动添加一个字段“index_at=最新时间戳”。

PUT _ingest/pipeline/indexed_at
{
  "description": "Adds indexed_at timestamp to documents",
  "processors": [
    {
      "set": {
        "field": "_source.indexed_at",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

PUT ms-test
{
  "settings": {
    "index.default_pipeline": "indexed_at"
  }
}

POST ms-test/_doc/1
{"title":"just testing"}

c. 核心原理

在实际文档索引发生之前,使用 Ingest 节点预处理文档。Ingest 节点拦截批量和索引请求,它应用转换,然后将文档传递回索引或 Bulk API。

Ingest 节点处理时机—— 在数据被索引之前,通过预定义好的处理管道对数据进行预处理。

默认情况下,所有节点都启用 Ingest,因此任何节点都可以处理 Ingest 任务。我们也可以创建专用的 Ingest 节点。

要禁用节点的 Ingest 功能,需要在 elasticsearch.yml 设置:node.ingest:false

涉及知识点:

  1. 预处理 pre-process:要在数据索引化之前预处理文档;
  2. 管道 pipeline:每个预处理过程可以指定包含一个或多个处理器的管道。
  3. 管道的实际组成
    {
      # 管道功能描述
      "description" : "...",
      # 注意是数组,可以指定 1 个或多个处理器
      "processors" : [ ... ]
    }
    
  4. 处理器 processors:每个处理器以某种特定方式转换文档。例如,管道可能有一个从文档中删除字段的处理器,然后是另一个重命名字段的处理器。

d. Ingest API

Ingest API 共分为 4 种操作,分别对应:PUT(新增)、GET(获取)、DELETE(删除)、Simulate (仿真模拟)。

模拟管道 AP Simulate 针对请求正文中提供的文档集执行特定管道。

除此之外,高阶操作包括:

  1. 支持复杂条件的 Nested 类型的操作;
  2. 限定条件的管道操作;
  3. 限定条件的正则操作等。

常见的处理器有如下 28 种,举例:

e. Ingest 节点和 Logstash Filter

(1)支持的数据源不同

  • Logstash:大量的输入和输出插件(比如 Kafka、Redis 等)可供使用,还可用来支持一系列不同的架构;
  • IngestNode:不能从外部来源(例如消息队列或数据库)提取数据,必须批量 bulk 或索引 index 请求将数据推送到 Elasticsearch。

(2)应对数据激增的能力不同

  • Logstash:Logstash 可在本地对数据进行缓冲以应对采集骤升情况。如前所述,Logstash 同时还支持与大量不同的消息队列类型进行集成。
  • IngestNode:极限情况下会出现:在长时间无法联系上 Es 或者 Es 无法接受数据的情况下,均有可能会丢失数据。

(3)处理能力不同

  • Logstash:支持的插件和功能点较 IngestNode 多很多。
  • IngestNode:支持28类处理器操作。IngestNode 管道只能在单一事件的上下文中运行。Ingest 通常不能调用其他系统或者从磁盘中读取数据。

3.4 架构规划

a. 节点的默认角色

通常只有较大的集群才能开始分离专用主节点、数据节点。 对于许多用户场景,路由节点根本不一定是必需的。

专用协调节点从数据节点中消除了聚合/查询的请求解析和最终阶段,并允许他们专注于处理数据。在多大程度上这对集群有好处将因情况而异。 通常我会说,在查询大量使用情况下路由节点更常见。

实际上,一个节点在默认情况下会同时扮演:Master Eligible Node、Data Node 和 Ingest Node。

节点类型 配置参数 默认值
Master Eligible node.master true
Data node.data true
Ingest node.ingest true
Coordinating only / 设置上面 3 个参数全为 false,节点为协调节点

b. 两个属性的四种组合

默认情况下这三个属性的值都是true。默认情况下,elasticsearch 集群中每个节点都有成为主节点的资格,也都存储数据,还可以提供查询服务,做预处理。

node.master    # 节点是否具有成为主节点的资格 (真正的主节点,是由多个具有主节点资格的节点进行选举产生的)
node.data      # 节点是否存储数据
node.ingest    # 是否允许成为协调节点

(1)node.master: true AND node.data: true AND node.ingest: true

  • 这种组合表示这个节点既有成为主节点的资格,又可以存储数据,还可以作为预处理节点;
  • 这个时候如果某个节点被选举成为了真正的主节点,那么他还要存储数据,这样对于这个节点的压力就比较大了;
  • elasticsearch 默认是:每个节点都是这样的配置,在测试环境下这样做没问题。实际工作中建议不要这样设置,这样相当于主节点和数据节点的角色混合到一块了。

(2)node.master: false AND node.data: true AND node.ingest: false

  • 这种组合表示这个节点没有成为主节点的资格,也就不参与选举,只会存储数据;
  • 这个节点我们称为 data 节点。在集群中需要单独设置几个这样的节点负责存储数据,后期提供存储和查询服务。

(3)node.master: true AND node.data: false AND node.ingest: false

这种组合表示这个节点不会存储数据,有成为主节点的资格,可以参与选举,有可能成为真正的主节点。这个节点我们称为 Master 节点。

(4)node.master: false AND node.data: false AND node.ingest: true

  • 这种组合表示这个节点即不会成为主节点,也不会存储数据,这个节点的意义是作为一个 client 节点,主要是针对海量请求的时候可以进行负载均衡。
  • 在新版 ElasticSearch 5.x 之后该节点称之为 Coordinate 节点,其中还增加了一个叫 Ingest 节点,用于预处理数据(索引和搜索阶段都可以用到)。
  • 当然,作为一般应用是不需要这个预处理节点做什么额外的预处理过程,那么这个节点和我们称之为 client 节点之间可以看做是等同的,我们在代码中配置访问节点就都可以配置这些 Ingest 节点即可。

4. Case: 日志分析

  1. 逻辑模块程序随时输出日志
  2. Logstash 收集日志到 Es
  3. Kibana 展现数据

Grok 内置类型:

USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}

# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)

5. Case: 站内搜索模块

  1. MySQL 导入 t_entity_goods 表;
  2. 创建索引 goods 及其映射;
  3. Logstash 的工作是从 MySQL 中读取数据,向 Es 中创建索引,这里需要提前创建 Mapping 的模板文件以便 Logstash 使用;
  4. 在 Logstash 的 config 目录创建 goods_template.json;
  5. Logstash 配置 mysql.conf;
    • Es 采用 UTC 时区,比北京时间早 8 小时,所以 Es 读取数据时让最后更新时间 +8 小时:where timestamp > date_add(:sql_last_value, INTERVAL 8 HOUR)
    • Logstash 每个执行完成会在 /config/logstash_metadata 记录执行时间下次以此时间为基准进行增量同步数据到索引库。
input {
  stdin {
  }
  jdbc {
      jdbc_connection_string => "jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC"
      # the user we wish to excute our statement as
      jdbc_user => "root"
      jdbc_password => root
      # the path to our downloaded jdbc driver
      jdbc_driver_library => "D:/develop/maven/repository3/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"
      # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # 要执行的sql文件
      # statement_filepath => "/conf/course.sql"
      statement => "select * from t_entity_goods where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)"
      # 定时配置
      schedule => "* * * * *"
      record_last_run => true
      # sql_last_value 上一次扫表的时间存储在 ↓ 位置
      last_run_metadata_path => "D:/elasticsearch/logstash-6.2.4/config/logstash_metadata"
  }
}


output {
  elasticsearch {
    # Es 的 IP 和端口
    hosts => "localhost:9200"
    # hosts => ["localhost:9200"]
    # Es 索引库名称
    index => "goods"
    # 数据库记录中的 id 属性赋值给 docID
    document_id => "%{id}"
    document_type => "_doc"
    template =>"D:/elasticsearch/logstash-6.2.4/config/goods_template.json"
    template_name =>"goods_template"
    template_overwrite =>"true"
  }
  stdout {
    # 日志输出
    codec => json_lines
  }
}
原文地址:https://www.cnblogs.com/liujiaqi1101/p/15800206.html