.net线程池

分享一个自制的 .net线程池

自制背景

 

由于项目需求,需要开发一些程序去爬取一些网站的信息,算是小爬虫程序吧。爬网页这东西是要经过网络传输,如果程序运行起来串行执行请求爬取,会很慢,我想没人会这样做。为了提高爬取效率,必须使用多线程并行爬取。这时候就需要线程池了。池的概念,我想做开发的都应该知道,目的就是对资源的合理运用。刚开始的时候,我首先想到的就是 .net 框架下的线程池 ThreadPool,毕竟是自带的,在性能、稳定性方面肯定没问题。但在琢磨了几天后,.net 框架下自带的这个 ThreadPool 让我很不放心。1.ThreadPool 是一个静态类!!也就是说,当程序运行起来以后,这个池是整个应用程序域共享的,.net 框架很大,程序运行了以后,除了咱们自己往这个共享池里塞任务,谁知道有没有其他的类、代码、任务也会往里塞 workItem 呢?也就是说,假如我设置这个共享池大小为 10,但实际为我们工作的线程会不到 10 个,这就会导致程序运行时达不到我们预期的效果。2.目前我们的爬虫程序设计是像一个服务一样挂着,只要程序启动了以后就会一直运行着,除非手动停止。因此,在没有爬取任务的时候,需要减少甚至清空池内的所有线程,以免池内线程一直挂着占用系统资源。由于 .net 自带的这个线程池是共享的,我还真不敢随意调整它的大小,对于我这种控制欲极强的程序员来说,这是万万接受不了的。虽然.net 自带的 ThreadPool 用法简单,功能强大,而且它还可以智能的调节池内线程池数量,但我还是决定抛弃它,因为,我需要一个可控的线程池!于是开始到网上到处查找有没有其它现成的线程池。百度、谷歌了好久,发现在.net界比较成熟的就 SmartThreadPool,对 SmartThreadPool  简单了解以后,还是觉得它不是我想要的,于是决定,自造一个。于是,借助强大的网络,又开始网上到处搜索有关线程池如何实现以及相关注意事项之类的信息,也拜读过一些网上开源的项目,如 .net 的 SmartThreadPool、java 里的 ThreadPoolExecutor 等,虽然没接触过 java,但 java 和 C# 犹如亲兄弟,大同小异,让我这 .net coder 读起来不是很费劲。基于前人的实现思路,再融入自己的思想,脑中自己的池也慢慢浮现…

 

线程池:IThreadPool

 

根据需求,首先定义基本接口

 

 View Code

 

可以看到,我定义的线程池接口成员就几个(命名都是来自 .net 自带 ThreadPool,哈哈),它们的用途上面代码也都带有注释。接下来,我们看下核心实现。首先,介绍类 WorkerThread:

 

 View Code

 

这个类主要是对线程的封装,我称之为 WorkerThread。它主要负责接收线程池分配的一个任务,然后执行,线程池内维护的就是这么一个类对象。介绍下它的几个核心成员:

 

  • _action:一个 Action 类型的字段。也就是一个委托,也就是 WorkerThread 要执行的任务,通过 SetWork(Action act) 方法设置值
  • _thread:一个 Thread 对象。也就是真正执行任务的线程
  • _waitEvent:一个 AutoResetEvent 对象。线程池的作用就是池内维护一定数量可重复利用的线程,线程执行每个任务以后,并不是直接“自然”结束,而是继续执行下一个任务,当没有可执行的任务的时候就会驻留在线程池内等待有可执行的任务。所以,如何让一个线程处于等待状态,而不是直接“自然”结束呢?我是通过这个 AutoResetEvent 对象去控制的。这个对象主要有 WaitOne() 和 Set() 两个方法,WaitOne() 用于“卡”住线程,让线程处于停滞状态,Set() 就是用于通知线程继续执行(关于这两个方法的使用以及介绍我就通俗的说明下,如果不熟悉这个类的同学可以自行查 msdn)。这两个方法,在配合一个 while 循环,基本就实现了线程的复用,具体看 Run() 方法。
  • Complete:一个 Action<WorkerThread> 类型的事件。每次执行完任务都会调用该事件,作用就是通知其所在线程池,说明”我“已经执行完”你“分配的任务了。
  • SetWork(Action act):设置线程要执行的任务,其实就是设置字段 _action 的值。这个方法是提供给线程池用的,每次给 WorkerThread 分配任务都是通过调用这个方法。
  • Activate():激活 WorkerThread 执行任务。调用了 SetWork(Action act) 分配任务了以后,就会调用该方法执行任务。这方法里主要就是调用 _waitEvent.Set() 方法触发 _thread 线程继续执行。
  • Run():这是 WorkerThread 对象的核心。创建 _thread 时,给 _thread 设置执行的就是这个 Run() 方法。这个方法内实现就是一个 while 循环,每循环一次就会调用 _waitEvent.WaitOne() “卡“住线程,直到被调用 Activate() 才会执行后续代码,后续代码也就是执行真正的任务 _action。执行完任务了以后进入到下一个循环等待,直到接收下一个任务和被再次调用 Activate()…如此循环…. 从而达到了我们循环利用线程的目的

 

