Creating a blocking Queue<T> in .NET

Creating a blocking Queue<T> in .NET

 这里的代码来自我们的StackOverflow 中的代码示例,我添加了一些中文注释;

 //there I will create blocking quene;
    class SizeQueue<T>
    {
        private readonly Queue<T> queue = new Queue<T>();
        private readonly int _maxSize;
        private readonly object _locker = new object();
        private static bool closing;

        public SizeQueue(int size)
        {
            this._maxSize = size;
        }

        public void Enquen(T item)
        {

            lock (_locker)
            {
                //这里要重点理解使用while 而不是 我们的if
                //应为当另外一个线程Pulse 之后,又可能另外一个线程获得锁,而不是当前等待的线程;
                while (this._maxSize <= this.queue.Count)  //为什么这里要用while 而不是if呢;  应为当 pulse 之后释放锁,很有可能另外一个线程活到锁; 然后进入到队列,添加,出来之后返现,队列还是慢慢的;
                {
                    Monitor.Wait(_locker); //进入等待队列;
                }
                //否则就进入我们的 队列中;
                queue.Enqueue(item);
                Monitor.PulseAll(_locker); // wake up any blocked dequeue 同时所有的线程去争夺锁,然后 取出数据;

            }

        }

        public T Eequeue()
        {
            lock (_locker)
            {
                while (queue.Count == 0)
                {
                    Monitor.Wait(queue);
                }
                T item = queue.Dequeue();
                if (queue.Count == _maxSize - 1) //队列装满了之后,
                {
                    // wake up any blocked enqueue
                    Monitor.PulseAll(queue); //通知线程继续写入;
                }
                return item;

            }
        }

       //empty queue just returns(rather than blocking):
       //锁,独占锁,确保一次只有一个线程能够操作我们的队列;
       //特别是在我们的读写的时候
       //读的时候,不允许写
       //写的时候,不允许读;

       //这里我们添加一个close 和 tryDequeue的方法;

        public void Close()
        {
            lock (_locker)
            {
                closing = true;
                Monitor.PulseAll(_locker);
            }
        }

        /// <summary>
        /// 考虑到有关系的情况;
        /// </summary>
        /// <param name="value"></param>
        /// <returns></returns>
        public bool TryDequeue(out T value)
        {
            lock (_locker)
            {

                while (queue.Count == 0)
                {
                    if (closing)
                    {
                        value = default(T);
                        return false;
                    }
                    Monitor.Wait(_locker);
                }
                value = value = queue.Dequeue();
                if (queue.Count == this._maxSize - 1)
                {
                    Monitor.PulseAll(_locker);
                }
                return true;
            }

        }

    }

当然这里还有我们的第二个版本的使用;

    class BlockingQueue<T>
    {
        /// <summary>
        ///队列最大值
        /// </summary>
        private readonly int _maxSize;

        /// <summary>
        /// 我们的队列
        /// </summary>
        private Queue<T> _Queue = new Queue<T>();

        /// <summary>
        /// 我们的独占锁
        /// </summary>
        private readonly object _locker = new object();


        /// <summary>
        /// 是否停止操作;
        /// </summary>
        private bool _Quit = false;


        /// <summary>
        /// 初始化队列的大小;
        /// </summary>
        public BlockingQueue(int maxSize)
        {
            this._maxSize = maxSize;
        }

        /// <summary>
        /// 取消操作
        /// </summary>
        public void Quit()
        {
            lock (_locker)
            {
                _Quit = true;

                Monitor.PulseAll(_locker);
            }
        }

        /// <summary>
        /// 队列的进入;
        /// </summary>
        /// <param name="t"></param>
        public bool Enqueue(T t)
        {

            lock (_locker) //确保线程的安全;
            {
                while (!this._Quit && this._Queue.Count >= this._maxSize) Monitor.Wait(_locker); //进入等待的队列中;

                if (_Quit) return false;
                //进入队列
                _Queue.Enqueue(t);

                //释放信号;
                Monitor.PulseAll(_locker);  //一旦队列中有了,就通知 去去;

            }
            return true;

        }

        /// <summary>
        ///取出队列中的值;
        /// </summary>
        /// <param name="t"></param>
        /// <returns></returns>
        public bool Dequeue(out T t)
        {
            t = default(T);

            lock (_locker)
            {
                while (!this._Quit && this._Queue.Count == 0) Monitor.Wait(_locker);

                if (_Queue.Count == 0) return false;  //这个大概是以多余的安全机制吧;

                t = this._Queue.Dequeue();

                Monitor.PulseAll(_locker);
            }
            return true;

        }
        //整体来说,这个方法还挺好使用使用的,效果不错的感觉;


        //线程的执行是没有太多额顺序的的; 如果三个线程读,两个线程写; 可能window 线程如何去协调这些线程执行呢;


    }


    class Program
    {
        //with one fast producer and two slow consumers:
        static void Test()
        {
            var q = new BlockingQueue<int>(4);
            //一个线程生成;
            new Thread(() =>
            {
                for (int x = 0; ; x++)
                {
                    if (!q.Enqueue(x)) break;
                    Trace.WriteLine(x.ToString("0000") + " >");

                }
                Trace.WriteLine("Producer quitting");
            }).Start();


            //消费者;
            // Consumers
            for (int i = 0; i < 2; i++)
            {
                new Thread(() =>
                {
                    for (;;)
                    {
                        Thread.Sleep(100);
                        int x = 0;
                        if (!q.Dequeue(out x)) break;
                        Trace.WriteLine("     < " + x.ToString("0000"));
                    }
                    Trace.WriteLine("Consumer quitting");
                }).Start();
            }

            Thread.Sleep(3000);

            Trace.WriteLine("Quitting");

            q.Quit();


        }

        static void Main(string[] args)
        {


            Test();


        }
    }

要锻炼自己写源代码的能力,所以要多看,多学习,看多额,才能写点东西出来的呀;

这样的效果还是挺好;

原文地址:https://www.cnblogs.com/mc67/p/7569004.html