TPL Dataflow库的几个扩展函数

TPL Dataflow是微软面向高并发应用而推出的新程序库。借助于异步消息传递与管道,它可以提供比线程池更好的控制。本身TPL库在DataflowBlock类中提供了不少扩展函数,用起来还是非常方便的,但感觉还是不够全(当然,MS没必要设计大而全的接口),前段时间写个小程序的时候用到了它,当时顺便写了几个扩展函数,这里记录一下,如果后续有扩展再继续补充。

    static class DataFlowExtension
    {
        /// <summary>
        ///
同步发送所有数据至
TargetBlock
        /// </summary>
        public static void PostAll<T>(this ITargetBlock<T> target, IEnumerable<T> source)
        {
            var isSuccess = source.All(i => target.Post(i));
            if (!isSuccess)
            {
                throw new InvalidOperationException();
            }
            target.Complete();
        }

        /// <summary>
        ///
异步发送所有数据至TargetBlock
        /// </summary>
        public static async Task PostAllAsync<T>(this ITargetBlock<T> target, IEnumerable<T> source)
        {
            foreach (var item in source)
            {
                await target.SendAsync(item);
            }
            target.Complete();
        }
        
        /// <summary>
        ///
同步从数据源中获取所有数据

        /// </summary>
        public static IReadOnlyList<T> ReceiveAll<T>(this IReceivableSourceBlock<T> source)
        {
            IList<T> output;
            if (!source.TryReceiveAll(out output))
            {
                throw new InvalidOperationException();
            }

            return output as IReadOnlyList<T>;
        }

        /// <summary>
        ///
步从数据源中获取所有数据

        /// </summary>
        public static async Task<IReadOnlyList<T>> ReceiveAllAsync<T>(this ISourceBlock<T> source)
        {
            List<T> output = new List<T>();
            while (await source.OutputAvailableAsync())
            {
                output.Add(source.Receive());
            }
            return output;
        }
    }

这几个扩展函数本身是对DataflowBlock类中的函数二次封装,没有太多的功能,基本上每个函数都只有几行,主要为了使用更加方便罢了,由于实现简单,扩充它也是非常方便的。

 

原文地址:https://www.cnblogs.com/TianFang/p/2829024.html