简单易用的线程池实现

0 前言

最近在写MySQL冷备server的一个模块,稍微接触到一点线程池的东西,自己也就想尝试写一个简单的线程池练练手。

这个线程池在创建时,即按照最大的线程数生成线程。

然后作业任务通过add_task接口往线程池中加入需要运行的任务,再调用线程池的run函数开始运行所有任务,每个线程从任务队列中读取任务,处理完一个任务后再读取新的任务,直到最终任务队列为空。

补充:再回头看这个设计,其实算不了线程池,最多算是多线程执行任务。

把所有的任务描述信息先加入到CThreadPool的任务队列里面,然后调用CThreadPool的run函数去创建指定数量的线程,每个线程互斥的从任务队列里面取出任务去执行。

1 线程池设计

简单描述如下(假设任务类名为CTasklet):

1、CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);

2、创建任务,并把任务加入到线程池

    CTasklet *pTask1 = new CTasklet();

    CTasklet *pTask2 = new CTasklet();

    ...

    thread_pool.add_task(pTask1);

    thread_pool.add_task(pTask2);

    ...

3、调用线程池的run方法开始执行任务

    thread_pool.run();

4、等待任务执行完成

    thread_pool.join_thread();

2 源码

下面给出完整的线程池代码

 1 /*
 2  * file: thread_pool.h
 3  * desc: 简单的线程池,一次性初始化任务队列和线程池。
 4  * 
 5  */
 6 
 7 #ifndef _THREAD_POOL_H_
 8 #define _THREAD_POOL_H_
 9 
10 #include <pthread.h>
11 #include <vector>
12 
13 using namespace std;
14 
15 template<typename workType>
16 class CThreadPool
17 {
18     public:
19         typedef void * (thread_func)(void *);
20 
21         CThreadPool(int thread_num, size_t stack_size = 10485760);
22         ~CThreadPool();
23 
24         // 向任务队列中添加任务
25         int add_task(workType *pTask);
26 
27         // 创建新线程并执行
28         int run();
29 
30         // 等待所有的线程执行结束
31         int join_thread();
32 
33     private:
34         int init_thread_attr();
35         int destroy_thread_attr();
36 
37         int set_thread_stacksize(size_t stack_size);
38         int set_thread_joinable();
39 
40     protected:
41         // 线程池执行函数,必须为static
42         static void start_routine(void *para);
43 
44     private:
45         pthread_attr_t attr_;
46         static pthread_mutex_t mutex_lock_;
47         static list<workType *> list_task_;
48 
49         int thread_num_; // 最大线程数
50         vector<pthread_t> thread_id_vec_;
51 };
52 #endif
View Code
  1 #include "pthread_pool.h"
  2 
  3 template<typename workType>
  4 pthread_mutex_t CThreadPool<workType>::mutex_lock_;
  5 
  6 template<typename workType>
  7 list<workType*> CThreadPool<workType*>::list_task_;
  8 
  9 template<typename workType>
 10 CThreadPool<workType>::CThreadPool(int thread_num, size_t stack_size)
 11 {
 12     thread_num_ = thread_num;
 13     pthread_mutex_init(&mutex_lock_, NULL);
 14 
 15     init_thread_attr();
 16     set_thread_stacksize(stack_size);
 17     set_thread_joinable();
 18 }
 19 
 20 template<typename workType>
 21 CThreadPool<workType>::~CthreadPool()
 22 {
 23     destroy_thread_attr();
 24 }
 25 
 26 template <typename workType>
 27 int init_thread_attr()
 28 {
 29     return pthread_attr_init(&m_attr); 
 30 }
 31 
 32 template <typename workType>
 33 int CThreadPool<workType>::destroy_thread_attr()
 34 {
 35     return pthread_attr_destroy(&attr_);
 36 }
 37 
 38 template <typename workType>
 39 int CThreadPool<workType>::set_thread_stacksize(size_t stack_size)
 40 {
 41     return pthread_attr_setstacksize(&attr_, stack_size);
 42 }
 43 
 44 template <typename workType>
 45 int CThreadPool<workType>::set_thread_joinable()
 46 {
 47     return pthread_attr_setdetachstate(&attr_, PTHREAD_CREATE_JOINABLE);
 48 }
 49 
 50 template <typename workType>
 51 void CThreadPool<workType>::start_routine(void *para)
 52 {
 53     workType *pWorkType = NULL;
 54 
 55     while (1) {
 56         pthread_mutex_lock(&mutex_lock_);
 57 
 58         if (list_task_.empty()) {
 59             pthread_mutex_unlock(mutex_lock_);
 60             return;
 61         }
 62 
 63         pWorkType = *(list_task_.begin());
 64         list_task_.pop_front();
 65         pthread_mutex_unlock(&mutex_lock_);
 66 
 67         pWorkType->run();
 68         delete pWorkType;
 69         pWorkType = NULL;
 70     }
 71 }
 72 
 73 template <typename workType>
 74 int CThreadPool<workType>::add_task(workType *pTask)
 75 {
 76     pthread_mutex_lock(&mutex_lock_);
 77     list_task_.push_back(pTask);
 78     pthread_mutex_unlock(&mutex_lock_);
 79     return 0;
 80 }
 81 
 82 template <typename workType>
 83 int CThreadPool<workType>::run()
 84 {
 85     int rc;
 86     pthread_t tid;
 87     for (int i = 0; i < thread_num_; ++i) {
 88         rc = pthread_create(&tid, &attr_, (thread_func)start_routine, NULL);
 89         thread_id_vec_.push_back(tid);
 90     }
 91     return rc;
 92 }
 93 
 94 template <typename workType>
 95 int CThreadPool<workType>::join_thread()
 96 {
 97     int rc = 0;
 98     vector<pthread_t>::iterator iter;
 99     for (iter = thread_id_vec_.begin(); iter != thread_id_vec_.end(); ++iter) {
100         rc = pthread_join((*iter), NULL);
101     }
102     thread_id_vec_.clear();
103     return rc;
104 }
View Code