WorkerThread 这个类代码也不是很多,百来行而已。总结来说它作用就是 SetWork(Action act) –> Activate() –> _action() –> WaitOne() –> SetWork(Action act) –> Activate()…无限循环…希望大家看得明白。

 

了解 WorkerThread 了以后,再来看下真正实现 IThreadPool 接口的类 WorkerThreadPool。前面提到,线程池的作用就是池内维护一定数量可重复利用的线程,WorkerThreadPool 负责 WorkerThread 的创建、接收用户任务并将任务分配给池内空闲的线程去执行用户任务,功能其实就这么简单。废话不多说,直接上代码:

 

 View Code

 

这个类代码还是挺多的(貌似有点占篇幅- - ),虽然代码里都加上了注释,但我还是想给大家简单说说其实现思路以及内部一些核心相关成员,方便大家更快的理解。

 

  • _threads:一个类型为 int 的字段。表示当前池的大小
  • _allThreads:一个类型为 List<WorkerThread> 的字段。用于存储池内创建的所有 WorkerThread
  • _workingTreads:一个类型为 List<WorkerThread> 的字段。用于存储正在执行任务的 WorkerThread
  • _freeTreads:一个类型为 Queue<WorkerThread> 的字段。用于存储处于空闲状态的 WorkerThread
  • _workQueue:一个类型为 Queue<WorkItem> 的字段。用于存储用户往当前池里塞的所有任务
  • SetPoolSize(int threads):设置线程池大小。这个方法主要做了两件事:1.设置线程池大小,也就是字段 _threads 的值。2.调整线程池内线程。当设置的值小于当前池内的大小时,则释放掉多出的空闲线程;当设置的值大于当前池大小时,如果 _workQueue 队列有待处理的任务的话,会尝试着创建新的 WorkerThread 去执行 _workQueue 队列里的任务,目的就是为了使当前池一直处于满负荷状态。
  • bool QueueWorkItem(WaitCallback callback, object state):向线程池中添加任务。每次调用这个方法,都会将 callback 和 state 封装成一个 WorkItem,然后将封装的 WorkItem 对象放入 _workQueue 队列。然后尝试调用 TryGetWorkerThreadAndWorkItem 方法获取可用的 WorkerThread 以及 _workQueue 队列第一个任务,如果获取成功(即有可用的 WorkerThread 和待处理的 WorkItem),就会将取出的 WorkItem 分配给取出的 WorkerThread 去执行。
  • bool TryGetWorkerThreadAndWorkItem(out WorkerThread workerThread, out WorkItem workItem, bool workerThreadCall):尝试从池内取出一个处于空闲的 WorkerThread 和待处理的 WorkItem。这个方法的实现不是很复杂,如果池内有空闲的 WorkerThread 和待处理的 WorkItem,则返回 true,否则返回 false。目前我们这个线程池内 WorkerThread 的创建不是伴随线程池创建而创建,而是真正需要用到的时候才会去创建。即当有任务往池里塞的时候,首先会判断 _freeTreads 集合内是否有空闲的 WorkerThread,如果有,则弹出一个空闲的 WorkerThread 去执行任务,同时将弹出的 WorkerThread 添加到 _workingTreads 集合中,没有的话才会去创建新的 WorkerThread 去执行任务,同时也会将新建的 WorkerThread 添加到 _workingTreads 集合中。
  • ActivateWorkerThread(WorkerThread workerThread, WorkItem workItem):这个方法体内的实现很简单,就是将 workItem 分配给 workerThread,同时调用 workerThread.Activate() 激活线程执行任务,调用 workerThread.Activate()会将当前池内的方法 WorkComplete(WorkerThread workerThread) 绑定到 workerThread 定义的 Complete 事件上,每当 workerThread 执行完任务以后,都会触发 workerThread.Complete 事件,以通知其所在的线程池。
  • WorkComplete(WorkerThread workerThread):每当 workerThread 执行完任务以后,都会调用该方法。该方法参数是一个 WorkerThread 对象,也就是说每个 workerThread 执行完任务后都会将自己作为参数调用这个方法。在这个方法内主要是做三件事:1.将执行完任务的 workerThread 从 _workingTreads 集合中移除,然后将 workerThread 添加到空闲线程队列 _freeTreads 中。2.调整线程池线程(如果有必要的话),为什么在这要进行调整线程池呢?因为会出现这种情况,比如当前线程池大小是 10,正在工作的线程为 6 个,空闲线程也就是 4 个,这时候我们调用 SetPoolSize(5),也就是将线程池大小设置为 5,减少了线程池的容量,虽然在 SetPoolSize 方法内会调整了一遍线程池大小,但 SetPoolSize 方法内只会销毁掉空闲的线程,也就是 4 个空闲线程会被销毁,这时候池内其实还是存在 6 个线程。所以还需要销毁一个,这时候怎么办呢?不可能在 SetPoolSize 方法内把正在执行任务的线程给终止掉吧?因此,workerThread 每次执行完任务后都要执行一次调整线程池的操作,以保证池内的线程数量是正确的。3.调用 TryGetWorkerThreadAndWorkItem 方法,如果有待处理的任务的话,则继续处理下一个任务,这样就达到了持续处理 _workQueue 队列内任务的目的。

 

