Multithreading With C# Cookbook---Chapter4---使用任务并行库

概念

任务并行库(Task Parallel Library,TPL),在.NET Framework 4.0引入,4.5有所改进。我们可以把线程池认为是一个抽象层,其隐藏使用线程的细节;TPL可被认为是线程池上的又一个抽象层,其隐藏了与线程池交互的底层代码,并提供更方便的API。

更多内容

TPL核心概念是任务。一个任务代表一个异步操作,该操作可以通过多种方式运行,可以使用或不使用独立线程运行。TPL向用户隐藏任务的实现细节从而创建一个抽象层,我们无须知道任务实际是如何执行的。但如果不正确使用,将会导致诡异的错误。

一个任务可以通过多种方式与其他任务组合,其用于组合任务的便利的API也是一个与其他模式(APM,EAP)的一个关键优势。

处理任务中的异常:一个任务可能有多个任务组成,多个任务又有自己的子任务,通过AggregateException来捕获底层任务的所有异常,并允许单独处理这些异常。

准备工作

使用.NET Framework 4.5以上版本。

使用using

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using static System.Threading.Thread;
using System.Threading;
View Code

创建任务

Task.Run创建任务,再使用Start开启任务;Task.Factory.StartNew创建任务后会立即启动。当创建任务时标记任务为长时间运行TaskCreationOptions.LongRunning,将不会使用线程池线程,而是另开线程。

    class Program
    {
        static void Main(string[] args)
        {
            Task t1 = new Task(() => TaskMethod("Task1"));
            Task t2 = new Task(() => TaskMethod("Task2"));
            t1.Start();
            t2.Start();
            Task.Run(() => TaskMethod("Task3"));
            Task.Factory.StartNew(() => TaskMethod("Task4"));
            Task.Factory.StartNew(() => TaskMethod("Task5"), TaskCreationOptions.LongRunning);//标记该任务为长时间运行任务,将不会使用线程池线程

            Console.ReadLine();
        }

        static void TaskMethod(string name)
        {
            Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
        }
    }
View Code

使用任务执行基本操作

创建Task<int>任务,使用任务对象.Result属性来获取该任务返回值,在在任务返回值之前,主线程会阻塞等待完成。使用Task.IsCompleted来轮询任务状态,通过Task.RunAsynchornously()方法,来指定任务运行在主线程。

class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Task1 result:"+ TaskMethod("Task1")); 

            Task<int> task = CreatTask("Task2");
            task.Start();
            Console.WriteLine("Task2 result:" + task.Result);

            task = CreatTask("Task3");
            task.RunSynchronously();//运行在主线程
            Console.WriteLine("Task3 result:" + task.Result);

            task = CreatTask("Task4");
            Console.WriteLine(task.Status) ;
            task.Start();
            while (!task.IsCompleted)
            {
                Console.WriteLine(task.Status);
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }
            Console.WriteLine(task.Status);
            Console.WriteLine("Task4 result:" + task.Result);

            Console.ReadLine();
        }

        static Task<int> CreatTask(string name)
        {
            return new Task<int>(()=>TaskMethod(name));
        }

        static int TaskMethod(string name)
        {
            Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
            Thread.Sleep(TimeSpan.FromSeconds(2));
            return 42;
        }
    }
View Code

组合任务

创建一个任务,再给他创建一个后续任务(Task.ContinueWith),先执行该任务,完成后才能执行后续任务。TaskContinuationOptions.OnlyOnRanToCompletion属性指定该延续任务只能在前面任务完成后才能执行;TaskContinuationOptions.ExecuteSynchronously属性指定该延续任务会工作在创建此延续任务的线程上执行。

创建任务时指定TaskCreationOptions.AttachedToParent属性,指定该任务为父子任务,只有当所有子任务完成,父任务才算完成。

