当队列中没有元素时,读线程将阻塞自己,直到队列有元素为止;
当队列满了,写线程同样阻塞自己,直到有空位为止。
.NET下面好像并没有这样的数据结构,Queue虽然是线程安全,但却不能竞争读写(当读一个空队列的时候会抛出异常)。
但是很多时候却需要这样的东东,例如生产者/消费者的结构……
于是我用AutoResetEvent,Queue写了一个简单的FIFO实现,终于可以支持Vocal中的FIFO的特性了。
1using System;
2using System.Collections;
3using System.Threading;
4
5namespace eXcel.Fifo
6{
7 /// <summary>
8 /// 线程安全的先进先出队列 Fifo.cs
9 /// eXcel Wong 2005-5-22
10 /// 支持多个读线程和多个写线程同时访问,
11 /// 当fifo为空时,读线程自动堵塞,直到fifo有数据为止,
12 /// 当fifo达到最大容量时,写线程自动堵塞,直到fofo有空余为止
13 /// </summary>
14 public class Fifo
15 {
16
17 private System.Collections.Queue m_queue; //队列
18 private AutoResetEvent m_eventReadAsync; //读线程使用的自动事件
19 private AutoResetEvent m_eventWriteAsync; //写线程使用的自动事件
20 private int m_iCurrentCount; //队列的当前元素数目
21 private int m_iMaxCount; //队列的最大元素数目
22
23 /// <summary>
24 /// 缺省构造函数,队列容量为int的最大值
25 /// </summary>
26 public Fifo()
27 {
28 m_iCurrentCount=0;
29 m_iMaxCount=int.MaxValue-1;
30
31 m_queue=Queue.Synchronized(new Queue());
32 m_eventReadAsync = new AutoResetEvent(false);
33 m_eventWriteAsync = new AutoResetEvent(false);
34 }
35
36 /// <summary>
37 /// 构造函数,由用户指定队列容量
38 /// </summary>
39 public Fifo(int iMaxCount)
40 {
41 m_iCurrentCount=0;
42 m_iMaxCount=iMaxCount;
43 if(m_iMaxCount<1 || m_iMaxCount>=int.MaxValue)
44 m_iMaxCount=int.MaxValue-1;
45
46 m_queue=Queue.Synchronized(new Queue());
47 m_eventReadAsync = new AutoResetEvent(false);
48 m_eventWriteAsync = new AutoResetEvent(false);
49 }
50
51 /// <summary>
52 /// 往队列插入元素
53 /// </summary>
54 public void Push(object o)
55 {
56 if(m_iCurrentCount>=m_iMaxCount) //如果已经达到队列容量
57 m_eventWriteAsync.WaitOne(); //阻塞自己,等待别的线程唤醒
58 lock(this)
59 {
60 m_queue.Enqueue(o); //插入新元素
61 }
62 if(Interlocked.Increment(ref m_iCurrentCount) >= m_iMaxCount) //如果插入队列之后,达到队列容量
63 m_eventWriteAsync.Reset(); //唤醒由于队列为空而阻塞的读线程
64 m_eventReadAsync.Set();
65 }
66
67 /// <summary>
68 /// 从队列弹出元素
69 /// </summary>
70 public object Pop()
71 {
72 object o;
73 if(m_iCurrentCount<=0)
74 m_eventReadAsync.WaitOne();
75 lock(this)
76 {
77 o=m_queue.Dequeue();
78 }
79 if (Interlocked.Decrement(ref m_iCurrentCount) <= 0)
80 m_eventReadAsync.Reset();
81 m_eventWriteAsync.Set();
82 return o;
83 }
84
85 /// <summary>
86 /// 返回当前队列的元素数目
87 /// </summary>
88 public int Count
89 {
90 get{return m_iCurrentCount;}
91 }
92 }
93
94}
95
2using System.Collections;
3using System.Threading;
4
5namespace eXcel.Fifo
6{
7 /// <summary>
8 /// 线程安全的先进先出队列 Fifo.cs
9 /// eXcel Wong 2005-5-22
10 /// 支持多个读线程和多个写线程同时访问,
11 /// 当fifo为空时,读线程自动堵塞,直到fifo有数据为止,
12 /// 当fifo达到最大容量时,写线程自动堵塞,直到fofo有空余为止
13 /// </summary>
14 public class Fifo
15 {
16
17 private System.Collections.Queue m_queue; //队列
18 private AutoResetEvent m_eventReadAsync; //读线程使用的自动事件
19 private AutoResetEvent m_eventWriteAsync; //写线程使用的自动事件
20 private int m_iCurrentCount; //队列的当前元素数目
21 private int m_iMaxCount; //队列的最大元素数目
22
23 /// <summary>
24 /// 缺省构造函数,队列容量为int的最大值
25 /// </summary>
26 public Fifo()
27 {
28 m_iCurrentCount=0;
29 m_iMaxCount=int.MaxValue-1;
30
31 m_queue=Queue.Synchronized(new Queue());
32 m_eventReadAsync = new AutoResetEvent(false);
33 m_eventWriteAsync = new AutoResetEvent(false);
34 }
35
36 /// <summary>
37 /// 构造函数,由用户指定队列容量
38 /// </summary>
39 public Fifo(int iMaxCount)
40 {
41 m_iCurrentCount=0;
42 m_iMaxCount=iMaxCount;
43 if(m_iMaxCount<1 || m_iMaxCount>=int.MaxValue)
44 m_iMaxCount=int.MaxValue-1;
45
46 m_queue=Queue.Synchronized(new Queue());
47 m_eventReadAsync = new AutoResetEvent(false);
48 m_eventWriteAsync = new AutoResetEvent(false);
49 }
50
51 /// <summary>
52 /// 往队列插入元素
53 /// </summary>
54 public void Push(object o)
55 {
56 if(m_iCurrentCount>=m_iMaxCount) //如果已经达到队列容量
57 m_eventWriteAsync.WaitOne(); //阻塞自己,等待别的线程唤醒
58 lock(this)
59 {
60 m_queue.Enqueue(o); //插入新元素
61 }
62 if(Interlocked.Increment(ref m_iCurrentCount) >= m_iMaxCount) //如果插入队列之后,达到队列容量
63 m_eventWriteAsync.Reset(); //唤醒由于队列为空而阻塞的读线程
64 m_eventReadAsync.Set();
65 }
66
67 /// <summary>
68 /// 从队列弹出元素
69 /// </summary>
70 public object Pop()
71 {
72 object o;
73 if(m_iCurrentCount<=0)
74 m_eventReadAsync.WaitOne();
75 lock(this)
76 {
77 o=m_queue.Dequeue();
78 }
79 if (Interlocked.Decrement(ref m_iCurrentCount) <= 0)
80 m_eventReadAsync.Reset();
81 m_eventWriteAsync.Set();
82 return o;
83 }
84
85 /// <summary>
86 /// 返回当前队列的元素数目
87 /// </summary>
88 public int Count
89 {
90 get{return m_iCurrentCount;}
91 }
92 }
93
94}
95
测试程序如下:
1using System;
2using System.Collections;
3using System.Threading;
4
5namespace eXcel.Fifo
6{
7 /// <summary>
8 /// FifoTest 的摘要说明。
9 /// </summary>
10 class FifoTest
11 {
12 /// <summary>
13 /// 应用程序的主入口点。
14 /// </summary>
15 [STAThread]
16 static void Main(string[] args)
17 {
18 FifoTest test=new FifoTest();
19 Thread t1 = new Thread(new ThreadStart(test.t1Start));
20 Thread t2 = new Thread(new ThreadStart(test.t2Start));
21 Thread t3 = new Thread(new ThreadStart(test.t3Start));
22 Thread t4 = new Thread(new ThreadStart(test.t4Start));
23 t1.Start();
24 t2.Start();
25 t3.Start();
26 t4.Start();
27 }
28
29 private Fifo fifo;
30
31 public FifoTest()
32 {
33 fifo=new Fifo(10);
34 }
35
36 public void t1Start()
37 {
38 int i=1;
39 while(true)
40 {
41 Console.WriteLine("Thread1 Push: {0} Current Count={1}",i,fifo.Count);
42 fifo.Push(i);
43 i++;
44 Thread.Sleep(1000);
45 }
46 }
47 public void t2Start()
48 {
49 int i=1001;
50 while(true)
51 {
52 Console.WriteLine("Thread2 Push: {0} Current Count={1}",i,fifo.Count);
53 fifo.Push(i);
54 i++;
55 Thread.Sleep(1000);
56 }
57 }
58 public void t3Start()
59 {
60 while(true)
61 {
62 Console.WriteLine("Thread3 Pop: {0} Current Count={1}",(int)fifo.Pop(),fifo.Count);
63 //Thread.Sleep(1000);
64 }
65 }
66 public void t4Start()
67 {
68 while(true)
69 {
70 Console.WriteLine("Thread4 Pop: {0} Current Count={1}",(int)fifo.Pop(),fifo.Count);
71 //Thread.Sleep(1000);
72 }
73 }
74 }
75}
76
2using System.Collections;
3using System.Threading;
4
5namespace eXcel.Fifo
6{
7 /// <summary>
8 /// FifoTest 的摘要说明。
9 /// </summary>
10 class FifoTest
11 {
12 /// <summary>
13 /// 应用程序的主入口点。
14 /// </summary>
15 [STAThread]
16 static void Main(string[] args)
17 {
18 FifoTest test=new FifoTest();
19 Thread t1 = new Thread(new ThreadStart(test.t1Start));
20 Thread t2 = new Thread(new ThreadStart(test.t2Start));
21 Thread t3 = new Thread(new ThreadStart(test.t3Start));
22 Thread t4 = new Thread(new ThreadStart(test.t4Start));
23 t1.Start();
24 t2.Start();
25 t3.Start();
26 t4.Start();
27 }
28
29 private Fifo fifo;
30
31 public FifoTest()
32 {
33 fifo=new Fifo(10);
34 }
35
36 public void t1Start()
37 {
38 int i=1;
39 while(true)
40 {
41 Console.WriteLine("Thread1 Push: {0} Current Count={1}",i,fifo.Count);
42 fifo.Push(i);
43 i++;
44 Thread.Sleep(1000);
45 }
46 }
47 public void t2Start()
48 {
49 int i=1001;
50 while(true)
51 {
52 Console.WriteLine("Thread2 Push: {0} Current Count={1}",i,fifo.Count);
53 fifo.Push(i);
54 i++;
55 Thread.Sleep(1000);
56 }
57 }
58 public void t3Start()
59 {
60 while(true)
61 {
62 Console.WriteLine("Thread3 Pop: {0} Current Count={1}",(int)fifo.Pop(),fifo.Count);
63 //Thread.Sleep(1000);
64 }
65 }
66 public void t4Start()
67 {
68 while(true)
69 {
70 Console.WriteLine("Thread4 Pop: {0} Current Count={1}",(int)fifo.Pop(),fifo.Count);
71 //Thread.Sleep(1000);
72 }
73 }
74 }
75}
76