Flume

Flume

flume的概述

Apache Flume是一个分布式,可靠且可用的系统,用于有效地从许多不同的source收集,聚合和移动大量日志数据到集中式数据存储。

Apache Flume的使用不仅限于日志数据聚合。由于数据source是可定制的,因此Flume可用于传输大量event 数据,包括但不限于网络流量数据,社交媒体生成的数据,电子邮件消息以及几乎任何可能的数据source。

系统要求

•Java运行时环境 - Java 1.8或更高版本

•内存 - 为source,channel或 sink 配置的内存

•磁盘空间 - channel或sink配置的磁盘空间

•目录权限 - agent使用的目录的读/写权限

flume的架构

Flume event 被定义为具有字节有效负载和可选字符串属性集的数据流单元。Flume agent 是一个(JVM)进程,它承载event 从外部source流向下一个目标(跃点)的组件。

​ 1.交互过程

Flume source消耗由外部 source(如Web服务器)传递给它的 event 。外部source以目标Flume source识别的格式向Flume发送event 。例如,Avro Flume source可用于从Avro客户端或从Avrosink发送event 的流中的其他Flume agent 接收Avroevent 。可以使用Thrift Flume Source定义类似的流程,以接收来自Thrift Sink或Flume Thrift Rpc客户端或Thrift客户端的event ,这些客户端使用Flume thrift协议生成的任何语言编写。当Flume source接收event 时,它将其存储到一个或多个channels 。该channel是一个被动存储器,可以保持event 直到它被Flume sink消耗。文件channel就是一个例子 - 它由本地文件系统支持。sink从channel中移除event 并将其放入外部存储库(如HDFS(通过Flume HDFS sink))或将其转发到流中下一个Flume agent (下一跳)的Flume source。给定 agent 中的source和sink与channel中暂存的event 异步运行。

​ 2.复杂的流程

Flume允许用户构建多跳流,其中event 在到达最终目的地之前经过多个 agent 。它还允许fan-in 和fan-out,上下文路由和故障跳跃的备份路由(故障转移)。

​ 3.可靠性

event 在每个 agent 的channel中进行。然后将event 传递到流中的下一个 agent 或终端存储库(如HDFS)。只有将event 存储在下一个 agent 的channel或终端存储库中后,才会从channel中删除这些event 。这就是Flume中的单跳消息传递语义如何提供流的端到端可靠性。

Flume使用事务方法来保证event 的可靠传递。source和sink分别在事务中封装由channel 提供的事务中放置或提供的event 的存储/检索。这可确保event 集在流中从一个点到另一个点可靠地传递。在多跳流的情况下,来自前一跳的sink和来自下一跳的source都运行其事务以确保数据安全地存储在下一跳的channel 中。

​ 4.可恢复性

event 在channel中进行,该channel管理从故障中恢复。Flume支持由本地文件系统支持的持久文件channel。还有一个内存channel,它只是将event 存储在内存中的队列中,这更快,但是当 agent 进程死亡时仍然留在内存channel中的任何event 都无法恢复。

flume的配置和安装

1.将flume的压缩文件上传到linux中

2.进入到flume/conf 将 flume-env.sh.templ 复制为flume-env.sh

​ 将export JAVA_OPTS 那行的注释去掉

3.进到flume目录下创建 dir-hdfs.conf 添加内容时记得把注解去掉

vi dir-hdfs.conf
#定义三大组件的名称
ag1.sources = source1
ag1.sinks = sink1
ag1.channels = channel1

# 配置source组件
ag1.sources.source1.type = spooldir
ag1.sources.source1.spoolDir = /root/data/log
ag1.sources.source1.fileSuffix=.FINISHED
ag1.sources.source1.inputCharset=utf-8
ag1.sources.source1.deserializer.maxLineLength=5120

# 配置sink组件
ag1.sinks.sink1.type = hdfs
ag1.sinks.sink1.hdfs.path =hdfs://192.168.56.2/access_log/%y-%m-%d/%H-%M
ag1.sinks.sink1.hdfs.filePrefix = app_log
ag1.sinks.sink1.hdfs.fileSuffix = .log
ag1.sinks.sink1.hdfs.batchSize= 100
ag1.sinks.sink1.hdfs.fileType = DataStream
ag1.sinks.sink1.hdfs.writeFormat =Text

## roll:滚动切换:控制写文件的切换规则
 ## 按文件体积(字节)来切   
ag1.sinks.sink1.hdfs.rollSize = 512000   

 ## 按event条数切
ag1.sinks.sink1.hdfs.rollCount = 1000000 

 ## 按时间间隔切换文件
ag1.sinks.sink1.hdfs.rollInterval = 60   

## 控制生成目录的规则(round回滚)
ag1.sinks.sink1.hdfs.round = true
ag1.sinks.sink1.hdfs.roundValue = 10
ag1.sinks.sink1.hdfs.roundUnit = minute
ag1.sinks.sink1.hdfs.useLocalTimeStamp = true

# channel组件配置
ag1.channels.channel1.type = memory

 ## event条数
ag1.channels.channel1.capacity = 500000  
##flume事务控制所需要的缓存容量600条event
ag1.channels.channel1.transactionCapacity = 600  

# 绑定source、channel和sink之间的连接
ag1.sources.source1.channels = channel1
ag1.sinks.sink1.channel = channel1

4.启动Flume 命令 在flume的目录下执行

bin/flume-ng agent -c conf/ -f dir-hdfs.conf -n ag1 -Dflume.root.logger=INFO,console

启动后flume会一直在运行,在创建一个Xshell窗口进行一下操作

5.根据你配置source组件 spoolDir 创建对应的路径 创建源目录log 给与777权限 并且在log目录下存放需要采集的数据

我的是ag1.sources.source1.spoolDir = /root/data/log

cd ~  #进入根目录 /root
 
mkdir data  #创建data

cd data   #进入data

mkdir log	#创建log

chmod 777 log  #给log赋予权限

touch a.txt  #创建一个文本插入数据用来测试

echo hello world > a.txt  

mv a.txt log/a.txt

6.观察flume中日志信息,查看采集后的文件是否加了后缀 .FINISHED

同时可在/root/data/log 中查看他的变化,和hdfs上的access_log文件夹里面的变化

Flume配置完成

原文地址:https://www.cnblogs.com/A-Nan-q/p/14175418.html