scala学习之实现RPC通信

最近学习scala,个人感觉非常灵活,实现rpc通信非常简单,函数式编程比较烧脑

1.搭建工程 创建scala maven 工程  

项目pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.xiaot</groupId>
  <artifactId>scala-demo</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.10.6</scala.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-actors</artifactId>
      <version>2.10.6</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_2.10</artifactId>
      <version>2.3.14</version>
    </dependency>

    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-remote_2.10</artifactId>
      <version>2.3.14</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

2. 创建Master.scala  作为主节点保持跟Worker进行通信  提供Worker注册信息存储和心跳检测

package com.xiaot.master

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import com.xiaot.worker.WorkerInfo
import scala.concurrent.duration._
import scala.collection.mutable


/**
  * Created by xiaot on 2018/4/2.
  * scla 实现RPC
  */
class Master(val host:String,val port :Int)  extends Actor{

  //存储worker注册信息
  val idToWork = new mutable.HashMap[String,WorkerInfo]()
  //用于心跳检测动态删除添加worker节点信息
  val workers = new mutable.HashSet[WorkerInfo]()
  //超时时间
  val CHECK_TIME_OUT = 15000

  override def preStart(): Unit = {
    println("master preStart ")
    //定时检查worker是否存活 实时删除不存活的worker
    import context.dispatcher
    context.system.scheduler.schedule(0 millis,CHECK_TIME_OUT millis,self,CheckTimeOutWorker)

  }

  override def receive: Receive = {
    case RegisterWorker(id,memory,cores) =>{

      if (!idToWork.contains(id)){
        val workerInfo = new WorkerInfo(id,memory,cores)
        idToWork(id) = workerInfo
        workers+=workerInfo
      }
     println("worker:"+id+" regist to master success")
      sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/master")
    }
    case HeartBeat(id) =>{
      if (idToWork.contains(id)){
        //获取对应的worker
        val workerInfo = idToWork(id)
        val currentTime = System.currentTimeMillis()
        //心跳检测更新时间
        workerInfo.lastHeartbeatTime = currentTime
      }
    }
    case CheckTimeOutWorker =>{
      val currentTime = System.currentTimeMillis()
      val preMovesWorkers = workers.filter(x => currentTime-x.lastHeartbeatTime>CHECK_TIME_OUT)

      for (w<-preMovesWorkers){
        idToWork -= w.id
        workers -=w
      }
      println(workers.size)
    }
  }
}

object Master{
  def main(args: Array[String]): Unit = {

    val host = args(0)
    val port = args(1).toInt

    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("MasterSystem",config)
  // 创建actor
    val master = actorSystem.actorOf(Props(new Master(host,port)),"master")

    actorSystem.awaitTermination()
  }



}

Master运行参数:

3.创建Worker.scala  作为工作节点 定期向master心跳检测和节点注册信息

package com.xiaot.worker

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._
import com.xiaot.master.{HeartBeat, RegisterWorker, RegisteredWorker, SendHeartBeat}

/**
  * Created by xiaot on 2018/4/3.
  */
class Worker(val masterHost:String,val port:Int,val memory:Int,val cores:Int) extends Actor{

  var  master :ActorSelection = _
  val workerId = UUID.randomUUID().toString
  val  HEART_INTERAL=10000

  override def preStart(): Unit = {
    //与master创建连接
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$port/user/master")
    //向master注册
    master ! RegisterWorker(workerId,memory,cores)
  }

  override def receive: Receive = {

    case RegisteredWorker(masterUrl) =>{

      println("worker:"+workerId+" 已经成功注册到"+masterUrl)
      //心跳检测
      import context.dispatcher
      //自己调用自己通过case类进行实际心跳检测实现
      context.system.scheduler.schedule(0 millis,HEART_INTERAL millis,self,SendHeartBeat)
    }
    case SendHeartBeat=>{
      println("send heartbeat to master")
      master ! HeartBeat(workerId)
    }
  }


}
object Worker{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt
    val memory = args(4).toInt
    val cores = args(5).toInt
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin

    var config = ConfigFactory.parseString(configStr)
    var  worder = ActorSystem("WorderSystem",config)
    worder.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)))
    worder.awaitTermination()
  }
}

Worker运行参数:

WorkerInfo.scala 用于保存工作节点的信息

package com.xiaot.worker

/**
  * Created by xiaot on 2018/4/4.
  */
class WorkerInfo(val id:String,val memory:Int,val cores:Int){

  var lastHeartbeatTime :Long = _
}

 RemoteMessage.scala 用于远程信息发送,其中包括心跳检测等样例类

package com.xiaot.master

/**
  * Created by xiaot on 2018/4/4.
  */
trait RemoteMessage extends Serializable
//worker -->master
case class RegisterWorker(id:String,memory:Int,cores:Int) extends RemoteMessage
//master--->worker
case class RegisteredWorker(masterUrl:String) extends RemoteMessage
//worker---master
case  class HeartBeat(id:String) extends RemoteMessage
//worker -->self
case object SendHeartBeat
//master -->self
case object CheckTimeOutWorker

4. 执行结果

原文地址:https://www.cnblogs.com/shoutn/p/8717750.html