上面就是 WorkerThreadPool 的一些核心字段以及方法,至于其它的成员就不做详细说明了。 为了方便管理,池内用了 _freeTreads 和 _workingTreads 两个集合来维护池内线程状态。所以每次从空闲线程 _freeTreads 取出 workerThread 执行任务的时候,都必须将 workerThread 添加到 _workingTreads 集合中;每个 workerThread 执行完任务都会将自己从 _workingTreads 移除,同时将自己置为空闲线程添加到 _freeTreads 集合中等待接受下一个任务来临,所以 WorkComplete 方法体内最后都要调用 TryGetWorkerThreadAndWorkItem 方法获取可用的 WorkerThread 以及一个待处理的任务,然后执行,这样就形成了一个循环,只要有任务,池内就会一直处于满负荷状态。

 

开篇提到一个需求:没有爬取任务的时候,需要减少甚至清空池内的所有线程,以免池内线程一直挂着占用系统资源。因此我给 IThreadPool 加了一个属性:KeepAliveTime。通过这个属性,可以给线程池设定一个时间,即线程池在指定的时间内都没有接收到任何任务,则会自行将池内的线程给销毁。在 WorkerThreadPool 中这个功能的实现很简单,在最后一个任务被执行完了以后,会自动从池内取出一个空闲的 workerThread 执行计时操作,也就是 WorkerThreadPool.Tick 方法,其实现也就是自旋计时,如果过了指定时间后都没有接受到任务,则自动将池内的线程给销毁。这个计时实现很简陋- - ,技术有限,想不到其它好办法了。

实现线程池应该使用系统的Thread Pool API(https://msdn.microsoft.com/en-us/library/windows/desktop/ms686766),而不应该自己管理线程。因为系统提供的ThreadPool可以很好地支持异步IO(IOCP),降低系统资源消耗。

原文地址:https://www.cnblogs.com/Leo_wl/p/5488611.html