class Program
    {
        static void Main(string[] args)
        {
            var t1 = new Task<int>(() => TaskMethod("task1", 3));
            var t2 = new Task<int>(() => TaskMethod("task2", 2));

            t1.ContinueWith(t => Console.WriteLine($"The first answer is {t.Result}. Thread id :{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}"), TaskContinuationOptions.OnlyOnRanToCompletion);//该属性指定该延续任务只能在前面任务完成后才能执行

            t1.Start();
            t2.Start();

            Thread.Sleep(TimeSpan.FromSeconds(4));

            Task continuation = t2.ContinueWith(t => Console.WriteLine($"The first answer is {t.Result}. Thread id :{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}"), TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);//ExecuteSynchronously属性指定该延续任务会工作在创建此延续任务的线程上执行

            continuation.GetAwaiter().OnCompleted(() => Console.WriteLine($"Continuation Task Completed! Thread id is: {CurrentThread.ManagedThreadId}. Is thread pool thread: {CurrentThread.IsThreadPoolThread}"));

            Thread.Sleep(TimeSpan.FromSeconds(2));
            Console.WriteLine();

            t1 = new Task<int>(() =>
            {
                var innerTask = Task.Factory.StartNew(() => TaskMethod("second Task", 5), TaskCreationOptions.AttachedToParent);
                innerTask.ContinueWith(t => TaskMethod("Third Task", 2), TaskContinuationOptions.AttachedToParent);
                return TaskMethod("First Task", 2);
            });

            t1.Start();

            while (!t1.IsCompleted)
            {
                Console.WriteLine(t1.Status);
                Thread.Sleep(TimeSpan.FromSeconds(0.5));
            }
            Console.WriteLine(t1.Status);

            Console.ReadLine();



            Console.ReadLine();
        }

        static int TaskMethod(string name, int seconds)
        {
            Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            return seconds * 42;
        }
    }
View Code

APM(异步编程)模式转任务

 将APM API转换为任务。代码示例Task.Factory.FromAsync三种重载方法。

