Akka(二)

1. future的所有方法都是非阻塞立即返回的

(1)future都要有TimeOut和ExecutionContextExecutor这2个隐士参数
(2)打印future

object HelloWorld extends App{

  val system = ActorSystem.apply()
  val hello: ActorRef = system.actorOf(Props[Hello],"helloactor")
  println(hello.path)
  implicit val ec: ExecutionContextExecutor = system.dispatcher
  implicit val timeout: Timeout = Timeout(5 seconds)
  val future = hello ? "wodetianna"    // 隐士参数timeout

  // future的onFailure方法接受一个PartialFunction
  future onFailure({  // 此方法立即返回,含有隐士参数ExecutionContextExecutor
    case e:Exception => println("failure...")
  })

  println("go on . . ")

  val finalFuture: Future[Any] = future.fallbackTo(Future(111)) // 此方法立即返回。fallback表示如果future成功返回,则不会返回Future(111)。二选一,有限返回前面成功地future

  println("go on 2 ...")

  finalFuture foreach println   // 遍历future的结果

  system.terminate
}

/**
  * akka://default/user/helloactor
    go on . .
    go on 2 ...
    wodetianna
    111
    failure...
  */

class Hello extends Actor{

  override def receive: Receive = {
    case msg:String => {
      Thread.sleep(2000)
      println(msg)
      throw new RuntimeException("my exception")   //此处抛出异常,则下面的sender() ! "yes"并不会执行,future.fallbackTo(Future(111))的结果是Future(111)
      sender() ! "yes"
    }
  }
}

2. 用Await.result等待future返回

object Test2 extends App{

  val system = ActorSystem.apply()
  val actorOf: ActorRef = system.actorOf(Props[MyIntActor],"helloactor")

  implicit val timeout: Timeout = Timeout(5 seconds)
  implicit val ec = system.dispatcher

  val future1 = ask(actorOf,1)
  val future2 = ask(actorOf,2)  //等同于actorOf ? 2

  println("go on ..")

  val eventualInt: Future[Int] = for {
    a <- future1.mapTo[Int]
    b <- future2.mapTo[Int]
    c <- Future(a + b).mapTo[Int]
  } yield c

  Await.result(eventualInt,timeout.duration) //阻塞情况要加上Await.result。否则future的方法全是立即返回
  eventualInt foreach println  //立即返回

  println("done")
  system.terminate
}

/** 结果
  * go on ..
    done
    3
  */

3. 使actor停止的kill与poisionpill信号

case class spark()
case class hadoop()

object TEst3 extends App{
  val system = ActorSystem.apply()
  val hello: ActorRef = system.actorOf(Props[myActor],"myactor")

  hello ! spark
}

class myActor extends Actor{
  override def receive: Receive = {
    case msg:spark => {
      println("spark")
      self ! Kill   // mailbox未处理的消息持久化存储起来,等待下次启动时重新初六老消息
    }
    case msg:hadoop => {
      println("haha")
      self ! PoisonPill    // 放弃正在处理和mailbox中的未处理信息,通知子actor终止,听之前执行poststop方法
      self ! Stop          // stop方法和PoisionPill类似,但是会先处理掉当前的任务后再停止
    }
  }

  @scala.throws[Exception](classOf[Exception])
  override def postStop(): Unit = {
    println("destory")
  }
}
原文地址:https://www.cnblogs.com/72808ljup/p/5606336.html