C++ 线程池的实现

写了一个简易线程池,

原理简单介绍下,就是设置一个任务队列queue,用来放要执行的函数,还有一个线程数组vector,用来存放所有的线程。

线程创建以后就存放在相应的vector里,空闲的线程去queue里去取要执行的函数地址,在run函数中执行,假如一个线程的run函数执行好后,

发现队列没有任务可取,则阻塞该线程,通过conidtion_variable变量的wait()函数进行阻塞,等待新的任务被添加进来后,会有一个cond变量的notify_one()

函数来唤醒阻塞中的run函数。

现在放代码吧!

线程池头文件Thread_Pool.h

/********************************************
            线程池头文件

        Author:十面埋伏但莫慌
        Time:2020/05/03

*********************************************/
#pragma once
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<thread>
#include<queue>
#include<mutex>
#include<atomic>
#include<vector>
#include<condition_variable>

typedef std::function<void()> Func;//定义线程执行函数类型,方便后面编码使用。
//任务类
template<typename T>
class Task {
public:
    Task() {}
    ~Task() {}
    int push(T func)//添加任务;
    {
        try {
            tasks.emplace(func);
        }
        catch (std::exception e)
        {
            throw e;
            return -1;
        }
        return 1;
    }
    int getTaskNum()//获得当前队列中的任务数;
    {
        return tasks.size();
    }
    T pop()//取出待执行的任务;
    {
        T temp;
        if (tasks.empty())
            return temp;
        else
        {
            temp = tasks.front();
            tasks.pop();
            return temp;
        }
    }

private:
    
    std::queue<T> tasks;//任务队列
};
//线程池类
class Thread_Pool {
public:
    Thread_Pool() :IsStart(false) {}
    ~Thread_Pool();
    int addTasks(Func&& tasks);//添加任务;
    void start();//开启线程池;
    void stop();//关闭线程池;
    int getTaskNum();//获得当前队列中的任务数;
private:
    void run();//线程工作函数;

private:
    static const int maxThreadNum = 3;//最大线程数为3;
    std::mutex mx;//锁;
    std::condition_variable cond;//条件量;
    std::vector<std::thread*> threads;//线程向量;
    bool IsStart;//原子变量,判断线程池是否运行;
    Task<Func> tasks;//任务变量;
};
#endif

线程池实现文件 Thread_Pool.cpp

/********************************************
            线程池CPP文件

        Author:十面埋伏但莫慌
        Time:2020/05/03

*********************************************/
#include"Thread_Pool.h"
#include<iostream>



int Thread_Pool::addTasks(Func&& func)
{
    std::unique_lock<std::mutex> lock(mx);
    int ret = tasks.push(func);
    if (ret == 1)
    {
        std::cout << "添加任务成功" << std::endl;
        cond.notify_one();
    }
    
    return ret;
}
void Thread_Pool::start() {
    if (!IsStart) {    
        {
            std::unique_lock<std::mutex> lock(mx);
            IsStart = true;
        }

        threads.reserve(maxThreadNum);
        for (int i = 0; i < maxThreadNum; i++)
        {
            threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this)));            
        }
        
    }
}

void Thread_Pool::run()
{
    while (IsStart)
    {
        Func f;
        if (tasks.getTaskNum() == 0 && IsStart)
        {
            std::unique_lock<std::mutex> lock(mx);
            cond.wait(lock);
        }
        {
            std::unique_lock<std::mutex> lock(mx);
            f = tasks.pop();
        }

        if (f)
            f();

    }
}
int Thread_Pool::getTaskNum() {
    return tasks.getTaskNum();
}
void Thread_Pool::stop() {
        {
            std::unique_lock<std::mutex> lock(mx);
            IsStart = false;
        }    
        cond.notify_all();
        for (auto T : threads) {
            std::cout << "线程 " << T->get_id() << " 已停止。" << std::endl;
            T->join();
            if (T != nullptr)
            {
                delete T;
                T = nullptr;
            }
        }
    std::cout << "所有线程已停止。" << std::endl;
}
Thread_Pool::~Thread_Pool() {
    if (IsStart)
    {
        stop();
    }
}

测试用的main.cpp文件

#include<iostream>
#include"Thread_Pool.h"
using namespace std;
void string_out_one() {
    int n = 500;
    while (n--)
    {
        cout << "One!" << endl;
    }
}
void string_out_two() {
    int n = 500;
    while (n--)
        cout << "Two!" << endl;
}
void string_out_three() {
    int n = 500;
    while(n--)
        cout << "Three!" << endl;
}
void string_out_four() {
    int n = 500;
    while (n--)
        cout << "Four!" << endl;
}
int main() {
    clock_t start, finish;
    double totaltime;
    start = clock();
    {
        /*实验对比代码段1开始处*/
        Thread_Pool Pool;
        try {
            Pool.start();
        }
        catch (std::exception e)
        {
            throw e;
            cout << "线程池创建失败。" << endl;
        }
        Pool.addTasks(move(string_out_one));
        Pool.addTasks(move(string_out_two));
        Pool.addTasks(move(string_out_three));
        Pool.addTasks(move(string_out_four));
    
        /*实验对比代码段1结束处*/
        /*实验对比代码段2开始处*/
        //string_out_one();
        //string_out_two();
        //string_out_three();
        //string_out_four();
        /*实验对比代码段2结束处*/
        finish = clock();
        totaltime = (double)(finish - start) / CLOCKS_PER_SEC;
        cout << "
此程序的运行时间为" << totaltime << "秒!" << endl;
        getchar();
     }
    getchar();
    return 0;
}

总结下这个线程池,主要是要注意锁的防止位置,放太多,就变成单线程,执行效率还不如单线程,放太少,可能会造成多线程之间的误读,放的位置不对,会造成死锁。。。是真的麻烦。

两个想改进的地方,

  一、希望可以在添加任务时确定任务的类型,而不是在Thread_Pool类中就确定task的类型,并且能支持传入函数形参。

  二、程序的优化做的不是很好吧,虽然不知道但是觉得肯定还有优化空间。

若有不足之处欢迎指出。

原文地址:https://www.cnblogs.com/leo-lv/p/12846661.html