C++实现线程池(二)

toc

背景

19年时,写了一篇线程池的博客,那篇文章介绍得比较详细(是什么?为什么?怎么做?),但也比较啰嗦,看着比较费劲,其实我现在看着也烦,那时候左大括号还换行来着,后面被go语言教育了。。。。。。。。哈哈哈哈哈哈哈哈哈。。。。
现在用尽量简洁清晰的方式再写一遍,同时加上任务取消功能,并做一些优化

实现

再次简要介绍

  • 线程池内部存在一个线程安全队列
    • 线程池内的数个消费线程,不断取出其中的任务去执行,队列为空时阻塞,直到有新任务进来(阻塞线程得不到调度,不耗CPU)
    • 外部生产线程往其中放入数据,达到异步执行的目的
      • 若队列存在上限,生产线程可发生阻塞,或立即得到失败通知

需注意点

  • 生命周期问题,保证线程池执行时,访问到的对象仍处于生命周期内,涉及对象包括
    • 传入的任务访问到的对象——生产线程保证其生命周期(任务为外部代码)
    • 线程池本身成员对象——线程池对象析构前所有线程必须停止

任务取消

  • 为每个任务(可执行对象)分配对应ID添加操作将返回ID,通过ID将还未执行的任务删除

代码

任务对象

  • 任务对象内部封装了任务ID、任务类型与行为,为了满足任务取消及退出单个线程的需求,需要使用此任务对象
  • 如果没有此需求,直接使用std::function<>即可,std::function<>本身就是可调用对象
  • 相对于上一任务对象,去掉了考虑不周的成员
    • 去掉了带参数版本:A线程申请,B线程释放,内存释放不清晰;可能跨Node,对numa架构不友好
    • 去掉了命令模式版本:成员为指针,也存在怎么释放命令成员的问题(上篇文章存在内存泄露taskcommand)
    • 二者存在的目的是处理参数问题,然而,std::function<>、lambda、std::bind相互配合也能带参,二者存在意义不大
constexpr bool ThreadExit = true;

class ThreadTask final{
public:
    enum ThreadTaskType{
        Exec,
        Exit
    };

    ThreadTask() : TaskID(-1), TaskType(ThreadTaskType::Exit), CB(nullptr){
    }
    ThreadTask(int iTaskID, const std::function<void()>& f) : TaskID(iTaskID), 
        TaskType(ThreadTaskType::Exec),CB(f){
    }
    ThreadTask(int iTaskID, std::function<void()>&& f) : TaskID(iTaskID), TaskType(ThreadTaskType::Exec),
        CB(std::move(f)){
    }
    ThreadTask(ThreadTask&& rhs): TaskID(rhs.TaskID), TaskType(rhs.TaskType), CB(std::move(rhs.CB)){
    }
    ThreadTask& operator=(ThreadTask& rhs){    //为了移动赋值CB、未加Const
        if(this != &rhs){
            TaskID = rhs.TaskID;
            TaskType = rhs.TaskType;
            CB = std::move(rhs.CB);
        }
        return *this;
    }
    bool operator()(){
        bool bExit = false;
        switch(TaskType){
        case ThreadTaskType::Exec:
            CB();
            break;
        case ThreadTaskType::Exit:
        default:
            bExit = ThreadExit;
            break;
        }
        return bExit;
    }
    bool operator<(int iTaskID){
        return TaskID < iTaskID;
    }


private:
    int TaskID;
    ThreadTaskType TaskType;
    std::function<void()> CB; 
};
  • 默认构造函数会构造一个携带退出消息的任务对象,执行该任务的线程会退出
  • 增加了两个参数构造函数,一个移动std::function<>,一个拷贝std::function<>
function(function&& _Right)
    {    // construct holding moved copy of _Right
    this->_Reset_move(_STD move(_Right));
    }
function(const function& _Right)
    {    // construct holding copy of _Right
    this->_Reset_copy(_Right);
    }
  • 增加的移动构造函数,使用到了std::move,std::move只是单纯做右值类型转换,由具体的移动构造函数实现具体的移动语义
  template<typename _Tp>
    constexpr typename std::remove_reference<_Tp>::type&&
    move(_Tp&& __t) noexcept
    { return static_cast<typename std::remove_reference<_Tp>::type&&>(__t); }
  • 原来的执行函数变为了重载函数调用符,看起来更像一个函数
  • 该移动构造可以用std::forward等价: 缺点是没有std::move简洁
CB(std::forward<std::function<void()>>(f))
  • 重载了赋值操作符,并以移动语义替换拷贝语义,来得到更好的性能
  • 为了能查找任务,重载了小于比较操作符

任务存储

两个新需求

  1. 为线程池增加任务上限,达到上限阻塞或拒掉新任务
  2. 添加任务成功时,返回任务ID,需要时,根据任务ID取消还未执行的任务

使用前面升级的同步队列来达成需求1,配合线程池新代码,实现需求2
互斥、同步操作由同步队列控制,线程池仅需从里面取出任务执行。

线程池

与之前的线程池相比:

  • 仅保留了支持std::function的添加方法
  • 使用了同步队列替代原队列
  • 改为headonly方式实现
  • 增加了任务取消功能
//线程的各种个数最好由配置文件决定,配置文件根据具体场景配置
class ThreadPool{
public:
    //线程池可能需要多个不同实例(如线程池隔离的情况,不同的线程池做不同类型的事情),所以为非单例
    ThreadPool(int iInitThreadCount, int iHoldThreadCount, int iMaxThreadCount, int iMaxTaskCount) :
        m_iInitThreadCount(iInitThreadCount), m_aiHoldThreadCount(iHoldThreadCount),
        m_aiMaxThreadCount(iMaxThreadCount), m_aiCurThreadCount(0), m_abStopFlag(true),
        m_aiTaskID(0){
    }
    ThreadPool(int iMaxTaskCount) : ThreadPool(4, 4, 4, iMaxTaskCount){
    }
    ThreadPool() : ThreadPool(4, 4, 4, 100){
    }
    ~ThreadPool(){
        Stop();
    }

    int AddTask(const std::function<void()>& Task){
        return AppendTask(Task);
    }

    int AddTask(std::function<void()>&& Task){
        return AppendTask(std::forward<std::function<void()>>(Task));
    }

    int TryAddTask(const std::function<void()>& Task){
        return TryAppendTask(Task);
    }

    int TryAddTask(std::function<void()>&& Task){
        return TryAppendTask(std::forward<std::function<void()>>(Task));
    }

    bool CancelTask(int iTaskID){
        return m_TaskQueue.Delete(std::mem_fn(&ThreadTask::operator<), iTaskID);
    }

    void Start(){
        m_abStopFlag = false;
        for(int i = 0; i < m_iInitThreadCount; ++i){
            AddThread();
        }
    }

    void Stop(){
        do{
            SetStop();
        } while(0 != GetCurThreadCount());
    }

    int GetCurTaskCount(){
        return m_TaskQueue.Size();
    }

    int GetCurThreadCount(){
        return m_aiCurThreadCount;
    }

    void SetHoldThreadCount(int iCount){
        m_aiHoldThreadCount = iCount;
    }

    void SetMaxThreadCount(int iCount){
        m_aiMaxThreadCount = iCount;
    }

private:
    template<typename U>
    int AppendTask(U&& Task){
        if(!Task){
            return -1;
        }
        int iTaskID = m_aiTaskID.fetch_add(1, std::memory_order_relaxed);
        m_TaskQueue.Enqueue(iTaskID, std::forward<U>(Task));
        AddThreadSuitably();
        return iTaskID;
    }

    template<typename U>
    int TryAppendTask(U&& Task){
        if(!Task){
            return -1;
        }
        int iTaskID = m_aiTaskID.fetch_add(1, std::memory_order_relaxed);
        if(!m_TaskQueue.TryEnqueue(iTaskID, std::forward<U>(Task))){
            return -1;
        }
        AddThreadSuitably();
        return iTaskID;
    }

    void AddThread(){
        std::thread th(&ThreadPool::Run, this);
        th.detach();
        m_aiCurThreadCount++;
    }
    //带条件添加线程:当前线程数未达最大,并且任务数多于线程数时,添加线程,以便尽快处理任务
    void AddThreadSuitably(){
        if(!m_abStopFlag && m_aiCurThreadCount < m_aiMaxThreadCount && m_aiCurThreadCount <= GetCurTaskCount()){
            AddThread();
        }
    }
    //退出一个线程
    void ReduceThread(){
        m_TaskQueue.Enqueue(ThreadTask());
    }
    //线程池主循环
    void Run(){
        bool bExit = false;
        do{
            ThreadTask Task;
            m_TaskQueue.Dequeue(Task);
            bExit = Task();
        } while(!m_abStopFlag && !bExit);
        m_aiCurThreadCount--;
    }
    //监视空闲线程,当前线程数多于保持线程数,并空闲一定时间时,将当前线程数减少到保持线程数
    //后续发布定时器时再添加处理逻辑
    void MontorIdle(){
        if(m_aiHoldThreadCount < m_aiCurThreadCount){
            //利用定时器控制线程池里空闲线程的个数
            ReduceThread();
        }
    }

