c# 生成者和消费者模式

概念:

  • 某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。
  • 产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
  • 单单抽象出生产者和消费者,还够不上是生产者/消费者模式。
  • 该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。
  • 生产者把数据放入缓冲区,而消费者从缓冲区取出数据。

 大概有三个特点:

 1.解耦

 2.支持并发

 3.

这里我提供了多个版本的模式;

基本模式;

1.生产者同步生产msg,进入如queene中,消费者同步从队列中取出数据,然后进行同步消费(这里主线程同步生产msg,一个子线程负责消费msg)

主线程负责往我们的qunen中添加msg,没添加一个就释放一个信号,让子线程去消费;

首先启动的是我们的子线程的状态;如果检车到队列为空,就等待,;

应该可以这么说,子线程先启动,出去等待信号的状态;

子线程在不断的while(true) { ........ }

线程的同步是,指有序的去访问公共变量;每次只能一个线程去访问;获取锁和是方法锁,必须是当前的同一个线程;

线程之间的互相通信,之间的协同操作,则是通过信号变量的;主线程负责释放开始的信号,子线程接受到信号之后,开始进行运动,

线程之间的通信,则用的是EventWaitHandle

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication38
{

    /// <summary>
    /// 生产者  消息  消费者;  这个三个应该是相互独立的对象;
    /// 在这里例子中,我们只有一个消费者; 后面我们可以将起改造成多个线程去消费我们的任务;
    /// 去处理其中的任务;
    /// 当队列中有的数据之后,我们要想法通知我们的消费者;
    /// 还要去看更多的实例;
    /// 看别人的各种设计想法和思路;
    /// To ensure thread safety, we used a lock to protect access to the Queue<string> collection.
    /// 
    /// 这个生成这的消费者模式;produce 和 consume 并不是同步的,也就是生产一个 消费一个
    /// 生产者由主线程不断的生成,然后加入到队列中, 消费者一个个的从队列中去取出数据,进行消费;
    /// 实现方式有多中滴呀;
    /// 线程的进入是同步的,    线程的消费也是同步的,而且只有一个线程;
    /// 如何做到多个线程异步生成  然后进入队列, 然后多个线程异步消费我们的额队列呢;
    /// </summary>

    public class ProducerConsumerQueue : IDisposable
    {

        EventWaitHandle _wh = new AutoResetEvent(false);
        Thread _worker;
        readonly object _locker = new object();     //这个锁,是用来同步读和写的;集合的读和写的;
        Queue<string> _tasks = new Queue<string>();     //一旦队列有了消息就要通知我们的线程去消费;


        /// <summary>
        /// 让线程跑起来
        /// </summary>
        public ProducerConsumerQueue()
        {
            _worker = new Thread(Work);
            _worker.Start();
        }

        /// <summary>
        /// 往线程中添加任务;一旦有了任务,就通知我们的消费者;
        /// 这里例子有很多问题滴呀;
        /// </summary>
        /// <param name="task"></param>
        public void EnqueueTask(string task)
        {
            lock (_locker) { _tasks.Enqueue(task); }
            _wh.Set();
        }

        public void Work()
        {
            while (true)  //相当于就是在不断的去轮询我们的队列中的信息;让这个线程一直处于执行的状态,应为只有一个线程,不能让它执行一完一次任务只有就停止了
            {
                string task = null;
                lock (_locker)   //这个锁是用来同步读和写的;
                {
                    if (_tasks.Count > 0) //队列中有数据,我们就从中取出数据;
                    {
                        task = _tasks.Dequeue();// 取出任务;
                        if (task == null) return;//任务为空就停止了;
                    }
                }
                if (task != null)
                {
                    //就取执行我们的任务;
                    Console.WriteLine("始终只有一个消费者:Task:{0} ThreadID{1} and now Quen length:{2}", task, Thread.CurrentThread.ManagedThreadId, _tasks.Count);
                    Thread.Sleep(1000);

                }
                else
                {
                    _wh.WaitOne(); //队列为空就等待
                   
                }

           } 
        }

        public void Dispose()
        {
            EnqueueTask(null);     // Signal the consumer to exit.
            _worker.Join();         // Wait for the consumer's thread to finish.
            _wh.Close();            // Release any OS resources.
            Console.WriteLine("对象销毁完毕...");

        }

    }


    class Program
    {

        static void Test()
        {
            //主线程在往我们的队列中添加消息。相当于就是我们的生产者; 往我们的队列中添加消息;
            using (ProducerConsumerQueue pc = new ProducerConsumerQueue())
            {
                pc.EnqueueTask("Hello");
                for (int i = 0; i < 10; i++) pc.EnqueueTask("Say " + i);
                pc.EnqueueTask("Goodbye!");
            }
        }

        static void Main(string[] args)
        {
            Test();
            Console.ReadLine();


        }
    }
}

