【翻译】Flume 1.8.0 User Guide(用户指南) Processors

翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分为以下5篇:

【翻译】Flume 1.8.0 User Guide(用户指南)

【翻译】Flume 1.8.0 User Guide(用户指南) source

【翻译】Flume 1.8.0 User Guide(用户指南) Sink

【翻译】Flume 1.8.0 User Guide(用户指南) Channel

【翻译】Flume 1.8.0 User Guide(用户指南) Processors

Flume Sink Processors

接收器组允许用户将多个接收器分组到一个实体中。接收器处理器可用于在组内的所有接收器上提供负载平衡功能,或在出现暂时故障时实现从一个接收器到另一个接收器的故障转移。

必须属性以粗体显示。

Property NameDefaultDescription
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be defaultfailover or load_balance

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

Default Sink Processor

默认接收器处理器只接受单个接收器。用户不必为单个接收器创建处理器(接收器组)。相反,用户可以遵循本用户指南中前面解释的源-通道-接收器模式。

Failover Sink Processor

故障转移接收器处理器维护一个优先级较高的接收器列表,确保只要有一个可用的接收器,就会处理(交付)事件。

故障转移机制的工作方式是将失败的接收器降级到池中,在池中为它们分配一个冷却期,在重试之前随着顺序故障的增加而增加。一旦接收器成功发送事件,它将被恢复到活动池。接收器有一个与之相关的优先级,越大,优先级越高。如果一个接收器在发送事件时失败,那么下一个具有最高优先级的接收器将在下一次发送事件时尝试。例如,优先级为100的接收器在优先级为80的接收器之前被激活。如果没有指定优先级,则根据配置中指定接收器的顺序确定thr优先级。

若要配置,请设置接收器组处理器进行故障转移,并为所有单个接收器设置优先级。所有指定的优先级必须是唯一的。此外,可以使用maxpenalty属性设置故障转移时间的上限(以毫秒为单位)。

必须属性以粗体显示。

Property NameDefaultDescription
sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be failover
processor.priority.<sinkName>

Priority value. <sinkName> must be one of the sink instances associated with the

current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority

processor.maxpenalty 30000 The maximum backoff period for the failed Sink (in millis)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor

负载平衡接收器处理器提供了跨多个接收器的负载平衡流的能力。它维护一个活动接收器的索引列表,其中必须分布负载。实现支持通过round_robin或随机选择机制分配负载。选择机制的选择默认为round_robin类型,但是可以通过配置覆盖。通过继承AbstractSinkSelector的自定义类支持自定义选择机制。

调用时,此选择器使用其配置的选择机制选择下一个接收器并调用它。对于round_robin和random,如果所选的接收器无法交付事件,处理器将通过其配置的选择机制选择下一个可用的接收器。此实现不会将失败的接收器列入黑名单,而是继续乐观地尝试所有可用的接收器。如果所有接收器调用都导致失败,则选择器将失败传播到接收器运行器。

如果启用了backoff,接收器处理器将把失败的接收器列入黑名单,在给定的超时中删除它们。当超时结束时,如果接收仍然是无响应的,则以指数方式增加超时,以避免在无响应接收上陷入长时间等待。禁用此功能后,在循环中,所有失败的接收器负载将被传递到行中的下一个接收器,因此不会均衡

必须属性以粗体显示。

Property NameDefaultDescription
processor.sinks Space-separated list of sinks that are participating in the group
processor.type default The component type name, needs to be load_balance
processor.backoff false Should failed sinks be backed off exponentially.
processor.selector round_robin Selection mechanism. Must be either round_robinrandom or FQCN of custom class that inherits from AbstractSinkSelector
processor.selector.maxTimeOut 30000 Used by backoff selectors to limit exponential backoff (in milliseconds)

Example for agent named a1:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Custom Sink Processor

目前不支持自定义接收器处理器。

Event Serializers

file_roll接收器和hdfs接收器都支持EventSerializer接口。下面提供了带有Flume的eventserializer的详细信息。

Body Text Serializer

别名:text。此拦截器将事件体写入输出流,而不进行任何转换或修改。忽略事件标题。配置选项如下:

Property NameDefaultDescription
appendNewline true

