flume监控

flume提供了一个度量框架,可以通过http的方式进行展现,当启动agent的时候通过传递参数 -Dflume.monitoring.type=http参数给flume agent:

1
2
3
4
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 
-Dflume.monitoring.type=http
-Dflume.monitoring.port=5653
-Dflume.root.logger=INFO,console

这样flume会在5653端口上启动一个HTTP服务器,访问如下地址,将返回JSON格式的flume相关指标参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
demo:

访问: http://flume-agent-host:5653/metrics
结果: 其中src-1是子自定义的source名称
{
"SOURCE.src-1":{
"OpenConnectionCount":"0", //目前与客户端或sink保持连接的总数量(目前只有avro source展现该度量)
"Type":"SOURCE",
"AppendBatchAcceptedCount":"1355", //成功提交到channel的批次的总数量
"AppendBatchReceivedCount":"1355", //接收到事件批次的总数量
"EventAcceptedCount":"28286", //成功写出到channel的事件总数量,且source返回success给创建事件的sink或RPC客户端系统
"AppendReceivedCount":"0", //每批只有一个事件的事件总数量(与RPC调用中的一个append调用相等)
"StopTime":"0", //source停止时自Epoch以来的毫秒值时间
"StartTime":"1442566410435", //source启动时自Epoch以来的毫秒值时间
"EventReceivedCount":"28286", //目前为止source已经接收到的事件总数量
"AppendAcceptedCount":"0" //单独传入的事件到Channel且成功返回的事件总数量
},
"CHANNEL.ch-1":{
"EventPutSuccessCount":"28286", //成功写入channel且提交的事件总数量
"ChannelFillPercentage":"0.0", //channel满时的百分比
"Type":"CHANNEL",
"StopTime":"0", //channel停止时自Epoch以来的毫秒值时间
"EventPutAttemptCount":"28286", //Source尝试写入Channe的事件总数量
"ChannelSize":"0", //目前channel中事件的总数量
"StartTime":"1442566410326", //channel启动时自Epoch以来的毫秒值时间
"EventTakeSuccessCount":"28286", //sink成功读取的事件的总数量
"ChannelCapacity":"1000000", //channel的容量
"EventTakeAttemptCount":"313734329512" //sink尝试从channel拉取事件的总数量。这不意味着每次事件都被返回,因为sink拉取的时候channel可能没有任何数据
},
"SINK.sink-1":{
"Type":"SINK",
"ConnectionClosedCount":"0", //下一阶段或存储系统关闭的连接数量(如在HDFS中关闭一个文件)
"EventDrainSuccessCount":"28286", //sink成功写出到存储的事件总数量
"KafkaEventSendTimer":"482493",
"BatchCompleteCount":"0", //与最大批量尺寸相等的批量的数量
"ConnectionFailedCount":"0", //下一阶段或存储系统由于错误关闭的连接数量(如HDFS上一个新创建的文件因为超时而关闭)
"EventDrainAttemptCount":"0", //sink尝试写出到存储的事件总数量
"ConnectionCreatedCount":"0", //下一个阶段或存储系统创建的连接数量(如HDFS创建一个新文件)
"BatchEmptyCount":"0", //空的批量的数量,如果数量很大表示souce写数据比sink清理数据慢速度慢很多
"StopTime":"0",
"RollbackCount":"9", //
"StartTime":"1442566411897",
"BatchUnderflowCount":"0" //比sink配置使用的最大批量尺寸更小的批量的数量,如果该值很高也表示sink比souce更快
}
}

Flume也可发送度量信息给Ganglia,用来监控Flume。在任何时候只能启用一个Ganglia或HTTP监控。Flume默认一分钟一次周期性的向Ganglia报告度量:

1
2
3
4
5
6
7
demo:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
-Dflume.monitoring.type=ganglia # 默认情况下flume以Ganglia3.1格式报告指标
-Dflume.monitoring.pollFrequency=45 # 报告间隔时间(秒)
-Dflume.monitoring.isGanglia3=true # 启用ganglia3个格式报告
-Dflume.root.logger=INFO,console

日志相关

1
2
3
4
$ bin/flume-ng agent --conf conf --conf-file example.conf  --name a1 -Dflume.root.logger=INFO,console

# -Dflume.root.logger=INFO,console 该参数将会把flume的日志输出到console,为了将其输出到日志文件(默认
$FLUME_HOME/logs),可以将console改为LOGFILE形式,具体的配置可以修改$FLUME_HOME/conf/log4j.properties

自定义Flume组件

Flume本身可插拔的架构设计,使得开发自定义插件变得很容易。Flume本身提供了非常丰富的source、channel、sink以及拦截器等插件可供选择,基本可以满足生产需要。具体可以参考Flume用户文档.

