<Flume><Source Code><Flume源码阅读笔记>

Overview

  • source采集的日志首先会传入ChannelProcessor, 在其内首先会通过Interceptors进行过滤加工,然后通过ChannelSelector选择channel。

  • Source和Sink之间是异步的,sink只需要监听自己关系的Channel的变化即可。

  • sink存在写失败的情况,flume提供了如下策略:

    • 默认是一个sink,若写入失败,则该事务失败,稍后重试。

    • 故障转移策略:给多个sink定义优先级,失败时会路由到下一个优先级的sink。sink只要抛出一次异常就会被认为是失败了,则从存活Sink中移除,然后指数级时间等待重试,默认是等待1s开始重试,最大等待重试时间是30s.

  • flume还提供了负载均衡策略:默认提供轮训和随机两种算法。通过抽象一个类似ChannelSelector的SinkSelector进行选择。

  • 以上,对于Source和sink如何异步、channel如何实现事务机制,详见后面的具体源码分析。

The whole process

  • 首先是flume的启动, 提供了两种启动方式:使用EmbeddedAgent内嵌在Java应用中或使用Application单独启动一个进程。 一般使用Application起一个进程比较多,我们这里也主要分析这种方式。

  • 程序入口:org.apache.flume.node.Application的main方法。

  • 注:因为暂时还没有了解到Zookeeper原理,所以这里关于ZK的部分就跳过了。

  • flume启动流程大致如下:

    1. 设置默认值启动参数,参数是否是必须的

      Options options = new Options();
      ​
      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);
      ​
      option = new Option("f", "conf-file", true,
                "specify a config file (required if -z missing)");
      option.setRequired(false);
      options.addOption(option);
      ......
    2. 解析命令行参数

      if (commandLine.hasOption('h')) {
         new HelpFormatter().printHelp("flume-ng agent", options, true);
         return;
      }
      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf"); // 是否reload配置文件
      ​
      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
          isZkConfigured = true;
      }
    3. Zookepper相关:暂时略

    4. 打开配置文件

         
       if (isZkConfigured) {
              ... // 若配置了zk,则使用zk参数启动
            } else {
              // 打开配置文件,如果不存在则快速失败
              File configurationFile = new File(commandLine.getOptionValue('f'));
      ​
              // 确保没有配置文件的时候agent会启动失败
              if (!configurationFile.exists()) {
                ...// If command line invocation, then need to fail fast
              }
              List<LifecycleAware> components = Lists.newArrayList();
      ​
              // 若需要定期reload配置文件
              if (reload) {
                // 使用EventBus事件总线, to allow publish-subscribe-style communication
                EventBus eventBus = new EventBus(agentName + "-event-bus");
                // 读取配置文件,使用定期轮训拉起策略,默认30s拉取一次
                PollingPropertiesFileConfigurationProvider configurationProvider =
                    new PollingPropertiesFileConfigurationProvider(
                        agentName, configurationFile, eventBus, 30);
                components.add(configurationProvider);
                // 向Application注册组件
                application = new Application(components);
                // 向EventBus注册本应用,EB会自动注册Application中使用@Subscribe声明的方法
                // TODO: EventBus, and why reload configuration
                eventBus.register(application);
              } else {
                // 若配置文件不支持定期reload
                PropertiesFileConfigurationProvider configurationProvider =
                    new PropertiesFileConfigurationProvider(agentName, configurationFile);
                application = new Application();
                // 直接使用配置文件初始化Flume组件
       application.handleConfigurationEvent(configurationProvider.getConfiguration());
              }
            }
    5. reload conf:若需要reload,则使用事件总线EventBus实现,Application的handleConfigurationEvent是事件订阅者,PollingPropertiesFileConfigurationProvider是事件发布者,其会定期轮训检查文件是否变更,如果变更则重新读取配置文件,发布配置文件事件变更,而handleConfigurationEvent会收到该配置变更重新进行初始化。

    6. handleConfigurationEvent:

       @Subscribe
        public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
          // MaterializedConfiguration存储Flume运行时需要的组件:Source、Channel、Sink、SourceRunner、SinkRunner等。其是通过ConfigurationProvider进行初始化获取,比如PollingPropertiesFileConfigurationProvider会读取配置文件然后进行组件的初始化。
          stopAllComponents(); // 停止所有组件
          startAllComponents(conf);// 使用配置文件初始化所有组件
        }
    7. startAllComponents

      • 要首先启动channels,等待所有channels启动才能继续。然后启动SinkRunner,准备好消费者。最后启动SourceRunner开始进行采集日志。

      • LifecycleSupervisor是组件守护哨兵,对这些组件进行守护,出问题时默认策略是自动重启。

      • 这里的启动都是supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); 这是如何启动的,我们后面再介绍。

      private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
          logger.info("Starting new configuration:{}", materializedConfiguration);
      ​
          this.materializedConfiguration = materializedConfiguration;
      ​
          // 启动channels。
          for (Entry<String, Channel> entry :
              materializedConfiguration.getChannels().entrySet()) {
            try {
              logger.info("Starting Channel " + entry.getKey());
              // TODO: LifecycleSupervisor启动
              // new SupervisorPolicy.AlwaysRestartPolicy():使用失败时总是重启的策略
              // LifecycleState.START: 初始化组件默认状态为START
              supervisor.supervise(entry.getValue(),
                  new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e) {
              logger.error("Error while starting {}", entry.getValue(), e);
            }
          }
      ​
          /*
           * Wait for all channels to start.
           */
          for (Channel ch : materializedConfiguration.getChannels().values()) {
            while (ch.getLifecycleState() != LifecycleState.START
                && !supervisor.isComponentInErrorState(ch)) {
              try {
                logger.info("Waiting for channel: " + ch.getName() +
                    " to start. Sleeping for 500 ms");
                Thread.sleep(500);
              } catch (InterruptedException e) {
                logger.error("Interrupted while waiting for channel to start.", e);
                Throwables.propagate(e);
              }
            }
          }
      ​
          // 启动sinkRunner
          for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
            try {
              logger.info("Starting Sink " + entry.getKey());
              supervisor.supervise(entry.getValue(),
                  new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e) {
              logger.error("Error while starting {}", entry.getValue(), e);
            }
          }
      ​
          // 启动SourceRunner   TODO: SourceRunner & SinkRunner
          for (Entry<String, SourceRunner> entry :
               materializedConfiguration.getSourceRunners().entrySet()) {
            try {
              logger.info("Starting Source " + entry.getKey());
              supervisor.supervise(entry.getValue(),
                  new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
            } catch (Exception e) {
              logger.error("Error while starting {}", entry.getValue(), e);
            }
          }
      ​
          this.loadMonitoring();
        }
    8. 之后main函数调用了application.start();

      /**
      其循环Application注册的组件,然后守护哨兵对它进行守护,默认策略是出现问题会自动重启组件,假设我们支持reload配置文件,则之前启动Application时注册过PollingPropertiesFileConfigurationProvider组件,即该组件会被守护哨兵守护着,出现问题默认策略自动重启
      **/
      public synchronized void start() {
        // private final List<LifecycleAware> components;
          for (LifecycleAware component : components) {
            // private final LifecycleSupervisor supervisor;
            supervisor.supervise(component,
                new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
          }
        }

      相应的stop函数。首先是main函数中:

      final Application appReference = application;
      // Runtinme.getRuntime(): Returns the runtime object associated with the current Java application.
      /** 
      addShutdownHook: 注册一个新的虚拟机关闭钩子。
      虚拟机shutdown有两种情况:1)当最后一个非守护进行户外那个退出或调用system.exit时,程序正常退出;2)JVM通过ctrl-c等被用户中断。
      **/
      Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
         @Override
         public void run() {
             appReference.stop();
         }
      });
        public synchronized void stop() {
          // 关闭守护哨兵和监控服务。
          supervisor.stop();
          if (monitorServer != null) {
            monitorServer.stop();
          }
        }
    9. 至此,Application整个流程就分析完了。

  • 整体流程可以总结为:

    1. 首先初始化命令行配置;

    2. 接着读取配置文件;

    3. 根据是否需要reload初始化配置文件中的组件;如果需要reload会使用EventBus进行发布订阅变化;

    4. 接着创建Application,创建守护哨兵LifecycleSupervisor,并先停止所有组件,接着启动所有组件;启动顺序:Channel、SinkRunner、SourceRunner,并把这些组件注册给守护哨兵、初始化监控服务MonitorService;停止顺序:SourceRunner、SinkRunner、Channel;

    5. 如果配置文件需要定期reload,则需要注册PollingPropertiesFileConfigurationProvider到守护哨兵;

    6. 最后注册虚拟机关闭钩子,停止守护哨兵和监控服务。

