.Net 4.0 Remoting ConcurrentQueue


/*
Remoting 异步队列实现,流程如下
1.并发若干客户端程序通过调用 Remoting CocurrentQueue Server 提供的公开远程方法 Enqueue 将数据元素入队尾
2.Remoting CocurrentQueue Server 发现队列不为空,则并发若干线程陆续 Dequeue 队首数据元素并处理
注意:
1.队列的数据元素定义需自行实现
2.对出列数据元素的处理程序需自行实现
不受 Remoting 生命周期租约过期的约束
*/
//AsyncQueue.cs
#define c4 //C# 4.0+
//#define c2
namespace Microshaoft
{
    using System;
    using System.Threading;
    using System.Diagnostics;
    using System.Collections.Generic;
#if c4
    using System.Collections.Concurrent;
#endif
    using Microshaoft;
    public class AsyncQueue<T>
                        where T : class
    {
        public delegate void QueueEventHandler(T element);
        public event QueueEventHandler OnDequeue;
        public delegate void QueueLogEventHandler(string logMessage);
        //public event QueueLogEventHandler OnQueueLog;
        public event QueueLogEventHandler OnQueueRunningThreadStart;
        public event QueueLogEventHandler OnQueueRunningThreadEnd;
        public event QueueLogEventHandler OnDequeueThreadStart;
        public event QueueLogEventHandler OnDequeueThreadEnd;
        public event QueueLogEventHandler OnDequeueAllThreadsEnd;
        public delegate void ExceptionEventHandler(Exception exception);
        public event ExceptionEventHandler OnException;
#if c2
        private Queue<T> _queue = new Queue<T>();
#elif c4
        private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
#endif
        private object _syncQueueLockObject = new object();
        //private object _syncQueueRunningLockObject = new object();
        private long _isQueueRunning = 0;
        private long _concurrentDequeueThreadsCount = 0; //Microshaoft 用于控制并发线程数
        private PerformanceCounter _enqueuePerformanceCounter;
        private PerformanceCounter _dequeuePerformanceCounter;
        private PerformanceCounter _dequeueProcessedPerformanceCounter;
        private PerformanceCounter _queueLengthPerformanceCounter;
        private PerformanceCounter _dequeueThreadStartPerformanceCounter;
        private PerformanceCounter _dequeueThreadEndPerformanceCounter;
        private PerformanceCounter _dequeueThreadsCountPerformanceCounter;
        private PerformanceCounter _queueRunningThreadStartPerformanceCounter;
        private PerformanceCounter _queueRunningThreadEndPerformanceCounter;
        private PerformanceCounter _queueRunningThreadsCountPerformanceCounter;
        private bool _isAttachedPerformanceCounters = false;
        public void AttachPerformanceCounters(string instanceNamePrefix)
        {
            string category = "Microshaoft AsyncConurrentQueue Counters";
            string counter = string.Empty;
            Process process = Process.GetCurrentProcess();
            //int processID = 0;//process.Id;
            string processName = process.ProcessName;
            //string processStartTime = "";//process.StartTime;
            string instanceName = string.Empty;
            instanceName = string.Format
                                    (
                                        "{0}-{1}"
                                        , instanceNamePrefix
                                        , processName
                                        //, processID
                                        //, processStartTime.ToString("yyyy-MM-dd HH:mm:ss.fff")
                                    );
            CounterCreationDataCollection ccdc = new CounterCreationDataCollection();
            if (PerformanceCounterCategory.Exists(category))
            {
                PerformanceCounterCategory.Delete(category);
            }
            CounterCreationData ccd = null;
            counter = "EnqueueCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "DequeueCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "QueueLengthCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "DequeueProcessedCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "DequeueThreadStartCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "DequeueThreadEndCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "DequeueThreadsCountCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "QueueRunningThreadStartCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "QueueRunningThreadEndCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            counter = "QueueRunningThreadsCountCounter";
            ccd = PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64);
            ccdc.Add(PerformanceCounterHelper.GetCounterCreationData(counter, PerformanceCounterType.NumberOfItems64));
            PerformanceCounterCategory.Create
                                            (
                                                category,
                                                string.Format("{0} Category Help.", category),
                                                PerformanceCounterCategoryType.MultiInstance,
                                                ccdc
                                            );
            counter = "EnqueueCounter";
            _enqueuePerformanceCounter = new PerformanceCounter();
            _enqueuePerformanceCounter.CategoryName = category;
            _enqueuePerformanceCounter.CounterName = counter;
            _enqueuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _enqueuePerformanceCounter.InstanceName = instanceName;
            _enqueuePerformanceCounter.ReadOnly = false;
            _enqueuePerformanceCounter.RawValue = 0;
            counter = "DequeueCounter";
            _dequeuePerformanceCounter = new PerformanceCounter();
            _dequeuePerformanceCounter.CategoryName = category;
            _dequeuePerformanceCounter.CounterName = counter;
            _dequeuePerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _dequeuePerformanceCounter.InstanceName = instanceName;
            _dequeuePerformanceCounter.ReadOnly = false;
            _dequeuePerformanceCounter.RawValue = 0;
            counter = "DequeueProcessedCounter";
            _dequeueProcessedPerformanceCounter = new PerformanceCounter();
            _dequeueProcessedPerformanceCounter.CategoryName = category;
            _dequeueProcessedPerformanceCounter.CounterName = counter;
            _dequeueProcessedPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _dequeueProcessedPerformanceCounter.InstanceName = instanceName;
            _dequeueProcessedPerformanceCounter.ReadOnly = false;
            _dequeueProcessedPerformanceCounter.RawValue = 0;
            counter = "QueueLengthCounter";
            _queueLengthPerformanceCounter = new PerformanceCounter();
            _queueLengthPerformanceCounter.CategoryName = category;
            _queueLengthPerformanceCounter.CounterName = counter;
            _queueLengthPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _queueLengthPerformanceCounter.InstanceName = instanceName;
            _queueLengthPerformanceCounter.ReadOnly = false;
            _queueLengthPerformanceCounter.RawValue = 0;
            counter = "DequeueThreadStartCounter";
            _dequeueThreadStartPerformanceCounter = new PerformanceCounter();
            _dequeueThreadStartPerformanceCounter.CategoryName = category;
            _dequeueThreadStartPerformanceCounter.CounterName = counter;
            _dequeueThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _dequeueThreadStartPerformanceCounter.InstanceName = instanceName;
            _dequeueThreadStartPerformanceCounter.ReadOnly = false;
            _dequeueThreadStartPerformanceCounter.RawValue = 0;
            counter = "DequeueThreadEndCounter";
            _dequeueThreadEndPerformanceCounter = new PerformanceCounter();
            _dequeueThreadEndPerformanceCounter.CategoryName = category;
            _dequeueThreadEndPerformanceCounter.CounterName = counter;
            _dequeueThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _dequeueThreadEndPerformanceCounter.InstanceName = instanceName;
            _dequeueThreadEndPerformanceCounter.ReadOnly = false;
            _dequeueThreadEndPerformanceCounter.RawValue = 0;
            counter = "DequeueThreadsCountCounter";
            _dequeueThreadsCountPerformanceCounter = new PerformanceCounter();
            _dequeueThreadsCountPerformanceCounter.CategoryName = category;
            _dequeueThreadsCountPerformanceCounter.CounterName = counter;
            _dequeueThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _dequeueThreadsCountPerformanceCounter.InstanceName = instanceName;
            _dequeueThreadsCountPerformanceCounter.ReadOnly = false;
            _dequeueThreadsCountPerformanceCounter.RawValue = 0;
            counter = "QueueRunningThreadStartCounter";
            _queueRunningThreadStartPerformanceCounter = new PerformanceCounter();
            _queueRunningThreadStartPerformanceCounter.CategoryName = category;
            _queueRunningThreadStartPerformanceCounter.CounterName = counter;
            _queueRunningThreadStartPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _queueRunningThreadStartPerformanceCounter.InstanceName = instanceName;
            _queueRunningThreadStartPerformanceCounter.ReadOnly = false;
            _queueRunningThreadStartPerformanceCounter.RawValue = 0;
            counter = "QueueRunningThreadEndCounter";
            _queueRunningThreadEndPerformanceCounter = new PerformanceCounter();
            _queueRunningThreadEndPerformanceCounter.CategoryName = category;
            _queueRunningThreadEndPerformanceCounter.CounterName = counter;
            _queueRunningThreadEndPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _queueRunningThreadEndPerformanceCounter.InstanceName = instanceName;
            _queueRunningThreadEndPerformanceCounter.ReadOnly = false;
            _queueRunningThreadEndPerformanceCounter.RawValue = 0;
            counter = "QueueRunningThreadsCountCounter";
            _queueRunningThreadsCountPerformanceCounter = new PerformanceCounter();
            _queueRunningThreadsCountPerformanceCounter.CategoryName = category;
            _queueRunningThreadsCountPerformanceCounter.CounterName = counter;
            _queueRunningThreadsCountPerformanceCounter.InstanceLifetime = PerformanceCounterInstanceLifetime.Process;
            _queueRunningThreadsCountPerformanceCounter.InstanceName = instanceName;
            _queueRunningThreadsCountPerformanceCounter.ReadOnly = false;
            _queueRunningThreadsCountPerformanceCounter.RawValue = 0;
            _isAttachedPerformanceCounters = true;
        }
        private int _maxConcurrentThreadsCount = 1; //Microshaoft 允许并发出列处理线程数为 1
        public int MaxConcurrentThreadsCount
        {
            set
            {
                _maxConcurrentThreadsCount = value;
            }
            get
            {
                return _maxConcurrentThreadsCount;
            }
        }
        //Microshaoft 服务启动后可立即开启新的线程调用此方法(死循环)
        private void QueueRun() //Microshaoft ThreadStart
        {
            if (Interlocked.Read(ref _concurrentDequeueThreadsCount) < _maxConcurrentThreadsCount)
            {
                if (Interlocked.CompareExchange(ref _isQueueRunning, 0, 1) == 0)
                {
                    ThreadStart ts = new ThreadStart(QueueRunThreadProcess);
                    Thread t = new Thread(ts);
                    t.Name = "QueueRunningThreadProcess";
                    t.Start();
                }
            }
        }
        public int Count
        {
            get
            {
                return _queue.Count;
            }
        }
        public long ConcurrentThreadsCount
        {
            get
            {
                return _concurrentDequeueThreadsCount;
            }
        }
        private void QueueRunThreadProcess()
        {
            if (_isAttachedPerformanceCounters)
            {
                _queueRunningThreadStartPerformanceCounter.Increment();
                _queueRunningThreadsCountPerformanceCounter.Increment();
            }
            if (OnQueueRunningThreadStart != null)
            {
                OnQueueRunningThreadStart
                    (
                        string.Format
                                (
                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3}({4}) at {5}"
                                    , "Queue Running Start ..."
                                    , _concurrentDequeueThreadsCount
                                    , _queue.Count
                                    , Thread.CurrentThread.Name
                                    , Thread.CurrentThread.ManagedThreadId
                                    , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                )
                    );
            }
#if c2
            while ((_queue.Count > 0)) //Microshaoft 死循环
#elif c4
            while (!_queue.IsEmpty) //Microshaoft 死循环
#endif
            {
                int threadID = -1;
                {
                    int r = (int) Interlocked.Read(ref _concurrentDequeueThreadsCount);
                    if (r < _maxConcurrentThreadsCount)
                    {
                        //if (_queue.Count > 0)
                        {
                            r = (int) Interlocked.Increment(ref _concurrentDequeueThreadsCount);
                            threadID = (int) _concurrentDequeueThreadsCount;
                            //ThreadProcessState tps = new ThreadProcessState();
                            //tps.element = element;
                            //tps.Sender = this;
                            Thread t = new Thread(new ThreadStart(DequeueThreadProcess));
                            t.Name = string.Format("ConcurrentDequeueProcessThread[{0}]", threadID);
                            t.Start();
                        }
///                        else
///                        {
///                            break;
///                        }
                    }
                    else
                    {
                        break;
                    }
                }
            }
            //Interlocked.CompareExchange(ref _queueRuning, 0, 1);
            if (OnQueueRunningThreadEnd != null)
            {
                int r = (int) Interlocked.Read(ref _concurrentDequeueThreadsCount);
                OnQueueRunningThreadEnd
                            (
                                string.Format
                                        (
                                            "{0} Threads Count {1}, Queue Count {2}, Current Thread: {3}({4}) at {5}"
                                            , "Queue Running Stop ..."
                                            , r
                                            , _queue.Count
                                            , Thread.CurrentThread.Name
                                            , Thread.CurrentThread.ManagedThreadId
                                            , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                        )
                            );
            }
            if (_isAttachedPerformanceCounters)
            {
                _queueRunningThreadEndPerformanceCounter.Increment();
                _queueRunningThreadsCountPerformanceCounter.Decrement();
            }
            Interlocked.Exchange(ref _isQueueRunning, 0);
        }
        public void Enqueue(T element)
        {
            try
            {
#if c2
                lock (_syncQueueLockObject) //还算并发吗?
#endif
                {
                    _queue.Enqueue(element);
                }
                if (_isAttachedPerformanceCounters)
                {
                    _enqueuePerformanceCounter.Increment();
                    _queueLengthPerformanceCounter.Increment();
                }
            }
            catch (Exception e)
            {
                if (OnException != null)
                {
                    OnException(e);
                }
            }
            //int r = Interlocked.CompareExchange(ref _queueRuning, 1, 0))
            //if (r == 1)
            //{
            QueueRun();
            //}
        }
        private void DequeueThreadProcess()
        {
            if (_isAttachedPerformanceCounters)
            {
                _dequeueThreadStartPerformanceCounter.Increment();
                _dequeueThreadsCountPerformanceCounter.Increment();
            }
            if (OnDequeueThreadStart != null)
            {
                int r = (int) Interlocked.Read(ref _concurrentDequeueThreadsCount);
                OnDequeueThreadStart
                                (
                                    string.Format
                                            (
                                                "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                , "Threads ++ !"
                                                , r
                                                , _queue.Count
                                                , Thread.CurrentThread.Name
                                                , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                            )
                                );
            }
            bool queueWasNotEmpty = false;
            try
            {
#if c2
                while (true)
#elif c4
                while (!_queue.IsEmpty)
#endif
                {
                    T element = null;
#if c2
                    lock (_syncQueueLockObject)
                    {
                        if (_queue.Count > 0)
                        {
                            element = _queue.Dequeue();
                        }
                        else
                        {
                            //避免QueueRun 死循环
                            break;
                        }
                    }
#elif c4
                    if (_queue.TryDequeue(out element))
                    {
#elif c2
                        if (element != null)
                        {
#endif
                            if (!queueWasNotEmpty)
                            {
                                queueWasNotEmpty = true;
                            }
                            if (_isAttachedPerformanceCounters)
                            {
                                _dequeuePerformanceCounter.Increment();
                                _queueLengthPerformanceCounter.Decrement();
                            }
                            if (OnDequeue != null)
                            {
                                OnDequeue(element);
                            }
                            if (_isAttachedPerformanceCounters)
                            {
                                _dequeueProcessedPerformanceCounter.Increment();
                            }
#if c2
                        }
#elif c4
                    }
                }
#endif
            }
            catch (Exception e)
            {
                if (OnException != null)
                {
                    OnException(e);
                }
            }
            finally
            {
                int r = (int) Interlocked.Decrement(ref _concurrentDequeueThreadsCount);
                if (OnDequeueThreadEnd != null)
                {
                    OnDequeueThreadEnd
                                (
                                    string.Format
                                            (
                                                "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                , "Threads--"
                                                , r
                                                , _queue.Count
                                                , Thread.CurrentThread.Name
                                                , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                            )
                                );
                }
                if (r == 0)
                {
                    if (OnDequeueAllThreadsEnd != null)
                    {
                        OnDequeueAllThreadsEnd
                                    (
                                        string.Format
                                                (
                                                    "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}"
                                                    , "All Threads End"
                                                    , r
                                                    , _queue.Count
                                                    , Thread.CurrentThread.Name
                                                    , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff")
                                                )
                                    );
                    }
                }
                if (_isAttachedPerformanceCounters)
                {
                    _dequeueThreadEndPerformanceCounter.Increment();
                    _dequeueThreadsCountPerformanceCounter.Decrement();
                }
                if (queueWasNotEmpty)
                {
                    QueueRun(); //死循环???
                }
                
            }
        }
    }
}
namespace Microshaoft
{
    using System;
    using System.Diagnostics;
    public static class PerformanceCounterHelper
    {
        public static CounterCreationData GetCounterCreationData(string counterName, PerformanceCounterType performanceCounterType)
        {
            CounterCreationData ccd = new CounterCreationData();
            ccd.CounterName = counterName;
            ccd.CounterHelp = string.Format("{0} Help", counterName);
            ccd.CounterType = performanceCounterType;
            return ccd;
        }
    }
}
//Server.cs
namespace Microshaoft
{
    using System;
    using System.Threading;
    using System.Data;
    using System.Data.SqlClient;
    using Microshaoft.RemotingObjects.Share;
    public class AsyncQueueProcessor
    {
        private AsyncQueue<Item> _queue;
        public AsyncQueue<Item> Queue
        {
            get
            {
                return _queue;
            }
        }
        public AsyncQueueProcessor(AsyncQueue<Item> queue)
        {
            _queue = queue;
            _queue.OnDequeue += new AsyncQueue<Item>.QueueEventHandler(_queue_OnDequeue);
            //_queue.OnDequeueThreadStart += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
            _queue.OnDequeueAllThreadsEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
            _queue.OnDequeueThreadEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
            //_queue.OnQueueRunningThreadStart += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
            //_queue.OnQueueRunningThreadEnd += new AsyncQueue<Item>.QueueLogEventHandler(_queue_OnQueueLog);
            _queue.OnException += new AsyncQueue<Item>.ExceptionEventHandler(_queue_OnException);
            _queue.MaxConcurrentThreadsCount = 64;
        }
        public void Enqueue(Item item)
        {
            _queue.Enqueue(item);
        }
        void _queue_OnQueueLog(string logMessage)
        {
            Console.WriteLine(logMessage);
        }
        void _queue_OnDequeue(Item item)
        {
            DateTime DequeueBeginTime = DateTime.Now;
///            SqlConnection connection = null;
///            try
///            {
///                connection = new SqlConnection(item.ConnectionString);
///                SqlCommand command = new SqlCommand(item.SqlCommandText, connection);
///                command.CommandType = CommandType.Text;
///                connection.Open();
///                command.ExecuteNonQuery();
///            }
///            catch (Exception e)
///            {
///                Console.WriteLine("Exception on Dequeue Process:{0}{1}", "\r\n", e.ToString());
///            }
///            finally
///            {
///                connection.Close();
///                connection.Dispose();
///                connection = null;
///            }
            DateTime DequeueEndTime = DateTime.Now;
            Console.WriteLine
                        (
                            "QueueRemainCount {0}, Enqueue {1}, Dequeue {2},[{3}], End {4},[{5}],[{6}]"
                            , _queue.Count
                            , item.EnqueueTime
                            , DequeueBeginTime
                            , (DequeueBeginTime.Ticks - item.EnqueueTime.Ticks) / 10000 / 1000
                            , DequeueEndTime
                            , (DequeueEndTime.Ticks - DequeueBeginTime.Ticks) / 10000 / 1000
                            , _queue.ConcurrentThreadsCount
                        );
        }
        void _queue_OnException(Exception e)
        {
            Console.WriteLine(e.ToString());
        }
    }
}
namespace Microshaoft.RemotingObjects.Server
{
    using System;
    using System.Threading;
    using System.Collections;
    using System.Runtime.Remoting;
    using System.Runtime.Remoting.Channels;
    using System.Runtime.Remoting.Channels.Tcp;
    using System.Runtime.Serialization.Formatters;
    using System.ServiceProcess;
    using System.ComponentModel;
    using System.Configuration.Install;
    using System.Security.Principal;
    using Microshaoft.RemotingObjects;
    using Microshaoft.RemotingObjects.Share;
    using Microshaoft.Win32;
    using Microshaoft;
    //using eHome.UserProfile;
    public class ServiceHost : ServiceBase
    {
        ///// <summary>
        /// 应用程序的主入口点。
        /// </summary>
        //[STAThread]
        public static readonly string serviceName = "RemotingAsyncConcurrentQueueService";
        private static AsyncQueueProcessor _asyncQueueProcessor;
        public static AsyncQueueProcessor AsyncQueueProcessor
        {
            get
            {
                return _asyncQueueProcessor;
            }
        }
        static void Main(string[] args)
        {
            //Microshaoft
            //Microshaoft TODO: 在此处添加代码以启动应用程序
            //Microshaoft 
            ServiceHost service = new ServiceHost();
            int l = 0;
            bool needFreeConsole = false;
            if (args != null)
            {
                l = args.Length;
            }
            if (l > 0)
            {
                if (args[0].ToLower() == "/console")
                {
                    needFreeConsole = true;
                    NativeMethods.AllocConsole();
                    Console.Title = "Server ...";
                    Console.WriteLine("Alloc Console ...");
                    Console.WriteLine("Current User Identity: {0}", WindowsIdentity.GetCurrent().Name);
                    Console.WriteLine(".Net Framework version: {0}", Environment.Version.ToString());
                    Console.Title = "Server"; //不能以服务运行
                    Console.WriteLine("Console");
                    service.OnStart(null);
                    Console.ReadLine();
                    return;
                }
            }
            Console.WriteLine("Service");
            ServiceBase.Run(service);
            if (needFreeConsole)
            {
                Console.WriteLine("Free Console ...");
                NativeMethods.FreeConsole();
            }
        }
        public ServiceHost()
        {
            CanPauseAndContinue = true;
            ServiceName = ServiceHost.serviceName;
        }
        protected override void OnStart(string[] args)
        {
            Console.WriteLine(Environment.Version.ToString());
            AsyncQueue<Item> queue = new AsyncQueue<Item>();
            queue.AttachPerformanceCounters("Q1");
            _asyncQueueProcessor = new AsyncQueueProcessor(queue);
            RemotingHelper.StartRemoting<RemotingAsyncQueue>
                                (
                                    "queueurl"
                                    , 8080
                                );
            Console.WriteLine("Server . , Press Enter key to exit.");
        }
    }
    [RunInstallerAttribute(true)]
    public class ProjectInstaller : Installer
    {
        private ServiceInstaller serviceInstaller;
        private ServiceProcessInstaller processInstaller;
        public ProjectInstaller()
        {
            processInstaller = new ServiceProcessInstaller();
            serviceInstaller = new ServiceInstaller();
            // Service will run under system account
            processInstaller.Account = ServiceAccount.LocalSystem;
            // Service will have Start Type of Manual
            serviceInstaller.StartType = ServiceStartMode.Manual;
            serviceInstaller.ServiceName = ServiceHost.serviceName;
            Installers.Add(serviceInstaller);
            Installers.Add(processInstaller);
        }
    }
}
namespace Microshaoft.Win32
{
    using System.Runtime.InteropServices;
    public class NativeMethods
    {
        /// <summary>
        /// 启动控制台
        /// </summary>
        /// <returns></returns>
        [DllImport("kernel32.dll")]
        public static extern bool AllocConsole();
        /// <summary>
        /// 释放控制台
        /// </summary>
        /// <returns></returns>
        [DllImport("kernel32.dll")]
        public static extern bool FreeConsole();
    }
}
namespace Microshaoft.RemotingObjects
{
    using System;
    using System.IO;
    using System.Net;
    using System.Web;
    using System.Text;
    using System.Threading;
    using System.Configuration;
    using System.Collections.Generic;
    using Microshaoft;
    using Microshaoft.RemotingObjects.Share;
    using Microshaoft.RemotingObjects.Server;
    public class RemotingAsyncQueue : MarshalByRefObject
    {
        public void Enqueue(Item item)
        {
            //Microshaoft 队列的数据元素定义需自行实现 Item
            ServiceHost.AsyncQueueProcessor.Enqueue(item);
        }
    }
}
//=============================================================
//===============================================
// Share.cs
//Server、Client 均需引用此 share.dll
//C:\WINDOWS\Microsoft.NET\Framework\v2.0.50727\csc.exe /t:library share.cs
//TO DO
//队列的数据元素定义需自行实现,示例如下:
namespace Microshaoft.RemotingObjects.Share
{
    using System;
    [Serializable]
    public class Item
    {
        private DateTime _EnqueueTime;
        public DateTime EnqueueTime
        {
            get
            {
                return _EnqueueTime;
            }
            set
            {
                _EnqueueTime = value; 
            }
        }
        private string _sql;
        public string SqlCommandText
        {
            get
            {
                return _sql;
            }
            set
            {
                _sql = value; 
            }
        }
        private string _connectionString;
        public string ConnectionString
        {
            get
            {
                return _connectionString;
            }
            set
            {
                _connectionString = value; 
            }
        }
    }
}
// remoting helper
//Share.cs
namespace Microshaoft
{
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Runtime.Remoting;
    using System.Runtime.Remoting.Channels;
    using System.Runtime.Remoting.Channels.Tcp;
    using System.Runtime.Serialization.Formatters;
    using System.Text;
    public static class RemotingHelper
    {
            public static void StartRemoting
                        (
                            Type RemotingType
                            , string Url
                            , int Port
                        )
            {
                BinaryServerFormatterSinkProvider provider = new BinaryServerFormatterSinkProvider();
                provider.TypeFilterLevel = TypeFilterLevel.Full;
                IDictionary ht = new Hashtable();
                ht["port"] = Port;
                TcpChannel tc = new TcpChannel(ht, null, provider);
                ChannelServices.RegisterChannel(tc, false);
                RemotingConfiguration.RegisterWellKnownServiceType(RemotingType, Url, WellKnownObjectMode.Singleton);
                Console.WriteLine("Remoting Object Started ...");
            }
            public static void StartRemoting<T>
                                        (
                                            string Url
                                            , int Port
                                        )
            {
                StartRemoting(typeof(T), Url, Port);
            }
            public static T GetRemotingLocalClientProxyObject<T>
                        (
                            string Url
                        )
            {
                return (T) Activator.GetObject
                                        (
                                            typeof(T)
                                            , Url
                                            //, "tcp://127.0.0.1:8080/queueUrl"
                                        );
            }
    }
}
//============================================================================
//============================================
// Client.cs
//C:\WINDOWS\Microsoft.NET\Framework\v1.1.4322\csc.exe client.cs /r:share.dll
namespace Microshaoft.RemotingObjects.Client
{
    using System;
    using System.Collections;
    using System.Runtime.Remoting;
    using System.Runtime.Remoting.Channels;
    using System.Runtime.Remoting.Channels.Tcp;
    using System.Runtime.Serialization.Formatters;
    using System.Threading;
    using System.Data;
    using System.Data.SqlClient;
    using Microshaoft.RemotingObjects;
    using Microshaoft.RemotingObjects.Share;
    public class Class1
    {
        static RemotingAsyncQueue _queue;
        public static void Main()
        {
            Console.Title = "Client";
            Console.WriteLine(Environment.Version.ToString());
            Class1 a = new Class1();
            a.Run();
        }
        public void Run()
        {
            _queue = RemotingHelper.GetRemotingLocalClientProxyObject<RemotingAsyncQueue>("tcp://127.0.0.1:8080/queueUrl");
            //Microshaoft 以下是耗时的主程序
            for (int i = 0; i < 20; i++)
            {
                Thread x = new Thread(new ThreadStart(ThreadProcess));
                x.Start();
            }
        }
        public void ThreadProcess()
        {
            for (int i = 0; i < 1000; i++)
            {
                Item x = new Item();
                DateTime EnqueueTime = DateTime.Now;
                x.EnqueueTime = EnqueueTime;
                x.SqlCommandText = @"
                        --==========================
                        declare @ varchar(10)
                        set @ = 'aaa'
                        exec zsp_test @
                        --==========================
                ";
                x.ConnectionString = "";
                _queue.Enqueue(x);
                Console.WriteLine
                            (
                                "Enqueue: {0},[{1}]"
                                , EnqueueTime
                                , (DateTime.Now.Ticks - EnqueueTime.Ticks)/10000
                            );
            }
        }
    }
}
//Microshaoft =========================================
//Microshaoft Remoting Object Client Local Proxy
namespace Microshaoft.RemotingObjects
{
    using System;
    using Microshaoft.RemotingObjects.Share;
    public interface RemotingAsyncQueue
    {
        void Enqueue(Item item);
    }
}

原文地址:https://www.cnblogs.com/Microshaoft/p/1725435.html