.NET Core3.1 并行LINQ

书接上文:LINQ 标准查询操作符

概述

  PLINQ(Parallel LINQ,)并行LINQ。

  syetem.Linq名称空间中包含的类ParallelEnumerable可以分解查询的工作,使其分布在多个线程上。尽管Enumerable类给IEnumerable<T>接口定义了扩展方法,但ParallelEnumerable类的大多数扩展方法是ParallelQuery<TSource>类的扩展。一个重要的例外是AsParallel()方法,它扩展了IEnumerable<TSource>接口,返回ParallelQuery<TSource>类,所以正常的集合都可以以并行的方式查询。

并行查询

  需要一个大型集合。 对于可以放在CPU的缓存中的小集合,并行LINQ看不出效果。示例中,用随机值填充一个大型的int集合,然后使用LINQ筛选数据,进行一些计算,获取所筛选数据的平均值。和上一节的例子中的唯一区别就是调用了AsParallel()方法:

 1         private static void ParallelLinqQuery()
 2         {
 3             const int arraySize = 500000000;
 4             var r = new Random();
 5             var data = Enumerable.Range(0, arraySize).Select(x => r.Next(140)).ToList();
 6             var query = (from s in data.AsParallel()
 7                          where Math.Log(s) < 4
 8                          select s).Average();
 9             Console.WriteLine(query);
10         }

    运行结果:

    

     程序运行期间,我们看看CPU运行情况,可以看出所有的CPU都在忙碌:

    

分区器

   AsParallel()方法不仅扩展了IEnumerable<T>接口,还扩展了Partitioner类。通过他,可以影响要创建的分区。

  Partitioner类在名称空间System.Collection.Concurrent中定义,有多个不同的变体。Create()方法接受实现了IList<T>类的数组或对象。根据这一点,以及Boolean类型的参数loadBalance和该方法的一些重载版本,会返回一个不同的Partitioner类型。

  修改上述事例中的代码,手工创建一个分区器,而不是使用默认的分区器:

 1         private static void ParallelLinqQueryPartitioner()
 2         {
 3             const int arraySize = 500000000;
 4             var r = new Random();
 5             var data = Enumerable.Range(0, arraySize).Select(x => r.Next(140)).ToList();
 6             var query = (from s in Partitioner.Create(data, true).AsParallel()
 7                          where Math.Log(s) < 4
 8                          select s
 9                          ).Average();
10             Console.WriteLine(query);
11         }

  也可以调用WithExecutionMode()和WithDegreeOfParallelism()方法来影响并行机制。

  WithExecutionMode()方法,可以传递ParallelExecutionMode的一个Default值或者ForceParallelism值。

  默认情况下,PLINQ避免使用系统开销很高的并行机制。对于WithDegreeOfParallelism(),可以传递一个整数值,以指定应并行运行的最大任务数。查询不应使用全部CPU,这个方法会很有用。

取消

  要取消长时间运行的查询,可以给查询添加WithCancellation()方法,并传递一个CancellationToken令牌作为参数。CancellationToken令牌从CancellationTokenSource类中创建。该查询在单独的线程中运行,在该线程中,捕获一个OperationCancellationException类型的异常。如果取消了查询,就触发这个异常。在主线程中调用CancellationTokenSource累的Cancel()方法可以取消任务:

 1         private static void UseCancellation()
 2         {
 3             var cts = new CancellationTokenSource();
 4             Task.Run(() =>
 5             {
 6                 try
 7                 {
 8                     const int arraySize = 500000000;
 9                     var r = new Random();
10                     var data = Enumerable.Range(0, arraySize).Select(x => r.Next(140)).ToList();
11                     var res = (from s in data.AsParallel().WithCancellation(cts.Token)
12                                where Math.Log(s) < 4
13                                select s).Average();
14                     Console.WriteLine(res);
15                 }
16                 catch (OperationCanceledException ex)
17                 {
18                     Console.WriteLine(ex.Message);
19                 }
20             });
21             
22             Console.WriteLine("启动查询!");
23             Console.WriteLine("取消? 确定取消请输入Y/y");
24             
25             string input = Console.ReadLine();
26             if (input.ToLower().Equals("y"))
27             {
28                 cts.Cancel();
29             }
30         }

下面,我们将学习LINQ的表达式树

原文地址:https://www.cnblogs.com/hiwuchong/p/14135500.html