C#中的PLINQ(并行LINQ)

.NET Framework 3.5 中引入了语言集成查询 (LINQ),它具有统一的模型,以类型安全方式查询任何 System.Collections.IEnumerable或 System.Collections.Generic.IEnumerable<T> 数据源。关于LINQ函数的使用,可以参考:https://www.cnblogs.com/zhaotianff/p/6236062.html

并行 LINQ (PLINQ) 是 LINQ 模式的并行实现

注意:以下示例代码仅是演示用,其运行速度可能不如等效的顺序LINQ查询快

1、如何进行PLINQ查询

ParallelEnumerable类提供了一个扩展方法AsParallel()可以并行执行LINQ查询

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static void Main(string[] args)
12         {
13             var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 };
14 
15             var result = list.AsParallel().Where(x => x > 30);
16         }
17     }
18 }

2、PLINQ查询的参数

WithDegreeOfParallelism:指定 PLINQ 应用于并行化查询的处理器的最大数量。

WithExecutionMode:指定 PLINQ 应如何并行化查询(即使是当默认行为是按顺序运行查询时)。

WithMergeOptions:提供有关 PLINQ 应如何(如果可能)将并行结果合并回使用线程上的一个序列的提示。

ParallelMergeOptions 枚举值如下:

AutoBuffered 2

利用系统选定大小的输出缓冲区进行合并。 在向查询使用者提供结果之前,会先将结果累计到输出缓冲区中。

Default 0

使用默认合并类型,即 AutoBuffered。

FullyBuffered 3

利用整个输出缓冲区进行合并。 在向查询使用者提供任何结果之前,系统会先累计所有结果。

NotBuffered 1

不利用输出缓冲区进行合并。 一旦计算出结果元素,就向查询使用者提供这些元素。

WithCancellation:指定 PLINQ 应定期监视请求取消时所提供的取消标记的状态以及取消执行。(这个操作和线程的取消操作是一致的,如果不理解线程中的取消操作,可以访问:https://docs.microsoft.com/zh-cn/dotnet/standard/threading/cancellation-in-managed-threads

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static void Main(string[] args)
12         {           
13             var cts = new System.Threading.CancellationTokenSource();
14 
15             try
16             {
17                 var result2 = list.AsParallel().Where(x => x > 30)
18                 .WithDegreeOfParallelism(Environment.ProcessorCount)
19                 .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
20                 .WithMergeOptions(ParallelMergeOptions.Default)
21                 .WithCancellation(cts.Token);
22 
23                 PrintResult(result2);
24             }
25             catch(OperationCanceledException)
26             {
27                 Console.WriteLine("并行查询已被取消");
28             }
29         }
30 
31 
32         static void PrintResult(IEnumerable<int> collection)
33         {
34             foreach (var item in collection)
35             {
36                 Console.WriteLine(item);
37             }
38         }
39     }
40 }

3、PLINQ查询中常用的函数

AsOrdered:指定 PLINQ 应为查询的其余部分保留源序列的排序,或直到例如通过使用 orderby(在 Visual Basic 中为 Order By)子句更改排序为止。

AsUnordered:指定保留源序列的排序不需要查询其余部分的 PLINQ。

AsSequential:指定查询的其余部分应像非并行的 LINQ 查询一样按顺序运行。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static Func<int, bool> query => x => x > 30;
12 static void Main(string[] args) 13 { 14 var orderResult = list.AsParallel().AsOrdered().Where(query);//asc 15 //list.AsParallel().Where(query).OrderBy(x=>x);//asc 16 //list.AsParallel().Where(query).OrderByDescending(x => x);//desc 17 PrintResult(orderResult); 18 19 var unorderResult = list.AsParallel().AsUnordered().Where(query); 20 PrintResult(unorderResult); 21 22 list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x)); 23 Console.WriteLine(" "); 24 25 var linqResult = list.Where(query); 26 PrintResult(linqResult); 27 28 var pinqSequentialResult = list.AsParallel().AsSequential().Where(query); 29 PrintResult(pinqSequentialResult); 30 31 var forceExecutionMode = list.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism).AsSequential().Where(query); 32 PrintResult(forceExecutionMode); 33 } 34 35 36 static void PrintResult(IEnumerable<int> collection) 37 { 38 foreach (var item in collection) 39 { 40 Console.WriteLine(item); 41 } 42 43 Console.WriteLine(" "); 44 } 45 } 46 }

4、ForAll

ForAll是一种多线程枚举方法,与循环访问查询结果不同,它允许在不首先合并回使用者线程的情况下并行处理结果。

如果并行执行查询,PLINQ 对源序列进行分区,以便多个线程能够并发处理不同部分,通常是在不同的线程中。 如果要在一个线程(例如,foreach)中使用结果,必须将每个线程的结果合并回一个序列中。 PLINQ 执行的合并类型具体视查询中的运算符而定。 例如,对结果强制施加新顺序的运算符必须缓冲所有线程中的全部元素。 从使用线程(以及应用用户)的角度来看,完全缓冲查询可能会运行很长时间,才能生成第一个结果。 默认情况下,其他运算符进行部分缓冲,并分批生成结果。 默认不缓冲的一个运算符是 ForAll。 它会立即生成所有线程中的所有元素。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static Func<int, bool> query => x => x > 30;
12         static void Main(string[] args)
13         {
14             var list = new List<int>() { 12, 21, 13, 31, 14, 41, 15, 51, 16, 61 };
15             list.AsParallel().Where(query).ForAll(x=> Console.WriteLine(x));
16             Console.WriteLine("

");        
17         }
18     }
19 }

5、处理PLINQ查询中常用的异常

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace PLINQ_demo
 8 {
 9     class Program
10     {
11         static void Main(string[] args)
12         {
13             //普通 LINQ
14             try
15             {
16                 tempList.Select(x => 100 / x).ToList();
17             }
18             catch(DivideByZeroException ex)
19             {
20                 Console.WriteLine(ex.Message);
21             }
22 
23             //PLINQ
24             try
25             {
26                  tempList.AsParallel().Select(x => 100 / x).ToList();              
27             }
28             catch(DivideByZeroException ex)
29             {
30                 Console.WriteLine(ex.Message);
31             }
32             catch(AggregateException ex)
33             {
34                 Console.WriteLine(ex.Message);
35             }
36 
37         }
38     }
39 }

使用顺序LINQ查询:当除以0时,得到了DevideByZeroException异常 

使用并行LINQ查询:当使用Asparallel()运行,除以0时,得到了AggregateException,因为现在是并行的方式运行,AggregateException将包含运行PLINQ查询期间的所有异常,可以使用Flatten和Handle方法来处理内部的DivideByZeroException

示例如下:

 1 try
 2             {
 3                 tempList.AsParallel().Select(x => 100 / x).ToList();              
 4             }
 5             catch(DivideByZeroException ex)
 6             {
 7                 Console.WriteLine(ex.Message);
 8             }
 9             catch(AggregateException ex)
10             {
11                 var exceptions = ex.Flatten().InnerExceptions;
12 
13                 foreach (var item in exceptions)
14                 {
15                     Console.WriteLine(item);
16                 }
17             }

 6、数据分区

若要并行执行对数据源的操作,关键步骤之一是,将数据源分区 成多个部分,以供多个线程同时访问。 PLINQ 和任务并行库 (TPL) 提供了默认分区程序,在用户编写并行查询或 ForEach 循环时透明运行。 对于更高级的方案,可以插入自己的分区程序。

这里有点难,还需要再学习理解一下,后面再更新

原文地址:https://www.cnblogs.com/zhaotianff/p/12658004.html