Monitor 实现阻塞队列 + 生产消费者实例

转载至 https://www.codeproject.com/Articles/28785/Thread-synchronization-Wait-and-Pulse-demystified

/*
 * 这是一个完整源代码示例,演示了此模式的多功能性。它实现了一个可以停止的阻塞队列。
 * 阻塞队列是固定大小的队列。如果队列已满,则尝试添加项目将被阻止。
 * 如果队列为空,则尝试删除项目将阻止。
 * 当Quit()被调用时,队列停止。这意味着您无法再添加任何项目,但可以删除现有项目,直到队列为空。此时,队列已完成。
 * 这是一组非常复杂的条件。您可以使用更高级别的结构组合来实现它,但它会更难。该模式使得该实现相对简单。
 * 
 */
using System;
using System.Threading;
using System.Collections.Generic;

namespace TestBlockQueue
{ 
    public class BlockingQueue<T>
    {
        readonly int _Size = 0;
        readonly Queue<T> _Queue = new Queue<T>();
        readonly object _Key = new object();
        bool _Quit = false;

        public BlockingQueue(int size)
        {
            _Size = size;
        }

        public void Quit()
        {
            lock (_Key)
            {
                _Quit = true;

                Monitor.PulseAll(_Key);
            }
        }

        public bool Enqueue(T t)
        {
            lock (_Key)
            {
                while (!_Quit && _Queue.Count >= _Size) Monitor.Wait(_Key);

                if (_Quit) return false;

                _Queue.Enqueue(t);

                Monitor.PulseAll(_Key);
            }

            return true;
        }

        public bool Dequeue(out T t)
        {
            t = default(T);

            lock (_Key)
            {
                while (!_Quit && _Queue.Count == 0) Monitor.Wait(_Key);

                if (_Queue.Count == 0) return false;

                t = _Queue.Dequeue();

                Monitor.PulseAll(_Key);
            }

            return true;
        }
    }

    /// <summary>
    /// 对于任意数量的生产者和消费者的并发访问,此实现是安全的。以下是一个快速生产者和两个慢速消费者的示例
    /// </summary>
    public class Program
    {
        public static void Main(string[] args)
        {
            var q = new BlockingQueue<int>(4);

            // Producer
            new Thread(() =>
            {
                for (int x = 0; ; x++)
                {
                    if (!q.Enqueue(x)) break;
                    Console.WriteLine(x.ToString("0000") + " >");
                }
                Console.WriteLine("Producer quitting");
            }).Start();

            // Consumers
            for (int i = 0; i < 2; i++)
            {
                new Thread(() =>
                {
                    for (; ; )
                    {
                        Thread.Sleep(100);
                        int x = 0;
                        if (!q.Dequeue(out x)) break;
                        Console.WriteLine("     < " + x.ToString("0000"));
                    }
                    Console.WriteLine("Consumer quitting");
                }).Start();
            }

            Thread.Sleep(1000);

            Console.WriteLine("Quitting");

            q.Quit();
        }
    }
}
原文地址:https://www.cnblogs.com/jshchg/p/11512571.html