Hadoop生态圈-Flume的组件之拦截器与选择器

                   Hadoop生态圈-Flume的组件之拦截器与选择器

                                            作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

  本篇博客只是配置的是Flume主流的Interceptors,想要了解更详细的配置信息请参考官网:http://flume.apache.org/FlumeUserGuide.html#flume-interceptors。

   想必大家都知道Flume的组件有Source,channel和sink。其实在Flume还有一些更深层的东西,比如你知道soucre是如何将数据传送给channel的吗?那你有知道channel又是如何将数据发送给sink的吗?对于一个Agent来说,它只能有一个source,但是它可以有多个channel和sink,如下图: 

   接下来就跟着我一起了了解一下更深层次的知识吧。接下来我们就一起探讨一下source是如何将数据发送到channel中的,以及sink是处理数据的。

一.Source端源码查看

1>.获取一行数据,使用其构建Event

2>.使用processEvent处理数据

 

3>.在处理过程中,event需要通过拦截器链,相当于过滤数据

4>.在拦截器链中,通过迭代所有拦截器,对数据进行多次处理(例如:host拦截器,是对event进行添加头部操作)

5>.通过拦截器处理后的event,再次进入到通道挑选器

 

6>.迭代所有channel,将数据放进channel中

  通过上面的源码解析,看下面这张图应该就不是什么难事了吧:

  这个时候,你是否绝对第一张图画得并不自信呢?这个时候我们可以把第一张图的Source端流程画得更详细一点,如下:

二.拦截器(Interceptors)

1>.Interceptors 功能

  答:拦截器是在source端的在处理过程中能够对数据(event)进行修改或丢弃的组件。

2>.官方文档

 

3>.host interceptor(将发送的event添加主机名的header)配置案例

  a>.实际配置参数:

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_hostInterceptor.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 指定添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
# 指定header的key
a1.sources.r1.interceptors.i1.hostHeader = hostname
# 指定header的value为主机ip
a1.sources.r1.interceptors.i1.useIP = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[yinzhengjie@s101 ~]$ 

  b>启动agent进程:

  c>.source端产生数据(启动nc):

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

4>.static interceptor(静态拦截器,手动指定key-value)配置案例

  a>.实际配置参数:

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_staticInterceptor.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 指定添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = name
a1.sources.r1.interceptors.i1.value = yinzhengjie

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[yinzhengjie@s101 ~]$ 

  b>启动agent进程:

  c>.source端产生数据(启动nc): 

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

5>.timestamp interceptor(将发送的event添加时间戳的header)配置案例

  a>.实际配置参数:

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_timestampInterceptor.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 指定添加拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[yinzhengjie@s101 ~]$ 

  b>启动agent进程:

 

  c>.source端产生数据(启动nc): 

 

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

6>.interceptor chain(连接器链)配置案例

  a>.实际配置参数:

[yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_chainInterceptor.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 8888

# 指定添加拦截器
a1.sources.r1.interceptors = i1 i2 i3
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
# 指定header的key
a1.sources.r1.interceptors.i1.hostHeader = hostname
# 指定header的value为主机ip
a1.sources.r1.interceptors.i1.useIP = true

# 添加i2拦截器
a1.sources.r1.interceptors.i2.type = timestamp

# 添加i3拦截器
a1.sources.r1.interceptors.i3.type = remove_header
a1.sources.r1.interceptors.i3.withName = timestamp

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

[yinzhengjie@s101 ~]$ 

  b>启动agent进程:

  c>.source端产生数据(启动nc): 

  d>.检查sink端数据(检查定义好的目录"/home/yinzhengjie/log2")

原文地址:https://www.cnblogs.com/yinzhengjie/p/9205536.html