filebeat-2-通过kafka队列链接logstash

filebeat 直接到logstash, 由于logstash的设计问题, 可能会出现阻塞问题, 因为中间使用消息队列分开

可以使用redis, 或者kafka, 这儿使用的是kafka

1, 安装

kafka的安装, 解压可用, 但需要zookeeper, 内置了一个zookeeper, 直接使用即可

1), 启动内置zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

2), 修改kafka的配置文件

vim ./conf/server.properties

############################# Server Basics #############################
broker.id=0
delete.topic.enable=true
 
############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
 
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

3), 启动kafkaserver

/bin/kafka-server-start.sh ./config/server.properties &

4),修改filebeat文件, 最终形态

cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v '#' | grep -v '^$'
filebeat.prospectors:
- input_type: log
  paths:
    - /var/log/nginx/*.log
  encoding: utf-8
  document_type: my-nginx-log
  scan_frequency: 5s
  harvester_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
output.kafka:
  enabled: true
  hosts: ["www.wenbronk.com:9092"]
  topic: elk-%{[type]}
  worker: 2
  max_retries: 3
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s
  channel_buffer_size: 256
  keep_alive: 60
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 0
  client_id: beats

5), 重新启动filebeat

./filebeat -c ./filebeat.yml &

6), 修改 logstash的input

input {
    kafka  {
      #codec => "json"
      topics_pattern => "elk-.*"
      bootstrap_servers => "127.0.0.1:9092"
      auto_offset_reset => "latest"
      group_id => "logstash-g1"
    }
}
output {
    elasticsearch {                                  #Logstash输出到elasticsearch;
      hosts => ["localhost:9200"]                    #elasticsearch为本地;
      index => "logstash-nginx-%{+YYYY.MM.dd}"       #创建索引;
      document_type => "nginx"                       #文档类型;
      workers => 1                                   #进程数量;
      user => elastic                                #elasticsearch的用户;
      password => changeme                           #elasticsearch的密码;
      flush_size => 20000
      idle_flush_time => 10
 }
}

7), 重启logstash

8 ), 页面访问 nginx, 可以查看消息队列中的消息

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning

 参考: http://www.ywnds.com/?p=9776

原文地址:https://www.cnblogs.com/wenbronk/p/7412141.html