从无到有实现.net协程(二)

上一篇实现了最基本的协程框架,但缺点是只有一个线程在执行协程(虽然nodejs也是单线程,早期的go也是),对于当前的多核CPU来说,不充分利用多核资源,是很浪费的。之前也说过,协程最大的应用场景是在web程序里面,因此,在这一篇文章里,将会实现一个简单的web服务器,运行在多线程协程上面,

首先,需要重新实现一个基于多线程的协程容器

  1     /// <summary>
  2     /// 基于多线程的协程容器实现
  3     /// </summary>
  4     public class CoroutineContainerMultiple : ICoroutineContainer
  5     {
  6 
  7         /// <summary>
  8         /// 存储多线程协程单元项
  9         /// </summary>
 10         private List<MultipleItem> _multipleItems = new List<MultipleItem>();
 11 
 12         /// <summary>
 13         /// 设定的线程数
 14         /// </summary>
 15         private int _threadCount;
 16         /// <summary>
 17         /// 错误处理
 18         /// </summary>
 19         private Action<ICoroutineUnit, Exception> _errorHandle;
 20 
 21         public CoroutineContainerMultiple(int threadCount, Action<ICoroutineUnit, Exception> errorHandle)
 22         {
 23             _threadCount = threadCount;
 24             _errorHandle = errorHandle;
 25         }
 26         public void Register(ICoroutineUnit unit)
 27         {
 28             //随机分摊请求到不同的协程处理线程
 29             Random random = new Random();
 30             var index=random.Next(0, _threadCount);
 31 
 32             lock (_multipleItems[index].AddUnits)
 33             {
 34                 _multipleItems[index].AddUnits.Add(new UnitItem() { Unit = (HttpAction)unit, UnitResult = null });
 35             }
 36 
 37         }
 38 
 39         public void Run()
 40         {
 41             ///根据设定的线程数启动协程处理线程
 42             for(var index=0;index<=_threadCount-1;index++)
 43             {
 44                 var multipleItem = new MultipleItem() { Units = new List<UnitItem>(), AddUnits = new List<UnitItem>() };
 45                 _multipleItems.Add(multipleItem);
 46 
 47                 Task.Run(() =>
 48                 {
 49                     while (true)
 50                     {
 51 
 52                         lock (multipleItem.AddUnits)
 53                         {
 54                             foreach (var addItem in multipleItem.AddUnits)
 55                             {
 56                                 multipleItem.Units.Add(addItem);
 57                             }
 58                             multipleItem.AddUnits.Clear();
 59                         }
 60 
 61 
 62                         foreach (var item in multipleItem.Units)
 63                         {
 64                             if (item.UnitResult == null)
 65                             {
 66                                 var result = item.Unit.Do();
 67 
 68                                 try
 69                                 {
 70                                     result.MoveNext();
 71                                 }
 72                                 catch (Exception ex)
 73                                 {
 74                                     _errorHandle(item.Unit, ex);
 75 
 76                                     multipleItem.Units.Remove(item);
 77 
 78                                     break;
 79                                 }
 80 
 81                                 item.UnitResult = result;
 82                             }
 83                             else
 84                             {
 85                                 if (item.UnitResult.Current.IsCanceled || item.UnitResult.Current.IsCompleted || item.UnitResult.Current.IsFaulted)
 86                                 {
 87                                     var nextResult = true;
 88                                     try
 89                                     {
 90                                         nextResult = item.UnitResult.MoveNext();
 91                                     }
 92                                     catch (Exception ex)
 93                                     {
 94                                         _errorHandle(item.Unit, ex);
 95 
 96                                         multipleItem.Units.Remove(item);
 97 
 98                                         break;
 99                                     }
100                                     if (!nextResult)
101                                     {
102 
103                                         multipleItem.Units.Remove(item);
104 
105                                         break;
106                                     }
107                                 }
108                             }
109                         }
110 
111                     }
112                 });
113 
114             }
115 
116 
117 
118         }
119 
120 
121 
122 
123         private class UnitItem
124         {
125             public HttpAction Unit { get; set; }
126             public IEnumerator<Task> UnitResult { get; set; }
127         }
128 
129 
130         /// <summary>
131         /// 多线程协程单元项
132         /// </summary>
133         private class MultipleItem
134         {
135             public List<UnitItem> Units
136             {
137                 get;set;
138             }
139 
140             public List<UnitItem> AddUnits
141             {
142                 get;set;
143             }
144 
145         }
146 }