class Program
    {
        static void Main(string[] args)
        {
            int threadId;
            AsynchronouTask a = Test;
            IncompatibleAsynchronousTask c = Test;

            Console.WriteLine("Option1");
            Task<string> task1 = Task<string>.Factory.FromAsync(a.BeginInvoke("Async 1",callBack,"async1 state"),a.EndInvoke);
            task1.ContinueWith(t => Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
            while (!task1.IsCompleted)
            {
                //Console.WriteLine(task1.Status);
            }
            Console.WriteLine(task1.Status);

            Console.WriteLine("----------------------------");
            Console.WriteLine();

            Console.WriteLine("Option2");
            Task<string> task2 = Task<string>.Factory.FromAsync(a.BeginInvoke,a.EndInvoke,"Async 2","async2 state");
            task2.ContinueWith(t => Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
            while (!task2.IsCompleted)
            {
                //Console.WriteLine(task2.Status);
            }
            Console.WriteLine(task2.Status);

            Console.WriteLine("----------------------------");
            Console.WriteLine();

            Console.WriteLine("Option3");
            IAsyncResult ar = c.BeginInvoke(out threadId,callBack,"async3 state");
            Task<string> task3 = Task<string>.Factory.FromAsync(ar,_=>c.EndInvoke(out threadId,ar));
            task2.ContinueWith(t => Console.WriteLine($"Callback is finished. now running a continuation!result:{t.Result}"));
            while (!task3.IsCompleted)
            {
                //Console.WriteLine(task3.Status);
            }
            Console.WriteLine(task3.Status);

            Console.ReadLine();
        }

        delegate string AsynchronouTask(string threadName);

        delegate string IncompatibleAsynchronousTask(out int threadId);

        static void callBack(IAsyncResult ar)
        {
            Console.WriteLine("Starting callback……");
            Console.WriteLine("State paseed to a callback:"+ar.AsyncState);
            Console.WriteLine("Is thread pool thread:"+CurrentThread.IsThreadPoolThread);
        }

        static string Test(string threadName)
        {
            Console.WriteLine("TN: Starting Test……");
            Console.WriteLine("Is thread pool thread:"+CurrentThread.IsThreadPoolThread);
            Sleep(2000);
            CurrentThread.Name = threadName;
            return "Thread name is:" + CurrentThread.Name;
         }

        static string Test(out int threadId)
        {
            Console.WriteLine("TI: Starting Test……");
            Console.WriteLine("Is thread pool thread:" + CurrentThread.ManagedThreadId);
            Sleep(2000);
            threadId = CurrentThread.ManagedThreadId;
            return "Thread Id is:" + threadId;
        }
    }
View Code

EAP(基于事件的异步操作)模式转任务

class Program
    {
        static void Main(string[] args)
        {
            var tcs = new TaskCompletionSource<int>();

            var worker = new BackgroundWorker();
            worker.DoWork += (sender, evenArgs) =>
              {
                  evenArgs.Result = TaskMethod("BackgroundWorker",3);
              };
            worker.RunWorkerCompleted += (sender, evenArgs) =>
              {
                  if (evenArgs.Error != null)
                  {
                      tcs.SetException(evenArgs.Error);
                  }
                  else if (evenArgs.Cancelled)
                  {
                      tcs.SetCanceled();
                  }
                  else
                  {
                      tcs.SetResult((int)evenArgs.Result);
                  }
              };
            worker.RunWorkerAsync();
            int result = tcs.Task.Result;
            Console.WriteLine(result);

            Console.ReadLine();
        }

        static int TaskMethod(string name,int seconds)
        {
            Console.WriteLine($"Task {name} on a thread id:{CurrentThread.ManagedThreadId},is thread pool thread:{CurrentThread.IsThreadPoolThread}");
            Sleep(TimeSpan.FromSeconds(seconds));
            return seconds * 42;
        }     
    }
View Code

实现取消选项

取消基于任务的异步操作。

class Program
    {
        static void Main(string[] args)
        {
            var cts = new CancellationTokenSource();
            var longTask = new Task<int>(() =>
                TaskMethod("Task1", 10, cts.Token), cts.Token);
            Console.WriteLine(longTask.Status);
            cts.Cancel();
            //longTask.Start();//异常
            Console.WriteLine(longTask.Status);
            Console.WriteLine("First task hai been cancelled before execution.");

            cts = new CancellationTokenSource();
            longTask = new Task<int>(() =>
                  TaskMethod("Task2", 10, cts.Token), cts.Token);
            longTask.Start();
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(0.5));
                Console.WriteLine(longTask.Status);
            }
            cts.Cancel();
            Console.WriteLine("Task2 is canceled.");
            for (int i = 0; i < 5; i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(0.5));
                Console.WriteLine(longTask.Status);
            }
            Console.WriteLine($"A task has been completed with result {longTask.Result}");
            Console.ReadLine();
        }

        static int TaskMethod(string name, int seconds, CancellationToken token)
        {
            Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");

            for (int i = 0; i < seconds; i++)
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                if (token.IsCancellationRequested)
                    return -1;
            }

            return seconds * 42;
        }
    }
View Code

并行运行任务

借助Task.WhenAll方法,创建第三个方法,该方法在所有任务完成之后执行,并返回一个结果集;使用Task.WhenAny方法,任务列表中每完成一个任务,就从列表删除并等待其他任务完成。可以使用一个任务来计时,超时取消其他所有任务,来模拟超时管理。

class Program
    {
        static void Main(string[] args)
        {
            var t1 = new Task<int>(() => TaskMethod("Task1", 2));
            var t2 = new Task<int>(() => TaskMethod("Task2", 3));
            var whenAllTask = Task.WhenAll(t1, t2);

            whenAllTask.ContinueWith(t => Console.WriteLine($"The first task answer is {t.Result[0]}. The second task answer is {t.Result[1]}"), TaskContinuationOptions.OnlyOnRanToCompletion);
            t1.Start();
            t2.Start();

            Thread.Sleep(TimeSpan.FromSeconds(5));

            var tasks = new List<Task<int>>();
            for (int i = 0; i < 4; i++)
            {
                int counter = i;
                var task = new Task<int>(() => TaskMethod("Tasks:Task " + counter, counter));
                tasks.Add(task);
                task.Start();
            }

            while (tasks.Count > 0)
            {
                var completedTask = Task.WhenAny(tasks).Result;
                tasks.Remove(completedTask);
                Console.WriteLine($"A task has been completed with result {completedTask.Result}");
            }

            Console.ReadLine();
        }

        static int TaskMethod(string name, int seconds)
        {
            Console.WriteLine($"Task {name} is running on a thread id: {CurrentThread.ManagedThreadId}.Is thread pool thread: {CurrentThread.IsThreadPoolThread}");
            Thread.Sleep(TimeSpan.FromSeconds(seconds));
            return seconds * 42;
        }
    }
