测试博客

Fluentd+Kafka

Fluentd

What is Fluentd?

Fluentd is an open source data collector for unified logging layer.

Unified Logging Layer

Fluentd decouples data sources from backend systems by providing a unified logging layer in between.LINK

Simple yet Flexible

Fluentd's 300+ plugins connect it to many data sources and data outputs while keeping its core small and fast.

List of Data Outputs

List of Data Outputs


INSTALL

#ububtu 16.04
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
/etc/init.d/td-agent status
/var/log/td-agentd.log

CONFIGURE

fluentd提供一个管理后台,需要手动启动

nohup td-agentd-ui start 
http://127.0.0.1:9292    user:admin passwd:changeme 

DASHBOARD:9292

INSTALL OUPUT PLUGINS

apt install gem2deb 
td-agent-gem install fluent-plugin-kafka
td-agent-gem install fluent-plugin-webhdfs
td-agent-gem install fluent-plugin-influxdb

Kafka

INSTALL

安装jdk

wget http://120.52.72.23/download.oracle.com/c3pr90ntc0td/otn-pub/java/jdk/8u91-b14/jdk-8u91-linux-x64.tar.gz
/etc/profile
export JAVA_HOME=/247/ad
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$
source /etc/profile

Binary Download

wget http://apache.fayea.com/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz

CONFIGURE

#server.properties
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/ads247admin/kafka_2.9.2-0.8.2.2/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=6000

SetUp

启动zk
bin/zookeeper-server-start.sh config/zookeeper.properties &
启动kafka
bin/kafka-server-start.sh config/server.properties
创建 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syslog-topic
查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看kafka中数据是否进入
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic syslog-topic

测试过四个方案

方案1:fluentd+influxdb
方案2:fluentd+mongodb
方案3:fluentd+webhdfs
方案4:fluentd+kafka

通过脚本写json格式日志进入/var/log/20160608.log(chmod 645),然后通过fluentd收集写入kafka

<source>
  type tail
  path /var/log/20160608.log
  tag phplog.kafka
  format json
  time_key time
  pos_file /tmp/fluentd--1465375453.pos
</source>
<match phplog.kafka>
  @type kafka
  brokers localhost:9092        ##new version kafaka donnt need this?
  zookeeper localhost:2181
  default_topic syslog-topic
</match>

匹配写入mongodb

<match mongo.nginx>
  type mongo
  host 127.0.0.1
  port 27017
  database zhw
  collection mongo.nginx
  capped
  capped_size 100m
  user root
  password abc123456
</match>

设置匹配转发,匹配到tag:login.log转发到10.4.0.6:24225

<match login.log>
  type forward
  heartbeat_type udp
  <server>
    name n-app247-te-04
    host 10.4.0.6
    port 24225
  </server>
  <secondary>
    type file
    path /var/log/td-agent/error
  </secondary>
</match>

loger.php生成日志,直接发送到fluent server:24224存储到Haddop-HDFS

<source>
  type forward
  bind 0.0.0.0
  port 24224
  linger_timeout 0
  log_level info
</source>
<match login.log>
  @type webhdfs
  host 172.31.22.245
  port 50070
  path /ad/login.log
  flush_interval 10s
</match>

匹配nginx日志写入influxdb

<source>
  type tail
  path /var/log/nginx/access.log
  tag access.nginx
  format nginx
  time_format %d/%b/%Y:%H:%M:%S
  pos_file /tmp/fluentd--1464852502.pos
</source>
<match access.nginx>
  type influxdb
  dbname zhw
  flush_interval 10s 
  host 127.0.0.1
  port 8086
  user root
  abc123456
</match>

Created by ZhangWei @2016-07-08

ONLY 内部文档


原文地址:https://www.cnblogs.com/cx2c/p/6873277.html