.Net 并发异步处理总结

    在处理很多IO逻辑时或者想在做后台计算时,我们都会用到并发。并发简单的分两种类型,一种是计算密集型,一种是IO密集型, 下面先介绍一下。

1. 计算密集型:
    如果我们把线程里面处理的事情看做一个任务,计算密集型就是这个任务主要做的事情是处理计算和逻辑,换句话说就是主要吃CPU。计算密集型的一个典型场景是在客户端程序后台做一些耗时的计算来避免界面假死。计算密集型任务并发处理,除了能避免某些线程阻塞外(比如界面假死),还能充分利用CPU资源,现在的PC机和笔记本一般都是双核或者4核,服务器更可能有好几个CPU,这样并发执行可以让计算机同时执行多个任务。

2. IO密集型:IO密集型任务里面一般要处理大量IO操作,当然,只包含一个IO操作的任务我们也可以看成IO密集型,原因是IO操作一般都比较耗时。IO密集型的场景更多,比如我们需要读取多个文件或者请求多个WebService等。IO任务并发执行优点更多,主要是节省CPU资源和加快处理速度。因为很多IO操作都比较耗时,但它们又基本用不到CPU,所以如果我们用单线程来处理,CPU可能长时间处于阻塞状态,啥活也不干,这样很浪费CPU资源。同时单线程使得IO操作只能一个一个执行,时间上也会更长。

    下面用几个简单的例子介绍如何在.Net里用线程池和异步来实现并发(当日你也可以直接创建线程来做)。

1. 最土鳖的例子,单任务计算密集型

   将一个计算密集型任务加入线程池,线程池会在有线程闲的时候执行该任务:
   

        private static void Sample2()
{
ThreadPool.QueueUserWorkItem(count => Run((int)count), 20);
}

private static void Run(int count)
{
for (int i = 0; i < count; ++i)
{
Console.WriteLine(i);
Thread.Sleep(100);
}
Console.WriteLine("finished");
}

上面的Run可以有任意的参数组合的委托,但是能加入QueueUserWorkItem的方法只能带一个object参数,需要打包一个参数类,然后在lambda里面分解后传给Run,也可以直接用一个字典当参数类。如果需要在计算完成时做一些处理,可以用下面的异步方法。 

private static void Sample1()
{
Action<int> a = c =>
{
Run(c);
};

//异步执行操作,注意在Callback里面调用EndInvoke,
//否则可能会有资源泄漏
a.BeginInvoke(20, ar =>
{
a.EndInvoke(ar);
}, null);
}

  完成处理的代码可以跟在a.EndInvoke(ar);后面。注意:EndInvoke最好在会掉里面调用,不然可能会造成资源泄漏,或某些内存无法回收。

2. 还是很土鳖的例子,异步读取一个文件添加行号然后打印
   这样在文件读取的IO操作过程中,CPU可以空出来做其他事情。
   

    public class AddLineNum
{
/// <summary>
/// 异步方式读取文件并加行号显示
///
/// 总共分三步:
/// 1. 前期准备工作,开始读文件,使用当前线程执行
/// 2. 文件异步读取,这个步骤将不占用线程和CPU资源
/// 3. 读取数据处理和显示,在第三个线程里执行
/// </summary>
/// <param name="file"></param>
public AddLineNum(string file)
{
_file = file;
}

/// <summary>
/// 1. 开始读取文件
/// </summary>
public void Begin()
{
long fileSize = new FileInfo(_file).Length;
_buffer = new byte[fileSize];
FileStream reader = File.OpenRead(_file);
reader.BeginRead(_buffer, 0, (int)fileSize, ar =>
{
int count = reader.EndRead(ar);
Console.WriteLine("Read length:{0}", count);
End();//处理读取数据
}, null);
}

/// <summary>
/// 2. 处理读到的数据
/// </summary>
private void End()
{
string text = Encoding.UTF8.GetString(_buffer);
string[] lines = text.Split(new string[]{Environment.NewLine}, StringSplitOptions.RemoveEmptyEntries);
for (int i = 0; i < lines.Length; ++i)
{
Console.WriteLine(String.Format("{0}:{1}", i, lines[i]));
}
}

private string _file = "";
private byte[] _buffer = null;
}

调用代码:

     AddLineNum apmOne = new AddLineNum("AddLineNum.cs");