LifecycleSupervisor

  • 守护哨兵,负责监控和重启组件

  • My: 所有需要被监控和重启的组件都应implements LifecycleAware

    public class LifecycleSupervisor implements LifecycleAware {
      public LifecycleSupervisor() {
        lifecycleState = LifecycleState.IDLE;
        // 存放被监控的组件
        supervisedProcesses = new HashMap<LifecycleAware, Supervisoree>();
        // 存放正在被监控的组件
        monitorFutures = new HashMap<LifecycleAware, ScheduledFuture<?>>();
        // 创建监控服务线程池
        monitorService = new ScheduledThreadPoolExecutor(10,
            new ThreadFactoryBuilder().setNameFormat(
                "lifecycleSupervisor-" + Thread.currentThread().getId() + "-%d")
                .build());
        monitorService.setMaximumPoolSize(20);
        monitorService.setKeepAliveTime(30, TimeUnit.SECONDS);
        // 定期清理被取消的组件
        purger = new Purger();
        // 默认不进行清理
        needToPurge = false;
      }
      ... // start() & stop()...
      
      // 进行组件守护
      public synchronized void supervise(LifecycleAware lifecycleAware,
          SupervisorPolicy policy, LifecycleState desiredState) {
        if (this.monitorService.isShutdown()
            || this.monitorService.isTerminated()
            || this.monitorService.isTerminating()) {
          ...// 如果哨兵已停止则抛出异常
        }
    ​
        // 初始化守护组件
        Supervisoree process = new Supervisoree();
        process.status = new Status();
    ​
        // 默认策略是失败重启
        process.policy = policy;
        process.status.desiredState = desiredState;  // 初始化组件默认状态,一般为START
        process.status.error = false;
    ​
        // 组件监控器,用于定时获取组件的最新状态,或重启组件。后面会介绍MonitorRunnable具体做什么。
        MonitorRunnable monitorRunnable = new MonitorRunnable();
        monitorRunnable.lifecycleAware = lifecycleAware;
        monitorRunnable.supervisoree = process;
        monitorRunnable.monitorService = monitorService;
    ​
        supervisedProcesses.put(lifecycleAware, process);
    ​
        // 以固定时间间隔执行monitorRunnable线程
        // scheduleWithFixedDelay: Creates and executes a periodic action. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor.
        // 所以需要把所有异常捕获,才能保证定时任务继续执行。
        ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
            monitorRunnable, 0, 3, TimeUnit.SECONDS);
        monitorFutures.put(lifecycleAware, future);
      }
  • MonitorRunnable:负责进行组件状态迁移或组件故障恢复

    public static class MonitorRunnable implements Runnable {
    ​
        public ScheduledExecutorService monitorService;
        public LifecycleAware lifecycleAware;
        public Supervisoree supervisoree;
    ​
        @Override
        public void run() {
          long now = System.currentTimeMillis();
    ​
          try {
            if (supervisoree.status.firstSeen == null) {
              logger.debug("first time seeing {}", lifecycleAware);
    ​
              supervisoree.status.firstSeen = now; // 记录第一次状态查看时间
            }
    ​
            supervisoree.status.lastSeen = now; // 记录最后一次状态查看时间
            synchronized (lifecycleAware) {
              // 如果守护组件被丢弃或出错了,则直接返回
              if (supervisoree.status.discard) {
                // 也就是此时已经调用了unsupervise
                logger.info("Component has already been stopped {}", lifecycleAware);
                return;
              } else if (supervisoree.status.error) {
                logger.info("Component {} is in error state, and Flume will not"
                    + "attempt to change its state", lifecycleAware);
                return;
              }
    ​
              // 更新最后一次查看到的状态
              supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();
    ​
              // 如果组件的状态和守护组件看到的状态不一致,则以守护组件的状态为准,然后进行初始化
              if (!lifecycleAware.getLifecycleState().equals(
                  supervisoree.status.desiredState)) {
                switch (supervisoree.status.desiredState) {
                    // 如果是启动状态,则启动组件。# 最开始的时候组件应该就是这么启动的
                  case START:
                    try {
                      lifecycleAware.start();
                    } catch (Throwable e) {
                      logger.error("Unable to start " + lifecycleAware
                          + " - Exception follows.", e);
                      if (e instanceof Error) {
                        // This component can never recover, shut it down.
                        supervisoree.status.desiredState = LifecycleState.STOP;
                        try {
                          lifecycleAware.stop();
                          logger.warn("Component {} stopped, since it could not be"
                              + "successfully started due to missing dependencies",
                              lifecycleAware);
                        } catch (Throwable e1) {
                          logger.error("Unsuccessful attempt to "
                              + "shutdown component: {} due to missing dependencies."
                              + " Please shutdown the agent"
                              + "or disable this component, or the agent will be"
                              + "in an undefined state.", e1);
                          supervisoree.status.error = true;
                          if (e1 instanceof Error) {
                            throw (Error) e1;
                          }
                          // Set the state to stop, so that the conf poller can
                          // proceed.
                        }
                      }
                      supervisoree.status.failures++;
                    }
                    break;
                  case STOP:
                    try {
                      lifecycleAware.stop();
                    } catch (Throwable e) {
                      logger.error("Unable to stop " + lifecycleAware
                          + " - Exception follows.", e);
                      if (e instanceof Error) {
                        throw (Error) e;
                      }
                      supervisoree.status.failures++;
                    }
                    break;
                  default:
                    logger.warn("I refuse to acknowledge {} as a desired state",
                        supervisoree.status.desiredState);
                }
    ​
                if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
                  logger.error(
                      "Policy {} of {} has been violated - supervisor should exit!",
                      supervisoree.policy, lifecycleAware);
                }
              }
            }
          } catch (Throwable t) {
            logger.error("Unexpected error", t);
          }
          logger.debug("Status check complete");
        }
      }