plugins.d目录

plugins.d是flume事先约定的存放自定义组件的目录。flume在启动的时候会自动将该目录下的文件添加到classpath下,当然你也可以在flume-ng 启动时通过指定--classpath,-C <cp>参数将自己的文件手动添加到classpath下。
相关目录说明:

1
2
3
plugins.d/xxx/lib - 插件jar
plugins.d/xxx/libext - 插件依赖jar
plugins.d/xxx/native - 本地库文件如 .so文件

拦截器

拦截器(Interceptor)是简单插件式组件,设置在Source和Source写入数据的Channel之间。Source接收到的事件在写入对应的Channel之前,拦截器都可以转换或删除这些事件。每个拦截器实例只处理同一个Source接收的事件。拦截器可以基于任意标准删除或转换事件,但是拦截器必须返回尽可能多(尽可能少)的事件,如同原始传递过来的事件.因为拦截器必须在事件写入Channel之前完成操作,只有当拦截器已成功转换事件后,RPC Source(和任何其他可能产生超时的Source)才会响应发送事件的客户端或Sink。因此尽量不要在拦截器中做大量耗时的处理操作。如果不得已这么处理了,那么需要相应的调整超时时间属性。Flume自身提供了多种类型的拦截器,比如:时间戳拦截器主机拦截器正则过滤拦截器等等。更多内容可以参考Flume Interceptors

拦截器一般用于分析事件以及在需要的时候丢弃事件。编写拦截器时,实现者只需要写以一个实现Interceptor接口的类,同时实现Interceptor$Builder接口的Builer类。所有的Builder类必须有一个公共无参的构造方法,Flume使用该方法来进行实例化。可以使用传递到Builder类的Context实例配置拦截器。所有需要的参数都要传递到Context实例。下面是时间戳拦截器的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class TimestampInterceptor implements Interceptor {

private final boolean preserveExisting;

/**
* 该构造方法只能被Builder调用
*/
private TimestampInterceptor(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
}

@Override
public void initialize() {
// no-op
}

/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
if (preserveExisting && headers.containsKey(TIMESTAMP)) {
// we must preserve the existing timestamp
} else {
long now = System.currentTimeMillis();
headers.put(TIMESTAMP, Long.toString(now));
}
return event;
}

/**
* Delegates to {@link #intercept(Event)} in a loop.
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}

@Override
public void close() {
// no-op
}

/**
* Builder which builds new instances of the TimestampInterceptor.
*/
public static class Builder implements Interceptor.Builder {

private boolean preserveExisting = PRESERVE_DFLT;

@Override
public Interceptor build() {
return new TimestampInterceptor(preserveExisting);
}

//通过Context传递配置参数
@Override
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}

}

public static class Constants {
public static String TIMESTAMP = "timestamp";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
}

}

注:

  1. intercept()的两个方法必须是线程安全的,因为如果source运行在多线程情况下,这些方法可能会被多个线程调用。
  2. 自定义拦截器的配置方式,interceptors type配置的是XXXInterceptor$Builder:

    1
    2
    3
    #自定义拦截器  --producer agent名称  --src-1 source名称   —-i1 拦截器名称  
    producer.sources.src-1.interceptors = i1
    producer.sources.src-1.interceptors.i1.type = com.networkbench.browser.flume.interceptor.MyBrowserInterceptor$Builder
  3. 将自定义代码打包放置到前面的plugins.d/ext-interceptors(可以自己命名)/lib目录下,启动flume时会自动加载该jar到classpath

解析器

Source使用嵌入式的反序列化器读取监控目录下的文件(这里以Spooling Directory Source为例),默认的反序列化器是LineDeserializer。该反序列化器会按行读取文件中的内容,封装成一个Event消息。默认一次读取的最大长度是2048个字符,你可以通过如下配置参数设置改值:

1
2
# --producer agent名称  --src-1 source名称 
producer.sources.src-1.deserializer.maxLineLength = 20480

