在学习同步之前,我们先用多线程实现一个简单的功能,看看并发会出现什么问题?
我们创建一个共享的字节数组对象和一个子线程Worker,同时向这个字节数组对象写入数据,主线程写入1到10这10个int数据,子线程写入11到20这10个int数据;然后我们将这个字节数组的所有内容打印出来。
这里存在一个问题,由于我们无法准确的控制子线程开始运行的时间,所以要做到两个线程同时向这个字节数组对象写入数据是件很困难的事,如果不做处理,我们根本看不出任何并发导致的问题;因为写入10个int数据对处理器来说太简单了,一瞬间即可完成;所以我们更可能看见打印出的内容是1到20(主线程先运行),或者看到11到20之后跟着1到10(子线程先运行)。
为了能看见并发导致的同步问题,我们需要加入一个特别耗时的方法,同时将一口气写入10个数字改为,先写入5个数字后调用耗时方法,然后再写入后5个数字,这样处理器极可能在耗时方法运行中切换另一个线程运行,这样我们就能看出并发可能出现的问题。
新建一个UnsafeThreadDemo项目:
文档类UnsafeThreadDemo源码:
1 package 2 { 3 import flash.display.Sprite; 4 import flash.events.Event; 5 import flash.system.Worker; 6 import flash.system.WorkerDomain; 7 import flash.system.WorkerState; 8 import flash.utils.ByteArray; 9 import flash.utils.getTimer; 10 import flash.utils.setTimeout; 11 12 public class UnsafeThreadDemo extends Sprite 13 { 14 //共享字节数组 15 private var _shareBytes:ByteArray; 16 //子线程 17 private var _myWorker:Worker; 18 19 public function UnsafeThreadDemo() 20 { 21 //创建共享字节数组 22 _shareBytes = new ByteArray(); 23 _shareBytes.shareable = true; 24 25 //创建子线程并运行 26 _myWorker = WorkerDomain.current.createWorker(Workers.MyWorker); 27 _myWorker.addEventListener(Event.WORKER_STATE, workerStateHandler); 28 _myWorker.setSharedProperty("shareBytes", _shareBytes); 29 _myWorker.start(); 30 } 31 32 private function workerStateHandler(event:Event):void 33 { 34 //子线程创建并开始运行后 35 if(_myWorker.state == WorkerState.RUNNING) 36 { 37 //开始向共享字节数组写入数据 38 writeNumberToBytes(); 39 //3秒后显示共享字节数组的内容 40 setTimeout(showBytes, 3000); 41 } 42 } 43 44 /** 45 * 本方法为线程不安全方法, 这里我们希望向共享字节数组中写入1到10这10个连续的数字. 46 */ 47 private function writeNumberToBytes():void 48 { 49 writeNumberToBytes1(); 50 /* 51 * 模拟一个特别耗时的计算方法, 这样不会导致 writeNumberToBytes 方法一下 52 * 子就运行结束, 子线程则会在这个耗时的方法运行中某一时刻开始运行. 53 */ 54 largeComputationalCost(); 55 writeNumberToBytes2(); 56 } 57 58 private function writeNumberToBytes1():void 59 { 60 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 61 _shareBytes.position = _shareBytes.length; 62 63 for(var i:int = 1; i < 6; i++) 64 { 65 _shareBytes.writeInt(i); 66 } 67 } 68 69 private function writeNumberToBytes2():void 70 { 71 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 72 _shareBytes.position = _shareBytes.length; 73 74 for(var i:int = 6; i < 11; i++) 75 { 76 _shareBytes.writeInt(i); 77 } 78 } 79 80 /** 81 * 如果没有看到数字被截断说明你电脑性能太好了! 把 10000000 改得更大一点吧. 82 */ 83 private function largeComputationalCost():void 84 { 85 var time:int = getTimer(); 86 for(var i:int = 0; i < 10000000;) 87 i++; 88 trace("巨大的运算耗时:", getTimer() - time, "毫秒"); 89 } 90 91 private function showBytes():void 92 { 93 _shareBytes.position = 0; 94 var result:String = ""; 95 while(_shareBytes.bytesAvailable >= 4) 96 { 97 result += _shareBytes.readInt() + ", "; 98 } 99 trace(result); 100 } 101 } 102 }
子线程MyWorker源码:
1 package 2 { 3 import flash.display.Sprite; 4 import flash.system.Worker; 5 import flash.utils.ByteArray; 6 import flash.utils.getTimer; 7 8 public class MyWorker extends Sprite 9 { 10 //共享字节数组 11 private var _shareBytes:ByteArray; 12 13 public function MyWorker() 14 { 15 //获取共享字节数组的引用 16 _shareBytes = Worker.current.getSharedProperty("shareBytes") as ByteArray; 17 //开始写入数据 18 writeNumberToBytes(); 19 } 20 21 /** 22 * 本方法为线程不安全方法, 这里时不管三七二十一, 直接就开始向共享字节数组写数据. 23 */ 24 private function writeNumberToBytes():void 25 { 26 writeNumberToBytes1(); 27 /* 28 * 模拟一个特别耗时的计算方法, 这样不会导致 writeNumberToBytes 方法一下 29 * 子就运行结束, 主线程则会在这个耗时的方法运行中某一时刻开始运行. 30 */ 31 largeComputationalCost(); 32 writeNumberToBytes2(); 33 } 34 35 private function writeNumberToBytes1():void 36 { 37 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 38 _shareBytes.position = _shareBytes.length; 39 40 for(var i:int = 11; i < 16; i++) 41 { 42 _shareBytes.writeInt(i); 43 } 44 } 45 46 private function writeNumberToBytes2():void 47 { 48 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 49 _shareBytes.position = _shareBytes.length; 50 51 for(var i:int = 16; i < 21; i++) 52 { 53 _shareBytes.writeInt(i); 54 } 55 } 56 57 /** 58 * 如果没有看到数字被截断说明你电脑性能太好了! 把 10000000 改得更大一点吧. 59 */ 60 private function largeComputationalCost():void 61 { 62 var time:int = getTimer(); 63 for(var i:int = 0; i < 10000000;) 64 i++; 65 trace("巨大的运算耗时:", getTimer() - time, "毫秒"); 66 } 67 } 68 }
我们运行该程序,3秒后会输出字节数组的内容,我电脑上的输出是:
1 巨大的运算耗时: 846 毫秒 2 巨大的运算耗时: 872 毫秒 3 1, 2, 3, 4, 5, 11, 12, 13, 14, 15, 6, 7, 8, 9, 10, 16, 17, 18, 19, 20,
问题出来了,我们看见这里是主线程先执行writeNumberToBytes方法,但是当执行到耗时方法largeComputationalCost时,子线程也开始运行,于是出现了字节数组先写入了1到5后,变成子线程的11到15,由于子线程也有一个largeComputationalCost方法,所以之后又是主线程的6到10,最后才是子线程的16到20。
这并不是我们期望的结果,我们希望writeNumberToBytes方法不会被其他线程的writeNumberToBytes方法方法打断(注意可以被其他没有操作共享数据的线程打断)那么该怎么办呢?
认识Mutex类
在flashplayer11.5中Adobe为我们提供了一个新的包:flash.concurrent。其在帮助文档的介绍为:flash.concurrent 包中包含用于支持 Flash Platform 应用程序中的并发的类。
而在这个包中则提供了两个新的类:
1.Condition:Condition 对象是 worker 之间实现资源共享的工具,它还可以暂停执行,直到满足特定的条件。
2.Mutex:Mutex("mutual exclusion"(互斥)的简写)类提供一种方式,可以确保同一时间只有一组代码在对特定内存块或其他共享资源进行操作。
而我们要实现as3中的同步,就需要使用到Mutex类,至于Condition类,我们可以理解为扩展了Mutex类功能的扩展类,我们会在后面讨论。
如果看官你学过Java,我可以这么告诉你,AS3的Mutex类就是Java的ReentrantLock类,呵呵,相信你不用继续向下看了,如果不熟悉Java的话就继续吧;
Mutex类可以调用lock方法锁定一段代码,如果其它线程的该Mutex类(还记得准备篇里说的么:Mutex在线程间是引用传递哦)也调用了lock方法则会判断该Mutex类是否已经在别的地方锁定了代码,如果锁定了该线程的执行就会暂停直到被锁定的Mutex对象调用unlock方法进行解锁后才会执行;那么我们使用Mutex锁定代码应该是这样的:
1 var _mutex:Mutex = new Mutex(); 2 _mutex.lock(); 3 //被锁定的代码写在这里 4 _mutex.unlock();
但是如果被锁定的代码抛出了异常导致我们的unlock方法没有被调用则会导致Mutex对象一直处于锁定状态,所以为了保证不出现任何的问题,正确的写法应该如下:
1 var _mutex:Mutex = new Mutex(); 2 _mutex.lock(); 3 try 4 { 5 //被锁定的代码写在这里 6 } 7 finally 8 { 9 _mutex.unlock(); 10 }
下面我们用Mutex类对之前的项目进行修改,我们创建一个名为SafeThreadDemo新项目:
文档类SafeThreadDemo源码:
1 package 2 { 3 import flash.concurrent.Mutex; 4 import flash.display.Sprite; 5 import flash.events.Event; 6 import flash.system.Worker; 7 import flash.system.WorkerDomain; 8 import flash.system.WorkerState; 9 import flash.utils.ByteArray; 10 import flash.utils.getTimer; 11 import flash.utils.setTimeout; 12 13 public class SafeThreadDemo extends Sprite 14 { 15 //互斥对象 16 private var _mutex:Mutex; 17 //共享字节数组 18 private var _shareBytes:ByteArray; 19 //子线程 20 private var _myWorker:Worker; 21 22 public function SafeThreadDemo() 23 { 24 //创建互斥对象 25 _mutex = new Mutex(); 26 27 //创建共享字节数组 28 _shareBytes = new ByteArray(); 29 _shareBytes.shareable = true; 30 31 //创建子线程并运行 32 _myWorker = WorkerDomain.current.createWorker(Workers.MyWorker); 33 _myWorker.addEventListener(Event.WORKER_STATE, workerStateHandler); 34 _myWorker.setSharedProperty("mutex", _mutex); 35 _myWorker.setSharedProperty("shareBytes", _shareBytes); 36 _myWorker.start(); 37 } 38 39 private function workerStateHandler(event:Event):void 40 { 41 //子线程创建并开始运行后 42 if(_myWorker.state == WorkerState.RUNNING) 43 { 44 //开始向共享字节数组写入数据 45 writeNumberToBytes(); 46 //3秒后显示共享字节数组的内容 47 setTimeout(showBytes, 3000); 48 } 49 } 50 51 /** 52 * 本方法为线程安全方法, 因为我们加入了互斥对象. 53 */ 54 private function writeNumberToBytes():void 55 { 56 /* 57 * 锁定该互斥对象, 其他线程再调用本互斥对象的 lock 方法时如果该互斥方法 58 * 还没有解锁则会阻塞线程等待解锁后才会继续运行. 59 */ 60 _mutex.lock(); 61 try 62 { 63 writeNumberToBytes1(); 64 /* 65 * 模拟一个特别耗时的计算方法, 这样不会导致 writeNumberToBytes 方法一下 66 * 子就运行结束, 子线程则会在这个耗时的方法运行中某一时刻开始运行. 67 */ 68 largeComputationalCost(); 69 writeNumberToBytes2(); 70 } 71 finally 72 { 73 /* 74 * 解锁互斥对象, 注意要使用 try finally 关键字, 避免由于抛出异常而 75 * 未能解锁的情况发生. 76 */ 77 _mutex.unlock(); 78 } 79 } 80 81 private function writeNumberToBytes1():void 82 { 83 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 84 _shareBytes.position = _shareBytes.length; 85 86 for(var i:int = 1; i < 6; i++) 87 { 88 _shareBytes.writeInt(i); 89 } 90 } 91 92 private function writeNumberToBytes2():void 93 { 94 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 95 _shareBytes.position = _shareBytes.length; 96 97 for(var i:int = 6; i < 11; i++) 98 { 99 _shareBytes.writeInt(i); 100 } 101 } 102 103 private function largeComputationalCost():void 104 { 105 var time:int = getTimer(); 106 for(var i:int = 0; i < 10000000;) 107 i++; 108 trace("巨大的运算耗时:", getTimer() - time, "毫秒"); 109 } 110 111 private function showBytes():void 112 { 113 _shareBytes.position = 0; 114 var result:String = ""; 115 while(_shareBytes.bytesAvailable >= 4) 116 { 117 result += _shareBytes.readInt() + ", "; 118 } 119 trace(result); 120 } 121 } 122 }
子线程MyWorker源码:
1 package 2 { 3 import flash.concurrent.Mutex; 4 import flash.display.Sprite; 5 import flash.system.Worker; 6 import flash.utils.ByteArray; 7 import flash.utils.getTimer; 8 9 public class MyWorker extends Sprite 10 { 11 //互斥对象 12 private var _mutex:Mutex; 13 //共享字节数组 14 private var _shareBytes:ByteArray; 15 16 public function MyWorker() 17 { 18 //获取互斥对象 19 _mutex = Worker.current.getSharedProperty("mutex") as Mutex; 20 //获取共享字节数组的引用 21 _shareBytes = Worker.current.getSharedProperty("shareBytes") as ByteArray; 22 //开始写入数据 23 writeNumberToBytes(); 24 } 25 26 /** 27 * 本方法为线程安全方法, 因为我们加入了互斥对象. 28 */ 29 private function writeNumberToBytes():void 30 { 31 //如果已被其他线程锁定则这里的线程阻塞, 等待解锁后才会继续执行 32 _mutex.lock(); 33 try 34 { 35 writeNumberToBytes1(); 36 /* 37 * 模拟一个特别耗时的计算方法, 这样不会导致 writeNumberToBytes 方法一下 38 * 子就运行结束, 主线程则会在这个耗时的方法运行中某一时刻开始运行. 39 */ 40 largeComputationalCost(); 41 writeNumberToBytes2(); 42 } 43 finally 44 { 45 _mutex.unlock(); 46 } 47 } 48 49 private function writeNumberToBytes1():void 50 { 51 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 52 _shareBytes.position = _shareBytes.length; 53 54 for(var i:int = 11; i < 16; i++) 55 { 56 _shareBytes.writeInt(i); 57 } 58 } 59 60 private function writeNumberToBytes2():void 61 { 62 //共享字节数组会被其他线程修改, 所以保证数据会添加到最后 63 _shareBytes.position = _shareBytes.length; 64 65 for(var i:int = 16; i < 21; i++) 66 { 67 _shareBytes.writeInt(i); 68 } 69 } 70 71 private function largeComputationalCost():void 72 { 73 var time:int = getTimer(); 74 for(var i:int = 0; i < 10000000;) 75 i++; 76 trace("巨大的运算耗时:", getTimer() - time, "毫秒"); 77 } 78 } 79 }
我们运行该程序,3秒后会输出字节数组的内容,我电脑上的输出是:
1 巨大的运算耗时: 897 毫秒 2 巨大的运算耗时: 813 毫秒 3 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
我们看见这次是子线程先执行writeNumberToBytes方法,并且两个线程的writeNumberToBytes方法不会同时执行,只能一个执行完成后再执行另一个。
工程源码的下载地址: