基于C++11的100行实现简单线程池

基于C++11的100行实现简单线程池

 

1 线程池原理

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。

线程池组成部分:

线程池管理器:用于创建并管理线程池

工作线程: 线程池中实际执行的线程

任务接口: 尽管线程池大多数情况下是用来支持网络服务器,但是我们将线程执行的任务抽象出来,形成任务接口,从而是的线程池与具体的任务无关。

任务队列:线程池的概念具体到实现则可能是队列,链表之类的数据结构,其中保存执行线程。

2 并发下线程池的最佳数量

如果是CPU密集型应用,则线程池大小设置为N+1;(对于计算密集型的任务,在拥有N个处理器的系统上,当线程池的大小为N+1时,通常能实现最优的效率。(即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保CPU的时钟周期不会被浪费。摘自《Java Concurrency In Practise》)。因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。I
如果是IO密集型应用,则线程池大小设置为2N+1;IO密集型任务可以使用稍大的线程池,一般为2*CPU核心数。IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。

2.1 线程池是否一定比单线程高效?

答案是否定的,比如Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:多线程带来线程上下文切换开销,单线程就没有这种开销 锁

当然“Redis很快”更本质的原因在于: 
Redis基本都是内存操作,这种情况下单线程可以很高效地利用CPU。而多线程适用场景一般是:存在相当比例的IO和网络操作。

3 线程池设计的C++11的特性

  • std::vector
  • std::thread
  • std::mutex
  • std::future
  • std::condition_variable

C++代码:

threadpool.h

#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdexcept>

const int MAX_THREADS = 100; // 线程池阈值

template <typename T>
class threadpool
{
public:
    threadpool(int number = 1); // 线程池构造函数,传入线程数,默认为1
    ~threadpool();                // 线程池析构函数
    bool append(T* request);    // 添加任务到任务队列

private:
    static void* worker(void* arg); // 工作线程数组
    void run();                        // 工作线程的执行函数

private:
    std::vector<std::thread> work_threads;  // 工作线程
    std::queue<T*> task_queues;                // 任务队列
    std::mutex queue_mutex;                    // 队列互斥锁
    std::condition_variable condition;        // 条件变量
    bool stop;                                // 停止标志
};

template <typename T>
threadpool<T>::threadpool(int number) :stop(false)
{
    if (number <= 0 || number > MAX_THREADS) // 如果number小于等于0或者大于线程池阈值,抛出异常
        throw std::exception();

    for (int i = 0; i < number; i++)
    {
        std::cout << "创建第" << i << "个线程" << std::endl;
        // 添加worker函数到线程中
        work_threads.emplace_back(worker, this); // Q:this是什么作用,为什么没了this会出错?
    }
}

template <typename T>
inline threadpool<T>::~threadpool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex); // 拿锁
        stop = true;                                    // 停止标志置位true
    }
    condition.notify_all();                                // 通知所有工作线程,唤醒后因为stop为true了,所有都会结束
    for (auto& ww : work_threads)
        ww.join();
}

template <typename T>
bool threadpool<T>::append(T* request) // 添加一个新的工作任务到任务队列
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        task_queues.emplace(request);  // 将任务添加到任务队列
    }
    condition.notify_one();               // 发送通知,唤醒某一个工作线程执行任务
    return true;
}

template <typename T>
void* threadpool<T>::worker(void* arg)
{
    threadpool* pool = (threadpool*)arg;
    pool->run();
    return pool;
}

template <typename T>
void threadpool<T>::run()
{
    while (!stop)
    {
        std::unique_lock<std::mutex> lk(queue_mutex);                            // 拿锁(独占所有权式)
        this->condition.wait(lk, [this] {return !this->task_queues.empty(); }); // 等待条件成立
        /*
            执行条件变量等待的时候,已经拿到了锁(即lock已经拿到锁,没有阻塞)
            这里将会unlock释放锁,其他线程可以继续拿锁,但此处仍然阻塞,等待条件成立
            一旦收到其他线程notify_*唤醒,则再次lock,然后进行条件判断
            当{return !this->task_queues.empty(); }的结果为false将阻塞
            条件为true时候解除阻塞。此时lock依然为锁住状态
        */
        if (this->task_queues.empty())            // 如果任务队列为空,继续
            continue;
        else 
        {
            T* request = task_queues.front();   // 取得任务队列首任务
            task_queues.pop();                    // 从队列中移除
            if (request)
                request->process();                // 执行任务
        }
    }
}

 

main.cpp

#include <iostream>
#include "threadpool.h"

class Task
{
public:
    void process()
    {
        static int a = 0;
        ++a;
        std::cout << "run......" << a << std::endl;
    }
};

int main()
{
    threadpool<Task> tp(2);

    while (1)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));

        Task* t = new Task();
        tp.append(t);
        delete t;
    }
}

 

参考:

https://blog.csdn.net/gcola007/article/details/78750220

https://blog.csdn.net/qq_34417408/article/details/78895573

 

原文地址:https://www.cnblogs.com/zkfopen/p/11236537.html