接着,实现用于处理Http请求的协程单元

 1 /// <summary>
 2     /// 用来处理Http请求的协程单元
 3     /// </summary>
 4     public class HttpAction : ICoroutineUnit
 5     {
 6         protected HttpListenerRequest _request;
 7         protected HttpListenerResponse _response;
 8 
 9 
10         /// <summary>
11         /// http响应
12         /// </summary>
13         public HttpListenerResponse Response
14         {
15             get { return _response; }
16         }
17 
18         /// <summary>
19         /// 构造函数
20         /// </summary>
21         /// <param name="request">http请求</param>
22         /// <param name="response">http响应</param>
23         public HttpAction(HttpListenerRequest request, HttpListenerResponse response)
24         {
25             _request = request;
26             _response = response;
27         }
28         /// <summary>
29         /// 执行处理
30         /// 负责http请求的处理,这里仅仅是根据url地址返回不同的字符串
31         /// 在实际应用中,需要处理诸如路由、拦截、过滤等等一系列复杂处理
32         /// </summary>
33         /// <returns></returns>
34         public IEnumerator<Task> Do()
35         {
36            
37             var strUri=_request.RawUrl;
38             string strContent;
39             if (strUri.Contains("page1"))
40             {
41                 strContent = "访问页面1";
42             }
43             else
44             {
45                 strContent = "访问其他页面";
46             }
47 
48             _response.ContentType = "text/html";
49             var bytes = UTF8Encoding.UTF8.GetBytes(strContent);
50             yield return _response.OutputStream.WriteAsync(bytes, 0, bytes.Length);
51             _response.OutputStream.Close();
52 
53 
54         }
55 
56 
57        
58     }

在主程序中监听分发请求

 1 static void Main(string[] args)
 2         {
 3 
 4             //错误处理,将错误原因返回给客户端
 5             Action<ICoroutineUnit, Exception> errorHandle = (unit, ex) =>
 6             {
 7                 var response = ((HttpAction)unit).Response;
 8                 response.ContentType = "text/html";
 9                 string str = "访问出错,错误原因:" + ex.ToString();
10                 var bytes = UTF8Encoding.UTF8.GetBytes(str);
11                 response.OutputStream.Write(bytes, 0, bytes.Length);
12                 response.OutputStream.Close();
13             };
14             
15             //初始化协程容器,设定线程数为2
16             ICoroutineContainer coroutineContainerMultiple = new CoroutineContainerMultiple(2, errorHandle);
17             coroutineContainerMultiple.Run();
18 
19             //监听本地8000端口
20              HttpListener listener = new HttpListener();
21 
22              listener.Start();
23              listener.Prefixes.Add("http://localhost:8000/");
24 
25             //循环监听请求
26              while (true)
27              {
28                  var httpContext = listener.GetContext();
29                 //创建Http请求的协程单元
30                  HttpAction action = new HttpAction(httpContext.Request, httpContext.Response);
31                 //注册分发协程单元
32                  coroutineContainerMultiple.Register(action);
33              }
34              
35 
36 
37 
38         }
39 
40 
41     }

监听请求后分发给协程容器处理

程序执行后,

打开浏览器访问,获得结果

在此方案中,一个监听线程用来监听所有请求,转发给后端多个线程分摊请求的协程处理

至此,基于多线程的协程简单框架完成

原文地址:https://www.cnblogs.com/rhwleo/p/6853767.html