Flume分析

       FlumeCloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

       当前Flume有两个版本Flume 0.9X版本的统称Flume-ogFlume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。

Flume运行环境:机器必须安装JDK6.0以上的版本,并且Flume目前只有Linux系统的启动脚本,没有Windows环境的启动脚本。

 

1.1 体系架构

Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些EventAgent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source

 

Flumeagent为最小的独立运行单位。一个agent就是一个JVM。单agentSourceSinkChannel三大组件构成,如下图:

 

 

1-1 数据流模型

 

组件

功能

Agent

使用JVM运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sourcessinks

Client

生产数据,运行在一个独立的线程。

Source

Client收集数据,传递给Channel

Sink

Channel收集数据,运行在一个独立线程。

Channel

连接sourcessinks,这个有点像一个队列。

Events

可以是日志记录、avro对象等。

 

1.2 Flume特点

FLUMOG有三种角色的节点,如图1-2:代理节点(agent)、收集节点(collector)、主节点(master)。

 

 

 1-2 FLUM OG架构图

 

agent从各个数据源收集日志数据,将收集到的数据集中到 collector,然后由收集节点汇总存入 hdfsmaster负责管理 agentcollector的活动。

 

  Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。

 

对于agent数据流配置就是从哪得到数据,把数据发送到哪个collector

对于collector是接收agent发过来的数据,把数据发送到指定的目标机器上。

Flume框架对hadoopzookeeper的依赖只是在jar包上,并不要求flume启动时必须将hadoopzookeeper服务也启动。

 

1.3 Flume核心组件

1、Flume核心组件-Source

1. FlumeSource:完成对日志的收集,分成transtionevent打入到channel之中。

2. Flume提供了各种source的实现,包括:

AvroSourceExceSourceSpoolingDirectorySourceNetCatSourceSyslogSourceSyslogTCPSource

SyslogUDPSourceHTTPSourceHDFSSourceetc

3. Flume自带了很多直接可用的数据源(source),如下表:

1.Flume’s Tiered Event Sources

collectorSource[(port)]

Collector source,监听端口汇聚数据

autoCollectorSource

通过master协调物理节点自动汇聚数据

logicalSource

逻辑source,由master分配端口并监听rpcSink

 

 

2.Flume’s Basic Sources

null

 

console

监听用户编辑历史和快捷键输入,只在node_nowatch模式下可用

stdin

监听标准输入,只在node_nowatch模式下可用,每行将作为一个event source

rpcSource(port)

rpc框架(thrift/avro)监听tcp端口

text("filename")

一次性读取一个文本,每行为一个event

tail("filename"[, startFromEnd=false])

每行为一个event。监听文件尾部的追加行,如果startFromEndtruetail将从文件尾读取,如果为falsetail将从文件开始读取全部数据

multitail("filename"[, file2 [,file3… ] ])

同上,同时监听多个文件的末尾

tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]])

监听目录中的文件末尾,使用正则去选定需要监听的文件(不包含目录),recurseDepth为递归监听其下子目录的深度

seqfile("filename")

监听hdfssequencefile,全路径

syslogUdp(port)

监听Udp端口

syslogTcp(port)

监听Tcp端口

syslogTcp1(port)

只监听Tcp端口的一个链接

execPeriodic("cmdline", ms)

周期执行指令,监听指令的输出,整个输出都被作为一个event

execStream("cmdline")

执行指令,监听指令的输出,输出的每一行被作为一个event

exec("cmdline"[, aggregate=false[,restart=false[,period=0]]])

执行指令,监听指令的输出,aggregate如果为true,整个输出作为一个event如果为false,则每行作为一个event。如果restarttrue,则按period为周期重新运行

synth(msgCount,msgSize)

随即产生字符串event,msgCount为产生数量,msgSize为串长度

synthrndsize(msgCount,minSize,maxSize)

同上,minSize – maxSize

nonlsynth(msgCount,msgSize)

 

asciisynth(msgCount,msgSize)