这几种的设计思路:都是开辟一个线程,执行while(true){} 的循环,让线程一直处于run的状态,然后去poll(轮询我们的quen;) 这个去qunen

实现方式二:主线push, 三个或者多个线程pull,前提还是要将我们的子线程先run起来,然后处于等待状态,等待push进queue,然后进一个,释放一个信号,然后子线程

消费一个;

这里就是我们的示例代码:

 首先这里涉及到一个Montior.wait 和 Monitor.Pluse 的基本用法:https://www.codeproject.com/Articles/28785/Thread-synchronization-Wait-and-Pulse-demystified

这里有一个很好的栗子,俩解释我们的monitor.wait  and monitor.pulse;

    //there is a good example monitor wait  and monitor pualse;
    class Program
    {
        private static readonly object _phone = new object();

        static void DoWork()
        {

            Console.WriteLine("do some work...");
        }

        static void Worker()
        {
            lock (_phone)
            {
                while (true)
                {
                    Monitor.Wait(_phone);  // Wait for a signal from the boss
                    DoWork();
                    Monitor.PulseAll(_phone); // Signal boss we are done

                }
            }

        }

        static void Boss()
        {
            lock (_phone) // Grab the phone when I have something ready for the worker
            {
                Monitor.PulseAll(_phone); // Signal worker there is work to do
                Monitor.Wait(_phone); // Wait for the work to be done
            }
        }

        static void Main(string[] args)
        {
            Worker();
            Thread.Sleep(100); //首先得让我们的worker 处于待命的状态;
            Boss(); //然后我们的boss 上场;

        }
    }

然后我们看到上面的栗子,得让我们的worker先处于wait的状态,然后我们boss的pulse才有效的滴呀; 则就是我们的Thread.sleep(100) 的目的;那么,我们有没有更好饿办法呢;

然后........

略微复杂版本的控制;

        //这里同样可以添加一个复杂版本的call
        public void SmartWorker()
        {
            lock (_phone)
            {
                while (true)
                {
                    if (Monitor.Wait(_phone, 1000)) // Wait for one second at most
                    {
                        DoWork();
                        Monitor.PulseAll(_phone); // Signal boss we are done
                    }
                    else
                    {
                        // do something else.....
                    }
                }
            }
        }
        /// <summary>
        /// 没哟耐性的老板 窝草
        /// </summary>
        public static void Impatient_Boss()
        {
            lock (_phone)
            {
                Monitor.PulseAll(_phone); // Signal worker there is work to do
                if (Monitor.Wait(_phone, 1000)) // Wait for one second at most
                    Console.WriteLine("Good work!");
            }
        }

到这里,你可能会发现,wait 和 pulse 都和我们的同一个_locker 相关联滴呀;

