flume-ng 自定义sink消费flume source

如何从一个已经存在的Flume source消费数据

1.下载flume

 wget http://www.apache.org/dist/flume/stable/apache-flume-1.5.2-bin.tar.gz

2.创建一个自己的ConsoleSink.java

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class ConsoleSink extends AbstractSink implements Configurable {
    @Override
    public void configure(Context context) {

    }

    @Override
    public Status process() throws EventDeliveryException {
        Status status = Status.READY;
        Transaction tx = null;
        try {
            Channel channel = getChannel();
            tx = channel.getTransaction();
            tx.begin();
            for (int i = 0; i < 100; i++) {
                Event event = channel.take();
                if (event == null) {
                    status = Status.BACKOFF;
                    break;
                } else {
                    String body = new String(event.getBody());
                    System.out.println(body);
                }
            }
            tx.commit();
        } catch (Exception e) {
            if (tx != null) {
                tx.commit();
            }
            e.printStackTrace();
        } finally {
            if (tx != null) {
                tx.close();
            }
        }
        return status;
    }
}

3.编译

javac -classpath lib/flume-ng-core-1.5.2.jar:lib/flume-ng-sdk-1.5.2.jar:lib/flume-ng-configuration-1.5.2.jar ConsoleSink.java
jar -cvf console-sink.jar ConsoleSink.class
rm -rf ConsoleSink.class
mv console-sink.jar lib/ //这里编译完要放到flume的lib目录里

4.配置文件

conf/example.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = ConsoleSink //这里是你自己Sink的包名和类名

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

5.启动

bin/flume-ng  agent -c conf -f conf/example.conf -n a1

6.在需要被消费的Flume Source上配置

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10 //这里是刚刚启动agent的机器地址
a1.sinks.k1.port = 44444
原文地址:https://www.cnblogs.com/23lalala/p/4303743.html