FLUME SOURCE

FLUME SOURCE

记得在AbstractConfigurationProvider中的getConfiguration方法中会执行

loadSources(agentConf, channelComponentMap, sourceRunnerMap);

方法。这个方法会中会有个SourceFactory去创建不同类型的SourceRunnner。
这就是Source 的入口。

整个source的代码分在好几处:

  1. flume-ng-core的org.apache.flume包里面。
  2. flume-ng-core的org.apache.flume.source包里面。
  3. flume-ng-source里面。

Source根据分为两种,PollableSource (轮训拉取)和 EventDrivenSource (事件驱动),说白了一个是主动,一个是被动。
PollableSource 的类图如下:

这里先解释下LifecycleAware这个接口,它定义了每个组件的状态,启动的时候都会调用start方法,停止的时候都会调用stop方法。

public interface LifecycleAware {
  public void start();
  public void stop();
  public LifecycleState getLifecycleState();
}

Source接口的定义如下.里面说明了每个Source都需要指定对应的channel。

public interface Source extends LifecycleAware, NamedComponent {
  
  public void setChannelProcessor(ChannelProcessor channelProcessor);
  
  public ChannelProcessor getChannelProcessor();

}

PollableSource 这个接口,增加了等待延时和最长延时的方法。

AbstractPollableSource 抽象类继承了BasicSourceSemantics ,
BasicSourceSemantics实现了组件在start和stop的时候需要做的一些公共的事情,例如设置LifecycleState 的状态。

整个AbstractPollableSource 将所有source需要做的和Flume环境相关的事情都写好了,其余的每个source在实现的时候,只需要实现这些source自己的启动方式就好了。以KafkaSource 为例,它的doStart()、doStop()、doConfigure()只需要考虑和kafka相关的实现就可以。

再看PollableSourceRunner这个类,它的作用就是创就按一个线程来启动不同的Source组件。

原文地址:https://www.cnblogs.com/SpeakSoftlyLove/p/6485278.html