Whether a newline will be appended to each event at write time. The default of true assumes that

events do not contain newlines, for legacy reasons.

Example for agent named a1:

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

“Flume Event” Avro Event Serializer

别名:avro_event。

这个拦截器将Flume事件序列化到Avro容器文件中。使用的模式与Avro RPC机制中Flume事件使用的模式相同。

这个序列化器继承了AbstractAvroEventSerializer类。

配置选项如下:

Property NameDefaultDescription
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.

 Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

Avro Event Serializer

别名:此序列化器没有别名,必须使用全限定类名类名指定。

这将Flume事件序列化到Avro容器文件中,如“Flume事件”Avro事件序列化器,但是记录模式是可配置的。记录模式可以指定为Flume配置属性,也可以在事件头中传递。

要将记录模式作为Flume配置的一部分传递,请使用下面列出的属性schemaURL。

要在事件标头中传递记录模式,请指定事件标头flume.avro.schema。包含模式或flume.av .schema的json格式表示的文本。一个可以找到模式的url (hdfs:/…支持uri)。

这个序列化器继承了AbstractAvroEventSerializer类。

配置选项如下:

Property NameDefaultDescription
syncIntervalBytes 2048000 Avro sync interval, in approximate bytes.
compressionCodec null Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.
schemaURL null Avro schema URL. Schemas specified in the header ovverride this option.

Example for agent named a1:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

Flume Interceptors

Flume能够修改/删除飞行中的事件。这是在拦截器的帮助下完成的。拦截器是实现org.apache.flume.interceptor.Interceptor接口的类。拦截器可以根据开发人员选择的任何标准修改甚至删除事件。Flume支持拦截器的链接。这可以通过在配置中指定拦截器构建器类名的列表来实现。拦截器在源配置中指定为空格分隔的列表。指定拦截器的顺序就是调用它们的顺序。一个拦截器返回的事件列表传递给链中的下一个拦截器。拦截器可以修改或删除事件。如果拦截器需要删除事件,它只是在返回的列表中不返回该事件。如果要删除所有事件,那么它只返回一个空列表。拦截器是命名组件,下面是一个通过配置创建拦截器的例子:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

注意,拦截器构建器被传递给type config参数。拦截器本身是可配置的,可以像传递给任何其他可配置组件一样传递配置值。在上面的示例中,首先将事件传递给HostInterceptor,然后将HostInterceptor返回的事件传递给TimestampInterceptor。可以指定完全限定类名(FQCN)或别名时间戳。如果有多个收集器写入相同的HDFS路径,那么还可以使用HostInterceptor。

Timestamp Interceptor

此拦截器将插入事件标头,即它处理事件的millis时间。这个拦截器插入一个带有键时间戳(或由header属性指定)的消息头,其值是相关的时间戳。如果配置中已有时间戳,则此拦截器可以保留该时间戳。

Property NameDefaultDescription
type The component type name, has to be timestamp or the FQCN
header timestamp The name of the header in which to place the generated timestamp.
preserveExisting false If the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

Host Interceptor

这个拦截器插入代理运行的主机的主机名或IP地址。它插入一个带有密钥主机的头或一个已配置密钥,该密钥的值是基于配置的主机名或主机的IP地址。

Property NameDefaultDescription
type The component type name, has to be host
preserveExisting false If the host header already exists, should it be preserved - true or false
useIP true Use the IP Address if true, else use hostname.
hostHeader host The header key to be used.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

Static Interceptor 

静态拦截器允许用户向所有事件附加一个具有静态值的静态标题。

当前实现不允许同时指定多个标题。相反,用户可以链接多个静态拦截器,每个拦截器定义一个静态头。

Property NameDefaultDescription
type The component type name, has to be static
preserveExisting true If configured header already exists, should it be preserved - true or false
key key Name of header that should be created
value value Static value that should be created

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

Remove Header Interceptor

这个拦截器通过删除一个或多个header来操纵Flume事件header。它可以删除静态定义的头、基于正则表达式的头或列表中的头。如果这些都没有定义,或者没有标题与标准匹配,就不会修改Flume事件。

注意,如果只需要删除一个头,那么通过名称指定它会比其他两个方法提供更好的性能。

