flume简介

组件介绍:

代理 Flume Agent

1 Flume内部有一个或者多个Agent
2 每一个Agent是一个独立的守护进程(JVM)
3 从客户端哪儿接收收集,或者从其他的Agent哪儿接收,然后迅速的将获取的数据传到下一个目的节点Agent
4 Agent主要由source、channel、sink三个组件组成。

agent source

1 一个flume源
2 负责一个外部源(数据生成器),如一个web服务器传递给他的事件
3 该外部源将它的事件以flume可以识别的格式(event)发送到flume中
4 当一个flume源接收到一个事件时,其将通过一个或多个通道存储该事件

agent channel

1 通道: 采用被动存储的形式,即通道会缓存该事件直到该事件被sink组件处理
2 所以Channel是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉它在source和sink间起着一共桥梁的作用,
channel是一个完整的事务,这一点保证了数据在收发的时候的一致性.并且它可以和任意数量的source和sink链接
3 可以通过参数设置event的最大个数 4 Flume通常选择FileChannel,而不使用Memory Channel。 Memory Channel: 内存存储事务,吞吐率极高,但存在丢数据风险 File Channel: 本地磁盘的事务实现模式,保证数据不会丢失(WAL实现)

监控网络端口使用

netcat
 1 # example.conf: A single-node Flume configuration
 2 
 3 # Name the components on this agent
 4 a1.sources = r1
 5 a1.sinks = k1
 6 a1.channels = c1
 7 
 8 # Describe/configure the source
 9 a1.sources.r1.type = netcat
10 a1.sources.r1.bind = localhost
11 a1.sources.r1.port = 44444
12 
13 # Describe the sink
14 a1.sinks.k1.type = logger
15 
16 # Use a channel which buffers events in memory
17 a1.channels.c1.type = memory
18 a1.channels.c1.capacity = 1000
19 a1.channels.c1.transactionCapacity = 100
20 
21 # Bind the source and sink to the channel
22 a1.sources.r1.channels = c1
23 a1.sinks.k1.channel = c1

启动命令:flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/example.conf -Dflume.root.logger=INFO,console

监控具体文件使用

exec
 1 # Name the components on this agent
 2 a1.sources = r1
 3 a1.sinks = k1
 4 a1.channels = c1
 5 
 6 # Describe/configure the source
 7 a1.sources.r1.type = exec
 8 a1.sources.r1.command = tail -F /home/log/data.log
 9 a1.sources.r1.shell = /bin/bash -c
10 
11 # Describe the sink
12 a1.sinks.k1.type = logger
13 
14 # Use a channel which buffers events in memory
15 a1.channels.c1.type = memory
16 
17 # Bind the source and sink to the channel
18 a1.sources.r1.channels = c1
19 a1.sinks.k1.channel = c1

启动命令:flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/conf/exec-memory-logger.conf -Dflume.root.logger=INFO,console

flume监控日志文件并持久化到hdfs

 1 # Name the components on this agent
 2 a1.sources = r1
 3 a1.sinks = k1
 4 a1.channels = c1
 5 
 6 # Describe/configure the source
 7 a1.sources.r1.type = exec
 8 a1.sources.r1.command = tail -F /home/data/data.log
 9 a1.sources.r1.channels = c1
10 
11 # Describe the sink
12 a1.sinks.k1.type = hdfs
13 a1.sinks.k1.channel = c1
14 a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
15 a1.sinks.k1.hdfs.filePrefix = events-
16 a1.sinks.k1.hdfs.fileType=DataStream
17 a1.sinks.k1.hdfs.useLocalTimeStamp = true  #必须得写 应为14行用到时间数据
18 
19 # Use a channel which buffers events in memory
20 a1.channels.c1.type = memory
21 
22 # Bind the source and sink to the channel
23 a1.sources.r1.channels = c1
24 a1.sinks.k1.channel = c1

别人写的

 1 # Name the components on this agent
 2 a1.sources = r1
 3 a1.sinks = k1
 4 a1.channels = c1
 5 
 6 # Describe/configure the source
 7 ## exec表示flume回去调用给的命令,然后从给的命令的结果中去拿数据
 8 a1.sources.r1.type = exec
 9 ## 使用tail这个命令来读数据
10 a1.sources.r1.command = tail -F /home/tuzq/software/flumedata/test.log
11 a1.sources.r1.channels = c1
12 
13 # Describe the sink
14 ## 表示下沉到hdfs,类型决定了下面的参数
15 a1.sinks.k1.type = hdfs
16 ## sinks.k1只能连接一个channel,source可以配置多个
17 a1.sinks.k1.channel = c1
18 ## 下面的配置告诉用hdfs去写文件的时候写到什么位置,下面的表示不是写死的,而是可以动态的变化的。表示输出的目录名称是可变的
19 a1.sinks.k1.hdfs.path = /flume/tailout/%y-%m-%d/%H%M/
20 ##表示最后的文件的前缀
21 a1.sinks.k1.hdfs.filePrefix = events-
22 ## 表示到了需要触发的时间时,是否要更新文件夹,true:表示要
23 a1.sinks.k1.hdfs.round = true
24 ## 表示每隔1分钟改变一次
25 a1.sinks.k1.hdfs.roundValue = 1
26 ## 切换文件的时候的时间单位是分钟
27 a1.sinks.k1.hdfs.roundUnit = minute
28 ## 表示只要过了3秒钟,就切换生成一个新的文件
29 a1.sinks.k1.hdfs.rollInterval = 3
30 ## 如果记录的文件大于20字节时切换一次
31 a1.sinks.k1.hdfs.rollSize = 20
32 ## 当写了5个事件时触发
33 a1.sinks.k1.hdfs.rollCount = 5
34 ## 收到了多少条消息往dfs中追加内容
35 a1.sinks.k1.hdfs.batchSize = 10
36 ## 使用本地时间戳
37 a1.sinks.k1.hdfs.useLocalTimeStamp = true
38 #生成的文件类型,默认是Sequencefile,可用DataStream:为普通文本
39 a1.sinks.k1.hdfs.fileType = DataStream
40 
41 # Use a channel which buffers events in memory
42 ##使用内存的方式
43 a1.channels.c1.type = memory
44 a1.channels.c1.capacity = 1000
45 a1.channels.c1.transactionCapacity = 100
46 
47 # Bind the source and sink to the channel
48 a1.sources.r1.channels = c1
49 a1.sinks.k1.channel = c1
原文地址:https://www.cnblogs.com/luozeng/p/8491149.html