测试代码如下

 1 #include <unistd.h>
 2 
 3 #include <iostream>
 4 #include <list>
 5 
 6 using namespace std;
 7 
 8 class CTasklet
 9 {
10     public:
11         CTasklet(int num) {
12             num_ = num;
13             cout << "CTasklet ctor create num: " << num_ << endl;
14         }
15 
16         ~CTasklet() {
17             cout << "CTasklet dtor delete num: " << num_ << endl;
18         }
19 
20         int run() {
21             cout << "CTasklet sleep begin: " << num_ << endl;
22             sleep(num_);
23             cout << "CTasklet sleep end: " << num_ << endl;
24         }
25 
26     private:
27         int num_;
28 };
29 
30 #define MAX_THREAD_NUM 3
31 int main(int argc, char **argv)
32 {
33     // Step1. 创建线程池
34     CThreadPool<CTasklet> thread_pool(MAX_THREAD_NUM);
35 
36     // Step2. 创建任务,并加入到线程池中
37     for (int i = 0; i < 6; ++i) {
38         CTasklet *pTask = new CTasklet(i);
39         thread_pool.add_task(pTask);
40     }
41     // Step3. 开始执行任务
42     thread_pool.run();
43     // Step4. 等待任务结束
44     thread_pool.join_thread();
45 
46     return 0;
47 
48 }
View Code

3 总结

上面的线程池属于最简单的一类线程池,即相当于程序运行时候就开启n个线程来执行任务。真正的线程池需要考虑的方面比较多,比如1、线程池中的线程数应该能动态变化;2、线程池能动态调度线程来运行任务,以达到均衡;3、线程池还应该能记录任务的运行时间,防止超时等等。

不过,起码我们已经开了个头,实现了一个简单的线程池,接下来,让我们在这个基础上一步步调整、完善。

PS:对于线程池的考虑,我能想到的有动态增减线程数、超时机制、负载均衡。不知道大家理解线程池还需要考虑什么场景。

原文地址:https://www.cnblogs.com/i4oolish/p/3975728.html