ELK+filebeat+kafka搭建Oracle数据库日志平台

# ELK+Filebeat+Kafka搭建Oracle数据库日志平台

## 1. 数据流向结构图

![image-20200422153157808](assets/image-20200422153157808.png)

![image-20200422153305723](assets/image-20200422153305723.png)

==注意:需要JDK支持==

## 2. 安装配置Filebeat收集日志文件

### 2.1 安装配置

``` shell
# 解压
tar -xf filebeat-*-linux-x86_64.tar.gz -C /ups/app/elastic
cd /ups/app/elastic
ln -s filebeat-* filebeat
```

### 2.2 配置数据采集conf文件

``` yaml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/boot.log
- /var/log/secure
include_lines: ['ERROR', 'fail', 'error', 'Failed', 'Error']
fields:
log_topic: system
# myid: oslog
- type: log
enabled: true
paths:
- /tmp/alert_orcl1.log
include_lines: ['ERROR', 'WARN', 'ORA-', 'Warnning']
fields:
log_topic: oradb-alert-logs
# myid: dblog
multiline.pattern: '^[[:alpha:]]{3} [[:alpha:]]{3} [[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2} [[:digit:]]{4}'
# multiline.pattern: '^[Mon|Tue|Wed|Thu|Fri|Sat|Sun]{1} [Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec]{1} [[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2} [[:digit:]]{4}'
multiline.negate: true
multiline.match: after

- type: log
enabled: true
paths:
- /tmp/alert_+ASM1.log
include_lines: ['ERROR', 'WARN', 'ORA-', 'WARNING']
fields:
log_topic: asm-alert-logs
# myid: asmlog
# multiline.pattern: '^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}T[[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2}'
multiline.pattern: '^[[:alpha:]]{3} [[:alpha:]]{3} [[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2} [[:digit:]]{4}'
multiline.negate: true
multiline.match: after

- type: log
enabled: true
paths:
- /tmp/alertora.log
include_lines: ['ERROR', 'WARNING', 'OFFLINE', 'disconnect', 'error']
#fields_under_root: true
fields:
log_topic: crs-alert-logs
# myid: crslog
multiline.pattern: '^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2}'
multiline.negate: true
multiline.match: after

name: "192.168.10.151"
# 删除事件中没意义的字段
processors:
- drop_fields:
fields: ["beat", "input", "offset", "prospector", "log", "@metadata"]

output.kafka:
enabled: true
hosts: ["192.168.10.151:9092"]
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000

# 记录日志
logging.level: info
logging.to_files: true
logging.files:
  path: /ups/app/elastic/filebeat/logs
  name: filebeat
  keepfiles: 7
  permissions: 0644

close_older: 30m
force_close_files: true
close_inactive: 1m
close_timeout: 3h
clean_inactive: 28h
ignore_older: 20h

# 限制 CPU和内存资源
max_procs: 1
queue.mem:
events: 256
flush.min_events: 255
flush.timeout: 5s

```

### 2.3 启动服务

``` shell
nohup /ups/app/elastic/filebeat/filebeat -c /ups/app/elastic/filebeat/config/filebeat2kafka.yml >/dev/null 2>&1 &
```

## 3. 部署Kafka中间件

[文档](Kafka/Kafka的安装.md)

## 4. 部署Logstash(清洗数据)

### 4.1 安装配置

``` shell
# 解压
unzip -qo logstash-*.zip -d /ups/app/elastic
cd /ups/app/elastic
ln -s logstash-* logstash
mkdir -p /ups/app/elastic/logstash{conf.d,patterns.d}
```

==修改Linux系统日志格式==

``` shell
# 备份配置
cp /etc/rsyslog.conf{,$(date +%Y%m%d)}

# 修改配置时间格式 RSYSLOG_TraditionalFileFormat
# Use default timestamp format
$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat

# 修改如下:
# Use default timestamp format
#$ActionFileDefaultTemplate RSYSLOG_TraditionalFileFormat
$template CustomFormat,"%$NOW% %TIMESTAMP:8:15% %HOSTNAME% %syslogtag%%msg% "
$ActionFiledefaultTemplate CustomFormat

# 重启服务
service rsyslog restart
service rsyslog status
```

### 4.2 配置数据清洗文件