中的来说,还是挺绕的一个概念;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication43
{

    //这里的qune就不再装得是消息了;
    //而是我们的action
    //使用的也是基于我们的 Monitor.wait(locker) 和 Monitor.Pulse(locker) 方法进行同步滴呀;
   //先让线程进入等待的状态,然后有信息(action) 进入队列的时候,我们就释放一个locker;然后另外一段就获取locker 进行消费;
   //直接将要执行的任务封装成了我们的action,
   //要懂得区分几个不同的概念:同步 异步  并行 并发  这个几个基本的概念;
   //这个例子是让消费者处于运动额状态,让后去poll 队列中的信息;


     public class PCQueue
    {
        Thread[] _threads;
        Queue<Action> _itemQ = new Queue<Action>();
        private readonly object _locker = new object();
        //初始化线程,让其进行执行;

        public  PCQueue(int count)
        {
            _threads = new Thread[count]; //先尽心初始化;

            for(var i = 0; i < count; i++)
            {
               ( _threads[i] = new Thread(Consume)).Start();  //初始化一定数量的工作者线程;
            }
        }

        public void ShutDown(bool waitForWorkers)
        {
            //是否等待线程执行完再关闭呢; 退出线程,不让线程一直在哪里pool 一个永远为空的 队列,我们就先进行 
            //Enqueue one null item per worker to make each exit
            foreach (var thread in _threads)
            {
                EnqueueItem(null);
                
            }
            if (waitForWorkers)
            {
                foreach (Thread thread in _threads)
                {
                    thread.Join();  //这个主要是为了堵塞主线程,让其进行等待;
                }
            }
                    
        }

        public void EnqueueItem(Action  action)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(action); //一旦进入了队列之后;
                Monitor.Pulse(_locker); //重新激活; 
            }
            
        }

        private void Consume()
        {
            //让所有的线程进行等待的状态;
            //消费的方法,就不断冲队列中取出msg 然后进行各种消费低啊;
            //一开始就要让我们的线程给跑起来;
            while (true)   
            {
                Action item;
                lock (_locker) //锁住取数据的步骤; 这里会造成堵塞; 线程序列化的访问;
                {
                    //去轮询消息队列;
                    while (_itemQ.Count == 0)
                    {
                        Monitor.Wait(_locker); //队列为空,让出locker; 等待激活后重新loop condtion  让出这个锁,造成当前线程的堵塞;
                       //堵塞当前钱线程并且暂时四方锁,一直到其他线程pulse通知 
                       //释放锁之后,还是当前线程获取锁,
                       //三个线程都处于等待状态;
                    }
                    item = _itemQ.Dequeue(); 
                }
                //取出来之后,进行读取;
                if (item == null) { return; }    // This signals our exit 让线程退租的信号;
                item();

            }
            
        }

    }
    class Program
    {
        static void Main(string[] args)
        {
            PCQueue p = new PCQueue(3); //tdd ,基于测试驱动开发,这是一种倒退的编程模式进行的开发; 尝试着进行倒退额方式进行编程;
                                        //相当于初始化三个线程,在线程池中 处于运动的状态
                                        //这里模拟,初始化三个线程去消费是个方法;?如何去停止这三个方法;

            //主线程push msg 几年对垒;
            Console.WriteLine("Enqueuing 10 items...");
            for (int i = 0; i < 10; i++)
            {
                int itemNumber = i;
                p.EnqueueItem(()=>{
                    //这里就是我们消费者,需要进行消费的各种方法;
                    Thread.Sleep(1000);    //simulate time-consuming work
                    Console.Write(" Task" + itemNumber);
                });

            }

            p.ShutDown(true);  //结束完之后,我们可以调用这个方法;
            Console.WriteLine();
            Console.WriteLine("Workers complete!");

        }
    }
}

多线程,者坛子水,很深的哦,一步小心就搞错了;

然后我们还有一种新的方法来实现这种模式;

c# 中当然给我们提供这样的集合不用我们手动的去写这么多的方法和实现的呀;下面这个栗子就是我们简易版的实现;

值得注意的一点就是我们的构造函数参数传递的问题:

ecause we didn’t pass anything into BlockingCollection’s constructor, it instantiated a concurrent queue automatically. 

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication49
{

   public class PCQueue : IDisposable
    {

        //If you call the constructor without passing in a collection, the class will automatically instantiate a ConcurrentQueue<T>
        BlockingCollection<Action> _tasks = new BlockingCollection<Action>();

        public PCQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public void EnqueueTask(Action action)
        {
            _tasks.TryAdd(action);
        }

        private void Consume()
        {
            // This sequence that we’re enumerating will block when no elements
           // are available and will end when CompleteAdding is called.
            foreach (Action action in _tasks.GetConsumingEnumerable())
            {
                action();
            }

        }

        public void Dispose()
        {

            _tasks.CompleteAdding();  //标记不再接受任何的添加
        }

    }



    class Program
    {
        static void Main(string[] args)
        {

            //模拟一个快速生成这;

            //连个慢速消费者;
            PCQueue p = new PCQueue(2);  //运行起来后,两个thread 将处于 被堵塞的状态;

            //主线程负责push

            for (int i = 0; i < 10; i++)
            {
                int value = i;
                p.EnqueueTask(() =>
                {
                    Console.WriteLine($"{value} 被消费了 ");  //队列中有消息之后,我们就能通知 将集合改为非堵塞;
                });
            }

            //Thread.Sleep(3000);

            Console.ReadLine();
           

        }
    }
}