Source

SourceRunner

  • 首先是SourceRunner,它控制how a source is driven。

  • 它是一个用来实例化derived classes(派生类)的抽象类。 根据指定的source,来通过其内的static factory method 来实例化runner。

      // 根据指定source的类型来实例化一个source runner的静态工厂方法    
      // 输入是要运行的source,返回可以运行指定source的runner
      public static SourceRunner forSource(Source source) {
        SourceRunner runner = null;
    ​
        if (source instanceof PollableSource) {
          runner = new PollableSourceRunner();
          ((PollableSourceRunner) runner).setSource((PollableSource) source);
        } else if (source instanceof EventDrivenSource) {
          runner = new EventDrivenSourceRunner();
          ((EventDrivenSourceRunner) runner).setSource((EventDrivenSource) source);
        } else {
          throw new IllegalArgumentException("No known runner type for source "
              + source);
        }
    ​
        return runner;
      }

EventDrivenSourceRunner

  • starts、stops and manages EventDrivenSource event-driven sources

  • 其内有如下几个方法:

    • 构造方法

      public EventDrivenSourceRunner() {
          lifecycleState = LifecycleState.IDLE;
        }
    • start()

      @Override
        public void start() {
          Source source = getSource(); //获取Source
          ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
          cp.initialize(); //初始化Channel处理器
          source.start();  //启动Source
          lifecycleState = LifecycleState.START; //本组件状态改成启动状态
        }
    • stop()、toString()、getLifecycleState()

PollableSourceRunner

public class PollableSourceRunner extends SourceRunner {
 @Override
 public void start() {
  PollableSource source = (PollableSource) getSource(); //获取Source
  ChannelProcessor cp = source.getChannelProcessor(); //Channel处理器
  cp.initialize(); //初始化channel处理器
  source.start();  //启动source
​
  runner = new PollingRunner();  //新建一个PollingRunner线程来拉取数据
  runner.source = source;
  runner.counterGroup = counterGroup;
  runner.shouldStop = shouldStop;
​
  runnerThread = new Thread(runner);
  runnerThread.setName(getClass().getSimpleName() + "-" + 
      source.getClass().getSimpleName() + "-" + source.getName());
  runnerThread.start(); 
​
  lifecycleState = LifecycleState.START;
 }
}
  • PollingRunner线程

@Override
  public void run() {
    while (!shouldStop.get()) { //如果没有停止,则一直在死循环运行
      counterGroup.incrementAndGet("runner.polls"); //原子操作
​
      try {
        //调用PollableSource的process方法进行轮训拉取,然后判断是否遇到了失败补偿
        if (source.process().equals(PollableSource.Status.BACKOFF)) {/
          counterGroup.incrementAndGet("runner.backoffs");
​
          //失败补偿时暂停线程处理,等待超时时间之后重试
          Thread.sleep(Math.min(
              counterGroup.incrementAndGet("runner.backoffs.consecutive")
              * source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
        } else {
          counterGroup.set("runner.backoffs.consecutive", 0L);
        }
      } catch (InterruptedException e) {
                }
      }
    }
  }
}
  • TODO

