[实用源码] 线程安全/竞争读写的先进先出队列

 非常怀念当年在UNIX下面曾经用过的VOCAL函数库,里面有一个FIFO类支持这样的特性:
    当队列中没有元素时,读线程将阻塞自己,直到队列有元素为止;
    当队列满了,写线程同样阻塞自己,直到有空位为止。

.NET下面好像并没有这样的数据结构,Queue虽然是线程安全,但却不能竞争读写(当读一个空队列的时候会抛出异常)。
但是很多时候却需要这样的东东,例如生产者/消费者的结构……

于是我用AutoResetEvent,Queue写了一个简单的FIFO实现,终于可以支持Vocal中的FIFO的特性了。

 1using System;
 2using System.Collections;
 3using System.Threading;
 4
 5namespace eXcel.Fifo
 6{
 7    /// <summary>
 8    /// 线程安全的先进先出队列 Fifo.cs
 9    /// eXcel Wong 2005-5-22
10    ///        支持多个读线程和多个写线程同时访问,
11    ///        当fifo为空时,读线程自动堵塞,直到fifo有数据为止,
12    ///        当fifo达到最大容量时,写线程自动堵塞,直到fofo有空余为止
13    /// </summary>

14    public class Fifo
15    {
16        
17        private System.Collections.Queue m_queue;    //队列    
18        private AutoResetEvent m_eventReadAsync;    //读线程使用的自动事件
19        private AutoResetEvent m_eventWriteAsync;    //写线程使用的自动事件
20        private int m_iCurrentCount;                //队列的当前元素数目
21        private int m_iMaxCount;                    //队列的最大元素数目
22
23        /// <summary>
24        /// 缺省构造函数,队列容量为int的最大值
25        /// </summary>

26        public Fifo()
27        {
28            m_iCurrentCount=0;
29            m_iMaxCount=int.MaxValue-1;
30            
31            m_queue=Queue.Synchronized(new Queue());
32            m_eventReadAsync = new AutoResetEvent(false);
33            m_eventWriteAsync = new AutoResetEvent(false);
34        }

35
36        /// <summary>
37        /// 构造函数,由用户指定队列容量
38        /// </summary>

39        public Fifo(int iMaxCount)
40        {
41            m_iCurrentCount=0;
42            m_iMaxCount=iMaxCount;
43            if(m_iMaxCount<1 || m_iMaxCount>=int.MaxValue)
44                m_iMaxCount=int.MaxValue-1;
45
46            m_queue=Queue.Synchronized(new Queue());
47            m_eventReadAsync = new AutoResetEvent(false);
48            m_eventWriteAsync = new AutoResetEvent(false);
49        }

50        
51        /// <summary>
52        /// 往队列插入元素
53        /// </summary>

54        public void Push(object o)
55        {
56            if(m_iCurrentCount>=m_iMaxCount) //如果已经达到队列容量
57                m_eventWriteAsync.WaitOne(); //阻塞自己,等待别的线程唤醒
58            lock(this)
59            {
60                m_queue.Enqueue(o); //插入新元素
61            }

62            if(Interlocked.Increment(ref m_iCurrentCount) >= m_iMaxCount) //如果插入队列之后,达到队列容量
63                m_eventWriteAsync.Reset(); //唤醒由于队列为空而阻塞的读线程
64            m_eventReadAsync.Set();
65        }

66
67        /// <summary>
68        /// 从队列弹出元素
69        /// </summary>

70        public object Pop()
71        {
72            object o;
73            if(m_iCurrentCount<=0)
74                m_eventReadAsync.WaitOne();
75            lock(this)
76            {
77                o=m_queue.Dequeue();
78            }

79            if (Interlocked.Decrement(ref m_iCurrentCount) <= 0)
80                m_eventReadAsync.Reset();
81            m_eventWriteAsync.Set();
82            return o;
83        }

84
85        /// <summary>
86        /// 返回当前队列的元素数目
87        /// </summary>

88        public int Count
89        {
90            get{return m_iCurrentCount;}
91        }

92    }

93
94}

95

测试程序如下:
 1using System;
 2using System.Collections;
 3using System.Threading;
 4
 5namespace eXcel.Fifo
 6{
 7    /// <summary>
 8    /// FifoTest 的摘要说明。
 9    /// </summary>

10    class FifoTest
11    {
12        /// <summary>
13        /// 应用程序的主入口点。
14        /// </summary>

15        [STAThread]
16        static void Main(string[] args)
17        {
18            FifoTest test=new FifoTest();
19            Thread t1  = new Thread(new ThreadStart(test.t1Start));
20            Thread t2  = new Thread(new ThreadStart(test.t2Start));
21            Thread t3  = new Thread(new ThreadStart(test.t3Start));
22            Thread t4  = new Thread(new ThreadStart(test.t4Start));
23            t1.Start();
24            t2.Start();
25            t3.Start();
26            t4.Start();
27        }

28
29        private Fifo fifo;    
30        
31        public FifoTest()
32        {
33            fifo=new Fifo(10);
34        }

35
36        public void t1Start()
37        {
38            int i=1;
39            while(true)
40            {
41                Console.WriteLine("Thread1 Push: {0} Current Count={1}",i,fifo.Count);
42                fifo.Push(i);
43                i++;
44                Thread.Sleep(1000);
45            }

46        }

47        public void t2Start()
48        {
49            int i=1001;
50            while(true)
51            {
52                Console.WriteLine("Thread2 Push: {0} Current Count={1}",i,fifo.Count);
53                fifo.Push(i);
54                i++;
55                Thread.Sleep(1000);
56            }

57        }

58        public void t3Start()
59        {
60            while(true)
61            {
62                Console.WriteLine("Thread3 Pop: {0} Current Count={1}",(int)fifo.Pop(),fifo.Count);
63                //Thread.Sleep(1000);
64            }

65        }

66        public void t4Start()
67        {
68            while(true)
69            {
70                Console.WriteLine("Thread4 Pop: {0} Current Count={1}",(int)fifo.Pop(),fifo.Count);
71                //Thread.Sleep(1000);
72            }

73        }

74    }

75}

76
原文地址:https://www.cnblogs.com/eXcel/p/160626.html