使用kafka作为缓存收集日志

一、环境准备

系统版本 主机名 IP地址 所运行的服务
Centos 7.5 kafka01 192.168.1.1 zookeeper、kafka、ES、kibana
Centos 7.5 kafka02 192.168.1.2 zookeeper、kafka、logstash
Centos 7.5 kafka03 192.168.1.3 zookeeper、kafka、ES、nginx、filebeat

由于电脑性能较低,所以就不开那么多机器了!

二、实现kafka作为缓存收集日志信息

2.1 安装zookeeper

$ echo -e "192.168.1.1 kafka01
192.168.1.2 kafka02
192.168.1.3 kafka03" >> /etc/hosts
$ wget https://downloads.apache.org/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
$ tar zxf zookeeper-3.4.14.tar.gz -C /opt/
$ ln -s /opt/zookeeper-3.4.14/ /opt/zookeeper
$ cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg 
$ mkdir -p /data/zookeeper
$ vim /opt/zookeeper/conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.3:2888:3888
$ echo "1" >> /data/zookeeper/myid
#将kafka相关的文件目录远程传输到另外两台
$ rsync -avz /opt/zookeeper* kafka02:/opt/
$ rsync -avz /data/* kafka02:/data
$ rsync -avz /opt/zookeeper* kafka03:/opt/
$ rsync -avz /data/* kafka03:/data
#启动
$ /opt/zookeeper/bin/zkServer.sh start
$ /opt/zookeeper/bin/zkServer.sh status
$ /opt/zookeeper/bin/zkServer.sh start
#kafka02和kafka03更改myid并启动
$ echo "2" > /data/zookeeper/myid 
$ /opt/zookeeper/bin/zkServer.sh start
$ echo "3" > /data/zookeeper/myid
$ /opt/zookeeper/bin/zkServer.sh start
#查看各个节点的状态
$ /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Mode: follower
$ /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Mode: leader
$ /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Mode: follower
#保证三个节点有一个leader
#测试
$ /opt/zookeeper/bin/zkCli.sh -server kafka01:2181
[zk: kafka01:2181(CONNECTED) 0] create /test "hello"
#插入数据
$ /opt/zookeeper/bin/zkCli.sh -server kafka02:2181
[zk: kafka02:2181(CONNECTED) 0] get /test
#获取数据
$ /opt/zookeeper/bin/zkCli.sh -server kafka03:2181
[zk: kafka03:2181(CONNECTED) 0] get /test
#获取数据

2.2 安装kafka

$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz
$ tar zxf kafka_2.11-2.4.1.tgz -C /opt
$ ln -s /opt/kafka_2.11-2.4.1/ /opt/kafka
$ mkdir /opt/kafka/logs
$ vim /opt/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.1.1:9092
log.dirs=/opt/kafka/logs
log.retention.hours=24
zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181

$ rsync -avz /opt/kafka* kafka02:/opt/
$ rsync -avz /opt/kafka* kafka03:/opt/
$ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties 
#最后一行出现KafkaServer id和started则表示启动成功,就可放后台启动
$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
$ vim /opt/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.1.2:9092
log.dirs=/opt/kafka/logs
log.retention.hours=24
zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
$ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
$ vim /opt/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.1.3:9092
log.dirs=/opt/kafka/logs
log.retention.hours=24
zookeeper.connect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
$ /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
#测试
$ /opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 --partitions 3 --replication-factor 3 --topic messagetest
$ /opt/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic messagetest
#进入交互模式随便输入信息
$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server  192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic messagetest --from-beginning
$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server  192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092 --topic messagetest --from-beginning
#查看是否可以获取到信息

2.3 部署ES

$ wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.6.0.rpm
$ yum -y install elasticsearch-6.6.0.rpm
$ egrep -v '#|^$' /etc/elasticsearch/elasticsearch.yml 
kafka01.name: kafka01
path.data: /elk/data
path.logs: /elk/log
network.host: 192.168.1.1
http.port: 9200
$ mkdir -p /elk/{data,log}
$ chown elasticsearch.elasticsearch /elk -R
$ systemctl start elasticsearch
$ ss -lnt | grep 9200
LISTEN     0      128     ::ffff:192.168.1.1:9200                    :::*    

2.4 部署kibana

$ wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.0-x86_64.rpm
$ yum -y install kibana-6.6.0-x86_64.rpm
$ egrep -v '#|^$' /etc/kibana/kibana.yml 
server.port: 5601
server.host: "192.168.1.1"
server.name: "kafka01"
elasticsearch.hosts: ["http://192.168.1.1:9200"]
kibana.index: ".kibana"
$ systemctl start kibana
$ ss -lnt | grep 5601
LISTEN     0      128    192.168.1.1:5601                     *:*         

访问页面:
20200328175433

2.5 部署nginx、filebeat

$ vim /etc/yum.repos.d/nginx.repo
[nginx-stable]
name=nginx stable repo
baseurl=http://nginx.org/packages/centos/$releasever/$basearch/
gpgcheck=0
enabled=1
$ yum -y install nginx httpd-tools
$ vim /etc/nginx/nginx.conf
#添加以下内容将其日志格式转换为json格式
    log_format json '{ "@time_local": "$time_local", '
                        '"remote_addr": "$remote_addr", '
                        '"referer": "$http_referer", '
                        '"request": "$request", '
                        '"status": $status, '
                        '"bytes": $body_bytes_sent, '
                        '"agent": "$http_user_agent", '
                        '"x_forwarded": "$http_x_forwarded_for", '
                        '"up_addr": "$upstream_addr",'
                        '"up_host": "$upstream_http_host",'
                        '"up_resp_time": "$upstream_response_time",'
                        '"request_time": "$request_time"'
' }';  

    access_log  /var/log/nginx/access.log  json;
$ nginx	
$ wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.0-x86_64.rpm
$ yum -y install filebeat-6.6.0-x86_64.rpm 
$ vim /etc/filebeat/filebeat.yml 
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
  json.keys_under_root: true
  json.overwrite_keys: true
  tags: ["access"]

- type: log
  enabled: true
  paths:
    - /var/log/nginx/error.log
  tags: ["error"]

output.kafka:
  hosts: ["192.168.1.1:9092","192.168.1.2:9092","192.168.1.3:9092"]
  topic: elklog
$ systemctl start filebeat
$ ab -c 100 -n 100 http://192.168.1.3/
$ ab -c 100 -n 100 http://192.168.1.3/error

2.6 部署logstash

$ wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.rpm
$ yum -y install logstash-6.6.0.rpm
$ vim /etc/logstash/conf.d/kafka.conf
#名称可以自定义,保证是在这个路径下
input{
  kafka {
    bootstrap_servers => "192.168.1.2:9092"
    topics => ["elklog"]
    group_id => "logstash"
    codec => "json"
  }
}

filter {
  mutate {
    convert => ["upstream_time","float"]
    convert => ["request_time","float"]
  }
}

output {
  if "access" in [tags] {
    elasticsearch {
      hosts => "http://192.168.1.1:9200"
      manage_template => false
      index => "nginx_access-%{+yyyy.MM}"
        }
  }
  if "error" in [tags] {
    elasticsearch {
      hosts => "http://192.168.1.1:9200"
      manage_template => false
      index => "nginx_error-%{+yyyy.MM}"
        }
  }
}
$ /usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/kafka.conf

如图:
20200328183757
由于不是一次成功的,所以图片中日志的条目可能有点不符!

自行添加索引,结果如图:
20200328184106

*************** 当你发现自己的才华撑不起野心时,就请安静下来学习吧!***************
原文地址:https://www.cnblogs.com/lvzhenjiang/p/14199348.html