Akka中的Message Stash

    Akka是JVM平台上构建高并发、分布式以及高度容错应用的工具包,其基于Actor模型实现了m:n的线程模式(m大于n,m是actor实例的个数,n是线程数量)。akka程序中每个actor实例都扮演一种角色或者实现某一个功能,每一个actor实例都有对应的消息邮箱,actor从自己的邮箱中消费消息,actor之间通过向对方的邮箱发送消息互相交流。常常遇到这样一种情形:某个actor需要在A和B两种状态下切换,以及分别在每种状态下接收各自对应的信息Messsage-A类型和Messsage-B类型的消息实例。由于当处于A状态时,仍然会收到本该处于B状态时处理的Messsage-B类型的消息实例;当处于B状态时,亦是如此。
    解决思路是把当前无法处理的消息暂存(stash),在切换对应的状态前把所有的暂存的消息添加到消息队列最前方,这些被释放出来的消息仍然按照它们被接收到顺序依次处理。

简单应用

    实现一个akka集群监控功能(AkkaClusterHealthActor),等待http请求,在响应请求时向集群中所有其他节点请求健康状态,等待所有节点全部返回健康信息,汇总信息后,再完成http请求,最后回到最初状态:等待http请求。

编码分析

    以上问题可以大致分为两种状态:等待http请求、等待其他节点的健康状态,但是在等待其他节点健康状态时,仍会收到http请求,需要暂存http请求,稍后释放做进一步处理。

object SomeApp {
  ..... //程序初始化 ActorSystem、ExecutorService ....
  val clusterhealthActor = system.actorOf(Props[AkkaClusterHealthActor], "clusterhealthActor") 
  ...
  def main(args: Array[String]): Unit = {
    ...
    val router = 
      path("healthState"){ //akka-http dsl
        get{
          imperativelyComplete {
            httpCtx =>
              clusterhealthActor ! httpCtx //向该actor发送http请求实例
          }
        }
      }
    ...
  }
}
class AkkaClusterHealthActor extends Actor with Stash {

  val cluster = Cluster(context.system) //获取集群状态信息,包括节点的列表、地址、状态等等

  override def preStart(): Unit = context.become(waitingForHttpRequest) //actor初始化时

  override def receive: Receive = {
    case _ =>
  }

  def waitingForHttpRequest: Receive = {
    case httpCtx: ImperativeRequestContext => //等待http请求
      val members = cluster.state.members.toList //包含自身节点
      members.foreach { member =>
        context.actorSelection(s"${member.address}/user/akkaMonitorActor") ! "FetchState" //请求集群的节点健康状态 ,member.address akka.tcp://itoa@127.0.0.1:2551
      }
      context.become(waitingForAkkaNode(httpCtx, members.length, Nil)) //利用FSM避免了局部变量的产生,members.length 需要等待几个节点的返回,有可能超时
      context.setReceiveTimeout(3 seconds)
  }

  def waitingForAkkaNode(ctx: ImperativeRequestContext, memberNum: Int, results: List[StateResult]): Receive = { //集群个数 用于接收到的信息个数
    case nodeStateResult: StateResult =>
      (memberNum - 1) match {
        case 0 => //完成任务 取消超时设置
          context.setReceiveTimeout(Duration.Inf)
          ctx.complete(nodeStateResult)
          context.become(waitingForHttpRequest)
          unstashAll() //完成该请求,处理其他请求
        case _ => //仍有部分节点没有返回健康状态结果,继续等待
          context.become(waitingForAkkaNode(ctx, memberNum - 1, nodeStateResult :: results)) //这里的代码有点类似于 `尾递归`
      }
    case ReceiveTimeout =>
      context.setReceiveTimeout(Duration.Inf) //完成任务 取消超时设置
      ctx.complete(results) //返回部分结果,完成http请求
      context.become(waitingForHttpRequest)
      unstashAll() //完成该次请求,释放暂存的http请求,处理其他请求
    case _ =>
      stash() //处理某个请求中,但是收到了其他http请求,暂存消息
  }
}

    以上代码大致实现了具体逻辑,actor在waitingForHttpRequestwaitingForAkkaNode两种状态之间来回切换,让我们来看看akka中关于该功能的源码。

...
trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueueSemantics]

    重启之前(并不是重启之后)应当释放所有暂存的消息,同样的,而且需要在关闭之后释放暂存的消息。

...
trait UnrestrictedStash extends Actor with StashSupport {
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    try unstashAll() finally super.preRestart(reason, message)
  }
  override def postStop(): Unit = try unstashAll() finally super.postStop()
}
private[akka] trait StashSupport {

  private[akka] def context: ActorContext

  private[akka] def self: ActorRef

  //基于`scala.collection.immutable.Vector`,当存储的消息数量很大时,也可以获得很好的性能
  private var theStash = Vector.empty[Envelope]

  // ActorContext是ActorCell的子类
  private def actorCell = context.asInstanceOf[ActorCell] 
  
  //可为暂存的消息设置数量限制,通过`stash-capacity`配置,默认为-1,即容量不限
  private val capacity: Int =
    context.system.mailboxes.stashCapacity(context.props.dispatcher, context.props.mailbox)

  private[akka] val mailbox: DequeBasedMessageQueueSemantics = {
    actorCell.mailbox.messageQueue match {
      case queue: DequeBasedMessageQueueSemantics ⇒ queue
      case other ⇒ throw ActorInitializationException
    }
  }
  
  //1. 获取当前的消息,不能将同一个消息存两次(不能调用两次`stash()`函数,`eq`用于比较对象之间的内存地址)
  //2. capacity默认-1,存储容量不限
  //3. 暂存的消息被添加到`theStash`集合的头部(遍历或者迭代时,从尾向头遍历)
  def stash(): Unit = {
    val currMsg = actorCell.currentMessage
    if (theStash.nonEmpty && (currMsg eq theStash.last))
      throw new IllegalStateException(s"Can't stash the same message $currMsg more than once")
    if (capacity <= 0 || theStash.size < capacity) 
      theStash = theStash :+ currMsg 
    else throw new StashOverflowException
  }

   //当`others`很小时,而`theStash`很大,该方法很高效
   //从头向尾依次将`others`中元素添加到`theStash`尾
  private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
    theStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)
    ...
  
  //`reverseIterator`反序输出`theStash`中的每一封邮件,依次添加到邮箱的头部
  def unstashAll(): Unit = {
    try {
      val i = theStash.reverseIterator 
      while (i.hasNext) enqueueFirst(i.next())
    } finally {
      theStash = Vector.empty[Envelope]
    }
  }
  ...
  
  //把该封信放在邮箱的第一个位置
  private def enqueueFirst(envelope: Envelope): Unit = {
    mailbox.enqueueFirst(self, envelope)
    envelope.message match {
      case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref)
      case _               ⇒
    }
  }
}

    基于scala.collection.immutable.Vector的实现,在存储数量很大时,仍然可以获得很好的性能。
    同一个消息不能被暂存两次,否则程序抛出IllegalStateException两次。
    theStash = theStash :+ currMsgtheStash = others.foldRight(theStash)((envelope, s) ⇒ envelope +: s)val i = theStash.reverseIterator这几句关于集合操作与迭代的代码确保了:这些被释放出来的消息依然按照它们被接收到顺序依次处理,先来后到规则要遵守。

知难行易
原创博文,请勿转载
我的又一个博客hangscer.win
原文地址:https://www.cnblogs.com/hangscer/p/8465686.html