因此在使用LineDeserializer时对源文件内容有个粗略的估计,否则,当某行的内容超出最大长度时。该行内容会被截取成两个部分,封装成两个Event发送到channel中。这样,在某些场景下该行消息相当于非法消息了。如,某个文件按行记录一个http请求的所有内容,而事先我们无法预知一行http请求的最大长度(当然理论上你可以将maxLineLength设置成一个较大的值,解决该问题)。但是这里要说的是另外一种解决方案,很简单,参考LineDeserializer实现一个不限制最大长度的解析器(flume之所以这么设计是出于什么角度考虑?)。反序列化器的定义和前面的拦截器基本相同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class LineDeserializer implements EventDeserializer {

private static final Logger logger = LoggerFactory.getLogger(LineDeserializer.class);

private final ResettableInputStream in;
private final Charset outputCharset;
private final int maxLineLength;
private volatile boolean isOpen;

public static final String OUT_CHARSET_KEY = "outputCharset";
public static final String CHARSET_DFLT = "UTF-8";

public static final String MAXLINE_KEY = "maxLineLength";
public static final int MAXLINE_DFLT = 2048;

LineDeserializer(Context context, ResettableInputStream in) {
this.in = in;
this.outputCharset = Charset.forName(context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
this.maxLineLength = context.getInteger(MAXLINE_KEY, MAXLINE_DFLT);
this.isOpen = true;
}

@Override
public Event readEvent() throws IOException {
ensureOpen();
String line = readLine();
if (line == null) {
return null;
} else {
return EventBuilder.withBody(line, outputCharset);
}
}

...

// TODO: consider not returning a final character that is a high surrogate
// when truncating
private String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int c;
int readChars = 0;
while ((c = in.readChar()) != -1) {
readChars++;

// FIXME: support
if (c == ' ') {
break;
}

sb.append((char)c);

// 限制最大长度
if (readChars >= maxLineLength) {
logger.warn("Line length exceeds max ({}), truncating line!", maxLineLength);
break;
}
}

if (readChars > 0) {
return sb.toString();
} else {
return null;
}
}

//这里和Interceptor$Builder很像
public static class Builder implements EventDeserializer.Builder {

@Override
public EventDeserializer build(Context context, ResettableInputStream in) {
return new LineDeserializer(context, in);
}

}

}

接下来的步骤和拦截器一致

1
2
3
#自定义解析器
producer.sources.src-1.deserializers = d1
producer.sources.src-1.deserializer = com.networkbench.browser.flume.interceptor.MyLineDeserializer$Builder

source

Flume提供了丰富的source类型,如Avro SourceExec SourceSpooling Directory Source .
这里要说的是实际使用过程中遇到的一个问题。还是前面记录http请求内容的场景,为了及时分析http请求的数据,我们将记录http请求的原始文件按照分钟进行切割,然后移动到spooling directory监控目录(如/tmp-logs)下。但是由于一些原因,会出现监控目录下文件重名的情况.

1
2
/tmp-logs/access_2015_10_01_16_30.log.COMPLETED   #flume处理完的文件会自动进行重命名.COMPLETED 
/tmp-logs/access_2015_10_01_16_30.log #刚进来的文件

这种情况下后进来的access_2015_10_01_16_30.log,在flume读取完成后会对其进行重命名,但是该文件名已经被占用了,flume就会抛出如下的异常信息,停止处理该监控目录下的其他文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
25 九月 2015 16:48:59,228 INFO  [pool-22-thread-1] (org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile:348)  - Preparing to move file /opt/nginx/tmp_logs/access-2015-09-25-13-51.log to /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
25 九月 2015 16:48:59,229 ERROR [pool-22-thread-1] (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256) - FATAL: Spool Directory source src-1: { spoolDir: /opt/nginx/tmp_logs }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /opt/nginx/tmp_logs/access-2015-09-25-13-51.log.COMPLETED
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:378)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:330)
at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:259)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

跟踪抛出异常的源码,SpoolDirectorySource会启动一个线程轮询监控目录下的目标文件,当读取完该文件(readEvents)之后会对该文件进行重名(rollCurrentFile),当重命名失败时会抛出IllegalStateException,被SpoolDirectoryRunnable catch重新抛出RuntimeException,导致当前线程退出,从源码看SpoolDirectoryRunnable是单线程执行的,因此线程结束后,监控目录下其他文件不再被处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# SpoolDirectorySource 启动SpoolDirectoryRunnable

executor = Executors.newSingleThreadScheduledExecutor();

File directory = new File(spoolDirectory);
...

Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
//默认500毫秒
executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
...


# SpoolDirectorySource$SpoolDirectoryRunnable.run():

@Override
public void run() {
int backoffInterval = 250;
try {
while (!Thread.interrupted()) {
//这里读取文件内容,当该文件没有可读内容时,会调用ReliableSpoolingFileEventReader.retireCurrentFile()->ReliableSpoolingFileEventReader.rollCurrentFile()
List<Event> events = reader.readEvents(batchSize);
if (events.isEmpty()) {
break;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();

try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full, and cannot write data now. The " +
"source will try again after " + String.valueOf(backoffInterval) +
" milliseconds");
hitChannelException = true;
if (backoff) {
TimeUnit.MILLISECONDS.sleep(backoffInterval);
backoffInterval = backoffInterval << 1;
backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
backoffInterval;
}
continue;
}
backoffInterval = 250;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
}
} catch (Throwable t) {
//异常输出部分
logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
"Uncaught exception in SpoolDirectorySource thread. " +
"Restart or reconfigure Flume to continue processing.", t);
hasFatalError = true;
//这里将该异常重新封装成了RuntimeException,导致当前线程退出
Throwables.propagate(t);
}
}

