scala快速入门04

Scala从入门开始04

1.分布式通信框架Akka

  • 对底层异步IO(NIO)封装,使用起来方便。Java中有Netty,Mina。Scala有Akka。

1.1什么是ACTORS

  • Actor是用来收发消息的,一个Actor就是一个实例,可以创建多个Actor实现并发。所有的Actor都有管理者,它用于创建Actor和管理Actor。以后根据发送带类型的消息进行匹配,处理不同逻辑。用到Scala实现用到技术点:case class/case object。 Akka Actors遵循Actor模型,Actors并发编程不需要锁,而是通过通信的机制实现并发。

1.2什么是Akka

  • Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。

    Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息传递

1.3Akka的特点

1. 单并发性  和  分布式
2. 可恢复的,弹性的
3. 高效的
4. 弹性,去中心化(分散)
5. 可扩展

1.4 Akka快速入门

  • 负责管理的角色:ActorSystem

  • 负责通信:Actor

  • 发送的消息:case class 和 case object

  • 发消息的方式: 异步、同步

  • 示例演示步骤:

    1. 创建一个Maven工程

      • idea中New --> project--> Maven --> Next --> 填写项目名
    2. 当前创建项目为Java项目,需要导入scala插件

      <build>
              <sourceDirectory>src/main/scala</sourceDirectory>
              <testSourceDirectory>src/test/scala</testSourceDirectory>
              <plugins>
                  <!-- 编译scala -->
                  <plugin>
                      <groupId>net.alchim31.maven</groupId>
                      <artifactId>scala-maven-plugin</artifactId>
                      <version>3.2.2</version>
                      <executions>
                          <execution>
                              <goals>
                                  <goal>compile</goal>
                                  <goal>testCompile</goal>
                              </goals>
                              <configuration>
                                  <args>
      <!--                                <arg>-make:transitive</arg>-->
                                      <arg>-dependencyfile</arg>
                                      <arg>${project.build.directory}/.scala_dependencies</arg>
                                  </args>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
      
                  <!-- 打包插件 -->
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>2.4.3</version>
                      <executions>
                          <execution>
                              <phase>package</phase>
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <filters>
                                      <filter>
                                          <artifact>*:*</artifact>
                                          <excludes>
                                              <exclude>META-INF/*.SF</exclude>
                                              <exclude>META-INF/*.DSA</exclude>
                                              <exclude>META-INF/*.RSA</exclude>
                                          </excludes>
                                      </filter>
                                  </filters>
                                  <transformers>
                                      <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                          <resource>reference.conf</resource>
                                      </transformer>
                                      <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                          <mainClass>cn._51doit.rpc.Master</mainClass>
                                      </transformer>
                                  </transformers>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
    3. 根据导入scala插件创建目录:main下创建scala文件夹,右键Make Directory as ---> Sources root。test下创建scala文件夹。右键Make Directory as ---> Sources test root。

    4. 右键项目->Add Frameworks Support->选择scala

    5. 引入scala依赖

      <!-- 导入scala的依赖 -->
          <properties>
              <maven.compiler.source>1.8</maven.compiler.source>
              <maven.compiler.target>1.8</maven.compiler.target>
              <encoding>UTF-8</encoding>
              <scala.version>2.11.12</scala.version>
              <scala.compat.version>2.11</scala.compat.version>
              <akka.version>2.4.17</akka.version>
          </properties>
      
      
          <dependencies>
              <!-- scala的依赖 -->
              <dependency>
                  <groupId>org.scala-lang</groupId>
                  <artifactId>scala-library</artifactId>
                  <version>${scala.version}</version>
              </dependency>
      
              <!-- akka actor依赖 -->
              <dependency>
                  <groupId>com.typesafe.akka</groupId>
                  <artifactId>akka-actor_2.11</artifactId>
                  <version>${akka.version}</version>
              </dependency>
      
              <!-- akka远程通信依赖 -->
              <dependency>
                  <groupId>com.typesafe.akka</groupId>
                  <artifactId>akka-remote_2.11</artifactId>
                  <version>${akka.version}</version>
              </dependency>
      
        </dependencies>
      
    6. 联网等待maven下载依赖。

    7. maven安装步骤:https://www.cnblogs.com/sunleejon/p/12391093.html

    8. idea强制清除Maven缓存:https://www.cnblogs.com/-beyond/p/11557196.html

1.5Akka RPC通信过程:

1.启动Master,内部启动一个定时器,定期检测超时Worker
2.启动所有Worker,Worker首先向Master建立连接,然后向Master注册,Worker把自身的信息发送给Master(ip,端口,资源)。
3.Master接收到Worker发送的注册信息,然后将Worker的信息保存起来,然后向Worker反馈信息,告诉Worker注册成功了。
4.Worker接收到了Master反馈注册成功信息,然后定期向Master发送心跳。
5.Master接收Worker的心跳信息,然后定期更新对应Worker的上一次心跳时间。

1.6 Akka创建Actor

  • Master.scala
package com._xjk.rpc

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

class Master extends Actor {
  /*
  * 接收消息
  * */
  override def receive: Receive = {
    case "hi" => {
      println("hi")
    }
    case "hello" => {
      println("hello")
    }
  }
}


