线程控制

一、单线程 去顺序执行接收到的数据

 概要:单线程执行,用一队列放置数据,获取队列中的数据并执行,无数据时 将线程至于空闲状态

队列线程基类:

 /// <summary>
    /// 队列线程基类
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public abstract class QueueCatheThread<T>
    {
        #region 字段

        /// <summary>
        /// 线程
        /// </summary>
        protected Thread thread;

        /// <summary>
        /// 运行标识
        /// </summary>
        private bool isrun = true;

        /// <summary>
        /// 线程队列长度
        /// </summary>
        protected int QueueMaxLength = 10000;

        /// <summary>
        /// 线程队列
        /// </summary>
        protected Queue<T> queue = new Queue<T>(1000);

        /// <summary>
        /// 生产者-消费者通知事件
        /// </summary>
        private AutoResetEvent reset = new AutoResetEvent(false);

        #endregion

        #region 构造函数

        protected QueueCatheThread(String name)
        {
            thread = new Thread(this.Run);
            thread.Name = name;
        }

        #endregion

        #region 子类重写

        protected virtual void OnStart() { }

        protected virtual void OnStop() { }

        protected virtual void OnWait() { }

        /// <summary>
        /// 处理数据逻辑
        /// </summary>
        /// <param name="data"></param>
        protected abstract void Use(T data);

        #endregion

        #region 私有方法

        /// <summary>
        /// 线程主函数
        /// </summary>
        void Run()
        {
            T data = default(T);
            bool towait = true;
            while (isrun)
            {
                try
                {
                    bool success;//获取到队列中数据成功标识
                    lock (queue)
                    {
                        if (queue.Count == 0)
                        {
                            success = false;
                        }
                        else
                        {
                            data = queue.Dequeue();
                            success = true;
                        }
                    }
                    if (success)
                    {
                        this.Use(data);
                    }
                    else if (towait)
                    {
                        Thread.Sleep(10);
                        towait = false;
                    }
                    else
                    {
                        towait = true;
                        this.OnWait();
                        reset.WaitOne();
                    }
                }
                catch (ThreadAbortException)
                {
                   // LogControl.Instance.Error("线程{0}强制中止", thread.Name);
                    break;
                }
                catch (Exception e)
                {
                  //  LogControl.Instance.Error("线程{0}处理数据时异常 原因:{1}", thread.Name, e.Message);
                }
            }
        }

        #endregion

        #region 公有方法

        /// <summary>
        /// 获取队列长度
        /// </summary>
        public Int32 Count
        {
            get
            {
                return this.queue.Count;
            }
        }

        /// <summary>
        /// 启动线程
        /// </summary>
        public void Start()
        {
            thread.Start();
            OnStart();
        }

        /// <summary>
        /// 停止线程
        /// </summary>
        /// <param name="abort"></param>
        public void Stop(bool abort)
        {
            if (abort)
            {
                this.isrun = false;
                this.thread.Abort();
            }
            else
            {
                this.isrun = false;
            }
            this.reset.Set();
            OnStop();

        }

        /// <summary>
        /// 处理数据
        /// </summary>
        /// <param name="data"></param>
        /// <returns></returns>
        public bool Enqueue(T data)
        {
            if (QueueMaxLength > 0 && this.Count > QueueMaxLength)
            {
                return false;
            }
            lock (queue)
            {
                this.queue.Enqueue(data);
            }
            this.reset.Set();
            return true;
        }

        public void Enqueue()
        {
            this.reset.Set();
        }

        /// <summary>
        /// 停止线程
        /// </summary>
        public void Stop()
        {
            this.Stop(false);
        }

        #endregion
    }
View Code

实现类,这里是项目中用到的一个串口发送数据的案列

 class ComData
    {
        public byte[] buff { get; set; }

        public bool needReplay { get; set; }
    }

    class ComThread : QueueCatheThread<ComData>
    {
        public ComThread(Object idriver) : base("ComThread")
        {
            this.driver = driver;
        }

        Object driver { get; set; }

        public ComThread()
            : base("ComThread")
        {

        }

        protected override void Use(ComData data)
        {
            // driver.TransPort(data.buff, data.needReplay);
            //执行 driver 下的相关业务方法
        }
    }
View Code

二、多线程控制

1、多线程启动控制

描述:新线程开启是不立即启动 让之收到启动信号才执行任务,在事件出发类中用的比较多,如下

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

using System.IO;


namespace EventManager
{
    sealed class EventManager
    {
        private Thread thread;

        private bool stop = false;

        private AutoResetEvent reset = new AutoResetEvent(false);

        private object o = new object();
     
        private bool isBusy = false;

        public EventManager( )
        {
           
        }

        public void Add( object data )
        {
            //  parking.Continue(false);
            // data.Continue(false);   实体 data 中可放入信号量 ,并且放入Continue方法  让原有挂起的线程继续执行
            lock (o)
            {
                if (stop)
                {
                    Start();
                    Thread.Sleep(10);
                }
            }
            reset.Set();
        }
     
