并行编程中的取消任务、共享状态,等等

在面对相互独立的数据或者相互独立的任务时,也许正是Parallel登场的时候。

比如说有一个盒子的集合,分别让盒子旋转一定的角度。

void RotateBox(IEnumerable<Box> boxes, float degree)
{
    Parallel.ForEach(boxes, box => box.Rotate(degree));
}

如果并行任务中的一个任务出现异常,需要结束出问题的任务呢?

 

Parallel.ForEach为我们提供了一个重载方法,可以控制任务是否继续。

void RotateBoxes(IEnumerable<Box> boxes)
{
    Parallel.ForEach(boxes, (box, state) => {
        if(!box.IsInvertible)
        {
            state.Stop();
        }
        else
        {
            box.Invert();
        }
    });
}

如果想取消整个并行任务呢?

 

Parallel.ForEach也为我们提供一个重载方法,可以接收一个ParallelOption的形参,通过设置ParallelOption的CancellationToken属性来取消整个并行过程。

void RotateBoxes(IEnumerable<Box> boxes,float degrees, CancellationToken token)
{
    Paralle.ForEach(boxes, 
    new ParallelOptions{CancellationToken=token},
    box => box.Rotate(degrees));
}

在使用的时候,一般先定义个全局CancellationTokenSource类型的变量。

static CancellationTokenSource token = new CancellationTokenSource();

然后,在某个并行任务中设置取消。

token.Cancel();

最后,再把这个token赋值给以上方法的CancellationToken属性。

各个并行任务如何共享状态,共享一个变量呢?

int InvertBoxes(IEnumerable<Box> boxes)
{
    object mutex = new object();//用来锁
    int nonInvertibleCount = 0; //各任务共享的变量
    Paralle.ForEach(boxes, box =>{
        if(box.IsInvertible){
            box.Invert();
        }
        else
        {
            lock(mutex)
            {
                ++nonInvertibleCount;
            }
        }
    });
    return nonInvertibleCount;
}

可见,对于各并行线程共享的变量,需要加一个线程锁,以防止多个线程同时操作共享变量。

另外,Parallel.ForEach提供了一个重载,其中localFinally形参接收一个委托类型,通过该委托让并行任务共享一个变量。比如:

static int ParallelSum(IEnumerable<int> values)
{
    object mutex = new object();
    int result = 0;
    Parallel.ForEach(
        source: values,
        LocalInit: () => 0,
        body: (item, state, localVlaue) => localValue + item,
        localFinally: localValue => {
            lock(mutex)
            {
                result += localValue;
            }
        }
    );
    return result;
}

当然,也别忘了PLINQ也支持并行

static int ParalleSum(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}

PLINQ的Aggregate方法也可实现:

static int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Aggregate(
        seed: 0,
        func: (sum, item) => sum + item;
    );
}

以上,是对相互独立数据的处理。

那么,如何处理相互独立的多个任务呢?

 

通过Parallel.Invoke方法可以实现。

static voiD ProcessArray(double[] array)
{
    Parallel.Invoke(
        () => ProcessPartialArray(array, 0, array.Length/2),
        () => ProcessPartialArray(array, array.Length/2, array.Length)
    );
}
static void ProcessPartialArray(double[] array, int begin, int end)
{}

使用Parallel.Invoke方法还可以让一个Action或这方法执行多次。

 

static void Do20(Action action)
{
    //让某个Action执行20次
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(actions);
}

Parallel.Invoke方法也提供了重载,接收ParallelOption类型的形参,用来控制取消整个并行过程。

static void Do20(Action action)
{
    //让某个Action执行20次
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(new ParallelOptions{CancellationToken = token},actions);
}

参考资料:C#并发编程经典实例

原文地址:https://www.cnblogs.com/darrenji/p/4714771.html