C++线程池实现-代码分析

废话不多说,请见注释

  1 #ifndef THREAD_POOL_H
  2 #define THREAD_POOL_H
  3 
  4 #include <vector>
  5 #include <queue>
  6 #include <memory>
  7 #include <thread>
  8 #include <mutex>
  9 #include <condition_variable>
 10 #include <future>
 11 #include <functional>
 12 #include <stdexcept>
 13 
 14 class ThreadPool {
 15 public:
 16     explicit ThreadPool(size_t);
 17     template<class F, class... Args>//可变参数模版
 18     //值得注意的是这里F&&表示universal reference而不是右值引用
 19     //如果存在推断类型如template或auto那么&&即表示universal reference,具体是左值引用还是右值引用由初始化决定
 20     auto enqueue(F&& f, Args&&... args)//f是函数名,args是参数
 21     ->std::future<decltype(f(args...))>;//尾置返回类型,返回 函数f返回值类型的future
 22     ~ThreadPool();
 23 private:
 24     // need to keep track of threads so we can join them
 25     std::vector< std::thread > workers;
 26     // the task queue
 27     std::queue< std::function<void()> > tasks;//std::function通用的函数封装,要求一个返回值类型为void的无参函数
 28 
 29     // synchronization
 30     std::mutex queue_mutex;//锁,负责保护任务队列和stop
 31     std::condition_variable condition;//条件变量
 32     bool stop;
 33 };
 34 
 35 // the constructor just launches some amount of workers
 36 inline ThreadPool::ThreadPool(size_t threads)//构造时设定线程数量
 37         :   stop(false)
 38 {
 39     for(size_t i = 0;i<threads;++i)
 40         workers.emplace_back(//push_back的优化版本
 41                 [this]//lambda表达式捕获this指针
 42                 {
 43                     for(;;)//比while(1)更优
 44                     {
 45                         std::function<void()> task;
 46                         {//{}内相当于新的作用域
 47                             std::unique_lock<std::mutex> lock(this->queue_mutex);
 48                             //在等待任务队列中出现任务的过程中解锁queue_mutex
 49                             //由notify_one或notify_all唤醒
 50                             //线程池初始化后将有threads个线程在此处等待,每个线程执行完分配到的任务将执行循环,再取任务执行或等待任务加入队列
 51                             /* 我们需要知道这么做的目的是,std::thread本身仅能绑定一个函数,而我们需要仅用threads个线程去帮我们执行m个任务,
 52                              * 而不是每执行一个任务创建一个线程,如果这样我们将创建m个线程,而创建线程是需要开销的,这引起了不必要的浪费,线程池就是为此而生的
 53                              * 通过这种方式,每个std::thread仍然是只绑定了一个函数,但是这一个函数会执行我们想要的多个任务
 54                              */
 55                             this->condition.wait(lock,
 56                                                  [this]{ return this->stop || !this->tasks.empty(); });
 57                             if(this->stop && this->tasks.empty())//stop=true,仍需执行任务队列中剩余任务
 58                                 return;
 59                             task = std::move(this->tasks.front());//std::move避免拷贝
 60                             this->tasks.pop();
 61                         }
 62 
 63                         task();//执行任务
 64                     }
 65                 }
 66         );
 67 }
 68 // add new work item to the pool
 69 template<class F, class... Args>
 70 auto ThreadPool::enqueue(F&& f, Args&&... args)
 71 -> std::future<decltype(f(args...))>
 72 {
 73     using return_type = decltype(f(args...));
 74 
 75     //基本类型是std::shared_ptr,指向类型是std::packaged_task,类型是返回值类型为return_type的无参函数
 76     auto task = std::make_shared< std::packaged_task<return_type()> >(
 77             /* 现在该说说为什么std::packaged_task的类型是一个返回值为return_type的无参数函数了
 78              * 返回值是return_type这没有问题,至于参数消失的原因是因为:std::bind
 79              * 在这里它创建了一个无参数(参数均被指定)版本的函数f
 80              */
 81             //std::forward配合universal reference使用,完美转发,实际效果是如果是右值引用那么还是右值引用,如果是左值引用那么还是左值引用
 82             std::bind(std::forward<F>(f), std::forward<Args>(args)...)
 83     );
 84 
 85     std::future<return_type> res = task->get_future();//任务函数实际执行后的返回值
 86     {
 87         std::unique_lock<std::mutex> lock(queue_mutex);
 88 
 89         // don't allow enqueueing after stopping the pool
 90         if(stop)
 91             throw std::runtime_error("enqueue on stopped ThreadPool");
 92 
 93         tasks.emplace([task](){ (*task)(); });//往tasks队列压入一个无参无返回值的函数,函数体内调用task(不要忘记task是shared_ptr类型)
 94     }
 95     //任务压入队列,唤醒等待的线程
 96     condition.notify_one();
 97     return res;
 98 }
 99 
100 // the destructor joins all threads
101 inline ThreadPool::~ThreadPool()
102 {
103     {
104         std::unique_lock<std::mutex> lock(queue_mutex);
105         stop = true;
106     }
107     condition.notify_all();//唤醒所有等待的进程
108     for(std::thread &worker: workers)
109         worker.join();//等待所有线程结束
110 }
111 
112 #endif
常常因身处温室而不自知,因而懈怠; 及时当勉励,岁月不待人!
原文地址:https://www.cnblogs.com/veasky/p/14620224.html