Generic AsyncQueue


namespace Microshaoft
{
    using System;
    using System.Threading;
    using System.Collections.Generic;
    public class AsyncQueue<T>
                        where T : class
    {
        public delegate void QueueEventHandler(T element);
        public event QueueEventHandler OnDequeue;
        public delegate void QueueLogEventHandler(string logMessage);
        public event QueueLogEventHandler OnQueueLog;
        public delegate void ExceptionEventHandler(Exception exception);
        public event ExceptionEventHandler OnException;
        private Queue<T> _queue = new Queue<T>();
        private static object _SyncLockObject = new object();
        private int _concurrentThreadsCount = 0; //Microshaoft 用于控制并发线程数
        private volatile bool _queueRuning = false;
        private int _maxConcurrentThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1
        public int MaxConcurrentThreadsCount
        {
            set
            {
                _maxConcurrentThreadsCount = value;
            }
            get
            {
                return _maxConcurrentThreadsCount;
            }
        }
        private long _EnqueueCount = 0; //入列计数器
        public long EnqueueCount
        {
            get
            {
                return _EnqueueCount;
            }
        }
        private long _DequeueCount = 0; //出列计数器
        public long DequeueCount
        {
            get
            {
                return _DequeueCount;
            }
        }
        //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环)
        private void QueueRun() //Microshaoft ThreadStart
        {
            if (!_queueRuning)
            {
                _queueRuning = true;
                lock (_SyncLockObject)
                {
                    ThreadStart ts = new ThreadStart(QueueRunThreadProcess);
                    Thread t = new Thread(ts);
                    t.Name = "QueueRunThreadProcess";
                    t.Start();
                }
            }
        }
        public int Count
        {
            get
            {
                return _queue.Count;
            }
        }
        public int ConcurrentThreadsCount
        {
            get
            {
                return _concurrentThreadsCount;
            }
        }
        private void QueueRunThreadProcess()
        {
            if (OnQueueLog != null)
            {
                OnQueueLog
                    (
                        string.Format
                                (
                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                    , "Queue Runing Start ..."
                                    , _concurrentThreadsCount
                                    , _queue.Count
                                    , Thread.CurrentThread.Name
                                )
                    );
            }
            while (_queue.Count > 0) //Microshaoft 死循环
            {
                T element = null;
                int threadID = -1;
                lock (_SyncLockObject)
                {
                    if (_concurrentThreadsCount < _maxConcurrentThreadsCount)
                    {
                        if (_queue.Count > 0)
                        {
                            Interlocked.Increment(ref _concurrentThreadsCount);
                            threadID = _concurrentThreadsCount;
                            if (_concurrentThreadsCount >= _maxConcurrentThreadsCount)
                            {
                                if (OnQueueLog != null)
                                {
                                    OnQueueLog
                                        (
                                            string.Format
                                                    (
                                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                        , "Threads is Full!"
                                                        , _concurrentThreadsCount
                                                        , _queue.Count
                                                        , Thread.CurrentThread.Name
                                                    )
                                        );
                                }
                            }
                            if (OnQueueLog != null)
                            {
                                OnQueueLog
                                    (
                                        string.Format
                                                (
                                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                                    , "Threads ++ !"
                                                    , _concurrentThreadsCount
                                                    , _queue.Count
                                                    , Thread.CurrentThread.Name
                                                )
                                    );
                            }
                            element = _queue.Dequeue();
                        }
                    }
                }
                if (element != null)
                {
                    //Microshaoft ThreadPool.QueueUserWorkelement(new WaitCallback(OnDequeueThreadProcess), element);
                    ThreadProcessState tps = new ThreadProcessState();
                    tps.element = element;
                    tps.Sender = this;
                    Thread t = new Thread(new ThreadStart(tps.ThreadProcess));
                    t.Name = string.Format("ConcurrentThread[{0}]", threadID);
                    t.Start();
                }
            }
            _queueRuning = false;
            if (OnQueueLog != null)
            {
                OnQueueLog
                    (
                        string.Format
                            (
                                "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                , "Queue Runing Stopped!"
                                , _concurrentThreadsCount
                                , _queue.Count
                                , Thread.CurrentThread.Name
                            )
                    );
            }
        }
        public void Enqueue(T element)
        {
            try
            {
                lock (_SyncLockObject) //还算并发吗?
                {
                    _queue.Enqueue(element);
                }
                Interlocked.Increment(ref _EnqueueCount);
            }
            catch (Exception e)
            {
                if (OnException != null)
                {
                    OnException(e);
                }
            }
            if (!_queueRuning)
            {
                QueueRun();
            }
        }
        private void OnDequeueThreadProcess(T element)
        {
            try
            {
                if (OnDequeue != null)
                {
                    OnDequeue(element);
                }
                Interlocked.Increment(ref _DequeueCount);
                DequeueProcess();
            }
            catch (Exception e)
            {
                if (OnException != null)
                {
                    OnException(e);
                }
            }
            finally
            {
                Interlocked.Decrement(ref _concurrentThreadsCount);
                if (_concurrentThreadsCount == 0)
                {
                    if (OnQueueLog != null)
                    {
                        OnQueueLog
                            (
                                string.Format
                                        (
                                            "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                            , "All Threads Finished!"
                                            , _concurrentThreadsCount
                                            , _queue.Count
                                            , Thread.CurrentThread.Name
                                        )
                            );
                    }
                }
                if (OnQueueLog != null)
                {
                    OnQueueLog
                        (
                            string.Format
                                    (
                                        "{0} Threads Count {1},Queue Count {2},Current Thread: {3}"
                                        , "Threads -- !"
                                        , _concurrentThreadsCount
                                        , _queue.Count
                                        , Thread.CurrentThread.Name
                                    )
                        );
                }
            }
        }
        private void DequeueProcess()
        {
            while (_queue.Count > 0)
            {
                T element = null;
                lock (_SyncLockObject)
                {
                    if (_queue.Count > 0)
                    {
                        element = _queue.Dequeue();
                    }
                }
                if (element != null)
                {
                    if (OnDequeue != null)
                    {
                        OnDequeue(element);
                    }
                    Interlocked.Increment(ref _DequeueCount);
                }
            }
        }
        private class ThreadProcessState
        {
            private AsyncQueue<T> _sender;
            public AsyncQueue<T> Sender
            {
                get
                {
                    return _sender;
                }
                set
                {
                    _sender = value;
                }
            }
            private T _element;
            public T element
            {
                get
                {
                    return _element;
                }
                set
                {
                    _element = value;
                }
            }
            public void ThreadProcess()
            {
                _sender.OnDequeueThreadProcess(_element);
            }
        }
    }
}
namespace Test
{
    using System;
    using System.Threading;
    using Microshaoft;
    public class Class1
    {
        static AsyncQueue<Item> _queue;
        public static void Main()
        {
            Console.Title = "Client";
            Console.WriteLine(Environment.Version.ToString());
            Class1 a = new Class1();
            a.Run();
            Console.ReadLine();
        }
        public void Run()
        {
            _queue = new AsyncQueue<Item>();
            _queue.OnDequeue += new AsyncQueue<Item>.QueueEventHandler(_queue_OnDequeue);
            _queue.OnQueueLog += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
            _queue.OnException += new AsyncQueue<Item>.ExceptionEventHandler(_queue_OnException);
            _queue.MaxConcurrentThreadsCount = 200;
            Thread t = new Thread(new ThreadStart(ConsoleMonitor));
            t.Start();
            //Microshaoft 以下是耗时的主程序
            for (int i = 0; i < 1000; i++)
            {
                Thread x = new Thread(new ThreadStart(ThreadProcess));
                x.Start();
            }
        }
        public void ConsoleMonitor()
        {
            Console.WriteLine("press any key to check queue status ...");
            while (Console.ReadLine() != "q")
            {
                Console.WriteLine
                            (
                                "Queue elements: {0},Threads count: {1},{2},{3}"
                                , _queue.Count
                                , _queue.ConcurrentThreadsCount
                                , _queue.EnqueueCount
                                , _queue.DequeueCount
                            );
            }
        }
        void _queue_OnException(Exception e)
        {
            Console.ForegroundColor = ConsoleColor.Yellow;
            Console.WriteLine(e.Message);
            Console.ResetColor();
        }
        void _queue_OnQueueLog(string logMessage)
        {
///            Console.WriteLine(logMessage);
        }
        void _queue_OnDequeue(Item element)
        {
///            DateTime DequeueBeginTime = DateTime.Now;
///            DateTime DequeueEndTime = DateTime.Now;
///            Console.WriteLine
///                        (
///                            "QueueRemainCount {0}, Enqueue {1}, Dequeue {2},[{3}], End {4},[{5}],[{6}]"
///                            , _queue.Count
///                            , element.EnqueueTime
///                            , DequeueBeginTime
///                            , (DequeueBeginTime.Ticks - element.EnqueueTime.Ticks) / 10000 /1000
///                            , DequeueEndTime
///                            , (DequeueEndTime.Ticks - DequeueBeginTime.Ticks) / 10000 /1000
///                            , _queue.ConcurrentThreadsCount
///                        );
///            Console.WriteLine(element.EnqueueTime);
               Thread.Sleep(1);
        }
        public void ThreadProcess()
        {
            for (int i = 0; i < 1000; i++)
            {
                Item x = new Item();
                DateTime EnqueueTime = DateTime.Now;
                x.Name = EnqueueTime.ToString();
                x.EnqueueTime = EnqueueTime;
                _queue.Enqueue(x);
///                Console.WriteLine
///                            (
///                                "Enqueue: {0},[{1}]"
///                                , EnqueueTime
///                                , (DateTime.Now.Ticks - EnqueueTime.Ticks) / 10000 / 1000
///                            );
            }
        }
    }
}
namespace Test
{
    using System;
    public class Item
    {
        private string _Name;
        public string Name
        {
            get
            {
                return _Name;
            }
            set
            {
                _Name = value;
            }
        }
        private DateTime _EnqueueTime;
        public DateTime EnqueueTime
        {
            get
            {
                return _EnqueueTime;
            }
            set
            {
                _EnqueueTime = value;
            }
        }
    }
}

原文地址:https://www.cnblogs.com/Microshaoft/p/1285784.html