


 * Implementation class of the “ask” pattern enrichment of ActorRef
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {

   * INTERNAL API: for binary compatibility
  protected def ask(message: Any, timeout: Timeout): Future[Any] =
    internalAsk(message, timeout, ActorRef.noSender)

  def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
    internalAsk(message, timeout, sender)

   * INTERNAL API: for binary compatibility
  protected def ?(message: Any)(implicit timeout: Timeout): Future[Any] =
    internalAsk(message, timeout, ActorRef.noSender)

  def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] =
    internalAsk(message, timeout, sender)

   * INTERNAL API: for binary compatibility
  private[pattern] def internalAsk(message: Any, timeout: Timeout, sender: ActorRef) = actorRef match {
    case ref: InternalActorRef if ref.isTerminated ⇒
      actorRef ! message
      Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
    case ref: InternalActorRef ⇒
      if (timeout.duration.length <= 0)
        Future.failed[Any](new IllegalArgumentException(s"""Timeout length must be positive, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
      else {
        val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef, message.getClass.getName, sender)
        actorRef.tell(message, a)
    case _ ⇒ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))



implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef(actorRef)


def apply(provider: ActorRefProvider, timeout: Timeout, targetName: Any, messageClassName: String,
            sender: ActorRef = Actor.noSender, onTimeout: String ⇒ Throwable = defaultOnTimeout): PromiseActorRef = {
    val result = Promise[Any]()
    val scheduler = provider.guardian.underlying.system.scheduler
    val a = new PromiseActorRef(provider, result, messageClassName)
    implicit val ec = a.internalCallingThreadExecutionContext
    val f = scheduler.scheduleOnce(timeout.duration) {
      result tryComplete Failure(
        onTimeout(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "${a.messageClassName}"."""))
    result.future onComplete { _ ⇒ try a.stop() finally f.cancel() }



override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = state match {
    case Stopped | _: StoppedWithPath ⇒ provider.deadLetters ! message
    case _ ⇒
      if (message == null) throw InvalidMessageException("Message is null")
      if (!(result.tryComplete(
        message match {
          case Status.Success(r) ⇒ Success(r)
          case Status.Failure(f) ⇒ Failure(f)
          case other             ⇒ Success(other)
        }))) provider.deadLetters ! message

   很明显,接收消息的Actor会通过!返回对应的消息,消息的处理一般会命中 case other,这其实就是给result赋值,在超时之前的赋值。如果在!方法内部给result赋值的时候,刚好已经超时或已经赋过值,会把返回的消息发送给deadLetters。