Property NameDefaultDescription
type The component type name has to be remove_header
withName Name of the header to remove
fromList List of headers to remove, separated with the separator specified by fromListSeparator
fromListSeparator s*,s*

Regular expression used to separate multiple header names in the list specified by 

fromList. Default is a comma surrounded by any number of whitespace characters

matching All the headers which names match this regular expression are removed

 UUID Interceptor

这个拦截器在所有被拦截的事件上设置一个统一的唯一标识符。一个示例UUID是b5755073-77a9-43c1-8fa -b7a586fc1b97,它表示128位值。

如果没有事件的应用程序级惟一键可用,可以考虑使用UUIDInterceptor自动为事件分配UUID。当事件进入Flume网络时,为它们分配uuid是非常重要的;也就是说,在第一个Flume source 的流中。这使得在为高可用性和高性能而设计的Flume网络中,面对复制和重新交付时,可以对事件进行后续重复数据删除。如果应用程序级密钥可用,这比自动生成的UUID更可取,因为它使用已知的应用程序级密钥支持数据存储中事件的后续更新和删除。

Property NameDefaultDescription
type The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id The name of the Flume header to modify
preserveExisting true If the UUID header already exists, should it be preserved - true or false
prefix “” The prefix string constant to prepend to each generated UUID

Morphline Interceptor

这个拦截器通过一个形态线配置文件过滤事件,该文件定义了一个转换命令链,将记录从一个命令传输到另一个命令。例如,morphline可以忽略某些事件,或者通过基于正则表达式的模式匹配更改或插入某些事件头部,或者可以通过Apache Tika自动检测并在被截获的事件上设置MIME类型。例如,这种包嗅探可以用于Flume拓扑中基于内容的动态路由。MorphlineInterceptor还可以帮助实现到多个Apache Solr集合的动态路由(例如,对于多租户)。

目前,有一个限制,拦截器的形态线不能为每个输入事件生成多个输出记录。这个拦截器不是为繁重的ETL处理而设计的——如果你需要的话,可以考虑将ETL处理从Flume源转移到Flume Sink,例如到MorphlineSolrSink。

必须属性以粗体显示。

Property NameDefaultDescription
type

The component type name has to be org.apache.flume.sink.solr.

morphline.MorphlineInterceptor$Builder

morphlineFile

The relative or absolute path on the local file system to the morphline configuration file.

Example: /etc/flume-ng/conf/morphline.conf

morphlineId null

Optional name used to identify a morphline if there are multiple morphlines in a morphline

config file

Sample flume.conf file:

a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

Search and Replace Interceptor

这个拦截器基于Java正则表达式提供简单的基于字符串的搜索和替换功能。也可以使用回溯/组捕获。这个拦截器使用与Java Matcher.replaceAll()方法中相同的规则。

Property NameDefaultDescription
type The component type name has to be search_replace
searchPattern The pattern to search for and replace.
replaceString The replacement string.
charset UTF-8 The charset of the event body. Assumed by default to be UTF-8.

Example configuration:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =

Another example:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

Regex Filtering Interceptor

这个拦截器通过将事件体解释为文本并根据配置的正则表达式匹配文本来选择性地过滤事件。提供的正则表达式可用于包含事件或排除事件。

Property NameDefaultDescription
type The component type name has to be regex_filter
regex ”.*” Regular expression for matching against events
excludeEvents false If true, regex determines events to exclude, otherwise regex determines events to include.
 

Regex Extractor Interceptor

这个拦截器使用指定的正则表达式提取regex匹配组,并将匹配组追加为事件的头部。它还支持可插入的序列化器,用于在将匹配组添加为事件头之前对其进行格式化。

Property NameDefaultDescription
type The component type name has to be regex_extractor
regex Regular expression for matching against events
serializers

Space-separated list of serializers for mapping matches to header names and serializing their values.

(See example below) Flume provides built-in support for the following serializers:

org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializerorg.

apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer

serializers.<s1>.type default

Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),

org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer

serializers.<s1>.name  
serializers.* Serializer-specific properties