Source

public interface Source extends LifecycleAware, NamedComponent {
  public void setChannelProcessor(ChannelProcessor channelProcessor);
  public ChannelProcessor getChannelProcessor();
} 
  • 继承了LifecycleAware接口,然后只提供了ChannelProcessor的setter和getter接口。其中:

    • 它的的所有逻辑的实现应该在LifecycleAware接口的start和stop中实现;

    • ChannelProcessor用来进行日志流的过滤和Channel的选择及调度。

  • 由上述的Runner我们知道,Source 提供了两种机制: PollableSource (轮训拉取)和 EventDrivenSource (事件驱动)

  • Source作用就是监听日志,采集,然后交给ChannelProcessor处理。

EventDrivenSource

  • 事件驱动型source不需要外部driver来获取event,EventDriven是一个implement Source的空接口。

  • 从这里开始~~~‘

  

Channel

  • 通过 Channel 实现了 Source 和 Sink 的解耦,可以实现多对多的关联,和 Source 、 Sink 的异步化

  • Channel exposes a transaction interface that can be used by its clients to ensure automic put(Event) and take() semantics.

ChannelProcesoor

  • 前面我们了解到Source采集日志后会交给ChannelProcessor处理,so接下来我们从ChannelProcessor入手,其依赖如下组件:

    private final ChannelSelector selector;  //Channel选择器,.flume.ChannelSelector
    private final InterceptorChain interceptorChain; //拦截器链,.flume.interceptor.InterceptorChain
    private ExecutorService execService; //用于实现可选Channel的ExecutorService,默认是单线程实现 [注:这个我在某个博客上看到的,但这个组件我在ChannelProcessor中没有搜到]
  • 我们来看ChannelProcessor是如何处理Event的:

    // Attempts to put the given event into each configured channel 
    public void processEvent(Event event) {
    ​
        event = interceptorChain.intercept(event); //首先进行拦截器链过滤,TODO:intercep...
      // InterceptorChain实现了Interceptor接口,调用a list of other Interceptors. 实现event的过滤和加工。具体见后面
        if (event == null) {
          return;
        }
    ​
        // Process required channels
        //通过Channel选择器获取必须成功处理的Channel,然后事务中执行.
        List<Channel> requiredChannels = selector.getRequiredChannels(event);
        for (Channel reqChannel : requiredChannels) {
          Transaction tx = reqChannel.getTransaction();  // 继承自Channel接口的类要实现getTransaction()方法,TODO:getTransaction
          Preconditions.checkNotNull(tx, "Transaction object must not be null");
          try {
            tx.begin(); //开始事务
    ​
            reqChannel.put(event);  // 将event放到reqChannel
    ​
            tx.commit();   //提交事务
          } catch (Throwable t) {
            tx.rollback();  // 如果捕捉到throwable(including Error & Exception),则回滚事务
            if (t instanceof Error) {
              LOG.error("Error while writing to required channel: " + reqChannel, t);
              throw (Error) t;
            } else if (t instanceof ChannelException) {
              throw (ChannelException) t;
            } else {
              throw new ChannelException("Unable to put event on required " +
                  "channel: " + reqChannel, t);  //TODO: Channelexception可能会被handle,不然如何保证RequiredChannel的成功处理? 
            }
          } finally {
            if (tx != null) {
              tx.close();  // 最后如果事务非空,还得关闭该事务
            }
          }
        }
    ​
        // Process optional channels
        //通过Channel选择器获取可选的Channel,这些Channel失败是可以忽略,不影响其他Channel的处理
        List<Channel> optionalChannels = selector.getOptionalChannels(event);
        for (Channel optChannel : optionalChannels) {
          Transaction tx = null;
          try {
            tx = optChannel.getTransaction();
            tx.begin();
    ​
            optChannel.put(event);
    ​
            tx.commit();
          } catch (Throwable t) {
            tx.rollback();
            LOG.error("Unable to put event on optional channel: " + optChannel, t);
            if (t instanceof Error) {
              throw (Error) t;
            }
          } finally {
            if (tx != null) {
              tx.close();
            }
          }
        }
      }
  • 看下flume内实现的channel类

Channel接口

public interface Channel extends LifecycleAware, NamedComponent {
  // put() and get() must be invoked within an active Transaction boundary
  public void put(Event event) throws ChannelException;
  public Event take() throws ChannelException;
  // @return: the transaction instance associated with this channel
  public Transaction getTransaction();
}

AbstractChannel

  • abstract class AbstractChannel implements Channel, LifecycleAware, Configurable

  • 实现了lifecycleStatus的改变(在构造、start()和stop()方法中),实现了空configure()方法。没有做什么具体的channel相关的处理。

BasicChannelSemantics

  • 基本Channel语义的实现,包括Transaction类的thread-local语义的实现。

