Scala并发编程【进阶】

 1 package com.dingxin.entrance
 2 
 3 import java.text.SimpleDateFormat
 4 import java.util.Date
 5 
 6 import scala.actors.Actor
 7 import scala.actors.Actor._
 8 /**
 9   * Created by zhen on 2019/1/24.
10   */
11 object My_Actor_Receive extends Actor{
12   def act(){
13     while(true){
14       receive{
15         case str : String => print(str + " ") // 模式匹配
16         case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
17         case _ => println("My heart will go on !")
18       }
19     }
20   }
21 }
22 object Actor_Receive {
23   def main(args: Array[String]) {
24     val getMessage = actor{
25       while(true){
26         receive{
27           case str : String => print(str) // 模式匹配
28           case dat : Date => println(new SimpleDateFormat("yyyy").format(dat))
29           case _ => My_Actor_Receive ! null // 消息转发
30         }
31       }
32     }
33     val sendMessage = actor{
34       while(true){
35         receive{
36           case str : String => getMessage ! str + " " // 消息转发
37           case dat : Date => getMessage ! dat
38           case _ => getMessage ! null
39         }
40       }
41     }
42     sendMessage ! "Scala"
43     sendMessage ! new Date()
44     sendMessage ! 2020
45 
46     // 这种方式必须执行start开启,且都是并行执行,不确定先后顺序
47     My_Actor_Receive.start()
48     My_Actor_Receive ! "Spark"
49   }
50 }

结果1:

  

结果2:

  

 信息交互

 1 package big.data.analyse.scala
 2 
 3 import scala.actors.Actor
 4 import scala.actors.Actor._
 5 /**
 6   * 消息发送与接收,可用于流计算测试的输入
 7   * Created by zhen on 2018/4/15.
 8   */
 9 object ActorTest {
10   def main(args: Array[String]) {
11     val actor = new HelloActor
12     actor.start//启动actor消息机制
13     var counter = 0
14     while(counter<10){
15       actor ! "Step " + counter //发送消息
16       counter += 1
17       Thread.sleep(2000)
18       self.receive{case msg => println("返回结果:"+msg)} // 获取子线程的消息
19     }
20   }
21 }
22 class HelloActor extends Actor{
23   def act(): Unit ={
24     while(true){
25       receive{
26         case content : String => println("Message : " + content)
27           sender ! content.split(" ")(1) // 向主线程发送消息
28       }
29     }
30   }
31 }

结果3:

  

loop+react

 1 package big.data.analyse.scala.actor
 2 
 3 import java.net.{UnknownHostException, InetAddress}
 4 import scala.actors.Actor
 5 import scala.actors.Actor._
 6 
 7 /**
 8   * Created by zhen on 2019/6/19.
 9   */
10 object NameResolver extends Actor{
11   def act(){
12     loop {
13       react {
14         case Net (name, actor) => actor ! getIp(name)
15         case msg => println("Unhandled message : " + msg)
16       }
17     }
18   }
19   def getIp(name : String) : Option[InetAddress] = {
20     try{
21       println(name)
22       Some(InetAddress.getByName(name))
23     } catch {
24       case _ : UnknownHostException => None
25     }
26   }
27 }
28 
29 case class Net(name : String, actor: Actor)
30 
31 object Actor_More_Effective {
32   def main(args: Array[String]) {
33     NameResolver.start
34     NameResolver ! Net("www.baidu.com", self)
35     NameResolver ! "www.xiaomi.com"
36 
37     for(i <- 1 until 10){
38       NameResolver ! "小米" + i
39     }
40     println(self.receiveWithin(1000){case x => x})
41   }
42 }

结果4:

  

 Actor详解

  1.Actor是一个通信模型,Scala提供了Actor的实现

  2.Spark1.6之前集群节点之间通信使用的是Akka,Akka底层是Actor实现的。Spark1.6之后,节点的通信变成Netty

  3.Actor相当于我们理解的Thread,Actor的出现主要解决的是代码锁的问题

  4.Actor底层通信实现用到了模式匹配

原文地址:https://www.cnblogs.com/yszd/p/10313578.html