akka监控框架设计

  本博客介绍一种AOP、无侵入的akka监控方案,方便大家在生产使用akka的过程中对akka进行监控。

  对于自身javaer来说,AOP三个字母基本就解释清楚了akka监控框架的原理。哈哈哈,不过我这里还是啰嗦一点,把相关的方案和框架介绍一下。

  无侵入监控方案,不管是对akka还是其他java应用来说,经常听到动态代理、静态代理、AOP这些词汇。AOP为Aspect Oriented Programming的缩写,意为:面向切面编程。使用Spring的童鞋一定听过这个词。虽然AOP一般用在spring上,但用在我们的akka监控来说也是非常合适的。静态代理、动态代理只不过是AOP的一种实现方式,就是用统一的代理类来拦截特定对象的执行过程,并加以监控。但这有一个比较严重的缺陷,至少在akka的监控中是这样的,那就是代理类会影响目标类的类型,比如通过classOf获取到的是代理类的类型。字节码注入是另一种AOP的实现方式,它是通过编译期或加载期修改目标类的字节码,来进行监控的。简单来说就是修改了Java的class文件,从而在不修改源码的情况下,修改编译结果。字节码注入有很多框架,但我们这里选择了ASPECTJ,这个工具可以通过注解的形式定义注入点,非常方便。

  好了,基本的知识点就讲解清楚了,下面直接上代码及其框架。

  监控框架工程命名为akkamon,意为akka的监控,哈哈。分为四个模块:

  • akkamon-core。核心模块,定义基本类型,主要用来定义注入点函数,就是aspectJ中的pointCut。
  • akkamon-injector。注入器。定义并实现ActorSystem的一个AkkaInjectorExtension,将注入点拦截到的信息以消息的形式发送给特定的actor
  • akkamon-java-shell。pointCut实现模块。因为aspectj-maven-plugin对scala支持不太友好,此处用Java实现对pointCut的具体实现。
  • akkamon-target。测试模块

  下面我们分模块详细讲解各个类的定义及其意义。首先是akkamon-core

trait Instrumentation

   Instrumentation意为仪器、仪表,代表一组注入函数集合。

trait ActorCellInstrumentation extends Instrumentation {
  def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef ):Unit
  def beforeActorTerminate(cell:ActorCell):Unit
}

   比如ActorCellInstrumentation代表ActorCell的监控仪表,它定义了两个函数,分别是ActorCell创建前和销毁前。

trait ActorSystemInjectPoint extends ListenerInstrumentation
  with ActorCellInstrumentation
  with ActorSystemInstrumentation
  with MessageInstrumentation

   ActorSystemInjectPoint代表ActorSystem的所有注入点,它主要是所有Instrumentation的一个汇总。

class ActorSystemInjectPointImpl extends ActorSystemInjectPoint

   ActorSystemInjectPoint有一个实现类,它来实现ActorSystemInjectPoint,主要是把拦截到的消息通知给listener,我们来看看其中一个函数的实现。

override def beforeActorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, parent: ActorRef): Unit = {
    val message = notifyMessage(ActorCellCreation(system.name,ref,props,parent))
    log.debug(s"beforeActorCellCreation $message")
  }

   很明显在拦截到ActorCell创建的时候,会构建ActorCellCreation这个case class,然后通过notifyMessage函数把消息通知给listener。

  notifyMessage是ActorSystemInstrumentationListeners的一个方法,ActorSystemInstrumentationListeners维护了ActorSystemInstrumentationListener列表,在notifyMessae内部,依次调用所有listener的onMessage函数。

abstract class ActorSystemInstrumentationListener{
  def onMessage(message:InjectMessage):Unit
  def close():Unit
}

   那么到这里,可能会有读者问,ActorSystemInstrumentationListener是什么时候注册到ActorSystemInstrumentationListeners里面的呢?答案就是监控ActorSystemInstrumentationListener构造函数的调用,然后把它register到listeners列表的。那Instrumentation定义的那些函数又是啥时候调用的呢?答案就在akkamon-java-shell模块。

@Aspect
public class InstrumentationJavaShell implements ActorSystemInjectPoint 

   这个模块只有一个InstrumentationJavaShell类,它继承了ActorSystemInjectPoint,所以它会实现你所有定义的注入点,并把相应的函数调用转发给ActorSystemInjectPointImpl

private static ActorSystemInjectPointImpl actorSystemInstrumentation = new ActorSystemInjectPointImpl();

   ActorSystemInjectPointImpl是InstrumentationJavaShell里面的一个静态类。

@Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, *, parent)")
    public void actorCellCreation(ActorSystem system , ActorRef ref,Props props , ActorRef parent){}

    @Override
    @Before("actorCellCreation(system,ref,props,parent)")
    public void beforeActorCellCreation(ActorSystem system , ActorRef ref, Props props , ActorRef parent) {
        actorSystemInstrumentation.beforeActorCellCreation(system,ref,props,parent);
    }

   上面两个函数分别定义了一个Pointcut和一个advice,Pointcut是对akka.actor.ActorCell.new的注入,Before这个advice是在akka.actor.ActorCell.new执行之前调用,可以看到beforeActorCellCreation就是简单的把相关的参数传给actorSystemInstrumentation.beforeActorCellCreation,而beforeActorCellCreation里面是通知各个listener。

  下面是listener的注册过程。

@Pointcut("execution(com.gabry.akkamon.listener.ActorSystemInstrumentationListener.new(..)) && this(listener)")
    public void listenerCreation(ActorSystemInstrumentationListener listener) {

    }

    @Override
    @After("listenerCreation(listener)")
    public void afterListenerCreation(ActorSystemInstrumentationListener listener) {
        actorSystemInstrumentation.afterListenerCreation(listener);
    }

   listener的注册过程其实也是通过AOP来实现的。

class ActorListener(actorRef:ActorRef) extends ActorSystemInstrumentationListener{
  override def onMessage(message: InjectMessage): Unit = {
    actorRef ! message
  }

  override def close(): Unit = {

  }
}

   上面是一个listener的实现,其实就是简单的把消息通知给了相关的actor。

  至此,一个akka的监控框架就实现了,是不是很简单呢?框架很简单,麻烦的是Instrumentation的定义和实现啊。

  读者问怎么执行?那就是java命令行的-javaagent参数啊。

原文地址:https://www.cnblogs.com/gabry/p/9543990.html