View Code

UI访问线程

第一个按钮Sync点击后整个界面卡死,一段时间后返回错误“调用线程无法访问此对象。另一个线程拥有该对象。”,因为我们不允许从创建UI的线程之外的线程访问UI;第二个按钮Async点击后,界面没有卡死,但依然会报错;第三个按钮AsyncOk点击后,可以成功返回结果,并且等待过程中,界面依然可以相应其他事件。

WPF XAML:

<Window x:Class="TaskSchedulerTest.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
        xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
        xmlns:local="clr-namespace:TaskSchedulerTest"
        mc:Ignorable="d"
        Title="MainWindow" Height="350" Width="525">
    <Grid>
        <TextBlock Name="ContentTextBlock" HorizontalAlignment="Left" Margin="44,134,0,0" VerticalAlignment="Top"
                   Width="425" Height="40"/>
        <Button Name="btnSync" Content="Sync" HorizontalAlignment="Left" Margin="45,190,0,0" VerticalAlignment="Top" Width="75" Click="btnSync_Click"/>
        <Button Name="btnAsync" Content="Async" HorizontalAlignment="Left" Margin="165,190,0,0" VerticalAlignment="Top" Width="75" Click="btnAsync_Click"/>
        <Button Name="btnAsyncOk" Content="AsyncOk" HorizontalAlignment="Left" Margin="285,190,0,0" VerticalAlignment="Top" Width="75" Click="btnAsyncOk_Click"/>
    </Grid>
</Window>
View Code

后台程序:

public partial class MainWindow : Window
    {
        public MainWindow()
        {
            InitializeComponent();
        }

        private void btnSync_Click(object sender, RoutedEventArgs e)
        {
            ContentTextBlock.Text = string.Empty;
            try
            {
                string result = TaskMethod().Result;
                ContentTextBlock.Text = result;
            }
            catch (Exception ex)
            {

                ContentTextBlock.Text = ex.InnerException.Message;
            }
        }

        private void btnAsync_Click(object sender, RoutedEventArgs e)
        {
            ContentTextBlock.Text = string.Empty;
            Mouse.OverrideCursor = Cursors.Wait;
            Task<string> task = TaskMethod();
            task.ContinueWith(t=>
            {
                ContentTextBlock.Text = t.Exception.InnerException.Message;
                Mouse.OverrideCursor = null;
            }, CancellationToken.None,
            TaskContinuationOptions.OnlyOnFaulted,TaskScheduler.FromCurrentSynchronizationContext());
            
        }

        private void btnAsyncOk_Click(object sender, RoutedEventArgs e)
        {
            ContentTextBlock.Text = string.Empty;
            Mouse.OverrideCursor = Cursors.Wait;
            Task<string> task = TaskMethod(TaskScheduler.FromCurrentSynchronizationContext());

            task.ContinueWith(t=>Mouse.OverrideCursor=null,CancellationToken.None,TaskContinuationOptions.None,TaskScheduler.FromCurrentSynchronizationContext());
        }

        private Task<string> TaskMethod()
        {
            return TaskMethod(TaskScheduler.Default);
        }
        private Task<string> TaskMethod(TaskScheduler scheduler)
        {
            Task delay = Task.Delay(TimeSpan.FromSeconds(5));

            return delay.ContinueWith(d=>
            {
                string str = $"Task is running on thread id :{CurrentThread.ManagedThreadId}. Is thread pool: {CurrentThread.IsThreadPoolThread}";
                ContentTextBlock.Text = str;
                return str;
            },scheduler);
        }
    }
View Code

注:本文是在阅读《C#多线程编程实战》后所写,部分内容引用该书内容,这是一本不错的书,感谢! 

原文地址:https://www.cnblogs.com/EasonDongH/p/8487911.html