Flume 自定义 组件

一、自定义Source

附上Maven依赖

 <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>

    </dependencies>

前提条件

MySource extends AbstractSource implements Configurable, PollableSource

主要实现

主要实现逻辑是在process()里,将封装好的对象传给ChannelProcessor,ChannelProcessor自己通过事务逻辑传递参数

代码示例

public class MySource extends AbstractSource implements Configurable, PollableSource {

    private String name;

    // 最核心的方法,读取数据,封装为event,写入到channel
    // 如果读到数据,封装为event,返回ready,否则如果当前没有读到数据,返回backoff
    // 每间隔5s,自动封装10个event,10个event的内容为{atguigu:i}
    @Override
    public Status process() throws EventDeliveryException {

        //声明默认返回的状态
        Status status= Status.READY;

        //封装event
        List<Event> events=new ArrayList<>();

        for (int i = 0; i < 10 ; i++) {

            SimpleEvent e = new SimpleEvent();

            //封装数据
            e.setBody((name+i).getBytes());

            events.add(e);

        }

       try {
           // 获取当前source对应的channel的channelProcessor
           ChannelProcessor channelProcessor = getChannelProcessor();
           //由ChannelProcessor将event放入到channel
           channelProcessor.processEventBatch(events);

           //间隔5s
           Thread.sleep(5000);
       }catch (Exception e){
           status=Status.BACKOFF;
            e.printStackTrace();
       }

        return status;
    }

    // 当source无法读到新的数据时,此时可以让Source所在的PollableSourceRunner线程休息会
    // 休息的时间取决于getBackOffSleepIncrement() 和  getMaxBackOffSleepInterval()
    @Override
    public long getBackOffSleepIncrement() {
        return 1000;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 5000;
    }

    // 从agent的配置文件中读取指定的参数的值
    @Override
    public void configure(Context context) {
        name=context.getString("name","atguigu:");
    }
}

二、自定义Sink

继承类

MySink extends AbstractSink implements Configurable

主要实现

在process()里实现逻辑,先获取Channel,再从中得到event值,与Source不同的是,这里要手动完成事务的逻辑

代码示例

public class MySink extends AbstractSink implements Configurable {

    private String prefix;
    private String suffix;

    private Logger logger= LoggerFactory.getLogger(MySink.class);

    //最核心的方法,这个方法负责从channel中获取event,将event写到指定的设备
    // 如果成功传输了一个或多个event,就返回ready,否则如果从channel中获取不到event,返回backoff
    @Override
    public Status process() throws EventDeliveryException {

        //自定义默认的返回值
        Status status=Status.READY;

        //获取sink对应的channel
        Channel c = getChannel();

        Event e=null;

        //从channel中获取take事务
        Transaction transaction = c.getTransaction();

        try {
            //开启事务
            transaction.begin();

            //从channel 获取一个event
             e = c.take();

             //如果成功获取,e就指向event对象,如果没有成功获取,此时e为null,说明channel里面没有event了!
            if (e==null){

                status=Status.BACKOFF;

            }else{

                //取到数据,将数据写到控制台
                logger.info("Header:"+e.getHeaders()+prefix+new String(e.getBody())+suffix);

            }

            //提交事务
            transaction.commit();

        }catch (Exception ex){
            //回滚事务
            transaction.rollback();

            status=Status.BACKOFF;

            ex.printStackTrace();

        }finally {
            //关闭事务
            transaction.close();
        }

        return status;
    }

    //从agent的配置文件中获取配置
    @Override
    public void configure(Context context) {
        prefix=context.getString("prefix","===>atguigu:");
        suffix=context.getString("suffix",":go!");
    }
}

三、自定义Interceptor

实现类

MyInterceptor implements Interceptor

主要实现

在拦截events组 的intercept()方法里调用拦截单个event的intercept()方法,这样可以省去很多代码,,,关键一点在于要创建一个内部类Builder,通过他来返回一个拦截器的实例

这里记录下我遇到的问题,如果我只定义了拦截单个event的intercept()方法,那么通过拦截器进入Channel的event就全是空值

代码示例

public class MyInterceptor implements Interceptor {

    // 初始化,会在拦截器创建完成后,调用一次
    @Override
    public void initialize() {

    }

    //拦截单个event,真正实现拦截的逻辑
    @Override
    public Event intercept(Event event) {

        //为event的header中添加 timestamp=时间戳
        Map<String, String> headers = event.getHeaders();

        headers.put("timestamp",System.currentTimeMillis()+"");

        return event;
    }

    //拦截一组event
    @Override
    public List<Event> intercept(List<Event> events) {

        for (Event event : events) {
            intercept(event);
        }

        return events;
    }

    //拦截器在关闭时调用
    @Override
    public void close() {

    }

    // 通过实现Builder来返回拦截器的一个实例
    public static class Builder implements Interceptor.Builder {

        //返回拦截器的实例
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        //从agent的配置文件中读取参数
        @Override
        public void configure(Context context) {

        }
    }
}

四、配置

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 自定义source,type必须是类的全类名
a1.sources.r1.type = com.atguigu.flume.custom.MySource
a1.sources.r1.name = atguigu:

#为source添加拦截器
a1.sources.r1.interceptors = i1
#type必须写Bulider的全类名(因为是内部类,所以需用$符)
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.custom.MyInterceptor$Builder

# 配置sink
a1.sinks.k1.type = com.atguigu.flume.custom.MySink
a1.sinks.k1.prefix = ***atguigu:
a1.sinks.k1.suffix = :go!

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

# 绑定和连接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
原文地址:https://www.cnblogs.com/yangxusun9/p/12489852.html