辣么,如果我们需要如下的功能;我们又如何进行相关的完善呢;

The producer/consumer that we just wrote is inflexible in that we can’t track work items after they’ve been enqueued. It would be nice if we could:

  • Know when a work item has completed.
  • Cancel an unstarted work item.
  • Deal elegantly with any exceptions thrown by a work item

An ideal solution would be to have the EnqueueTask method return some object giving us the functionality just described.

//这几年的发展对一个人的性格和价值观形成了很大的不同;所以加油吧,骚年;

 首先,这里我们添加了任务完成的通知事件

  可以取消一个没有开始的任务项目; 

  还要我们一个异常的捕获,

具备以上的功能之后,我们才能称之为一基本完整的

这里涉及到一个TaskCompleteSource<TResult>的基本用法

这里是它的一个基本用法的介绍的呀:https://msdn.microsoft.com/en-us/library/dd449174(v=vs.110).aspx

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication50
{
    public class PCQueue : IDisposable
    {

        //封装一个workItem;里面有三个东西 TaskCompleteSource Action CancleToken
        class WorkItem
        {
            public readonly TaskCompletionSource<object> TaskSource;
            public readonly Action Action;
            public readonly CancellationToken? CancleToken;

            public WorkItem(TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancleToken)
            {
                this.TaskSource = taskSource;
                this.Action = action;
                this.CancleToken = cancleToken;
            }
        }

        //然后是我们堵塞队列;
        BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();

        //初始化消费者数量;
        public PCQueue(int workerCount)
        {
            for (int i = 0; i < workerCount; i++)
            {
                Task.Factory.StartNew(Consume);
            }
        }

        public Task EnqueueTask(Action action)
        {
            return EnqueueTask(action,null);
        }

        public Task EnqueueTask(Action action, CancellationToken? cancelToken)
        {
            //lets us later control the task that we return to the consumer  返回了我们执行的结果,这样方便我么进行控制滴啊;效果很好;
            var tcs=new TaskCompletionSource<object>();
            _taskQ.Add(new WorkItem(tcs,action, cancelToken));
            return tcs.Task; //返回的就是我们的这个Task滴呀;
        }
        
        //消费者的方法,这样我们的基本任务就算基本实现滴呀;效果还是挺好的;
        private void Consume()
        {

            foreach(WorkItem workItem in this._taskQ.GetConsumingEnumerable())
            {
                if(workItem.CancleToken.HasValue && workItem.CancleToken.Value.IsCancellationRequested)
                {
                    workItem.TaskSource.SetCanceled();
                }
                else
                {

                    //要执行的基本任务;
                    try
                    {
                        workItem.Action();
                        workItem.TaskSource.SetResult(null);   // Indicate completion

                    }
                    catch (OperationCanceledException ex)
                    {
                        if (ex.CancellationToken == workItem.CancleToken)
                        {
                            workItem.TaskSource.SetCanceled();
                        }
                        else
                        {
                            workItem.TaskSource.SetException(ex);
                        }

                    }
                    catch(Exception ex)
                    {
                        workItem.TaskSource.SetException(ex);
                    }
                }
            }

        }

        public void Dispose()
        {
            _taskQ.CompleteAdding();
        }
    }


    class Program
    {
        static void Test()
        {
            var pcQ= new PCQueue(1);
            Task task = pcQ.EnqueueTask(() =>
             {
                 Console.WriteLine("easy  done,producer and consumer.....cool task!");
                 int value = 0;
                 var result = 1 / value;

             });

            //等搞task的时候,我们再深入的理解一哈这个方法;
            Console.WriteLine(task.IsCompleted);
            Console.WriteLine(task.Status);
            Console.WriteLine(task.Exception);
            Console.WriteLine(task);
            Console.ReadLine();


        }

        static void Main(string[] args)
        {
            Test();
        }
    }
}

我们的生产者,消费者就这样一步一步的完善到这里了;效果整体来说不错滴呀;

使用我们的 BlockingCollection:

处于以下的几个原因和解决方案;

第一个:必须是线程安全的;

第二个:buffer的大小应该有一个限制;也就是提供集合的边界问题;

第三个:Producer should discard the data or enter into sleep mode when queue size is full

第四个:Consumer should go to sleep more when buffer is empty