//
object Master {
  def main(args: Array[String]): Unit = {
    // 报错:Error:scalac:bad option: "-make:transitive"
    // 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
    val host = args(0)
    val port = args(1).toInt
    // 字符串传入配置信息
    val confStr =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname="$host"
        |akka.remote.netty.tcp.port="$port"
      """.stripMargin
    val conf = ConfigFactory.parseString(confStr)
    // 创建一个 ActorSystem(创建管理Actor,单例)
    val actorSystem = ActorSystem("Master-Actor-System", conf)
    // 通过ActorSystem 创建Actor
    val master = actorSystem.actorOf(Props[Master], "Master-Acotory")
    // master给自己发送消息
    // ! 表示异步发送
    master ! "hello"
  }
}
  • 配置Master.scala 的Program arguments,运行即可。
localhost 8888

1.7 Akka 中Master和Worker建立连接

  • Akka 中Master和Worker建立连接 并发送一条消息
package com._xjk.rpc

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class Worker extends Actor{
  var masterRef: ActorSelection = _
  // 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
  override def preStart(): Unit = {
    // 跟Master 建立连接, 拿到Master代理对象
    // 此处指定Master地址是akka的通信协议,是长链接,书写方式: 地址/user/Master名字
    masterRef = context.actorSelection("akka.tcp://Master-Actor-System@localhost:8888/user/Master-Actor")
    // 代理对象发送消息
    masterRef ! "hello"
  }
  // masterRef 为 代理对象,并不是 真正Master的引用,因为Master和Worker不在同一个进程中,Worker实际拿到是Master代理对象。
  override def receive: Receive = {
    case "hello" => {
      println("recv hello!")
    }
  }
}


object Worker {
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    // 字符串传入配置信息
    val confStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname="$host"
         |akka.remote.netty.tcp.port="$port"
      """.stripMargin
      val config: Config = ConfigFactory.parseString(confStr)
      // 创建ActorSystem
      val actorSystem = ActorSystem.apply("Worker-Actor-System", config)
      // 创建Actor, Props指定类型
      val worker = actorSystem.actorOf(Props[Worker], "Worker-Actor")
//      worker ! "hello"
    }
}
  • Worker将注册信息发送Master,定义case class里面放置信息。
// 注册worker信息
case class RegisterWorker(workerId:String, memory:Int, cores:Int)



// Worker发送:
val WORKER_ID = UUID.randomUUID().toString
// 向Master发送注册消息: WorkerId,内存,核数等
masterRef ! RegisterWorker(WORKER_ID, 10240, 24)

// Master接收消息并打印:
class Master extends Actor {
  /*
  * 接收消息
  * */
  override def receive: Receive = {
    case RegisterWorker(workerId, memory, cores) => {
      println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
    }
  }
}

  • Master保存注册信息,并返回注册成功消息