# ReliableSpoolingFileEventReader.rollCurrentFile():

private void rollCurrentFile(File fileToRoll) throws IOException {

File dest = new File(fileToRoll.getPath() + completedSuffix);
logger.info("Preparing to move file {} to {}", fileToRoll, dest);

// Before renaming, check whether destination file name exists
if (dest.exists() && PlatformDetect.isWindows()) {
/*
* If we are here, it means the completed file already exists. In almost
* every case this means the user is violating an assumption of Flume
* (that log files are placed in the spooling directory with unique
* names). However, there is a corner case on Windows systems where the
* file was already rolled but the rename was not atomic. If that seems
* likely, we let it pass with only a warning.
*/
if (Files.equal(currentFile.get().getFile(), dest)) {
logger.warn("Completed file " + dest +
" already exists, but files match, so continuing.");
boolean deleted = fileToRoll.delete();
if (!deleted) {
logger.error("Unable to delete file " + fileToRoll.getAbsolutePath() +
". It will likely be ingested another time.");
}
} else {
String message = "File name has been re-used with different" +
" files. Spooling assumptions violated for " + dest;
throw new IllegalStateException(message);
}

// Dest file exists and not on windows
} else if (dest.exists()) {
//这里抛出目标文件已经存在的异常
String message = "File name has been re-used with different" +
" files. Spooling assumptions violated for " + dest;
throw new IllegalStateException(message);

// Destination file does not already exist. We are good to go!
} else {
boolean renamed = fileToRoll.renameTo(dest);
if (renamed) {
logger.debug("Successfully rolled file {} to {}", fileToRoll, dest);

// now we no longer need the meta file
deleteMetaFile();
} else {
/* If we are here then the file cannot be renamed for a reason other
* than that the destination file exists (actually, that remains
* possible w/ small probability due to TOC-TOU conditions).*/
String message = "Unable to move " + fileToRoll + " to " + dest +
". This will likely cause duplicate events. Please verify that " +
"flume has sufficient permissions to perform these operations.";
throw new FlumeException(message); } } }

现在基本清楚了异常栈的调用逻辑,那么和前面自定义解析器一样,我们可以重写ReliableSpoolingFileEventReader以及SpoolDirectorySource的相关实现,也就是自定义一个spooling source,在rollCurrentFile()重命名失败时,做些处理措施,比如将该文件重新命名为access_2015_10_01_16_30.log(2).COMPLETED(此时文件内容已经读取完毕了)继续处理(注意要是.COMPLETED结尾,不然flume会再次读取该文件)。

改写完成之后,就和前面自定义解析器的处理步骤一样了,打包放在plugins.d目录下,配置:

1
producer.sources.src-1.type = com.networkbench.flume.source.SpoolDirectoryExtSource

总结

基本上flume的各种组件都可以自定义开发,本人使用flume时间也没多久,截止到目前为止遇到问题还有以下几个:

消息重发

这个坑其实是自己挖的,当时想当然的理解flume的配置参数#producer.sinks.sink-1.requiredAcks = 1(默认是1),我设置成了10,当时使用的kafka sink,由于某个kafka节点出现了问题(还没有仔细验证,是否kafka正常时也会出现该问题?),导致flume一直重发某个时间点的数据,而最新的数据一直被阻塞(可能是被缓存在了channel中)。导致后台接收的一直是某个时间点的消息。后台想到自己改动的这个参数,改回1之后就正常了。下面是官方文档对该参数的说明:

requiredAcks 1 (默认值) How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.

channel溢出

chanel溢出是因为前面的消息重发导致的,当时使用的channle是File Channel,其中有几个配置项值得注意:

配置项默认值说明
transactionCapacity 10000 单个事务中可以写入或读取的事务的最大数量
maxFileSize 2146435071 每个数据文件的最大大小(字节),一旦文件达到这个大小(或一旦写入下个文件达到这个大小),该文件保存关闭并在那个目录下创建一个新的数据文件。如果此值设置为高于默认值,仍以默认值为准
minimumRequiredSpace 524288000 channel继续操作时每个卷所需的最少空间(字节),如果任何一个挂载数据目录的卷只有这么多空间剩余,channel将停止操作来防止损坏和避免不完整的数据被写入
capacity 1000000 channel可以保存的提交事件的最大数量
keep-alive 3 每次写入或读取应该等待完成的最大的时间周期(秒)

前面的channel溢出推测就是由capacity的达到了限制造成的。

原文地址:https://www.cnblogs.com/breg/p/5649363.html