        private void run()
        {
            try
            {
                while (!stop)
                {
                    reset.WaitOne();
                    lock (o)
                    {
                        isBusy = true;
                        try
                        {
                            // todo  执行要做业务
                          }
                        catch (Exception e)
                        {
                          //todo  记录错误日志
                        }
                        isBusy = false;
                    }
                }
            }
            catch { }
        }

       public void Start()
        {
            stop = false;
            thread = new Thread(this.run);
            thread.IsBackground = true;
            thread.Start();
        }

        public void Stop()
        {
            stop = true;
            Thread.Sleep(10);
            reset.Set();
        }
    }
}
View Code

 应用场景:

此应用场景用于并发不多的场景,若多的话,资源开销比较大。用线程池比较合适,

2、或者加入信号量(Semaphore)来协调,降低线程开销 或者调用线程池

 通道加入信号量来控制达到类似线程池的效果 来降低线程开销

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

using System.IO;


namespace EventManager
{
    sealed class EventManager
    {
        private Thread thread;

        private bool stop = false;

        private AutoResetEvent reset = new AutoResetEvent(false);
        static Semaphore sema = new Semaphore(5, 5);// 后面必须完成一个后才能进行另一个  因为总线最大程数量 是5。这里第一个5标识初始接收 的并发线程请求。 也可用SemaphoreSlim类,SemaphoreSlim sema=new SemaphoreSlim(5)
        private object o = new object();
     
        private bool isBusy = false;

        public EventManager( )
        {
           
        }

        public void Add( object data )
        {
            //  parking.Continue(false);
            // data.Continue(false);   实体 data 中可放入信号量 ,并且放入Continue方法  让原有挂起的线程继续执行
            lock (o)
            {
                if (stop)
                {
                    Start();
                    Thread.Sleep(10);
                }
            }
            sema.Release();//第一次继续,接触第一次等待,
        }
     
        private void run()
        {
            try
            {
                while (!stop)
                {
                    sema.WaitOne();//第一次等待
                    sema.WaitOne();//等待指导后续有完成的线程位置
                    lock (o)
                    {
                        isBusy = true;
                        try
                        {
                            // todo  执行要做业务
                          }
                        catch (Exception e)
                        {
                          //todo  记录错误日志
                        }
                        isBusy = false;
                    }
                    sema.Release();
                }
            }
            catch { }
        }

       public void Start()
        {
            stop = false;
            thread = new Thread(this.run);
            thread.IsBackground = true;
            thread.Start();
        }

        public void Stop()
        {
            stop = true;
            Thread.Sleep(10);
            sema.Release();
        }
    }
}
View Code

 另外注意:Task是.NET4.0加入的,跟线程池ThreadPool的功能类似,用Task开启新任务时,会从线程池中调用线程。

 三、将数据分配到指定的线程中,被指定线程顺序执行 分发过来的数据。

增加队列线程 接口类

 /// <summary>
    /// 线程队列接口
    /// </summary>

    public interface IQueueThread
    {
        string ThreadName { get; }
        void ExcuteCmd(Object driver);
    }
View Code

 改造一下 第一项中的  串口发送线程类

  class ComThread : QueueCatheThread<ComData>, IQueueThread
    {
        public ComThread(Object idriver) : base("ComThread")
        {
        }

        Object driver { get; set; }

        public string ThreadName
        {
            get { return "ComThread"; }
        }

        public ComThread()
            : base("ComThread")
        {

        }

        protected override void Use(ComData data)
        {
            // driver.TransPort(data.buff, data.needReplay);
            //执行 driver 下的相关业务方法
          
        }

        public void ExcuteCmd(object driver)
        {
            //执行 driver 下的相关业务方法
        }
    }
View Code

加入线程队列工厂类

 sealed class QueueThreadFactory
    {
        #region one instance

        private static readonly QueueThreadFactory instance = new QueueThreadFactory();

        static QueueThreadFactory() { }

        public static QueueThreadFactory Instance
        {
            get
            {
                return instance;
            }
        }

        #endregion

        #region 私有字段

        Dictionary<String, IQueueThread> queueThreadDictionary = new Dictionary<String, IQueueThread>();

        #endregion

        #region 构造函数

        private QueueThreadFactory()
        {
            Assembly processes = Assembly.GetCallingAssembly();
            Type[] ts = processes.GetTypes();
            foreach (Type t in ts)
            {
                if (t.IsClass && !t.IsAbstract && t.GetInterface(typeof(IQueueThread).FullName) != null)
                {
                    IQueueThread p = processes.CreateInstance(t.FullName) as IQueueThread;
                    if (p != null)
                    {
                        processDictionary.Add(p.ThreadName, p);
                    }
                }
            }
        }

        #endregion

        #region 公有方法

        public IQueueThread GetQueueThread(string commandName)
        {
            IQueueThread p;
            if (!queueThreadDictionary.TryGetValue(commandName, out p))
            {
            }
            return p;
        }

        #endregion
    }
View Code

调用时 如下

  var queueThread=  QueueThreadFactory.Instance.GetQueueThread("ComThread");
            object data = "";//是需要的数据  自己定义
            queueThread.ExcuteCmd(data);
原文地址:https://www.cnblogs.com/musexiaoluo/p/8727326.html