flume集群日志收集

一、Flume简介

  Flume是一个分布式的、高可用的海量日志收集、聚合和传输日志收集系统,支持在日志系统中定制各类数据发送方(如:Kafka,HDFS等),便于收集数据。其核心为agent,agent是一个java进程,运行在日志收集节点。

agent里面包含3个核心组件:source、channel、sink。
   source组件是专用于收集日志的,可以处理各种类型各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义,同时 source组件把数据收集

以后,临时存放在channel中。

  channel组件是在agent中专用于临时存储数据的,可以存放在memory、jdbc、file、自定义等。channel中的数据只有在sink发送成功之后才会被删除。

  sink组件是用于把数据发送到目的地的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义。

  在整个数据传输过程中,流动的是event。事务保证是在event级别。flume可以支持多级flume的agent,支持扇入(fan-in)、扇出(fan-out)。

二、环境准备

  1)hadoop集群(楼主用的版本2.7.3,共6个节点,可参考http://www.cnblogs.com/qq503665965/p/6790580.html

  2)flume集群规划:

HOST

作用

方式

路径

hadoop01

agent

spooldir

/home/hadoop/logs

hadoop05

collector

HDFS

/logs

hadoop06

collector

HDFS

/logs

  其基本结构官网给出了更加具体的说明,这里楼主就直接copy过来了:

三、集群配置

  1)系统环境变量配置  

1 export FLUME_HOME=/home/hadoop/apache-flume-1.7.0-bin
2 export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$FLUME_HOME/bin

  记得 source /etc/profile

  2)flume JDK环境  

1 mv flume-env.sh.templete flume-env.sh
2 vim flume-env.sh
3 export JAVA_HOME=/usr/jdk1.7.0_60//增加JDK安装路径即可

  3)hadoop01中flume配置

  在conf目录增加配置文件 flume-client ,内容为:  

 1 #agent1名称
 2 agent1.channels = c1
 3 agent1.sources = r1
 4 agent1.sinks = k1 k2
 5 
 6 #sink组名称
 7 agent1.sinkgroups = g1 
 8 
 9 #set channel
10 agent1.channels.c1.type = memory
11 agent1.channels.c1.capacity = 1000
12 agent1.channels.c1.transactionCapacity = 100
13 
14 agent1.sources.r1.channels = c1
15 agent1.sources.r1.type = spooldir
16 #日志源
17 agent1.sources.r1.spoolDir =/home/hadoop/logs
18 
19 agent1.sources.r1.interceptors = i1 i2
20 agent1.sources.r1.interceptors.i1.type = static
21 agent1.sources.r1.interceptors.i1.key = Type
22 agent1.sources.r1.interceptors.i1.value = LOGIN
23 agent1.sources.r1.interceptors.i2.type = timestamp
24 
25 # 设置sink1
26 agent1.sinks.k1.channel = c1
27 agent1.sinks.k1.type = avro
28 #sink1所在主机
29 agent1.sinks.k1.hostname = hadoop05
30 agent1.sinks.k1.port = 52020
31 
32 #设置sink2
33 agent1.sinks.k2.channel = c1
34 agent1.sinks.k2.type = avro
35 #sink2所在主机
36 agent1.sinks.k2.hostname = hadoop06
37 agent1.sinks.k2.port = 52020
38 
39 #设置sink组包含sink1,sink2
40 agent1.sinkgroups.g1.sinks = k1 k2
41 
42 #高可靠性
43 agent1.sinkgroups.g1.processor.type = failover
44 #设置优先级
45 agent1.sinkgroups.g1.processor.priority.k1 = 10
46 agent1.sinkgroups.g1.processor.priority.k2 = 1
47 agent1.sinkgroups.g1.processor.maxpenalty = 10000

  4)hadoop05配置

 1 #设置 Agent名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #设置channlels
 7 a1.channels.c1.type = memory
 8 a1.channels.c1.capacity = 1000
 9 a1.channels.c1.transactionCapacity = 100
10 
11 # 当前节点信息
12 a1.sources.r1.type = avro
13 #绑定主机名
14 a1.sources.r1.bind = hadoop05
15 a1.sources.r1.port = 52020
16 a1.sources.r1.interceptors = i1
17 a1.sources.r1.interceptors.i1.type = static
18 a1.sources.r1.interceptors.i1.key = Collector
19 a1.sources.r1.interceptors.i1.value = hadoop05
20 a1.sources.r1.channels = c1
21 
22 #sink的hdfs地址
23 a1.sinks.k1.type=hdfs
24 a1.sinks.k1.hdfs.path=/logs
25 a1.sinks.k1.hdfs.fileType=DataStream
26 a1.sinks.k1.hdfs.writeFormat=TEXT
27 #没1K产生文件
28 a1.sinks.k1.hdfs.rollInterval=1
29 a1.sinks.k1.channel=c1
30 #文件后缀
31 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

  5)hadoop06配置

 1 #设置 Agent名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #设置channel
 7 a1.channels.c1.type = memory
 8 a1.channels.c1.capacity = 1000
 9 a1.channels.c1.transactionCapacity = 100
10 
11 # 当前节点信息
12 a1.sources.r1.type = avro
13 #绑定主机名
14 a1.sources.r1.bind = hadoop06
15 a1.sources.r1.port = 52020
16 a1.sources.r1.interceptors = i1
17 a1.sources.r1.interceptors.i1.type = static
18 a1.sources.r1.interceptors.i1.key = Collector
19 a1.sources.r1.interceptors.i1.value = hadoop06
20 a1.sources.r1.channels = c1
21 #设置sink的hdfs地址目录
22 a1.sinks.k1.type=hdfs
23 a1.sinks.k1.hdfs.path=/logs
24 a1.sinks.k1.hdfs.fileType=DataStream
25 a1.sinks.k1.hdfs.writeFormat=TEXT
26 a1.sinks.k1.hdfs.rollInterval=1
27 a1.sinks.k1.channel=c1
28 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

四、启动flume集群

  1)启动collector,即hadoop05,hadoop06 

1 flume-ng agent -n a1 -c conf -f flume-server -Dflume.root.logger=DEBUG,console

  2)启动agent,即hadoop01

flume-ng agent -n a1 -c conf -f flume-client -Dflume.root.logger=DEBUG,console

  agent启动后,hadoop05,hadoop06的控制台可看到如下打印信息:

  

五、日志收集测试

  1)启动zookeeper集群(未搭建zookeeper的同学可以忽略)

  2)启动hdfs start-dfs.sh 

  3)模拟网站日志,楼主这里随便弄的测试数据

  4)上传到/hadoop/home/logs

  hadoop01输出:

   hadoop05输出:

 

  由于hadoop05设置的优先级高于hadoop06,因此hadoop06无日志写入。

  我们再看hdfs上,是否成功上传了日志文件:

  

六、高可用性测试

  由于楼主设置的hadoop05的优先级要高于hadoop06,这也是上面的日志收集hadoop05输出而不是hadoop06输出的原因。现在我们干掉优先级高的hadoop05,看hadoop06是否能正常进行日志采集工作。

  

  我们向日志源添加一个测试日志文件:

  

  hadoop06输出:

   查看hdfs:

  

  好了!flume集群配置及日志收集就介绍到这里,下次楼主楼主会具体介绍利用mapreduce对日志进行清洗,并存储到hbase相关的内容。

  

原文地址:https://www.cnblogs.com/qq503665965/p/6853209.html