Flume wasn't able to parse timestamp header

来自:http://caiguangguang.blog.51cto.com/1652935/1384187

flume bucketpath的bug一例

测试的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
agent-server1.sources= testtail
agent-server1.sinks = hdfs-sink
agent-server1.channels= hdfs-channel
agent-server1.sources.testtail.type = netcat
agent-server1.sources.testtail.bind = localhost
agent-server1.sources.testtail.port = 9999
agent-server1.sinks.hdfs-sink.hdfs.kerberosPrincipal = hdfs/_HOST@KERBEROS_HADOOP
agent-server1.sinks.hdfs-sink.hdfs.kerberosKeytab = /home/vipshop/conf/hdfs.keytab
agent-server1.channels.hdfs-channel.type = memory
agent-server1.channels.hdfs-channel.capacity = 200000000
agent-server1.channels.hdfs-channel.transactionCapacity = 10000
agent-server1.sinks.hdfs-sink.type = hdfs
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume/%Y%m%d
agent-server1.sinks.hdfs-sink.hdfs.rollInterval = 60
agent-server1.sinks.hdfs-sink.hdfs.rollSize = 0
agent-server1.sinks.hdfs-sink.hdfs.rollCount = 0
agent-server1.sinks.hdfs-sink.hdfs.threadsPoolSize = 10
agent-server1.sinks.hdfs-sink.hdfs.round = false
agent-server1.sinks.hdfs-sink.hdfs.roundValue = 30
agent-server1.sinks.hdfs-sink.hdfs.roundUnit = minute
agent-server1.sinks.hdfs-sink.hdfs.batchSize = 100
agent-server1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent-server1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent-server1.sinks.hdfs-sink.hdfs.callTimeout = 60000
agent-server1.sinks.hdfs-sink.hdfs.idleTimeout = 100
agent-server1.sinks.hdfs-sink.hdfs.filePrefix = ip
agent-server1.sinks.hdfs-sink.channel = hdfs-channel
agent-server1.sources.testtail.channels = hdfs-channel

在启动服务后,使用telnet进行测试,发现如下报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
14/03/24 18:03:07 ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing.
 Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:375)
        at java.lang.Long.valueOf(Long.java:525)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
        ... 5 more
14/03/24 18:03:07 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to
resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:461)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
        ... 3 more
Caused by: java.lang.NumberFormatException: null
        at java.lang.Long.parseLong(Long.java:375)
        at java.lang.Long.valueOf(Long.java:525)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
        ... 5 more

从调用栈的信息来看,错误出在org.apache.flume.formatter.output.BucketPath类的replaceShorthand方法。
在org.apache.flume.sink.hdfs.HDFSEventSink类中,使用process方法来生成hdfs的url,其中主要是调用了BucketPath类的escapeString方法来进行字符的转换,并最终调用了replaceShorthand方法。
其中replaceShorthand方法的相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  public static String replaceShorthand(char c, Map<String, String> headers,
      TimeZone timeZone, boolean needRounding, int unit, int roundDown) {
    String timestampHeader = headers.get("timestamp");
    long ts;
    try {
      ts = Long.valueOf(timestampHeader);
    } catch (NumberFormatException e) {
      throw new RuntimeException("Flume wasn't able to parse timestamp header"
        + " in the event to resolve time based bucketing. Please check that"
        + " you're correctly populating timestamp header (for example using"
        + " TimestampInterceptor source interceptor).", e);
    }
    if(needRounding){
      ts = roundDown(roundDown, unit, ts);
    }
........

从代码中可以看到,timestampHeader 的值如果取不到,在向ts赋值时就会报错。。
这其实是flume的一个bug,bug id:
https://issues.apache.org/jira/browse/FLUME-1419
解决方法有3个:
1.更改配置,更新hdfs文件的路径格式

1
agent-server1.sinks.hdfs-sink.hdfs.path = hdfs://bipcluster/tmp/flume

但是这样就不能按天来存放日志了
2.通过更改相关的代码
(patch:https://issues.apache.org/jira/secure/attachment/12538891/FLUME-1419.patch)
如果在headers中获取不到timestamp的值,就给它一个当前timestamp的值。
相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
     String timestampHeader = headers.get("timestamp");
     long ts;
     try {
      if (timestampHeader == null) {
        ts = System.currentTimeMillis();
      } else {
        ts = Long.valueOf(timestampHeader);
      }
     } catch (NumberFormatException e) {
       throw new RuntimeException("Flume wasn't able to parse timestamp header"
         + " in the event to resolve time based bucketing. Please check that"
         + " you're correctly populating timestamp header (for example using"
                  + " TimestampInterceptor source interceptor).", e);
}

3.为source定义基于timestamp的interceptors 
在配置中增加两行即可:

1
2
agent-server1.sources.testtail.interceptors = i1
agent-server1.sources.testtail.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

一个技巧:
在debug flume的问题时,可以在flume的启动参数中设置把debug日志打到console中。

1
-Dflume.root.logger=DEBUG,console,LOGFILE
原文地址:https://www.cnblogs.com/sunxucool/p/3820499.html