Ascii码字符

twitter("username","pw"[,"url"])

尼玛twitter的插件啊

irc("server",port, "nick","chan")

 

scribe[(+port)]

Scribe插件

report[(periodMillis)]

生成所有physical node报告为事件源

 

 

2、Flume核心组件-Sink

1. Flume Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

2. Flume提供了各种sink的实现,包括:

HDFSsinkLoggersinkAvrosinkFileRollsinkNullsinkHbasesinketc

3. Flume自带了很多直接可用的数据源(source),如下表:

1.Flume’s Collector Tier Event Sinks

collectorSink( "fsdir","fsfileprefix",rollmillis)

collectorSink,数据通过collector汇聚之后发送到hdfs, fsdirhdfs目录,fsfileprefix为文件前缀码

     

2.Flume’s Agent Tier Event Sinks

agentSink[("machine"[,port])]

Defaults to agentE2ESink,如果省略,machine参数,默认使用flume.collector.event.hostflume.collector.event.port作为默认collecotr(以下同此)

agentE2ESink[("machine"[,port])]

执着的agent,如果agent发送event没有收到collector成功写入的状态码,该event将被agent重复发送,直到接到成功写入的状态码

agentDFOSink[("machine" [,port])]

本地热备agentagent发现collector节点故障后,不断检查collector的存活状态以便重新发送event,在此间产生的数据将缓存到本地磁盘中

agentBESink[("machine"[,port])]

不负责的agent,如果collector故障,将不做任何处理,它发送的数据也将被直接丢弃

agentE2EChain("m1[:_p1_]" [,"m2[:_p2_]"[,…]])

指定多个collector提高可用性。 当向主collector发送event失效后,转向第二个collector发送,当所有的collector失败后,它会非常执着的再来一遍...

agentDFOChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

同上,当向所有的collector发送事件失效后,他会将event缓存到本地磁盘,并检查collector状态,尝试重新发送

agentBEChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

同上,当向所有的collector发送事件失效后,他会将event丢弃

autoE2EChain

无需指定collector,master协调管理event的流向

autoDFOChain

同上

autoBEChain

同上

 

3.Flume’s Logical Sinks

logicalSink("logicalnode")

 

 

4.Flume’s Basic Sinks

null

null

console[("formatter")]

转发到控制台

text("txtfile" [,"formatter"])

转发到文本文件

seqfile("filename")

转发到seqfile

dfs("hdfspath")

转发到hdfs

customdfs("hdfspath"[, "format"])

自定义格式dfs

+escapedCustomDfs("hdfspath", "file", "format")

 

rpcSink("host"[, port])

Rpc框架

syslogTcp("host"[,port])

发向网络地址

 

 

3、Flume核心组件-Channel

1. MemoryChannel可以实现高速的的吞吐,但是无法保证数据的完整性。

2. MemoryRecoverChannel在官方文档的建议上已经建议使用FileChannel来替换。

3. FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置目录和程序日志保存的目录设成不同的磁盘,以便提高效率。

 

4、Flume核心组件-collector

1. collector的作用是将多个agent的数据汇总后,加载到storage中。它的sourcesinkagent类似。

1.source

collectorSource[(port)]

Collector source,监听端口汇聚数据

autoCollectorSource

通过master协调物理节点自动汇聚数据

logicalSource

逻辑source,由master分配端口并监听rpcSink

 

2.sinks

collectorSink( "fsdir","fsfileprefix",rollmillis)

collectorSink,数据通过collector汇聚之后发送到hdfs, fsdirhdfs目录,fsfileprefix为文件前缀码

customdfs("hdfspath"[, "format"])

自定义格式dfs

 

5、Flume核心组件-storage

1. storage是存储系统,可以是一个普通file,也可以是HDFSHIVEHBase,分布式存储等。

6、Flume核心组件-Master

1. Master是管理协调agentcollector的配置等信息,是flume集群的控制器。

 

原文地址:https://www.cnblogs.com/pigdata/p/10305566.html