第五个: resume its operation when items are available in queue for processing

//下面这三个集合都继承自我们的:IProducerConsumerCollection
//都可以用于我们的 生成者和消费者模式的使用;

ConcurrentQueue<Task> conQueue = new ConcurrentQueue<Task>();
ConcurrentStack<Task> conStack= new ConcurrentStack<Task>(); //我们的这几个线程安全的集合,都继承自我们的IProducerConsumerCollection
ConcurrentBag<Task> conBag = new ConcurrentBag<Task>();

接下来,我们再添加一种写法;

 这里还有一个示例:

http://www.nullskull.com/a/1464/producerconsumer-queue-and-blockingcollection-in-c-40.aspx

还要我们的异步方式去实现这个东西;等我学了dataflow 之后,我们回过头来研究这个东西;

https://blog.stephencleary.com/2012/11/async-producerconsumer-queue-using.html

当我们的吧 dataflow的方式学习了,之后,我们随后就开启了 actor的大门,然后就是我们 akka.net 还有我们

函数编程的大门,简直就是发现了我们的新大陆的感觉;

随后,我们就要使用我们的dataflow 来实现我们的消费者和生产者模式的拉;(this is simple demo)

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication54
{

    //use data flow to create producer and consumer dataFlow pattern;

    public class DataFlowProducerConsumer
    {
        public static  readonly Random random = new Random();

        /// <summary>
        /// The Produce method calls the Post method in a loop to synchronously write data to the target block
        /// </summary>
        /// <param name="target"></param>
        public static void Produce(ITargetBlock<byte[]> target)
        {

            for (int i = 0; i < 100; i++)
            {
                byte[] buffer = new byte[1024];

                //Fill the buffer with random bytes
                random.NextBytes(buffer);

                target.Post(buffer);
            }
            // After the Produce method writes all data to the target block, it calls the Complete method to indicate that the block will never have additional data available
            target.Complete(); // Post the result to the message block
        }

        //就不用你再去开启线程去干一些事儿了;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="source"></param>
        /// <returns></returns>
        public static async Task<int> ConsumeTaskAsync(ISourceBlock<byte[]> source)
        {
            int result = 0;
            //Consume method uses the async and await operators to asynchronously compute the total number of bytes that are received from the 
            //异步的去消费我们的data
            while (await source.OutputAvailableAsync().ConfigureAwait(false))  //在控制台中这么配置低呀;效果还不错滴呀;
            {

                //统一子线程来做剩下的事情;
                byte[] data = source.Receive();
                result += data.Length;
                Console.WriteLine($"thread id {Thread.CurrentThread.ManagedThreadId}");
            }

            return result;
        }
    }

    class Program
    {
        static void Main(string[] args)
        {

            //The Produce method writes arrays that contain random bytes of data to a ITargetBlock<TInput> object and the Consume method reads bytes from a ISourceBlock<TOutput> object.

            Console.WriteLine($"Main Thread id {Thread.CurrentThread.ManagedThreadId}");
            var buffer = new BufferBlock<byte[]>();

            //相当于先开启我们的消费者线程;
            var consumer = DataFlowProducerConsumer.ConsumeTaskAsync(buffer);

            DataFlowProducerConsumer.Produce(buffer);

            consumer.Wait();
            Console.WriteLine("Processed {0} bytes.", consumer.Result);

        }
    }
}

This example uses just one consumer to process the source data. If you have multiple consumers in your application, use the TryReceive method to read data from the source block, as shown in the following example. 

然后按做一点点小小的改造;效果还挺不错滴呀;

        /// <summary>
        /// the TryReceive method returns False when no data is available. 
        /// When multiple consumers must access the source block concurrently,
        /// this mechanism guarantees that data is still available 
        /// after the call to OutputAvailableAsync.
        /// </summary>
        /// <param name="source"></param>
        /// <returns></returns>
        public static async Task<int> MoreConsumeTaskAsync(IReceivableSourceBlock<byte[]> source)
        {

            int result = 0;
            while(await source.OutputAvailableAsync())
            {

                byte [] data;
                while (source.TryReceive(out data))
                {
                    result += data.Length;
                    Console.WriteLine($"thread id {Thread.CurrentThread.ManagedThreadId}");
                }
            }

            return result;

        }
原文地址:https://www.cnblogs.com/mc67/p/7561785.html