C#多线程之解决多线程编程中大并发数等待唤醒的问题

在移动交通流调查项目的一个算法分析程序中,碰到一个业务问题:用户采集上传的基站定位数据需要进行分析预处理,方案是先按预定格式解析文件并从中 提取出成百上千个基站定位数据记录,并合并相同的基站点,根据获取到的基站位置信息作为参数,去请求google 基站定位 api,从而得到对应的基站定位经纬度等信息,接下来再加上华工的算法分析。

      在执行华工算法分析逻辑之前,调用谷歌api这一步必需全部完成;网络请求是个耗时的过程,故对每一个请求开启单独的线程(同时请求可能数百个,这里通过Semaphore信号量来控制每次发出请求的最大数,该部分的讨论不再本话题之类)。

      问题出来了,那么如何知道所有的网络请求全部完成了,可以进行下一步算法分析呢?答案是利用前面讲的ManualResetEvent来处理;于是有下面的写法

 
1 //针对每个线程 绑定初始化一个ManualResetEvent实例
2 ManualResetEvent doneEvent = new ManualResetEvent(false);
3 //通过ThreadPool.QueueUserWorkItem(网络请求方法HttpRequest,doneEvent ) 来开启多线程
4   
5 //将等待事件一一加入事件列表
 
01 List<ManualResetEvent> listEvent = new List<ManualResetEvent>(); 
02 for(int i=0;i<请求线程数;i++){
03         listEvent.Add(doneEvent);
04 }
05   
06 //主线程等待网络请求全部完成
07 WaitHandle.WaitAll(listEvent.ToArray());
08 //....接下去的算法分析
09   
10   
11 //在网络请求方法HttpRequest的子线程中调用
12 doneEvent.Set();//通知主线程 本网络请求已经完成

运行好像没有问题,程序按原定计划执行;但是当线程数大于64个之后抛出异常

WaitHandles must be less than or equal to 64

原来WaitHandle.WaitAll(listEvent.ToArray()); 这里listEvent线程数不能超过64个

以前解决方法:

下面是吴建飞以前的方案:既然WaitHandle.WaitAll方法只能唤醒64个ManualResetEvent对象,那么就采用

 
1 List<List<ManualResetEvent>> _listLocEventList = new List<List<ManualResetEvent>>();

采用这种复杂集合;集合的每个元素也是一个集合(内部每个集合包含最大64个ManualResetEvent对象);和上面一样 把每个线程相关的ManualResetEvent对象添加到该集合;

//主线程等待网络请求全部完成

 
1 foreach (List<ManualResetEvent> listEvent in _listLocEventList)
2 {
3                 WaitHandle.WaitAll(listEvent.ToArray());
4 }

该方案运用起来比较复杂,而且会导致创建大量的ManualResetEvent对象;

现在的设计目标是这种对文件的分析是多任务同时进行的,也就是说会产生的ManualResetEvent对象List<List<ManualResetEvent>>.Size() * 任务数(N个文件上传)

改进的解决方法:

原理:封装一个ManualResetEvent对象,一个计数器current,提供SetOne和WaitAll方法;

主线程调用WaitAll方法使ManualResetEvent对象等待唤醒信号;

各个子线程调用setOne方法 ,setOne每执行一次current减1,直到current等于0时表示所有子线程执行完毕 ,调用ManualResetEvent的set方法,这时主线程可以执行WaitAll之后的步骤。

目标:减少ManualResetEvent对象的大量产生和使用的简单性。