序列化器用于将匹配映射到标题名称和格式化的标题值;默认情况下,您只需要指定标题名称,并使用默认的org.apache.flume.interceptor. regexextractorinterceptorpassthrough序列化器。这个序列化器只是将匹配映射到指定的头名称,并在regex提取值时传递该值。您可以使用完全限定类名(FQCN)将自定义序列化器实现插入提取器中,以按照您喜欢的方式格式化匹配。

Example 1:

如果水槽事件体包含1:2:3.4foobar5,则使用以下配置

a1.sources.r1.interceptors.i1.regex = (\d):(\d):(\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

提取的事件将包含相同的主体,但是添加了以下头部:1 =>1,2 =>2,3 =>3

Example 2:

如果水槽事件体包含2012-10-18 18:47:57,614,则使用一些日志行,并使用以下配置

a1.sources.r1.interceptors.i1.regex = ^(?:\n)?(\d\d\d\d-\d\d-\d\d\s\d\d:\d\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

提取的事件将包含相同的主体,但是添加了以下标头的时间戳=>1350611220000

Flume Properties

Property NameDefaultDescription
flume.called.from.service

If this property is specified then the Flume agent will continue polling for the config file even

if the config file is not found at the expected location. Otherwise, the Flume agent will

terminate if the config doesn’t exist at the expected location. No property

value is needed when setting this property (eg, just specifying -Dflume.called.from.service is enough)

 Property: flume.called.from.service

Flume每30秒定期轮询指定配置文件的更改。如果第一次轮询现有文件,或者自上次轮询以来已有文件的修改日期发生了更改,则Flume代理将从配置文件加载新配置。重命名或移动文件不会改变其修改时间。当Flume代理轮询不存在的文件时,会发生两种情况之一:1。当代理第一次轮询不存在的配置文件时,代理将根据flume.call .from.service属性进行操作。如果设置了属性,那么代理将继续轮询(始终在同一时间段—每30秒)。如果属性未设置,则代理将立即终止。2. 当代理轮询一个不存在的配置文件,并且这不是该文件第一次轮询时,那么代理在此轮询期间不进行任何配置更改。代理继续轮询而不是终止。

Log4J Appender

将Log4j事件追加到flume代理的avro源。使用这个appender的客户机必须在类路径中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。必须属性以粗体显示。

Property NameDefaultDescription
Hostname The hostname on which a remote Flume agent is running with an avro source.
Port The port at which the remote Flume agent’s avro source is listening.
UnsafeMode false If true, the appender will not throw exceptions on failure to send the events.
AvroReflectionEnabled false Use Avro Reflection to serialize Log4j events. (Do not use when users log strings)
AvroSchemaUrl A URL from which the Avro schema can be retrieved.

Sample log4j.properties file:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

默认情况下,通过调用toString()或使用Log4j布局(如果指定的话)将每个事件转换为一个字符串。

如果事件是org.apache.avro.generic.GenericRecord的实例。 org.apache.avro.specific.SpecificRecord, 如果属性AvroReflectionEnabled设置为true,则使用Avro序列化对事件进行序列化。

使用其Avro模式序列化每个事件的效率很低,因此最好提供一个模式URL,下游接收器(通常是HDFS接收器)可以从中检索模式。如果没有指定AvroSchemaUrl,则模式将作为Flume标头包含。

示例log4j。配置为使用Avro序列化的属性文件:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.AvroReflectionEnabled = true
log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Load Balancing Log4J Appender

将Log4j事件追加到flume代理的avro源列表中。使用这个appender的客户机必须在类路径中包含flume-ng-sdk(例如,flume-ng-sdk-1.8.0.jar)。这个附加程序支持执行负载平衡的循环和随机方案。它还支持可配置的backoff超时,以便临时从所需的主机属性集中删除down代理。

Property NameDefaultDescription
Hosts A space-separated list of host:port at which Flume (through an AvroSource) is listening for events
Selector ROUND_ROBIN

Selection mechanism. Must be either ROUND_ROBIN, RANDOM

or custom FQDN to class that inherits from LoadBalancingSelector.

MaxBackoff

A long value representing the maximum amount of time in milliseconds the Load balancing

client will backoff from a node that has failed to consume an event. Defaults to no backoff

UnsafeMode false If true, the appender will not throw exceptions on failure to send the events.
AvroReflectionEnabled false Use Avro Reflection to serialize Log4j events.
AvroSchemaUrl A URL from which the Avro schema can be retrieved.

Sample log4j.properties file configured using defaults:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Sample log4j.properties file configured using RANDOM load balancing:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431
log4j.appender.out2.Selector = RANDOM

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Sample log4j.properties file configured using backoff:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432
log4j.appender.out2.Selector = ROUND_ROBIN
log4j.appender.out2.MaxBackoff = 30000

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Security

HDFS接收器、HBase接收器、Thrift source、Thrift接收器和Kite数据集接收器都支持Kerberos身份验证。有关配置与kerberos相关的选项,请参阅相应的部分。

Flume代理将作为一个主体对kerberos KDC进行身份验证,需要kerberos身份验证的不同组件将使用这个主体。为Thrift source、Thrift sink、HDFS sink、HBase sink和DataSet sink配置的principal和keytab应该相同,否则组件将无法启动。

Monitoring

Flume的监测工作仍在进行中, 变化经常发生。几个Flume组件向JMX平台MBean服务器报告指标。可以使用Jconsole查询这些指标。

JMX Reporting

可以通过使用flume-env在JAVA_OPTS环境变量中指定JMX参数来启用JMX报告。就像

export JAVA_OPTS=”-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false

注意:上面的示例禁用安全性。要启用安全性,请参阅http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html

Ganglia Reporting

Flume还可以向Ganglia 3或Ganglia 3.1 metanode报告这些指标。要向Ganglia报告指标,必须使用这种支持启动flume代理。启动Flume代理时,必须将以下参数作为系统属性传递给Flume .monitoring.,可在flume-env.sh中指定:

Property NameDefaultDescription
type The component type name, has to be ganglia
hosts Comma-separated list of hostname:port of Ganglia servers
pollFrequency 60 Time, in seconds, between consecutive reporting to Ganglia server
isGanglia3 false Ganglia server version is 3. By default, Flume sends in Ganglia 3.1 format

我们可以使用Ganglia支持启动Flume,如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON Reporting

Flume还可以以JSON格式报告指标。要启用JSON格式的报表,Flume在一个可配置端口上驻留一个Web服务器。Flume以以下JSON格式报告指标:

{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}

下面是一个例子:

{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
                      "Type":"CHANNEL",
                      "StopTime":"0",
                      "EventPutAttemptCount":"468086",
                      "ChannelSize":"233428",
                      "StartTime":"1344882233070",
                      "EventTakeSuccessCount":"458200",
                      "ChannelCapacity":"600000",
                      "EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
                   "Type":"CHANNEL",
                   "StopTime":"0",
                   "EventPutAttemptCount":"22948908",
                   "ChannelSize":"5",
                   "StartTime":"1344882209413",
                   "EventTakeSuccessCount":"22948900",
                   "ChannelCapacity":"100",
                   "EventTakeAttemptCount":"22948908"}
}
Property NameDefaultDescription
type The component type name, has to be http
port 41414 The port to start the server on.

我们可以使用JSON报表支持启动Flume,如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

然后,Metrics将在http://<hostname>:<port>/metrics网页上提供。定制组件可以报告上面Ganglia部分中提到的指标。

Custom Reporting

通过编写执行报告的服务器,可以向其他系统报告指标。任何报告类都必须实现接口org.apache.flume.instrument . monitorservice。此类类的使用方式与GangliaServer用于报告的方式相同。他们可以轮询平台mbean服务器以轮询mbean以获得度量。例如,如果一个名为httpre的HTTP监控服务可以使用如下方式:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property NameDefaultDescription
type The component type name, has to be FQCN

Reporting metrics from custom components

任何自定义flume组件都应该继承自org.apache.flume.instrumentation.MonitoredCounterGroup类。然后类应该为它公开的每个度量提供getter方法。参见下面的代码。MonitoredCounterGroup需要一个属性列表,该类公开这些属性的指标。到目前为止,该类只支持将度量作为长值公开。

public class SinkCounter extends MonitoredCounterGroup implements
    SinkCounterMBean {

  private static final String COUNTER_CONNECTION_CREATED =
    "sink.connection.creation.count";

  private static final String COUNTER_CONNECTION_CLOSED =
    "sink.connection.closed.count";

  private static final String COUNTER_CONNECTION_FAILED =
    "sink.connection.failed.count";

  private static final String COUNTER_BATCH_EMPTY =
    "sink.batch.empty";

  private static final String COUNTER_BATCH_UNDERFLOW =
      "sink.batch.underflow";

  private static final String COUNTER_BATCH_COMPLETE =
    "sink.batch.complete";

  private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
    "sink.event.drain.attempt";

  private static final String COUNTER_EVENT_DRAIN_SUCCESS =
    "sink.event.drain.sucess";

  private static final String[] ATTRIBUTES = {
    COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
    COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
    COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
  };


  public SinkCounter(String name) {
    super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
  }

  @Override
  public long getConnectionCreatedCount() {
    return get(COUNTER_CONNECTION_CREATED);
  }

  public long incrementConnectionCreatedCount() {
    return increment(COUNTER_CONNECTION_CREATED);
  }

}

Tools

File Channel Integrity Tool

文件通道完整性工具验证文件通道中单个事件的完整性,并删除损坏的事件。

工具可以运行如下:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir

其中datadir是要验证的数据目录的逗号分隔列表。

以下是可用的选项

Option NameDescription
h/help Displays help
l/dataDirs Comma-separated list of data directories which the tool must verify
 

Event Validator Tool

事件验证器工具可用于以特定于应用程序的方式验证文件通道事件。该工具对每个事件应用用户提供程序验证登录,并删除不符合逻辑的事件。

工具可以运行如下:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000

其中datadir是要验证的数据目录的逗号分隔列表。

以下是可用的选项

Option NameDescription
h/help Displays help
l/dataDirs Comma-separated list of data directories which the tool must verify
e/eventValidator Fully Qualified Name of Event Validator Implementation. The jar must be on Flume classpath

事件验证器实现必须实现EventValidator接口。建议不要从实现中抛出任何异常,因为它们被视为无效事件。其他参数可以通过-D选项传递给EventValitor实现。

让我们看一个简单的基于大小的事件验证器示例,它将拒绝大于指定的最大大小的事件。

public static class MyEventValidator implements EventValidator {

  private int value = 0;

  private MyEventValidator(int val) {
    value = val;
  }

  @Override
  public boolean validateEvent(Event event) {
    return event.getBody() <= value;
  }

  public static class Builder implements EventValidator.Builder {

    private int sizeValidator = 0;

    @Override
    public EventValidator build() {
      return new DummyEventVerifier(sizeValidator);
    }

    @Override
    public void configure(Context context) {
      binaryValidator = context.getInteger("maxSize");
    }
  }
}

Topology Design Considerations

Flume非常灵活,允许大量可能的部署场景。如果您计划在大型生产部署中使用Flume,那么明智的做法是花一些时间考虑如何用Flume拓扑来表示问题。本节将介绍一些注意事项。

Is Flume a good fit for your problem?

如果您需要将文本日志数据导入Hadoop/HDFS中,那么Flume正好适合您的问题,完全停止。对于其他用例,这里有一些指导方针:

Flume的设计目的是在相对稳定、潜在复杂的拓扑结构上传输和摄取常规生成的事件数据。“事件数据”的概念定义非常广泛。对Flume来说,事件只是一个普通的字节blob。对于事件的大小有一些限制—例如,它不能大于您可以存储在内存或单个机器上的磁盘上的内容—但是在实践中,flume事件可以是从文本日志条目到图像文件的所有内容。事件的关键属性是以连续的流方式生成的。如果您的数据不是定期生成的(例如,您正在尝试将单个批量数据加载到Hadoop集群中),那么Flume仍然可以工作,但是对于您的情况来说,这可能有些过头了。Flume喜欢相对稳定的拓扑结构。您的拓扑不需要是不可变的,因为Flume可以在不丢失数据的情况下处理拓扑中的更改,还可以容忍由于故障转移或供应而定期进行重新配置。如果您每天都尝试更改拓扑,那么它可能不会很好地工作,因为重新配置需要一些思考和开销。

Flow reliability in Flume

Flume流量的可靠性取决于几个因素。通过调整这些因素,您可以通过Flume实现广泛的可靠性选项。

你使用什么类型的频道?Flume既有持久通道(将数据持久化到磁盘上的通道),也有非持久通道(如果机器发生故障将丢失数据的通道)。持久通道使用基于磁盘的存储,存储在此类通道中的数据将在机器重启或与磁盘无关的故障之间持续存在。

是否为工作负载提供了足够的通道。Flume中的通道在不同的跃点上充当缓冲器。这些缓冲器的容量是固定的,一旦容量满了,就会对流中较早的点产生反压力。如果这种压力传播到流的源头,水槽将不可用,可能会丢失数据。

是否使用冗余拓扑。Flume让您可以跨冗余拓扑复制流。这可以提供一个非常容易的容错源,克服磁盘或机器故障。

考虑Flume拓扑中的可靠性的最佳方法是考虑各种故障场景及其结果。如果磁盘出现故障怎么办?如果机器故障了怎么办?如果你的终端接收器(如HDFS)下降一段时间,你有背压,会发生什么?可能的设计空间很大,但是您需要问的基本问题却很少。

Flume topology design

设计Flume拓扑的第一步是枚举数据的所有源和目标(终端接收器)。这些将定义拓扑的边缘点。接下来要考虑的是是否引入中间聚合层或事件路由。如果您正在从大量源中收集数据,为了简化在终端接收器上的摄取,聚合这些数据是很有帮助的。聚合层还可以充当缓冲区,消除源的突发性或汇聚处的不可用性。如果您在不同位置之间路由数据,您可能还希望在不同的点上分割流:这将创建子拓扑,这些拓扑本身可能包含聚合点。

Sizing a Flume deployment

一旦您了解了拓扑的外观,下一个问题就是需要多少硬件和网络容量。首先要量化生成的数据量。这并不总是一项简单的任务!大多数数据流都是突发性的(例如,由于昼夜模式),并且可能无法预测。一个好的起点是考虑拓扑的每一层的最大吞吐量,包括每秒的事件数和每秒的字节数。一旦您知道了给定层所需的吞吐量,就可以计算出该层需要多少节点的下限。为了确定可达到的吞吐量,最好在硬件上使用合成的或取样的事件数据对Flume进行试验。一般来说,基于磁盘的通道应该得到10 MB/s,基于内存的通道应该得到100 MB/s或更多。但是,根据硬件和操作环境的不同,性能差别很大。

调整聚合吞吐量的大小可以为每一层所需的节点数量提供一个下限。增加节点的原因有很多,比如增加冗余和更好地吸收负载中的突发事件。

Troubleshooting

Handling agent failures

如果Flume代理宕机,则该代理上承载的所有流都将中止。一旦代理重新启动,则流将恢复。使用文件流通道或其他稳定的通道将恢复处理事件离开。如果代理不能重新启动在相同的硬件上,然后有一个选项将数据库迁移到另一个硬件和设置一个新的水槽代理,可以保存在数据库恢复处理事件。可以利用数据库HA futures将Flume代理移动到另一个主机。

Compatibility

HDFS

目前Flume支持HDFS 0.20.2和0.23

AVRO

TBD

Additional version requirements

TBD

Tracing

TBD

More Sample Configs

TBD

 Component Summary

Component InterfaceType AliasImplementation Class
org.apache.flume.Channel memory org.apache.flume.channel.MemoryChannel
org.apache.flume.Channel jdbc org.apache.flume.channel.jdbc.JdbcChannel
org.apache.flume.Channel file org.apache.flume.channel.file.FileChannel
org.apache.flume.Channel org.apache.flume.channel.PseudoTxnMemoryChannel
org.apache.flume.Channel org.example.MyChannel
org.apache.flume.Source avro org.apache.flume.source.AvroSource
org.apache.flume.Source netcat org.apache.flume.source.NetcatSource
org.apache.flume.Source seq org.apache.flume.source.SequenceGeneratorSource
org.apache.flume.Source exec org.apache.flume.source.ExecSource
org.apache.flume.Source syslogtcp org.apache.flume.source.SyslogTcpSource
org.apache.flume.Source multiport_syslogtcp org.apache.flume.source.MultiportSyslogTCPSource
org.apache.flume.Source syslogudp org.apache.flume.source.SyslogUDPSource
org.apache.flume.Source spooldir org.apache.flume.source.SpoolDirectorySource
org.apache.flume.Source http org.apache.flume.source.http.HTTPSource
org.apache.flume.Source thrift org.apache.flume.source.ThriftSource
org.apache.flume.Source jms org.apache.flume.source.jms.JMSSource
org.apache.flume.Source org.apache.flume.source.avroLegacy.AvroLegacySource
org.apache.flume.Source org.apache.flume.source.thriftLegacy.ThriftLegacySource
org.apache.flume.Source org.example.MySource
org.apache.flume.Sink null org.apache.flume.sink.NullSink
org.apache.flume.Sink logger org.apache.flume.sink.LoggerSink
org.apache.flume.Sink avro org.apache.flume.sink.AvroSink
org.apache.flume.Sink hdfs org.apache.flume.sink.hdfs.HDFSEventSink
org.apache.flume.Sink hbase org.apache.flume.sink.hbase.HBaseSink
org.apache.flume.Sink asynchbase org.apache.flume.sink.hbase.AsyncHBaseSink
org.apache.flume.Sink elasticsearch org.apache.flume.sink.elasticsearch.ElasticSearchSink
org.apache.flume.Sink file_roll org.apache.flume.sink.RollingFileSink
org.apache.flume.Sink irc org.apache.flume.sink.irc.IRCSink
org.apache.flume.Sink thrift org.apache.flume.sink.ThriftSink
org.apache.flume.Sink org.example.MySink
org.apache.flume.ChannelSelector replicating org.apache.flume.channel.ReplicatingChannelSelector
org.apache.flume.ChannelSelector multiplexing org.apache.flume.channel.MultiplexingChannelSelector
org.apache.flume.ChannelSelector org.example.MyChannelSelector
org.apache.flume.SinkProcessor default org.apache.flume.sink.DefaultSinkProcessor
org.apache.flume.SinkProcessor failover org.apache.flume.sink.FailoverSinkProcessor
org.apache.flume.SinkProcessor load_balance org.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.SinkProcessor  
org.apache.flume.interceptor.Interceptor timestamp org.apache.flume.interceptor.TimestampInterceptor$Builder
org.apache.flume.interceptor.Interceptor host org.apache.flume.interceptor.HostInterceptor$Builder
org.apache.flume.interceptor.Interceptor static org.apache.flume.interceptor.StaticInterceptor$Builder
org.apache.flume.interceptor.Interceptor regex_filter org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.interceptor.Interceptor regex_extractor org.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.channel.file.encryption.KeyProvider$Builder jceksfile org.apache.flume.channel.file.encryption.JCEFileKeyProvider
org.apache.flume.channel.file.encryption.KeyProvider$Builder org.example.MyKeyProvider
org.apache.flume.channel.file.encryption.CipherProvider aesctrnopadding org.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider
org.apache.flume.channel.file.encryption.CipherProvider org.example.MyCipherProvider
org.apache.flume.serialization.EventSerializer$Builder text org.apache.flume.serialization.BodyTextEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builder avro_event org.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builder org.example.MyEventSerializer$Builder
 

Alias Conventions

在上面特定于组件的示例中使用了这些别名约定,以保持所有示例中的名称简短且一致。

Alias NameAlias Type
a agent
c channel
r source
k sink
g sink group
i interceptor
y key
h host
s serializer

翻译自官网flume1.8用户指南,原文地址:Flume 1.8.0 User Guide

篇幅限制,分为以下5篇:

【翻译】Flume 1.8.0 User Guide(用户指南)

【翻译】Flume 1.8.0 User Guide(用户指南) source

【翻译】Flume 1.8.0 User Guide(用户指南) Sink

【翻译】Flume 1.8.0 User Guide(用户指南) Channel

【翻译】Flume 1.8.0 User Guide(用户指南) Processors

 完结

感谢有道翻译,主要是他的功能,我就是个搬砖的

原文地址:https://www.cnblogs.com/Springmoon-venn/p/10371412.html