public abstract class BasicChannelSemantics extends AbstractChannel {
​
  // 1. 事务使用ThreadLocal存储,保证事务线程安全
  private ThreadLocal<BasicTransactionSemantics> currentTransaction
      = new ThreadLocal<BasicTransactionSemantics>();
​
  private boolean initialized = false;
​
  protected void initialize() {}   // 2. 进行一些初始化工作
​
  // 3.提供给实现类(子类)的创建事务的回调
  // 用于new Transaction对象,该对象必须继承自BasicTransactionSemantics
  // 比如MemoryChannel覆盖了该方法,方法体内new了一个实例,该实例为其内私有类MemoryTransaction,该私有类继承了BasicTransactionSemantics。
  // MemoryTransaction内部用两条双向并发阻塞队列LinkedBlockingDeque实现putList和takeList。具体的稍后看,会介绍到MemoryChannel   TODO
  protected abstract BasicTransactionSemantics createTransaction();
​
  // 4. 往Channel中放Event,其直接委托给事务的put方法
  // 确保该thread存在一个事务,然后将put方法委托给该线程的BasicTransactionSemantics实例
  @Override
  public void put(Event event) throws ChannelException {
    // ThreadLocal<BasicTransactionSemantics>的实例currentTransaction
    // 即取得当前线程的事务实例
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    transaction.put(event);
  }
​
  // 5.从Channel获取Event,也是直接委托给事务的take方法实现
  @Override
  public Event take() throws ChannelException {
    BasicTransactionSemantics transaction = currentTransaction.get();
    Preconditions.checkState(transaction != null,
        "No transaction exists for this thread");
    return transaction.take();
  }
​
  @Override
  public Transaction getTransaction() {
    // 1. 如果channel is not ready, then 初始化该channel
    if (!initialized) {
      synchronized (this) {
        if (!initialized) {
          initialize();
          initialized = true;
        }
      }
    }
​
     // 2. 如果当前线程没有open的事务(无事务或已关闭),则创建一个,并绑定到currentTransaction中
    BasicTransactionSemantics transaction = currentTransaction.get();
    if (transaction == null || transaction.getState().equals(
            BasicTransactionSemantics.State.CLOSED)) {
      transaction = createTransaction();
      currentTransaction.set(transaction);
    }
    return transaction;
  }
}