// Master.scala
class Master extends Actor {
  val id2Worker = new mutable.HashMap[String, WorkerInfo]()
  val workers = new mutable.HashSet[WorkerInfo]()
  /*
  * 接收消息
  * */
  override def receive: Receive = {
    case RegisterWorker(workerId, memory, cores) => {
      println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
      val workerInfo = new WorkerInfo(workerId,memory,cores)
      // 保存Worker信息到Map
//      id2Worker += ((workerId, workerInfo))
//      id2Worker += (workerId -> workerInfo)
      id2Worker(workerId) = workerInfo
      // 保存Worker信息到Set中
      workers += workerInfo
      // 向Worker反馈成功的消息,获取消息发送者的引用(代理)
      // 消息接收者,可以通过sender方法获取消息发送者的连接
      sender() ! RegisterdWorker
    }
  }
}

// Worker.scala
case RegisterdWorker => {
      println("worker 接收到注册成功消息!")
    }
// WorkerInfo.scala
package com._xjk.rpc

class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {

}
// Messages
package com._xjk.rpc
// 注册worker信息
case class RegisterWorker(workerId:String, memory:Int, cores:Int)
// Master向 Worker发送注册成功消息
case object RegisterdWorker
  • Worker定期向Master发送心跳。

2.完整Akka练习

  • Master和Worker建立连接,Worker向Master发送注册消息,Master保存注册信息并返回注册成功消息,Worker定期向Master发送心跳,Master定期检测超时Worker并移除。
  • Master.scala
package com._xjk.rpc

import java.util.Date

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._

import scala.collection.mutable

class Master extends Actor {
  val id2Worker = new mutable.HashMap[String, WorkerInfo]()
  val workers = new mutable.HashSet[WorkerInfo]()
  // 检测时间
  val CHECK_INTERVAL = 15000
  // 启动定时器,定期检测超时Worker
  override def preStart(): Unit = {
    // 导入隐式转换
    import context.dispatcher
    // 启动定时器
    context.system.scheduler.schedule(
      0 millis,
      CHECK_INTERVAL millis,
      self,
      CheckTimeoutWorker
    )
  }

  /*
  * 接收消息
  * */
  override def receive: Receive = {
    case RegisterWorker(workerId, memory, cores) => {
      println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
      val workerInfo = new WorkerInfo(workerId,memory,cores)
      // 保存Worker信息到Map
//      id2Worker += ((workerId, workerInfo))
//      id2Worker += (workerId -> workerInfo)
      id2Worker(workerId) = workerInfo
      // 保存Worker信息到Set中
      workers += workerInfo
      // 向Worker反馈成功的消息,获取消息发送者的引用(代理)
      // 消息接收者,可以通过sender方法获取消息发送者的连接
      sender() ! RegisterdWorker
    }
    // Worker发送给Master的心跳消息
    case Heartbeat(workerId) => {
      println(new Date() + "," + workerId)
      // 根据workerId取出对应worker的Info
      val info = id2Worker(workerId)
      // 获取当前时间:
      val currentTime = System.currentTimeMillis()
      // 更新workerInfo 的心跳时间
      info.lastHeartbeatTime = currentTime

    }
    // 发给自己消息,用于检查Worker超时消息
    case CheckTimeoutWorker => {
      // 获取当前时间:
      var currentTime = System.currentTimeMillis()
      // 过滤超时workers
      val deadWorkers = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)
      // 删除出现问题的worker
//      for (workerInfo <- deadWorkers) {
//        val workerId= workerInfo.workerId
//        // 从map中删除
//        id2Worker -= workerId
//        // 从set中删除
//        workers -= workerInfo
//      }
      deadWorkers.foreach(workerInfo => {
        val workerId= workerInfo.workerId
        // 从map中删除
        id2Worker -= workerId
        // 从set中删除
        workers -= workerInfo
        println(s"当前worker的个数为:${id2Worker.size}")
      })
    }
  }
}


//
object Master {

  val MASTER_ACTOR_SYSTEM = "Master-Actor-System"
  val MASTER_ACTOR_NAME = "Master-Actor"