在这里我写了个封装类:

 
01 /********************************************************************************
02  * Copyright © 2001 - 2010Comit. All Rights Reserved.
03  * 文件:MutipleThreadResetEvent.cs
04  * 作者:杨柳
05  * 日期:2010年11月13日
06  * 描述:封装 ManualResetEvent ,该类允许一次等待N(N>64)个事件执行完毕
07  
08  *       解决问题:WaitHandle.WaitAll(evetlist)方法最大只能等待64个ManualResetEvent事件
09  * *********************************************************************************/
10 using System;
11 using System.Collections.Generic;
12 using System.Linq;
13 using System.Text;
14 using System.Threading;
15   
16 namespace TestMutipleThreadRestEvent
17 {
18     /// <summary>
19     ///  封装ManualResetEvent
20     /// </summary>
21     public class MutipleThreadResetEvent : IDisposable
22     {
23         private readonly ManualResetEvent done;
24         private readonly int total;
25         private long current;
26   
27         /// <summary>
28         /// 构造函数
29         /// </summary>
30         /// <param name="total">需要等待执行的线程总数</param>
31         public MutipleThreadResetEvent(int total)
32         {
33             this.total = total;
34             current = total;
35             done = new ManualResetEvent(false);
36         }
37   
38         /// <summary>
39         /// 唤醒一个等待的线程
40         /// </summary>
41         public void SetOne()
42         {
43             // Interlocked 原子操作类 ,此处将计数器减1
44             if (Interlocked.Decrement(ref current) == 0)
45             {
46                 //当所以等待线程执行完毕时,唤醒等待的线程
47                 done.Set();
48             }
49         }
50   
51         /// <summary>
52         /// 等待所以线程执行完毕
53         /// </summary>
54         public void WaitAll()
55         {
56             done.WaitOne();
57         }
58   
59         /// <summary>
60         /// 释放对象占用的空间
61         /// </summary>
62         public void Dispose()
63         {
64             ((IDisposable)done).Dispose();
65         }
66     
67   
68 }

注释写的很清楚了:本质就是只通过1个ManualResetEvent 对象就可以实现同步N(N可以大于64)个线程

下面是测试用例:

 
01 using System;
02 using System.Collections.Generic;
03 using System.Linq;
04 using System.Text;
05 using System.Threading;
06   
07 namespace TestMutipleThreadRestEvent
08 {
09     /// <summary>
10     /// 测试MutipleThreadResetEvent
11     /// </summary>
12     class Program
13     {
14         static int i = 0;
15   
16         /// <summary>
17         /// 主方法
18         /// </summary>
19         /// <param name="args">参数</param>
20         static void Main(string[] args)
21         {
22             //假设有100个请求线程
23             int num = 100;
24   
25             //使用 MutipleThreadResetEvent
26             using (var countdown = new MutipleThreadResetEvent(num))
27             {
28                 for (int i=0;i<num;i++)
29                 {
30                     //开启N个线程,传递MutipleThreadResetEvent对象给子线程
31                     ThreadPool.QueueUserWorkItem(MyHttpRequest, countdown);
32                 }
33   
34                 //等待所有线程执行完毕
35                 countdown.WaitAll();
36             }
37   
38             Console.WriteLine("所有的网络请求以及完毕,可以继续下面的分析...");
39             Console.ReadKey();
40         }
41   
42         /// <summary>
43         /// 假设的网络请求
44         /// </summary>
45         /// <param name="state">参数</param>
46         private static void MyHttpRequest(object state)
47         {
48            // Thread.Sleep(1000);
49             Console.WriteLine(String.Format("哈哈:{0}",++i));
50   
51             MutipleThreadResetEvent countdown = state as MutipleThreadResetEvent;
52             //发送信号量 本线程执行完毕
53             countdown.SetOne();
54         }
55     }
56 }

输出:

      …  省略 ...  

从结果上看线程执行的完成的时间顺序是不固定的;并且只有在所有100个网络请求任务完成后,才显示可以继续下面的分析。

与上面的方案是一样的效果,但是本方案使用非常简单,出错的概念小,免去了创建大量 ManualResetEvent 对象的烦恼

该解决方案可以适用与.net framework 2.0 以上的运行时。

tips:在.net framework 4.0 中有一个CountdownEvent对象可以实现类似的功能;

          不过目前公司大多数项目运行时还是基于.net framework 2.0 和 3.5

原文地址:https://www.cnblogs.com/xyqCreator/p/2854467.html