``` yaml
input {
kafka {
bootstrap_servers => "localhost:9092"
codec => json {
charset => "UTF-8"
}
topics => ["system", "oradb-alert-logs", "asm-alert-logs", "crs-alert-logs"]
}
}

filter {
mutate {
add_field => {
"Serverhost" => "%{[host][name]}"
"logTopic" => "%{[fields][log_topic]}"
}
}
if [logTopic] == "oradb-alert-logs" or [logTopic] == "asm-alert-logs" {
grok {
match => {
"message" => "(?m)%{HTTPDERROR_DATE:timestamp}[\n]+%{GREEDYDATA:log_message}"
}
}
date {
# Wed Apr 22 11:50:59 2020
match => [ "timestamp", "EEE MMM dd HH:mm:ss yyyy" ]
}
mutate {
replace => [ "message", "%{log_message}" ]
remove_field => [ "timestamp", "log_message", "host", "fields" ]
}
}
else if [logTopic] == "crs-alert-logs" {
grok {
match => {
#"message" => "%{TIMESTAMP_ISO8601:timestamp}%{GREEDYDATA:log_message}"
"message" => "(?m)%{TIMESTAMP_ISO8601:timestamp}[: \n]+%{GREEDYDATA:log_message}"
}
}
date {
# 2015-07-12 18:02:37.719
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss.SSS" ]
}
mutate {
replace => [ "message", "%{log_message}" ]
remove_field => [ "timestamp", "log_message", "host", "fields" ]
}
}
else if [logTopic] == "system" {
grok {
match => {
# "message" => "%{SYSLOGLINE}"
#"message" => "(?m)%{SYSLOGTIMESTAMP:timestamp}[ ]+%{GREEDYDATA:log_message}"
"message" => "(?m)%{TIMESTAMP_ISO8601:timestamp}[: \n]+%{GREEDYDATA:log_message}"
}
}
date {
# 2020-05-01 22:08:22
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
}
mutate {
replace => [ "message", "%{log_message}" ]
remove_field => [ "timestamp", "log_message", "host", "fields", "%{[host][name]}" ]
}
}
}

output {
#stdout { codec => rubydebug }

if [logTopic] == "system" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "oslog-%{+YYYY.MM.dd}"
}
}
else if [logTopic] == "crs-alert-logs" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "crslog-%{+YYYY.MM.dd}"
}
}
else if [logTopic] == "oradb-alert-logs" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "dblog-%{+YYYY.MM.dd}"
}
}
else if [logTopic] == "asm-alert-logs" {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "asmlog-%{+YYYY.MM.dd}"
}
}
}
```

### 4.3 启动服务

``` shell
cd /ups/app/elastic/logstash
nohup ./bin/logstash -f config/kafka2es.conf --config.reload.automatic >/dev/null 2>&1 &
```

## 5. 部署Elasticsearch检索服务

### 5.1 安装配置

``` shell
# 创建用户
su - root
useradd -c "Elasticsearch Server" -r -m -p elastic -d /ups/app/elastic -s /bin/bash elastic

# 配置内核参数
cp /etc/sysctl.conf{,_$(date +%Y%m%d)}
cat >> /etc/sysctl.conf <<EOF
# elk for elasticsearch
vm.max_map_count = 262144
EOF

# 配置资源限制参数
-- cp /etc/security/limits.conf{,_$(date +%Y%m%d)}
cat >> /etc/security/limits.d/99-elastic.conf << EOF
# Elasticsearch config
elastic soft nproc 4096
elastic hard nproc 4096
elastic soft nofile 65536
elastic hard nofile 65536
EOF

# 解压软件包
su - elastic
ELSASTIC_VERSION='6.8.0'
tar -xf elasticsearch-${ELSASTIC_VERSION}.tar.gz -C /ups/app/elastic/
ln -s elasticsearch-${ELSASTIC_VERSION} elasticsearch

# 编辑配置文件
cd /ups/app/elastic/elasticsearch/config
cp elasticsearch.yml{,_$(date +%Y%m%d)}
vi /ups/app/elastic/elasticsearch/config/elasticsearch.yml

cluster.name: elastic-cluster
node.name: elastic-1
bootstrap.memory_lock: false
network.host: 192.168.10.151
http.port: 9200
transport.tcp.port: 9300
action.destructive_requires_name: true
#
bootstrap.system_call_filter: false
# 开启跨域访问支持,默认为false
http.cors.enabled: true
# 跨域访问允许的域名地址,(允许所有域名)以上使用正则
http.cors.allow-origin: "*"
```

### 5.2 启动服务

``` shell
su - elastic
cd /ups/app/elalstic/elasticsearch
nohup ./bin/elasticsearch -d >/dev/null 2>&1 &
```

## 6. kibana数据展示平台

## 6.1 安装配置

``` shell
# 解压
tar -xf kibana-*-linux-x86_64.tar.gz -C /ups/app/elastic
cd /ups/app/elastic
ln -s kibana-* kibana
```

### 6.2 服务启动

``` shell
cd /ups/app/elastic/kibana
nohup ./bin/kibana -l /ups/app/elastic/kibana/logs/kibana.log >/dev/null 2>&1 &
```

### 6.3 检查状态

​ 通过 `localhost:5601/status` 来访问 Kibana 的服务器状态页,状态页展示了服务器资源使用情况和已安装插件列表。

![kibana status page](assets/kibana-status-page.png)

原文地址:https://www.cnblogs.com/binliubiao/p/12744509.html