    void SetStop(){
        if(!m_abStopFlag){
            m_abStopFlag = true;
            m_TaskQueue.Stop();
        }
    }

    ThreadPool(const ThreadPool& rhs) = delete;
    ThreadPool(ThreadPool&& rhs) = delete;
    ThreadPool& operator=(const ThreadPool& rhs) = delete;
    ThreadPool& operator=(ThreadPool&& rhs) = delete;

private:
    int m_iInitThreadCount;                    //初始线程个数
    std::atomic<int> m_aiHoldThreadCount;      //保持线程个数
    std::atomic<int> m_aiMaxThreadCount;       //最大线程个数
    std::atomic<int> m_aiCurThreadCount;       //当前线程个数

    std::atomic<bool> m_abStopFlag;

    std::atomic<int> m_aiTaskID;
    SyncQueue<ThreadTask> m_TaskQueue;
};
  • 类中有三个构造函数,其初始化逻辑都是一样的,仅仅是参数列表不同,使用委托构造函数的方式,来避免重复代码
  • AddTask添加方式,在队列满时,添加操作会被阻塞TryAddTask方式满时任务会被拒掉,并的到添加失败信息,两种添加方式均支持任务拷贝或移动,任务添加成功后返回ID返回的ID是取消任务的依据
  • CancelTask为取消任务接口,其依赖同步队列删除方法实现,传入成员函数包装器任务ID,此处包装的是小于操作符,因为需要根据任务ID成员与目标ID大小,来确定具体任务,确定后执行删除操作
  • Start后线程池才开始创建取消费任务的线程,个数为初始个数
  • Stop必须等到线程池内没有正在运行的线程才返回,为先停线程后析构做保证(升级后的线程池,消费线程退出很快,并且std::this_thread::sleep_for将引发调度,性价比不高,并且睡眠时长会收系统时间影响,存在风险,所以直接选择循环)
  • AppendTask与TryAppendTask为添加任务核心逻辑,以自增原子变量作为任务的ID,与任务一起存入同步队列,随后根据任务数与线程数决定是否添加消费线程
    • 宽松内存序具有原子性、修改顺序一致性,多线程环境可无误自增,同时,它不限制编译器、CPU重排附近指令,相比其他顺序有最佳性能,对仅自增ID而言,是最佳选择
    • 当线程池处于运行模式,任务数比线程数大,且未达最大线程数限制时,会增加线程,以尽快处理完任务
  • Run为中消费线程主循环,循环内只做了两件事,取任务与执行任务
    • 线程池停止时,退出全部消费线程
    • ReduceThread()投递退出任务后,收到退出任务的消费线程退出
    • 每当有线程退出,递减线程计数原子变量

测试

测试代码

在测试代码中,我们将所有未执行的任务取消掉

std::vector<std::function<void()>> PrepareForThreadPoolTask(){
    int a = 0;
    std::vector<std::function<void()>> vecTask(10000);
    for(auto& elem : vecTask){
        elem = std::bind(std::move([](int a){
            std::cout << "************************" << a << "************************" << std::endl;
        }), a++);
    }

    return vecTask; //确切的说是NRVO
}

void doTestThreadPool()
{
    std::unique_ptr<ThreadPool> pThreadPool(new ThreadPool(1, 1, 1, 1000));

    pThreadPool->SetMaxThreadCount(std::thread::hardware_concurrency());

    std::vector<std::function<void()>> vecTask = PrepareForThreadPoolTask();
    for(auto & elem : vecTask){
        auto iTaskID = pThreadPool->AddTask(std::move(elem));
        std::cout << "Cancel Task, ID : " << iTaskID << std::endl;
        pThreadPool->CancelTask(iTaskID);
    }
    pThreadPool->Start();

    do{
        std::this_thread::sleep_for(std::chrono::seconds(1));
    } while(pThreadPool->GetCurTaskCount());

    pThreadPool->Stop();
    std::system("pause");
}

int main(int argc, char **argv){
    doTestThreadPool();

    return 0;
}

运行结果

9年老本运行结果如下





原创不易,转载请注明出处,谢谢
原文地址:https://www.cnblogs.com/Keeping-Fit/p/15154744.html