MemoryChannel

  • 当写入硬盘不实际或不需要数据持久化时,推荐使用。或在单元测试时使用。

  • 大部分channel会把put和take委托给事务去完成。

  • 纯内存的Channel实现,整个事务操作都是在内存中完成的。

  • 每个事务都有一个TakeList和PutList,分别用于存储事务相关的取数据和放数据,等事务提交时才完全同步到Channel Queue,或者失败把取数据回滚到Channel Queue。 TODO:整体理解何时commit、rollback。

    public class MemoryChannel extends BasicChannelSemantics {
      // TODO: about factory
      private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
      ...//一些常量定义:缺省值defaultCapacity、defaultTransCapacity、byteCapacitySlotSize..
    ​
        // 内部类,继承自BasicTransactionSemantics。TODO: About BasicTransactionSemantics
      private class MemoryTransaction extends BasicTransactionSemantics {
        // 每个事务都有两条双向并发阻塞队列,TODO: LinkedBlockingDeque
        private LinkedBlockingDeque<Event> takeList;
        private LinkedBlockingDeque<Event> putList;
        private final ChannelCounter channelCounter; 
        ...//
        public MemoryTransaction(int transCapacity, ChannelCounter counter) {
          putList = new LinkedBlockingDeque<Event>(transCapacity);
          takeList = new LinkedBlockingDeque<Event>(transCapacity);
    ​
          channelCounter = counter;
        }
    ​
        // 将event放到putList中
        // 整个doPut操作相对来说比较简单,就是往事务putList队列放入Event,如果满了则直接抛异常回滚事务;否则放入putList暂存,等事务提交时转移到Channel Queue。另外需要增加放入队列的字节数计数器,以便之后做字节容量限制
        @Override
        protected void doPut(Event event) throws InterruptedException {
          // channelCounter是一个计数器,记录当前队列放入Event数、取出event数、成功数等。
          channelCounter.incrementEventPutAttemptCount(); // 增加放入event计数器
          // estimateEventSize计算当前Event body大小,ceil():向上取整
          int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
    ​
          // 往事务队列的putList中放入Event,如果满了,则抛异常回滚事务
          if (!putList.offer(event)) {
            throw new ChannelException(
                "Put queue for MemoryTransaction of capacity " +
                putList.size() + " full, consider committing more frequently, " +
                "increasing capacity or increasing thread count");
          }
          putByteCounter += eventByteSize; // 增加放入队列字节数计数器
        }
    ​
        // 从Channel Queue中取event放到takeList中
        @Override
        protected Event doTake() throws InterruptedException {
          channelCounter.incrementEventTakeAttemptCount();
          // 如果takeList队列没有剩余容量,即当前事务已经消费了最大容量的Event
          if (takeList.remainingCapacity() == 0) {
            throw new ChannelException("Take list for MemoryTransaction, capacity " +
                takeList.size() + " full, consider committing more frequently, " +
                "increasing capacity, or increasing thread count");
          }
          // queueStored试图获取一个信号量,超时直接返回null 
          if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
            return null;
          }
          Event event;
          // 从Channel Queue获取一个Event, 对Channel Queue的操作必须加queueLock
          synchronized (queueLock) {
            event = queue.poll();
          }
          // 因为信号量的保证,Channel Queue不应该返回null,出现了就不正常了
          Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
              "signalling existence of entry");
          // 暂存到事务的takeList队列 
          takeList.put(event);
          // 计算当前Event body大小并增加取出队列字节数计数器
          int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
          takeByteCounter += eventByteSize;
    ​
          return event;
        }
    ​
        // 等事务提交时,才将当前事务的put list同步到Channel Queue
        @Override
        protected void doCommit() throws InterruptedException {
          // /1、计算改变的Event数量,即取出数量-放入数
          int remainingChange = takeList.size() - putList.size();
          if (remainingChange < 0) {
            // bytesRemaining是字节容量信号量,超出容量则回滚事务
            if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
              throw new ChannelException("Cannot commit transaction. Byte capacity " +
                  "allocated to store event body " + byteCapacity * byteCapacitySlotSize +
                  "reached. Please increase heap space/byte capacity allocated to " +
                  "the channel as the sinks may not be keeping up with the sources");
            }
            if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
              bytesRemaining.release(putByteCounter);
              throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
                  " Sinks are likely not keeping up with sources, or the buffer size is too tight");
            }
          }
          int puts = putList.size();
          int takes = takeList.size();
          synchronized (queueLock) {
            if (puts > 0) {
              while (!putList.isEmpty()) {
                if (!queue.offer(putList.removeFirst())) { // offer:添加一个元素并返回true
                  throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
                }
              }
            }
            putList.clear();
            takeList.clear();
          }
          bytesRemaining.release(takeByteCounter);
          takeByteCounter = 0;
          putByteCounter = 0;
    ​
          queueStored.release(puts);
          if (remainingChange > 0) {
            queueRemaining.release(remainingChange);
          }
          if (puts > 0) {
            channelCounter.addToEventPutSuccessCount(puts);
          }
          if (takes > 0) {
            channelCounter.addToEventTakeSuccessCount(takes);
          }
    ​
          channelCounter.setChannelSize(queue.size());
        }
    ​
        // 事务失败时,将take list数据回滚到Channel Queue
        // 在回滚时,需要把takeList中暂存的事件回滚到Channel Queue,并回滚queueStored信号量。
        @Override
        protected void doRollback() {
          int takes = takeList.size();
          synchronized (queueLock) {
            Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
                "Not enough space in memory channel " +
                "queue to rollback takes. This should never happen, please report");
            while (!takeList.isEmpty()) {
              queue.addFirst(takeList.removeLast());
            }
            putList.clear();
          }
          bytesRemaining.release(putByteCounter);
          putByteCounter = 0;
          takeByteCounter = 0;
    ​
          queueStored.release(takes);
          channelCounter.setChannelSize(queue.size());
        }
    ​
      }
    ​
      private Object queueLock = new Object();
      // 在操作Channel Queue时都需要锁定,因为Channel Queue可能动态扩容(会被重新new)。用法就是synchronized(queueLock){...操作queue}
      @GuardedBy(value = "queueLock")  // 用@GuardedBy注解告诉维护者这个变量被哪个锁保护着
      private LinkedBlockingDeque<Event> queue; // 由一个Channel Queue存储整个Channel的Event数据
    ​
      // Semaphore可控制某资源可被同时访问的个数,acquire()获取一个许可,若无等待,而release()释放一个许可
      // queueRemaining表示可存储事件容量。在提交事务时增加或减少该信号量
      // 1. 首先在configure()函数中初始化为一个capacity大小的信号量
      // 2. 在resize的时候,如果要缩容则要看是否还能acquire到oldCapacity - capacity个许可,不能则不允许缩容(很合理啊,不然就丢失数据了)。若是扩容,则queueRemaining.release(capacity - oldCapacity)
      // 3. 提交事务时,如果takeList.size() < putList.size(),则要检查是否有足够的queueRemaining
      private Semaphore queueRemaining;
    ​
      // 表示ChannelQueue已存储事件容量
      // 2. 在configure()中初始化为一个大小为0的信号量
      // 3. 在doTake()时tryAcquire是否有许可
      // 4. 在commit()时release(puts)增加puts个许可
      // 5. 在rollback()时release(takes)个许可
      private Semaphore queueStored;
    ​
      // maximum items in a transaction queue
      private volatile Integer transCapacity;
      private volatile int keepAlive;
      private volatile int byteCapacity;
      private volatile int lastByteCapacity;
      private volatile int byteCapacityBufferPercentage;
      private Semaphore bytesRemaining;
      private ChannelCounter channelCounter;
    ​
      public MemoryChannel() {
        super();
      }
    ​
      @Override
      public void configure(Context context) {
        // Read parameters from context
        // capacity、transactionCapacity、byteCapacity、byteCapacityBufferPercentage...
      }
    ​
      // 因为多个事务要操作ChannelQueue,还要考虑ChannelQueue的扩容问题,因此MemoryChannel使用了锁来实现;而容量问题则使用了信号量来实现。
      // 改变queue的容量,是通过新建一个LinkedBlockingDeque来实现的,并将原queue的东西加进来。
      private void resizeQueue(int capacity) throws InterruptedException {
        int oldCapacity;
        // 计算原queue的capacity,注意该方法需加锁
        synchronized (queueLock) {
          oldCapacity = queue.size() + queue.remainingCapacity();
        }
    ​
        if (oldCapacity == capacity) {
          return;
        } else if (oldCapacity > capacity) {
          // tryAcquire():从该信号量中获取指定数量的许可
          //首先要预占老容量-新容量的大小,以便缩容容量。如果获取失败,默认是记录日志,然后忽略
          if (!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
            LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
          } else {
            //否则,直接缩容,然后复制老Queue的数据,缩容时需要锁定queueLock,这一系列操作要线程安全
            synchronized (queueLock) {
              LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
              newQueue.addAll(queue);
              queue = newQueue;
            }
          }
        } 

Interceptor

  • flume内部实现了很多自定义的Interceptor,如下图:

  • 同时还实现了InterceptorChain用来链式处理event。

InterceptorChain

  • Implementation of Interceptor that calls a list of other Interceptors

  • Interptor接口: 用于过滤、加工Event,然后返回一个新的Event

  • 相比之下,InterceptorChain就是对event逐个(链式)调用其内的Interceptor(接口子类)实例的各个方法。

    public class InterceptorChain implements Interceptor {
    ​
      // list of interceptors that will be traversed, in order
      private List<Interceptor> interceptors;
    ​
      public InterceptorChain() {
        interceptors = Lists.newLinkedList();  // 构造方法,type LinkedList
      }
      public void setInterceptors(List<Interceptor> interceptors) {
        this.interceptors = interceptors;  // set方法
      }
    ​
      // Interceptor接口的intercept方法: Interception of a single Event.事件拦截
      // @return: Original or modified event, or null if the Event is to be dropped.
      @Override
      public Event intercept(Event event) {   
        for (Interceptor interceptor : interceptors) {
          if (event == null) {
            return null;
          }
          event = interceptor.intercept(event);  // 注意:该类的实例会调用上面的set方法初始化intercptors,其中的intercptor是Interceptor接口子类的实例。所以这里的intercept()方法调用的是Interceptor的某个接口所覆盖的方法。[Interceptor有很多子类,下面有一个demo子类的分析,可以往下看HostInterceptor]
        }
        return event;
      }
    ​
      // Interceptor接口: Interception of a batch of events
      // @return: Output list of events
      @Override
      public List<Event> intercept(List<Event> events) {
        ... // 基本同上面的方法,不过调用的是interceptor.intercept(events);
      }
    ​
      // Interceptor: Any initialization / startup needed by the Interceptor.
      @Override
      public void initialize() {
        Iterator<Interceptor> iter = interceptors.iterator();
        while (iter.hasNext()) {
          Interceptor interceptor = iter.next();
          interceptor.initialize();  // 挨个对linkedlist中的interceptor实例进行initialize
        }
      }
    ​
      @Override
      public void close() {
        ...// 挨个对linkedlist中的interceptor实例进行close
    }

HostInterceptor

  • implements Interceptor

  • 功能:在所有拦截的events的header中上加上本机的host name或IP

    public class HostInterceptor implements Interceptor {
      ... // 一些private变量
      /**
       * Only {@link HostInterceptor.Builder} can build me
       */
      // private的构造方法,so只能通过下面的静态方法Builder实例化
      private HostInterceptor(boolean preserveExisting,
          boolean useIP, String header) {
        // 用xx.conf内的值初始化这些变量
        this.preserveExisting = preserveExisting;
        this.header = header;
        InetAddress addr;
        try {
          addr = InetAddress.getLocalHost(); //Returns the address of the local host.
          if (useIP) {
            //Returns the IP address string in textual presentation
            host = addr.getHostAddress(); 
          } else {
            // Gets the fully qualified domain name for this IP address. 
            host = addr.getCanonicalHostName();
          }
        } catch (UnknownHostException e) {
          logger.warn("Could not get local host address. Exception follows.", e);
        }
    ​
      }
    ​
      @Override
      public void initialize() {
        // no-op
      }
    ​
      /**
       * Modifies events in-place.
       */
      @Override
      public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
    ​
        // 如果要要保存当前的'host‘值并且当前已有头部,那么就不处理直接返回。
        if (preserveExisting && headers.containsKey(header)) {
          return event;
        }
        if (host != null) {
          headers.put(header, host); //将host添加到头部
        }
    ​
        return event;
      }
    ​
      @Override
      public List<Event> intercept(List<Event> events) {
        ... // 为events中的每一个event调用intercept(Event event)
      }
    ​
      @Override
      public void close() {
        // no-op
      }
    ​
      /**
       * Builder which builds new instances of the HostInterceptor.
       */
      public static class Builder implements Interceptor.Builder {
    ​
        private boolean preserveExisting = PRESERVE_DFLT;
        private boolean useIP = USE_IP_DFLT;
        private String header = HOST;
    ​
        @Override
        public Interceptor build() {
          return new HostInterceptor(preserveExisting, useIP, header);
        }
    ​
        @Override
        public void configure(Context context) {
          preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
          useIP = context.getBoolean(USE_IP, USE_IP_DFLT);
          header = context.getString(HOST_HEADER, HOST);
        }
      }
    ​
      public static class Constants {
        public static String HOST = "host";
        ... // 一些配置的缺省值
      }
    }
  • demo Usage in xx.conf: more details see User Guide

    agent.sources.r1.interceptors = i1
    agent.sources.r1.interceptors.i1.type = host
    # preserveExisting: 是否保存当前已存在的'host'值,缺省是不保存
    agent.sources.r1.interceptors.i1.preserveExisting = true
    agent.sources.r1.interceptors.i1.useIP = false
    agent.sources.r1.interceptors.i1.hostHeader = hostname

Selector

  • 先上一张所有selector的继承关系图

    可见ChannelSelector默认提供了两种实现:复制和多路复用。默认实现是ReplicatingChannelSelector。

ChannelSelector

  • interface

  • 基于不同实现政策,允许在channels的集合中选取channels子集。

    // NamedComponent接口:用于给component附加一个名字,包括setName()和getName()方法
    public interface ChannelSelector extends NamedComponent, Configurable {
    ​
      // @param channels:all channels the selector could select from.
      public void setChannels(List<Channel> channels);
    ​
      /**
       * Returns a list of required channels. 这些channels的写入失败会传达回接收事件的source.
       * @param: event
       * @return: the list of required channels that this selector has selected for
       * the given event.
       */
      public List<Channel> getRequiredChannels(Event event);
    /**
     * Returns a list of optional channels. 这些channels的写入失败会被忽略。
     * @param: event
     * @return: the list of optional channels that this selector has selected for
     * the given event.
     */
    public List<Channel> getOptionalChannels(Event event);
    ​
    /**
     * @return the list of all channels that this selector is configured to work
     * with.
     */
    public List<Channel> getAllChannels();
    }
    
    ​
    ## AbstractChannelSelector
    ​
    * abstract class
    ​
      ```java
      public abstract class AbstractChannelSelector implements ChannelSelector {
    ​
        private List<Channel> channels;
        private String name;
        
        ...// override ChannelSelctor的getAllChannels()、setChannels(List<Channel> channels)、setName(String name)、getName()方法。
        
        //@return: A map of name to channel instance.
        protected Map<String, Channel> getChannelNameMap() {
          Map<String, Channel> channelNameMap = new HashMap<String, Channel>();
          for (Channel ch : getAllChannels()) {
            // 对每一个Channel, 将Channel和其名字放到HashMap中
            channelNameMap.put(ch.getName(), ch);  
          }
          return channelNameMap;
        }
    ​
        /**
         * Given a list of channel names as space delimited string,
         * returns list of channels.
         * @return List of {@linkplain Channel}s represented by the names.
         */
        // 根据(space分隔的channel名字的)字符串, 返回相应的channel,利用名字-channel的HashMap
        protected List<Channel> getChannelListFromNames(String channels,
                Map<String, Channel> channelNameMap) {
          List<Channel> configuredChannels = new ArrayList<Channel>();
          if (channels == null || channels.isEmpty()) { // 判空
            return configuredChannels;
          }
          String[] chNames = channels.split(" ");
          for (String name : chNames) {
            Channel ch = channelNameMap.get(name);
            if (ch != null) {
              configuredChannels.add(ch);
            } else {
              throw new FlumeException("Selector channel not found: "
                      + name);
            }
          }
          return configuredChannels;
        }
    ​
      }

ReplicatingChannelSelector

  • ChannelSelector的一个具体实现,即把接收到的消息复制到每一个Channel。【与之对应的,MultiplexingChannelSelector会根据 Event Header 中的参数进行选择,以此来选择使用哪个 Channel】

  • Replicating channel selector. 允许event被放置到source所配置的所有channels中。

  • 实际的实现方式是,默认将所有channel加入requiredChannels中,optionalChannels为空。然后根据配置的"optional"将该配置对应的channel加入optionalChannels,并从requiredChannels中移除(添加和移除是在configure方法中实现的)。 TODO:看一下这个配置如何实现

    public class ReplicatingChannelSelector extends AbstractChannelSelector {
    ​
      // Configuration to set a subset of the channels as optional.
      public static final String CONFIG_OPTIONAL = "optional";
      List<Channel> requiredChannels = null;  // 在configure()中被设置为getAllChannels()的返回值,即所有配置的channels
      List<Channel> optionalChannels = new ArrayList<Channel>();
    ​
      @Override
      public List<Channel> getRequiredChannels(Event event) {
        /*
         * Seems like there are lot of components within flume that do not call
         * configure method. It is conceiveable that custom component tests too
         * do that. So in that case, revert to old behavior.
         */
        // 如果component没有调用configure(),那么requiredChannels为null,此时再调用一次。
        // TODO: configure()方法是在哪里调用的? 同样的问题在很多class中都存在
        if (requiredChannels == null) {
          return getAllChannels();
        }
        return requiredChannels;
      }
    ​
      @Override
      public List<Channel> getOptionalChannels(Event event) {
        return optionalChannels;
      }
    ​
      @Override
      public void configure(Context context) {
        String optionalList = context.getString(CONFIG_OPTIONAL);
        requiredChannels = new ArrayList<Channel>(getAllChannels());
        Map<String, Channel> channelNameMap = getChannelNameMap();
        // 根据OptionList(String, 是空格分隔的channel名字),得到相应的Channel,并将channel放到optionalChannel&& 从requiredChannels中移除。
        if (optionalList != null && !optionalList.isEmpty()) {
          for (String optional : optionalList.split("\s+")) {
            Channel optionalChannel = channelNameMap.get(optional);
            requiredChannels.remove(optionalChannel);
            if (!optionalChannels.contains(optionalChannel)) {
              optionalChannels.add(optionalChannel);
            }
          }
        }
      }
    }

Sink

Sink Runner

  • A driver for sinks that polls them, attempting to process events if any are available in the Channel. All sinks are polled.

    public class SinkRunner implements LifecycleAware {
      private PollingRunner runner; // 内部类,实现了Runnable接口
      private SinkProcessor policy;    // 
    }

Sink Processor

  • 分为两类:

    • DefaultSinkProcessor处理单sink,直接传送不附加任何处理。

      public void start() {
          Preconditions.checkNotNull(sink, "DefaultSinkProcessor sink not set");
          sink.start();    // start()方法直接启动single sink
          lifecycleState = LifecycleState.START;
        }
      // stop()方法类似,configure()方法为空
      public Status process() throws EventDeliveryException {
          return sink.process();   // 直接调用sink的process()
        }
      public void setSinks(List<Sink> sinks) {
          Preconditions.checkNotNull(sinks);
          Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
              + "only handle one sink, "
              + "try using a policy that supports multiple sinks");
          sink = sinks.get(0);
        }
    • 多sink处理(AbstractSinkProcessor),其中又包括两种:

      • FailoverSinkProcessor:故障切换—>通过维持一个sinks的优先级list —> 把故障sinks降级放到一个pool中被赋予一个冷冻周期。必须先调用setSinks()再configure()

        public void setSinks(List<Sink> sinks) {
            // needed to implement the start/stop functionality
            super.setSinks(sinks);
        ​
            this.sinks = new HashMap<String, Sink>();
            for (Sink sink : sinks) {
              this.sinks.put(sink.getName(), sink);
            }
          }
        private Sink moveActiveToDeadAndGetNext() {
            Integer key = liveSinks.lastKey();
            failedSinks.add(new FailedSink(key, activeSink, 1)); // 把当前liveSinks里的第一优先级key移除到failedSinks中
            liveSinks.remove(key);
            if (liveSinks.isEmpty()) return null;
            if (liveSinks.lastKey() != null) {
              return liveSinks.get(liveSinks.lastKey());
            } else {
              return null;
            }
          }
        ...
      • LoadBalancingSinkProcessor: 提供在多个sinks之间负载均衡的能力—> 维持一个active sinks的索引序列(load需分布在这些sinks上) —> 算法包括ROUND_ROBIN(default)和RANDOM选择机制。

        内部通过一个private interface SinkSelector实现。该接口下实现了两个私有静态类RoundRobinSinkSelectorRandomOrderSinkSelector.

满地都是六便士,她却抬头看见了月亮。
原文地址:https://www.cnblogs.com/wttttt/p/6873519.html