  def main(args: Array[String]): Unit = {
    // 报错:Error:scalac:bad option: "-make:transitive"
    // 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
    val host = args(0)
    val port = args(1).toInt
    // 字符串传入配置信息
    val confStr =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname="$host"
        |akka.remote.netty.tcp.port="$port"
      """.stripMargin
    val conf = ConfigFactory.parseString(confStr)
    // 创建一个 ActorSystem(创建管理Actor,单例)
    val actorSystem = ActorSystem(MASTER_ACTOR_SYSTEM, conf)
    // 通过ActorSystem 创建Actor
    val master = actorSystem.actorOf(Props[Master], MASTER_ACTOR_NAME)
    // master给自己发送消息
    // ! 表示异步发送
//    master ! "hello"
  }
}
  • Messages
package com._xjk.rpc


// 注册worker信息
case class RegisterWorker(workerId:String, memory:Int, cores:Int)

// Master向 Worker发送注册成功消息
case object RegisterdWorker

// Worker发给Master心跳消息
case class Heartbeat(workerId:String)

// Worker发送给自己的消息
case object  SendHeartbeat


// Master发送给自己检测Worker超时消息
case object CheckTimeoutWorker
  • Worker.scala
package com._xjk.rpc

import java.util.UUID
// 导入时间单位
import scala.concurrent.duration._
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class Worker (var masterHost:String, var masterPort:Int, var memory:Int, var cores:Int) extends Actor{
  var masterRef: ActorSelection = _
  // worker 的 ID
  val WORKER_ID = UUID.randomUUID().toString
  var HEARTBEAT_INTERVAL = 10000
  // 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
  override def preStart(): Unit = {

    // 跟Master 建立连接, 拿到Master代理对象
    masterRef = context.actorSelection(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_ACTOR_NAME}")
    // 代理对象发送消息
//    masterRef ! "hello"
    // 向Master发送注册消息: WorkerId,内存,核数等
    masterRef ! RegisterWorker(WORKER_ID, memory, cores)
  }

  override def receive: Receive = {
    case "hello" => {
      println("recv hello!")
    }
    // Master返回给Worker注册成功的消息
    case RegisterdWorker => {
      println("worker 接收到注册成功消息!")
      // 导入隐式转换
      import context.dispatcher
      // 上下文中定义延时0秒,每10秒给自己发送消息。self 表示发送给自己。
      context.system.scheduler.schedule(
        0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat
      )
    }
      // 接收自己发送消息
    case SendHeartbeat => {
      // 判断是否断开连接
      // 判断是否Master发生变化
      // 将心跳消息发送给master
      masterRef ! Heartbeat(WORKER_ID)
    }
  }
}


object Worker {
  val WORKER_ACTOR_SYSTEM = "Worker-Actor-System"
  val WORKER_ACTOR_NAME = "Worker-Actor"
  def main(args: Array[String]): Unit = {
    val masterHost = args(0)
    val masterHort = args(1).toInt

    val workerHost = args(2)
    val workerPort = args(3).toInt

    val workerMemory = args(4).toInt
    val workerCores = args(5).toInt



    // 字符串传入配置信息
    val confStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname="$workerHost"
         |akka.remote.netty.tcp.port="$workerPort"
      """.stripMargin
      val config: Config = ConfigFactory.parseString(confStr)
      // 创建ActorSystem
      val actorSystem = ActorSystem.apply(WORKER_ACTOR_SYSTEM, config)
      // 创建Actor, Props指定类型
      val worker = actorSystem.actorOf(Props(new Worker(masterHost, masterHort, workerMemory, workerCores)), WORKER_ACTOR_NAME)
//      worker ! "hello"
    }
}
  • WorkerInfo.scala
package com._xjk.rpc

class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {
  var lastHeartbeatTime: Long = _
}

  • 通过maven打jar包.Lifecycle->package.(注意路径有中文会有错误)

  • 运行Master

java -jar .AkkaRPC-Master-1.0.jar 127.0.0.1 8888
  • 运行Worker
java -jar .AkkaRPC-Worker-1.0.jar 127.0.0.1 8888 127.0.0.1 9999 1024 2
原文地址:https://www.cnblogs.com/xujunkai/p/14413186.html