Scala-Akka 实例

Akka 实例

18.1需求分析

 

  实现一个分布式模型,Master 保持所有 Worker 节点的信息,根据
Worker 的心跳信息维持与 Worker 的连接,Worker 启动时向 Master 节点进行
注册,Master 节点回复 ACK 信息。 

18.2项目源代码

18.2.1 新建 Maven 项目 AkkaSystem

pom.xml 文件如下: 
 
 
 
 
 
 
 

18.2.2 WorkInfo 类抽象 

class WorkerInfo(val id : String, val workerHost : String, val memory :
String, val cores : String) {
  var lastHeartbeat : Long = System.currentTimeMillis()
  override def toString = s"WorkerInfo($id, $workerHost, $memory, $cores)"

}

18.2.3 ActorMessage 

case class RegisterWorker(val id : String, val workerHost : String, val memory : String, val cores : String)
case class HeartBeat(val workid : String)
case class CheckOfTimeOutWorker()
case class RegisteredWorker(val workerHost : String)
case class SendHeartBeat()

18.2.4 Master

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global


class Master extends Actor{
  //保存 WorkerID 和 Work 信息的 map
  val idToWorker = new mutable.HashMap[String, WorkerInfo]
  //保存所有 Worker 信息的 Set
  val workers = new mutable.HashSet[WorkerInfo]
  //Worker 超时时间
  val WORKER_TIMEOUT = 10 * 1000
  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //启动定时器,定时执行
    context.system.scheduler.schedule(5 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
  }
  //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息
  override def receive: Receive = {
    //Worker 向 Master 发送的注册消息
    case RegisterWorker(id, workerHost, memory, cores) => {
      if(!idToWorker.contains(id)) {
        val worker = new WorkerInfo(id, workerHost, memory, cores)
        workers.add(worker)
        idToWorker(id) = worker
        println("new register worker: "+worker)
        sender ! RegisteredWorker(worker.id)
      }
    }
    //Worker 向 Master 发送的心跳消息
    case HeartBeat(workerId) => {
      val workerInfo = idToWorker(workerId)
      println("get heartbeat message from: "+workerInfo)
      workerInfo.lastHeartbeat = System.currentTimeMillis()
    }
    //Master 自己向自己发送的定期检查超时 Worker 的消息
    case CheckOfTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
      for(worker <- toRemove){
        workers -= worker
        idToWorker.remove(worker.id)
      }
      println("worker size: " + workers.size)
    }
  }
}
object Master {
  //程序执行入口
  def main(args: Array[String]) {
    val host = "localhost"
    val port = 8888
    //创建 ActorSystem 的必要参数
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
""".stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem 是单例的,用来创建 Actor
    val actorSystem = ActorSystem.create("MasterActorSystem", config)
    //启动 Actor,Master 会被实例化,生命周期方法会被调用
    actorSystem.actorOf(Props[Master], "Master")
  }
} 

18.2.5 Worker 

import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global


class Worker extends Actor{
  //Worker 端持有 Master 端的引用(代理对象)
  var master: ActorSelection = null

  //生成一个 UUID,作为 Worker 的标识
  val id = UUID.randomUUID().toString
  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //Worker 向 MasterActorSystem 发送建立连接请求
    master =
      context.system.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/use
        r/Master")
      //Worker 向 Master 发送注册消息
      master ! RegisterWorker(id, "localhost", "10240", "8")
      }
      //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息
      override def receive: Receive = {
      //Master 向 Worker 的反馈信息
      case RegisteredWorker(masterUrl) => {
      //启动定时任务,向 Master 发送心跳
      context.system.scheduler.schedule(0 millis, 5000 millis, self,
      SendHeartBeat)
      }
      case SendHeartBeat => {
      println("worker send heartbeat")
      master ! HeartBeat(id)
      }
      }
      }
      object Worker {
      def main(args: Array[String]) {
      val clientPort = 8889
      //创建 WorkerActorSystem 的必要参数
      val configStr =
      s"""
                  |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
                  |akka.remote.netty.tcp.port = $clientPort
""".stripMargin
    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("WorkerActorSystem", config)
    //启动 Actor,Master 会被实例化,生命周期方法会被调用
    actorSystem.actorOf(Props[Worker], "Worker")
  }
}

18.3项目运行 

Master: 
 
Worker:
原文地址:https://www.cnblogs.com/LXL616/p/11136054.html