flume案例:netcat-console

flume案例:netcat-console

Flume 1.8

1、一个hello world案例。  

# example.conf: 一个单节点的 Flume 实例配置

# 配置Agent a1各个组件的名称
a1.sources = r1    #Agent a1 的source r1
a1.sinks = k1      #Agent a1 的sink k1
a1.channels = c1   #Agent a1 的channel c1

# 配置Agent a1的source r1的属性
a1.sources.r1.type = netcat       #使用的是NetCat TCP Source,这个的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名
a1.sources.r1.bind = localhost    #NetCat TCP Source监听的hostname,这个是本机
a1.sources.r1.port = 44444        #监听的端口

# 配置Agent a1的sink k1的属性
a1.sinks.k1.type = logger         # sink使用的是Logger Sink,这个配的也是别名

# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的
a1.channels.c1.type = memory                #channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.capacity = 1000              #表示a1的channel总容量1000个event
a1.channels.c1.transactionCapacity = 100    #表示a1的channel传输时,收集了100个event以后再提交事务

# 把source和sink绑定到channel上
a1.sources.r1.channels = c1       #source r1绑定到channel c1
a1.sinks.k1.channel = c1          #sink k1绑定到channel c1

这个配置文件定义了一个Agent叫做a1,a1有一个source监听本机44444端口上接收到的数据、一个缓冲数据的channel还有一个把Event数据输出到控制台的sink。这个配置文件给各个组件命名,并且设置了它们的类型和其他属性。通常一个配置文件里面可能有多个Agent,当启动Flume时候通常会传一个Agent名字来做为程序运行的标记。

用下面的命令加载这个配置文件启动Flume:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

参数说明

--conf conf/  :表示配置文件存储在conf/目录

--name a1 :表示给agent起名为a1

--conf-file job/flume-telnet.conf :flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件

-Dflume.root.logger==INFO,console -D表示flume运行时动态修改flume.root.logger参数属性值,并控制台日志打印级别设置为INFO级别。日志级别包括:loginfo、warnerror

测试,打开一个 新的终端 窗口,用telnet命令连接本机的44444端口,然后输入Hello world!后按回车,这时收到服务器的响应[OK](这是 NetCat TCP Source 默认给返回的),说明一行数据已经成功发送。

#判断44444端口是否被占用
[jason@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

Flume的终端 里面会以log的形式输出这个收到的Event内容。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

Flume可以替换配置文件中的环境变量,例如:

a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${NC_PORT}
a1.sources.r1.channels = c1

警告:   注意了,目前只允许在value里面使用环境变量(也就是说只能在等号右边用,左边不行)

    环境变量可以用其他方式配置,比如在conf/flume-env.sh里面设置。

启动Agent时候加上 propertiesImplementation = org.apache.flume.node.EnvVarResolverProperties 就可以了。

例如:

$ NC_PORT=44444 bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

环境变量可以用其他方式配置,比如在conf/flume-env.sh里面设置。

2、拦截器案例

cat jobs/netcat_console.conf

#组织agent 
netcatConsole.sources = netcatSrc
netcatConsole.channels = consoleChn
netcatConsole.sinks = consoleSnk

#组织source
netcatConsole.sources.netcatSrc.type = netcat
netcatConsole.sources.netcatSrc.bind = 0.0.0.0
netcatConsole.sources.netcatSrc.port = 33333
netcatConsole.sources.netcatSrc.interceptors = i1 i2
# source正则提取器拦截器
netcatConsole.sources.netcatSrc.interceptors.i1.type=regex_extractor
netcatConsole.sources.netcatSrc.interceptors.i1.regex = .*((\d\d\d\d-\d\d-\d\d)|(\d\d\d\d/\d\d/\d\d)|(\d\d\d\d\d\d\d\d)).*
#netcatConsole.sources.netcatSrc.interceptors.i1.regex = ^(?:\n)?(\d\d\d\d-\d\d-\d\d\s\d\d:\d\d)
netcatConsole.sources.netcatSrc.interceptors.i1.serializers = s1
netcatConsole.sources.netcatSrc.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
#netcatConsole.sources.netcatSrc.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
netcatConsole.sources.netcatSrc.interceptors.i1.serializers.s1.name = inc_day
#netcatConsole.sources.netcatSrc.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH

# source正则提取器2拦截器
netcatConsole.sources.netcatSrc.interceptors.i2.type=regex_extractor
netcatConsole.sources.netcatSrc.interceptors.i2.regex = (\d):(\d):(\d)
netcatConsole.sources.netcatSrc.interceptors.i2.serializers = s1 s2 s3
netcatConsole.sources.netcatSrc.interceptors.i2.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
netcatConsole.sources.netcatSrc.interceptors.i2.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
netcatConsole.sources.netcatSrc.interceptors.i2.serializers.s3.type = org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
netcatConsole.sources.netcatSrc.interceptors.i2.serializers.s1.name = one
netcatConsole.sources.netcatSrc.interceptors.i2.serializers.s2.name = two
netcatConsole.sources.netcatSrc.interceptors.i2.serializers.s3.name = three

#组织sink
netcatConsole.sinks.consoleSnk.type = logger

# 组织channel:Use a channel which buffers events in memory
netcatConsole.channels.consoleChn.type = memory
netcatConsole.channels.consoleChn.capacity = 1000
netcatConsole.channels.consoleChn.transactionCapacity = 100

#将source和sink绑定到channel
netcatConsole.sources.netcatSrc.channels = consoleChn
netcatConsole.sinks.consoleSnk.channel = consoleChn

启动flume agent

bin/flume-ng agent --conf conf/ --name netcatConsole --conf-file jobs/netcat_console.conf -Dflume.root.logger=INFO,console

测试,打开一个新的终端窗口,用telnet命令连接本机的33333端口,

telnet 0.0.0.0 33333
原文地址:https://www.cnblogs.com/LIAOBO/p/13603047.html