apmOne.Begin();

如注释里面所说,在处理文件读取时不占用CPU,异步带来的好处。

    

3. 稍微不土鳖的例子,同时开始多个web请求,在所有请求结束时汇总结果
   

   public class CheckSiteStatus
{
public CheckSiteStatus(IList<string> sites)
{
_sites = sites;
}

public void Begin()
{
_finishedCount = _sites.Count;
foreach (string site in _sites)
{
HttpWebRequest req = (HttpWebRequest)HttpWebRequest.Create("http://" + site);
req.Method = "HEAD";
req.BeginGetResponse(ar =>
{
HttpWebRequest request = ar.AsyncState as HttpWebRequest;
try
{
HttpWebResponse res = (HttpWebResponse)(request.EndGetResponse(ar));
_statusDic[request.RequestUri.Host] = res.StatusCode;
}
catch (WebException exp)
{
HttpWebResponse errRes = exp.Response as HttpWebResponse;
if (errRes != null)
{
_statusDic[request.RequestUri.Host] = errRes.StatusCode;
}
else
{
_statusDic[request.RequestUri.Host] = HttpStatusCode.NotFound;
}
}

//完成一个
Interlocked.Decrement(ref _finishedCount);
if (_finishedCount <= 0)
End();
}, req);
}
}

private void End()
{
int goodCount = 0;
foreach (var pair in _statusDic)
{
if (pair.Value == HttpStatusCode.OK)
{
++goodCount;
}
Console.WriteLine("site:{0}, status:{1}", pair.Key, pair.Value);
}

Console.WriteLine("{0} of {1} can be connected.", goodCount, _statusDic.Keys.Count);
}

private Dictionary<string, HttpStatusCode> _statusDic = new Dictionary<string, HttpStatusCode>();
private IList<string> _sites;
private int _finishedCount = 0;
}

调用代码:

        CheckSiteStatus checker = new CheckSiteStatus(new string[] {
"www.baidu.com",
"www.google.com",
"www.soso.com",
"www.idiot.com/"
});
checker.Begin();

例子中用_finishedCount跟踪执行完成个数,然后用Interlocked的方法实现同步控制。

4. 和上面例子3实现的功能相同,本土鳖常用的方法,主线程会阻塞浪费点CPU,但是代码更简洁清楚。

    class CheckSiteStatus2
{
public static void Check(IList<string> sites)
{
//:开始请求
Dictionary<string, HttpWebRequest> reqs = new Dictionary<string, HttpWebRequest>();
Dictionary<string, IAsyncResult> asyncs = new Dictionary<string,IAsyncResult>();
foreach (string site in sites)
{
HttpWebRequest req = (HttpWebRequest)HttpWebRequest.Create("http://" + site);
req.Method = "HEAD";
reqs[site] = req;
asyncs[site] = req.BeginGetResponse(null, null);
}

//:结束请求,更新状态
Dictionary<string, HttpStatusCode> statusDic = new Dictionary<string, HttpStatusCode>();
foreach (var pair in asyncs)
{
HttpWebRequest request = reqs[pair.Key];
try
{
HttpWebResponse res = (HttpWebResponse)(request.EndGetResponse(pair.Value));
statusDic[request.RequestUri.Host] = res.StatusCode;
}
catch (WebException exp)
{
HttpWebResponse errRes = exp.Response as HttpWebResponse;
if (errRes != null)
{
statusDic[request.RequestUri.Host] = errRes.StatusCode;
}
else
{
statusDic[request.RequestUri.Host] = HttpStatusCode.NotFound;
}
}
}

//:全部处理完时汇总处理
int goodCount = 0;
foreach (var pair in statusDic)
{
if (pair.Value == HttpStatusCode.OK)
{
++goodCount;
}
Console.WriteLine("site:{0}, status:{1}", pair.Key, pair.Value);
}

Console.WriteLine("{0} of {1} can be connected.", goodCount, statusDic.Keys.Count);
}
}

调用代码:

CheckSiteStatus2.Check(new string[] {
"www.baidu.com",
"www.google.com",
"www.soso.com",
"www.idiot.com/"
});


上面直接在一个静态方法里面用一段代码实现,各个步骤也比较清楚。
 

原文地址:https://www.cnblogs.com/alala666888/p/2380054.html