SendMessage

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace Manager.Common
{
    public enum EngineResult
    {
        Success,
        FaildAndSuspend,
        FaildWithoutSuspend
    }

    //消息传递引擎
    public class RelayEngine<T>
    {
        private Thread _RelayThread;
        private AutoResetEvent _ItemArriveEvent = new AutoResetEvent(false);
        private ManualResetEvent _ResumeEvent = new ManualResetEvent(true);
        private WaitHandle[] _WaitHandles;
        private bool _Stop = false;

        private LinkedList<T> _Buffer = new LinkedList<T>();
        private Func<T, bool> _RelayFunc;
        private Func<T, EngineResult> _RelayFunc2;
        private Action<Exception> _HandleException;
        public bool IsSuspend = true;

        public RelayEngine(Func<T, bool> relayFunc, Action<Exception> handleException, Func<T, EngineResult> relayFunc2 = null)
        {
            this._WaitHandles = new WaitHandle[] { this._ItemArriveEvent, this._ResumeEvent };
            this._RelayFunc = relayFunc;
            this._RelayFunc2 = relayFunc2;
            this._HandleException = handleException;
            this._RelayThread = new Thread(this.Run) { IsBackground = true };
            this._RelayThread.Start();
            this.IsSuspend = false;
        }

        public void AddItem(T item)
        {
            lock (this)
            {
                this._Buffer.AddLast(item);
            }
            this._ItemArriveEvent.Set();
        }

        public void Suspend()
        {
            this.IsSuspend = true;
            this._ResumeEvent.Reset();
        }

        public void Resume()
        {
            this.IsSuspend = false;
            this._ResumeEvent.Set();
        }

        public void Stop()
        {
            this.IsSuspend = true; //线程挂起
            this._Stop = true;    //线程停止
            this._ItemArriveEvent.Set();
            this._ResumeEvent.Set();
        }

        private void Run()
        {
            try
            {
                while (true)
                {
                    if (this._Buffer.Count == 0)
                    {
                        WaitHandle.WaitAll(this._WaitHandles);
                    }
                    else
                    {
                        this._ResumeEvent.WaitOne(); //队列没有消息阻塞线程,知道收到信号
                    }

                    if (this._Stop) break;

                    if (this._Buffer.Count > 0)
                    {
                        T item = this._Buffer.First.Value; //先进先出
                        EngineResult result;
                        if (this._RelayFunc2 == null)
                        {
                            result = this._RelayFunc(item) ? EngineResult.Success : EngineResult.FaildAndSuspend;
                        }
                        else
                        {
                            result = this._RelayFunc2(item);
                        }
                        if (result == EngineResult.Success)
                        {
                            lock (this)
                            {
                                this._Buffer.RemoveFirst();
                            }
                        }
                        else
                        {
                            if (result == EngineResult.FaildAndSuspend) this.Suspend();
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                this._HandleException(ex);
            }
        }
    }
}
View Code
原文地址:https://www.cnblogs.com/feige/p/5994896.html